@ -43,18 +43,24 @@ class KombuManager(PubSubManager): # pragma: no cover
raise RuntimeError ( ' Kombu package is not installed '
' (Run " pip install kombu " in your '
' virtualenv). ' )
self . kombu = kombu . Connection ( url )
self . exchange = kombu . Exchange ( channel , type = ' fanout ' , durable = False )
self . queue = kombu . Queue ( str ( uuid . uuid4 ( ) ) , self . exchange )
super ( KombuManager , self ) . __init__ ( channel = channel ,
write_only = write_only )
super ( KombuManager , self ) . __init__ ( channel = channel )
self . url = url
self . writer_conn = kombu . Connection ( self . url )
self . writer_queue = self . _queue ( self . writer_conn )
def _queue ( self , conn = None ) :
exchange = kombu . Exchange ( self . channel , type = ' fanout ' , durable = False )
queue = kombu . Queue ( str ( uuid . uuid4 ( ) ) , exchange )
return queue
def _publish ( self , data ) :
with self . kombu . SimpleQueue ( self . queue ) as queue :
with self . writer_conn . SimpleQueue ( self . writer_ queue) as queue :
queue . put ( pickle . dumps ( data ) )
def _listen ( self ) :
with self . kombu . SimpleQueue ( self . queue ) as queue :
reader_conn = kombu . Connection ( self . url )
reader_queue = self . _queue ( reader_conn )
with reader_conn . SimpleQueue ( reader_queue ) as queue :
while True :
message = queue . get ( block = True )
message . ack ( )