diff --git a/socketio/base_manager.py b/socketio/base_manager.py index 19e22f7..e0f35e0 100644 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -104,12 +104,16 @@ class BaseManager(object): def trigger_callback(self, sid, namespace, id, data): """Invoke an application callback.""" + callback = None try: callback = self.callbacks[sid][namespace][id] except KeyError: - raise ValueError('Unknown callback') - del self.callbacks[sid][namespace][id] - callback(*data) + # if we get an unknown callback we just ignore it + self.server.logger.warning('Unknown callback received, ignoring.') + else: + del self.callbacks[sid][namespace][id] + if callback is not None: + callback(*data) def _generate_ack_id(self, sid, namespace, callback): """Generate a unique identifier for an ACK packet.""" diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 837f054..c5e7298 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -57,8 +57,12 @@ class KombuManager(PubSubManager): if isinstance(message.payload, six.binary_type): try: data = pickle.loads(message.payload) - except pickle.PickleError: + except: pass if data is None: - data = json.loads(message.payload) - yield data + try: + data = json.loads(message.payload) + except: + pass + if data: + yield data diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index 01a03ec..bb21e0e 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -1,3 +1,6 @@ +from functools import partial +import uuid + from .base_manager import BaseManager @@ -18,6 +21,7 @@ class PubSubManager(BaseManager): def __init__(self, channel='socketio'): super(PubSubManager, self).__init__() self.channel = channel + self.host_id = uuid.uuid4().hex def initialize(self, server): super(PubSubManager, self).initialize(server) @@ -34,8 +38,14 @@ class PubSubManager(BaseManager): The parameters are the same as in :meth:`.Server.emit`. """ + namespace = namespace or '/' + if callback is not None: + id = self._generate_ack_id(room, namespace, callback) + callback = (room, namespace, id) + else: + callback = None self._publish({'method': 'emit', 'event': event, 'data': data, - 'namespace': namespace or '/', 'room': room, + 'namespace': namespace, 'room': room, 'skip_sid': skip_sid, 'callback': callback}) def close_room(self, room, namespace=None): @@ -61,17 +71,50 @@ class PubSubManager(BaseManager): raise NotImplementedError('This method must be implemented in a ' 'subclass.') + def _handle_emit(self, message): + # Events with callbacks are very tricky to handle across hosts + # Here in the receiving end we set up a local callback that preserves + # the callback host and id from the sender + remote_callback = message.get('callback') + if remote_callback is not None and len(remote_callback) == 3: + callback = partial(self._return_callback, self.host_id, + *remote_callback) + else: + callback = None + super(PubSubManager, self).emit(message['event'], message['data'], + namespace=message.get('namespace'), + room=message.get('room'), + skip_sid=message.get('skip_sid'), + callback=callback) + + def _handle_callback(self, message): + if self.host_id == message.get('host_id'): + try: + sid = message['sid'] + namespace = message['namespace'] + id = message['id'] + args = message['args'] + except KeyError: + return + self.trigger_callback(sid, namespace, id, args) + + def _return_callback(self, host_id, sid, namespace, callback_id, *args): + # When an event callback is received, the callback is returned back + # the sender, which is identified by the host_id + self._publish({'method': 'callback', 'host_id': host_id, + 'sid': sid, 'namespace': namespace, 'id': callback_id, + 'args': args}) + + def _handle_close_room(self, message): + super(PubSubManager, self).close_room( + room=message.get('room'), namespace=message.get('namespace')) + def _thread(self): for message in self._listen(): if 'method' in message: if message['method'] == 'emit': - super(PubSubManager, self).emit( - message['event'], message['data'], - namespace=message.get('namespace'), - room=message.get('room'), - skip_sid=message.get('skip_sid'), - callback=message.get('callback')) + self._handle_emit(message) + elif message['method'] == 'callback': + self._handle_callback(message) elif message['method'] == 'close_room': - super(PubSubManager, self).close_room( - room=message.get('room'), - namespace=message.get('namespace')) + self._handle_close_room(message) diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py index 735bbcf..9004fae 100644 --- a/socketio/redis_manager.py +++ b/socketio/redis_manager.py @@ -52,9 +52,13 @@ class RedisManager(PubSubManager): if isinstance(message['data'], six.binary_type): try: data = pickle.loads(message['data']) - except pickle.PickleError: + except: pass if data is None: - data = json.loads(message['data']) - yield data + try: + data = json.loads(message['data']) + except: + pass + if data: + yield data self.pubsub.unsubscribe(self.channel) diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index 0badf69..8ff9369 100644 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -104,12 +104,11 @@ class TestBaseManager(unittest.TestCase): self.bm.connect('123', '/') cb = mock.MagicMock() id = self.bm._generate_ack_id('123', '/', cb) - self.assertRaises(ValueError, self.bm.trigger_callback, - '124', '/', id, ['foo']) - self.assertRaises(ValueError, self.bm.trigger_callback, - '123', '/foo', id, ['foo']) - self.assertRaises(ValueError, self.bm.trigger_callback, - '123', '/', id + 1, ['foo']) + + # these should not raise an exception + self.bm.trigger_callback('124', '/', id, ['foo']) + self.bm.trigger_callback('123', '/foo', id, ['foo']) + self.bm.trigger_callback('123', '/', id + 1, ['foo']) self.assertEqual(cb.call_count, 0) def test_get_namespaces(self): diff --git a/tests/test_server.py b/tests/test_server.py index c7a7a6d..6f2c7ca 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -274,7 +274,9 @@ class TestServer(unittest.TestCase): s._handle_eio_message('123', '61-1["my message","a",' '{"_placeholder":true,"num":0}]') self.assertEqual(s._attachment_count, 1) - self.assertRaises(ValueError, s._handle_eio_message, '123', b'foo') + # the following call should not raise an exception in spite of the + # callback id being invalid + s._handle_eio_message('123', b'foo') def test_handle_event_with_ack(self, eio): s = server.Server()