Browse Source

Write KafkaManager

pull/181/head
Vincent Mézino 7 years ago
parent
commit
610c229fd9
  1. 101
      socketio/kafka_manager.py

101
socketio/kafka_manager.py

@ -1,11 +1,12 @@
import logging import logging
import pickle import pickle
import time import time
import json
try: try:
import redis import kafka
except ImportError: except ImportError:
redis = None kafka = None
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
@ -13,43 +14,41 @@ logger = logging.getLogger('socketio')
class KafkaManager(PubSubManager): # pragma: no cover class KafkaManager(PubSubManager): # pragma: no cover
"""Redis based client manager. """Kafka based client manager.
This class implements a Redis backend for event sharing across multiple This class implements a Kafka backend for event sharing across multiple
processes. Only kept here as one more example of how to build a custom processes.
backend, since the kombu backend is perfectly adequate to support a Redis
message queue.
To use a Redis backend, initialize the :class:`Server` instance as To use a Kafka backend, initialize the :class:`Server` instance as
follows:: follows::
url = 'redis://hostname:port/0' url = 'kafka://hostname:port/0'
server = socketio.Server(client_manager=socketio.RedisManager(url)) server = socketio.Server(client_manager=socketio.KafkaManager(url))
:param url: The connection URL for the Redis server. For a default Redis :param url: The connection URL for the Kafka server. For a default Kafka
store running on the same host, use ``redis://``. store running on the same host, use ``kafka://``.
:param channel: The channel name on which the server sends and receives :param channel: The channel name (topic) on which the server sends and receives
notifications. Must be the same in all the servers. notifications. Must be the same in all the servers.
:param write_only: If set ot ``True``, only initialize to emit events. The :param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting default of ``False`` initializes the class for emitting
and receiving. and receiving.
""" """
name = 'redis' name = 'kafka'
def __init__(self, url='redis://localhost:6379/0', channel='socketio', def __init__(self, url='kafka://localhost:9092/0', channel='socketio',
write_only=False): write_only=False):
if redis is None: if kafka is None:
raise RuntimeError('Redis package is not installed ' raise RuntimeError('kafka-python package is not installed '
'(Run "pip install redis" in your ' '(Run "pip install kafka-python" in your '
'virtualenv).') 'virtualenv).')
self.redis_url = url self.kafka_url = url
self._redis_connect() self.producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')
super(RedisManager, self).__init__(channel=channel,
super(KafkaManager, self).__init__(channel=channel,
write_only=write_only) write_only=write_only)
def initialize(self): def initialize(self):
super(RedisManager, self).initialize() super(KafkaManager, self).initialize()
monkey_patched = True monkey_patched = True
if self.server.async_mode == 'eventlet': if self.server.async_mode == 'eventlet':
from eventlet.patcher import is_monkey_patched from eventlet.patcher import is_monkey_patched
@ -59,52 +58,22 @@ class KafkaManager(PubSubManager): # pragma: no cover
monkey_patched = is_module_patched('socket') monkey_patched = is_module_patched('socket')
if not monkey_patched: if not monkey_patched:
raise RuntimeError( raise RuntimeError(
'Redis requires a monkey patched socket library to work ' 'Kafka requires a monkey patched socket library to work '
'with ' + self.server.async_mode) 'with ' + self.server.async_mode)
def _redis_connect(self):
self.redis = redis.Redis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()
def _publish(self, data): def _publish(self, data):
retry = True self.producer.send(self.channel, value=pickle.dumps(data))
while True: self.producer.flush()
try:
if not retry:
self._redis_connect() def _kafka_listen(self):
return self.redis.publish(self.channel, pickle.dumps(data)) self.consumer = kafka.KafkaConsumer(self.channel, bootstrap_servers='localhost:9092')
except redis.exceptions.ConnectionError: for message in self.consumer:
if retry: yield message
logger.error('Cannot publish to redis... retrying')
retry = False
else:
logger.error('Cannot publish to redis... giving up')
break
def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
while True:
try:
if connect:
self._redis_connect()
self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen():
yield message
except redis.exceptions.ConnectionError:
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep))
connect = True
time.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60
def _listen(self): def _listen(self):
channel = self.channel.encode('utf-8') for message in self._kafka_listen():
self.pubsub.subscribe(self.channel) if message.topic == self.channel:
for message in self._redis_listen_with_retries(): yield pickle.loads(message.value)
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
yield message['data']
self.pubsub.unsubscribe(self.channel)

Loading…
Cancel
Save