diff --git a/socketio/__init__.py b/socketio/__init__.py index b97db6f..b77e23d 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -5,6 +5,7 @@ from .base_manager import BaseManager from .pubsub_manager import PubSubManager from .kombu_manager import KombuManager from .redis_manager import RedisManager +from .kafka_manager import KafkaManager from .zmq_manager import ZmqManager from .server import Server from .namespace import Namespace, ClientNamespace @@ -29,8 +30,8 @@ else: # pragma: no cover __version__ = '4.3.1dev' __all__ = ['__version__', 'Client', 'Server', 'BaseManager', 'PubSubManager', - 'KombuManager', 'RedisManager', 'ZmqManager', 'Namespace', - 'ClientNamespace', 'WSGIApp', 'Middleware'] + 'KombuManager', 'RedisManager', 'ZmqManager', 'KafkaManager', + 'Namespace', 'ClientNamespace', 'WSGIApp', 'Middleware'] if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncClient', 'AsyncServer', 'AsyncNamespace', 'AsyncClientNamespace', 'AsyncManager', 'AsyncRedisManager', diff --git a/socketio/kafka_manager.py b/socketio/kafka_manager.py new file mode 100644 index 0000000..00a2e7f --- /dev/null +++ b/socketio/kafka_manager.py @@ -0,0 +1,63 @@ +import logging +import pickle + +try: + import kafka +except ImportError: + kafka = None + +from .pubsub_manager import PubSubManager + +logger = logging.getLogger('socketio') + + +class KafkaManager(PubSubManager): # pragma: no cover + """Kafka based client manager. + + This class implements a Kafka backend for event sharing across multiple + processes. + + To use a Kafka backend, initialize the :class:`Server` instance as + follows:: + + url = 'kafka://hostname:port' + server = socketio.Server(client_manager=socketio.KafkaManager(url)) + + :param url: The connection URL for the Kafka server. For a default Kafka + store running on the same host, use ``kafka://``. + :param channel: The channel name (topic) on which the server sends and + receives notifications. Must be the same in all the + servers. + :param write_only: If set ot ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + """ + name = 'kafka' + + def __init__(self, url='kafka://localhost:9092', channel='socketio', + write_only=False): + if kafka is None: + raise RuntimeError('kafka-python package is not installed ' + '(Run "pip install kafka-python" in your ' + 'virtualenv).') + + super(KafkaManager, self).__init__(channel=channel, + write_only=write_only) + + self.kafka_url = url[8:] if url != 'kafka://' else 'localhost:9092' + self.producer = kafka.KafkaProducer(bootstrap_servers=self.kafka_url) + self.consumer = kafka.KafkaConsumer(self.channel, + bootstrap_servers=self.kafka_url) + + def _publish(self, data): + self.producer.send(self.channel, value=pickle.dumps(data)) + self.producer.flush() + + def _kafka_listen(self): + for message in self.consumer: + yield message + + def _listen(self): + for message in self._kafka_listen(): + if message.topic == self.channel: + yield pickle.loads(message.value)