|
|
@ -1,6 +1,6 @@ |
|
|
|
import logging |
|
|
|
import pickle |
|
|
|
from typing import List |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
import kafka |
|
|
@ -9,7 +9,7 @@ except ImportError: |
|
|
|
|
|
|
|
from .pubsub_manager import PubSubManager |
|
|
|
|
|
|
|
logger = logging.getLogger('socketio') |
|
|
|
logger = logging.getLogger("socketio") |
|
|
|
|
|
|
|
|
|
|
|
class KafkaManager(PubSubManager): # pragma: no cover |
|
|
@ -26,6 +26,8 @@ class KafkaManager(PubSubManager): # pragma: no cover |
|
|
|
|
|
|
|
:param url: The connection URL for the Kafka server. For a default Kafka |
|
|
|
store running on the same host, use ``kafka://``. |
|
|
|
From now on you can pass a list of URLs to use |
|
|
|
different Kafka servers. |
|
|
|
:param channel: The channel name (topic) on which the server sends and |
|
|
|
receives notifications. Must be the same in all the |
|
|
|
servers. |
|
|
@ -33,23 +35,29 @@ class KafkaManager(PubSubManager): # pragma: no cover |
|
|
|
default of ``False`` initializes the class for emitting |
|
|
|
and receiving. |
|
|
|
""" |
|
|
|
name = 'kafka' |
|
|
|
|
|
|
|
def __init__(self, url='kafka://localhost:9092',channel='socketio', |
|
|
|
write_only=False): |
|
|
|
name = "kafka" |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, url="kafka://localhost:9092", channel="socketio", write_only=False |
|
|
|
): |
|
|
|
if kafka is None: |
|
|
|
raise RuntimeError('kafka-python package is not installed ' |
|
|
|
'(Run "pip install kafka-python" in your ' |
|
|
|
'virtualenv).') |
|
|
|
raise RuntimeError( |
|
|
|
"kafka-python package is not installed " |
|
|
|
'(Run "pip install kafka-python" in your ' |
|
|
|
"virtualenv)." |
|
|
|
) |
|
|
|
|
|
|
|
super(KafkaManager, self).__init__(channel=channel, write_only=write_only) |
|
|
|
|
|
|
|
super(KafkaManager, self).__init__(channel=channel, |
|
|
|
write_only=write_only) |
|
|
|
|
|
|
|
urls = [url] if isinstance(url, str) else url |
|
|
|
self.kafka_urls = [url[8:] if url != 'kafka://' else 'localhost:9092' for url in urls] |
|
|
|
self.kafka_urls = [ |
|
|
|
url[8:] if url != "kafka://" else "localhost:9092" for url in urls |
|
|
|
] |
|
|
|
self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_urls) |
|
|
|
self.consumer = kafka.KafkaConsumer(self.channel, |
|
|
|
bootstrap_servers=self.kafka_urls) |
|
|
|
self.consumer = kafka.KafkaConsumer( |
|
|
|
self.channel, bootstrap_servers=self.kafka_urls |
|
|
|
) |
|
|
|
|
|
|
|
def _publish(self, data): |
|
|
|
self.producer.send(self.channel, value=pickle.dumps(data)) |
|
|
|