diff --git a/README.rst b/README.rst
index a2f9bb9..57fdd94 100644
--- a/README.rst
+++ b/README.rst
@@ -9,20 +9,33 @@ Python implementation of the `Socket.IO`_ realtime server.
 Features
 --------
 
--  Fully compatible with the Javascript `socket.io-client`_ library.
--  Compatible with Python 2.7 and Python 3.3+.
--  Based on `Eventlet`_, enabling large number of clients even on modest
-   hardware.
--  Includes a WSGI middleware that integrates Socket.IO traffic with
-   standard WSGI applications.
--  Uses an event-based architecture implemented with decorators that
-   hides the details of the protocol.
--  Implements HTTP long-polling and WebSocket transports.
--  Supports XHR2 and XHR browsers as clients.
--  Supports text and binary messages.
--  Supports gzip and deflate HTTP compression.
--  Configurable CORS responses to avoid cross-origin problems with
-   browsers.
+- Fully compatible with the 
+  `Javascript <https://github.com/Automattic/socket.io-client>`_,
+  `Swift <https://github.com/socketio/socket.io-client-swift>`_,
+  `C++ <https://github.com/socketio/socket.io-client-cpp>`_ and
+  `Java <https://github.com/socketio/socket.io-client-java>`_ official
+  Socket.IO clients, plus any third party clients that comply with the
+  Socket.IO specification.
+- Compatible with Python 2.7 and Python 3.3+.
+- Supports large number of clients even on modest hardware when used with an
+  asynchronous server based on `eventlet <http://eventlet.net/>`_ or
+  `gevent <http://gevent.org/>`_. For development and testing, any WSGI
+  complaint multi-threaded server can be used.
+- Includes a WSGI middleware that integrates Socket.IO traffic with standard
+  WSGI applications.
+- Broadcasting of messages to all connected clients, or to subsets of them
+  assigned to "rooms".
+- Optional support for multiple servers, connected through a messaging queue
+  such as Redis or RabbitMQ.
+- Send messages to clients from external processes, such as Celery workers or
+  auxiliary scripts.
+- Event-based architecture implemented with decorators that hides the details
+  of the protocol.
+- Support for HTTP long-polling and WebSocket transports.
+- Support for XHR2 and XHR browsers.
+- Support for text and binary messages.
+- Support for gzip and deflate HTTP compression.
+- Configurable CORS responses, to avoid cross-origin problems with browsers.
 
 Example
 -------
diff --git a/docs/index.rst b/docs/index.rst
index 9254da6..dbb0e63 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -8,13 +8,17 @@ socketio documentation
 
 :ref:`genindex` | :ref:`modindex` | :ref:`search`
 
-This project implements an Socket.IO server that can run standalone or
+This project implements a Socket.IO server that can run standalone or
 integrated with a Python WSGI application. The following are some of its
 features:
 
-- Fully compatible with the Javascript
-  `socket.io-client <https://github.com/Automattic/socket.io-client>`_ library,
-  versions 1.3.5 and up.
+- Fully compatible with the 
+  `Javascript <https://github.com/Automattic/socket.io-client>`_,
+  `Swift <https://github.com/socketio/socket.io-client-swift>`_,
+  `C++ <https://github.com/socketio/socket.io-client-cpp>`_ and
+  `Java <https://github.com/socketio/socket.io-client-java>`_ official
+  Socket.IO clients, plus any third party clients that comply with the
+  Socket.IO specification.
 - Compatible with Python 2.7 and Python 3.3+.
 - Supports large number of clients even on modest hardware when used with an
   asynchronous server based on `eventlet <http://eventlet.net/>`_ or
@@ -24,25 +28,33 @@ features:
   WSGI applications.
 - Broadcasting of messages to all connected clients, or to subsets of them
   assigned to "rooms".
-- Uses an event-based architecture implemented with decorators that hides the
-  details of the protocol.
+- Optional support for multiple servers, connected through a messaging queue
+  such as Redis or RabbitMQ.
+- Send messages to clients from external processes, such as Celery workers or
+  auxiliary scripts.
+- Event-based architecture implemented with decorators that hides the details
+  of the protocol.
 - Support for HTTP long-polling and WebSocket transports.
 - Support for XHR2 and XHR browsers.
 - Support for text and binary messages.
 - Support for gzip and deflate HTTP compression.
-- Configurable CORS responses to avoid cross-origin problems with browsers.
+- Configurable CORS responses, to avoid cross-origin problems with browsers.
 
 What is Socket.IO?
 ------------------
 
 Socket.IO is a transport protocol that enables real-time bidirectional
 event-based communication between clients (typically web browsers) and a
-server. The official implementations of the client and server components are
+server. The original implementations of the client and server components are
 written in JavaScript.
 
 Getting Started
 ---------------
 
+The Socket.IO server can be installed with pip::
+
+    pip install python-socketio
+
 The following is a basic example of a Socket.IO server that uses Flask to
 deploy the client code to the browser::
 
@@ -100,8 +112,8 @@ Rooms
 
 Because Socket.IO is a bidirectional protocol, the server can send messages to
 any connected client at any time. To make it easy to address groups of clients,
-the application can put clients into rooms, and then address messages to all
-the clients in a room.
+the application can put clients into rooms, and then address messages to the
+entire room.
 
 When clients first connect, they are assigned to their own rooms, named with
 the session ID (the ``sid`` argument passed to all event handlers). The
@@ -198,6 +210,66 @@ methods in the :class:`socketio.Server` class.
 When the ``namespace`` argument is omitted, set to ``None`` or to ``'/'``, the
 default namespace, representing the physical connection, is used.
 
+Using a Message Queue
+---------------------
+
+The Socket.IO server owns the socket connections to all the clients, so it is
+the only process that can emit events to them. Unfortunately this becomes a
+limitation for many applications, as a common need is to emit events to
+clients from a different process, like a
+`Celery <http://www.celeryproject.org/>`_ worker, or any other auxiliary
+process or script that works in conjunction with the server.
+
+To enable these other processes to emit events, the server can be configured
+to listen for externally issued events on a message queue such as
+`Redis <http://redis.io/>`_ or `RabbitMQ <https://www.rabbitmq.com/>`_.
+Processes that need to emit events to client then post these events to the
+queue.
+
+Another situation in which the use of a message queue is necessary is with
+high traffic applications that work with large number of clients. To support
+these clients, it may be necessary to horizontally scale the Socket.IO
+server by splitting the client list among multiple server processes. For this
+type of installation, the server processes communicate with each other through
+ta message queue.
+
+The message queue service needs to be installed and configured separately. By
+default, the server uses `Kombu <http://kombu.readthedocs.org/en/latest/>`_
+to access the message queue, so any message queue supported by this package
+can be used. Kombu can be installed with pip::
+
+    pip install kombu
+
+To configure a Socket.IO server to connect to a message queue, the
+``client_manager`` argument must be passed in the server creation. The
+following example instructs the server to connect to a Redis service running
+on the same host and on the default port::
+
+    redis = socketio.KombuManager('redis://localhost:6379/')
+    sio = socketio.Server(client_manager=redis)
+
+For a RabbitMQ queue also running on the local server, the configuration is
+as follows::
+
+    amqp = socketio.KombuManager('amqp://guest:guest@localhost:5672//')
+    sio = socketio.Server(client_manager=amqp)
+
+The arguments passed to the ``KombuManager`` constructor are passed directly
+to Kombu's `Connection object
+<http://kombu.readthedocs.org/en/latest/userguide/connections.html>`_.
+
+If multiple Sokcet.IO servers are connected to a message queue, they
+automatically communicate with each other and manage a combine client list,
+without any need for additional configuration. To have a process other than
+the server connect to the queue to emit a message, the same ``KombuManager``
+class can be used. For example::
+
+    # connect to the redis queue
+    redis = socketio.KombuManager('redis://localhost:6379/')
+    
+    # emit an event
+    redis.emit('my event', data={'foo': 'bar'}, room='my room')
+
 Deployment
 ----------
 
@@ -233,16 +305,14 @@ command to launch the application under gunicorn is shown below::
     $ gunicorn -k eventlet -w 1 module:app
 
 Due to limitations in its load balancing algorithm, gunicorn can only be used
-with one worker process, so the ``-w 1`` option is required. Note that a
-single eventlet worker can handle a large number of concurrent clients.
+with one worker process, so the ``-w`` option cannot be set to a value higher
+than 1. A single eventlet worker can handle a large number of concurrent
+clients, each handled by a greenlet.
 
-Another limitation when using gunicorn is that the WebSocket transport is not
-available, because this transport it requires extensions to the WSGI standard.
-
-Note: Eventlet provides a ``monkey_patch()`` function that replaces all the
-blocking functions in the standard library with equivalent asynchronous
-versions. While python-socketio does not require monkey patching, other
-libraries such as database drivers are likely to require it.
+Eventlet provides a ``monkey_patch()`` function that replaces all the blocking
+functions in the standard library with equivalent asynchronous versions. While
+python-socketio does not require monkey patching, other libraries such as
+database drivers are likely to require it.
 
 Gevent
 ~~~~~~
@@ -287,14 +357,14 @@ Or to include WebSocket::
     $ gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -w 1 module: app
 
 Same as with eventlet, due to limitations in its load balancing algorithm,
-gunicorn can only be used with one worker process, so the ``-w 1`` option is
-required. Note that a single eventlet worker can handle a large number of
-concurrent clients.
+gunicorn can only be used with one worker process, so the ``-w`` option cannot
+be higher than 1. A single gevent worker can handle a large number of
+concurrent clients through the use of greenlets.
 
-Note: Gevent provides a ``monkey_patch()`` function that replaces all the
-blocking functions in the standard library with equivalent asynchronous
-versions. While python-socketio does not require monkey patching, other
-libraries such as database drivers are likely to require it.
+Gevent provides a ``monkey_patch()`` function that replaces all the blocking
+functions in the standard library with equivalent asynchronous versions. While
+python-socketio does not require monkey patching, other libraries such as
+database drivers are likely to require it.
 
 Standard Threading Library
 ~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -345,25 +415,25 @@ multiple servers), the following conditions must be met:
   using eventlet, gevent, or standard threads. Worker processes that only
   handle one request at a time are not supported.
 - The load balancer must be configured to always forward requests from a
-  client to the same process. Load balancers call this *sticky sessions*, or
-  *session affinity*.
-
-A limitation in the current release of the Socket.IO server is that because
-the clients are randomly assigned to different server processes, any form of
-broadcasting is not supported. A storage backend that enables multiple
-processes to share information about clients is currently in development to
-address this important limitation.
+  client to the same worker process. Load balancers call this *sticky
+  sessions*, or *session affinity*.
+- The worker processes communicate with each other through a message queue,
+  which must be installed and configured. See the section on using message
+  queues above for instructions.
 
 API Reference
 -------------
 
-.. toctree::
-   :maxdepth: 2
-
 .. module:: socketio
-
 .. autoclass:: Middleware
    :members:
-
 .. autoclass:: Server
    :members:
+.. autoclass:: BaseManager
+   :members:
+.. autoclass:: PubSubManager
+   :members:
+.. autoclass:: KombuManager
+   :members:
+.. autoclass:: RedisManager
+   :members:
diff --git a/socketio/__init__.py b/socketio/__init__.py
index 6e95349..be5882f 100644
--- a/socketio/__init__.py
+++ b/socketio/__init__.py
@@ -1,4 +1,9 @@
 from .middleware import Middleware
+from .base_manager import BaseManager
+from .pubsub_manager import PubSubManager
+from .kombu_manager import KombuManager
+from .redis_manager import RedisManager
 from .server import Server
 
-__all__ = [Middleware, Server]
+__all__ = [Middleware, Server, BaseManager, PubSubManager, KombuManager,
+           RedisManager]
diff --git a/socketio/base_manager.py b/socketio/base_manager.py
index 7d13f1f..e0f35e0 100644
--- a/socketio/base_manager.py
+++ b/socketio/base_manager.py
@@ -12,12 +12,15 @@ class BaseManager(object):
     services. More sophisticated storage backends can be implemented by
     subclasses.
     """
-    def __init__(self, server):
-        self.server = server
+    def __init__(self):
+        self.server = None
         self.rooms = {}
         self.pending_removals = []
         self.callbacks = {}
 
+    def initialize(self, server):
+        self.server = server
+
     def get_namespaces(self):
         """Return an iterable with the active namespace names."""
         return six.iterkeys(self.rooms)
@@ -69,7 +72,7 @@ class BaseManager(object):
         except KeyError:
             pass
 
-    def close_room(self, namespace, room):
+    def close_room(self, room, namespace):
         """Remove all participants from a room."""
         try:
             for sid in self.get_participants(namespace, room):
@@ -101,12 +104,16 @@ class BaseManager(object):
 
     def trigger_callback(self, sid, namespace, id, data):
         """Invoke an application callback."""
+        callback = None
         try:
             callback = self.callbacks[sid][namespace][id]
         except KeyError:
-            raise ValueError('Unknown callback')
-        del self.callbacks[sid][namespace][id]
-        callback(*data)
+            # if we get an unknown callback we just ignore it
+            self.server.logger.warning('Unknown callback received, ignoring.')
+        else:
+            del self.callbacks[sid][namespace][id]
+        if callback is not None:
+            callback(*data)
 
     def _generate_ack_id(self, sid, namespace, callback):
         """Generate a unique identifier for an ACK packet."""
diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py
new file mode 100644
index 0000000..05d256d
--- /dev/null
+++ b/socketio/kombu_manager.py
@@ -0,0 +1,54 @@
+import pickle
+
+try:
+    import kombu
+except ImportError:
+    kombu = None
+
+from .pubsub_manager import PubSubManager
+
+
+class KombuManager(PubSubManager):  # pragma: no cover
+    """Client manager that uses kombu for inter-process messaging.
+
+    This class implements a client manager backend for event sharing across
+    multiple processes, using RabbitMQ, Redis or any other messaging mechanism
+    supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_.
+
+    To use a kombu backend, initialize the :class:`Server` instance as
+    follows::
+
+        url = 'amqp://user:password@hostname:port//'
+        server = socketio.Server(client_manager=socketio.KombuManager(url))
+
+    :param url: The connection URL for the backend messaging queue. Example
+                connection URLs are ``'amqp://guest:guest@localhost:5672//'``
+                and ``'redis://localhost:6379/'`` for RabbitMQ and Redis
+                respectively. Consult the `kombu documentation
+                <http://kombu.readthedocs.org/en/latest/userguide\
+                /connections.html#urls>`_ for more on how to construct
+                connection URLs.
+    :param channel: The channel name on which the server sends and receives
+                    notifications. Must be the same in all the servers.
+    """
+    name = 'kombu'
+
+    def __init__(self, url='amqp://guest:guest@localhost:5672//',
+                 channel='socketio'):
+        if kombu is None:
+            raise RuntimeError('Kombu package is not installed '
+                               '(Run "pip install kombu" in your '
+                               'virtualenv).')
+        self.kombu = kombu.Connection(url)
+        self.queue = self.kombu.SimpleQueue(channel)
+        super(KombuManager, self).__init__(channel=channel)
+
+    def _publish(self, data):
+        return self.queue.put(pickle.dumps(data))
+
+    def _listen(self):
+        listen_queue = self.kombu.SimpleQueue(self.channel)
+        while True:
+            message = listen_queue.get(block=True)
+            message.ack()
+            yield message.payload
diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py
new file mode 100644
index 0000000..c454540
--- /dev/null
+++ b/socketio/pubsub_manager.py
@@ -0,0 +1,145 @@
+from functools import partial
+import uuid
+
+import json
+import pickle
+import six
+
+from .base_manager import BaseManager
+
+
+class PubSubManager(BaseManager):
+    """Manage a client list attached to a pub/sub backend.
+
+    This is a base class that enables multiple servers to share the list of
+    clients, with the servers communicating events through a pub/sub backend.
+    The use of a pub/sub backend also allows any client connected to the
+    backend to emit events addressed to Socket.IO clients.
+
+    The actual backends must be implemented by subclasses, this class only
+    provides a pub/sub generic framework.
+
+    :param channel: The channel name on which the server sends and receives
+                    notifications.
+    """
+    name = 'pubsub'
+
+    def __init__(self, channel='socketio'):
+        super(PubSubManager, self).__init__()
+        self.channel = channel
+        self.host_id = uuid.uuid4().hex
+
+    def initialize(self, server):
+        super(PubSubManager, self).initialize(server)
+        self.thread = self.server.start_background_task(self._thread)
+        self.server.logger.info(self.name + ' backend initialized.')
+
+    def emit(self, event, data, namespace=None, room=None, skip_sid=None,
+             callback=None):
+        """Emit a message to a single client, a room, or all the clients
+        connected to the namespace.
+
+        This method takes care or propagating the message to all the servers
+        that are connected through the message queue.
+
+        The parameters are the same as in :meth:`.Server.emit`.
+        """
+        namespace = namespace or '/'
+        if callback is not None:
+            if self.server is None:
+                raise RuntimeError('Callbacks can only be issued from the '
+                                   'context of a server.')
+            if room is None:
+                raise ValueError('Cannot use callback without a room set.')
+            id = self._generate_ack_id(room, namespace, callback)
+            callback = (room, namespace, id)
+        else:
+            callback = None
+        self._publish({'method': 'emit', 'event': event, 'data': data,
+                       'namespace': namespace, 'room': room,
+                       'skip_sid': skip_sid, 'callback': callback})
+
+    def close_room(self, room, namespace=None):
+        self._publish({'method': 'close_room', 'room': room,
+                       'namespace': namespace or '/'})
+
+    def _publish(self, data):
+        """Publish a message on the Socket.IO channel.
+
+        This method needs to be implemented by the different subclasses that
+        support pub/sub backends.
+        """
+        raise NotImplementedError('This method must be implemented in a '
+                                  'subclass.')  # pragma: no cover
+
+    def _listen(self):
+        """Return the next message published on the Socket.IO channel,
+        blocking until a message is available.
+
+        This method needs to be implemented by the different subclasses that
+        support pub/sub backends.
+        """
+        raise NotImplementedError('This method must be implemented in a '
+                                  'subclass.')  # pragma: no cover
+
+    def _handle_emit(self, message):
+        # Events with callbacks are very tricky to handle across hosts
+        # Here in the receiving end we set up a local callback that preserves
+        # the callback host and id from the sender
+        remote_callback = message.get('callback')
+        if remote_callback is not None and len(remote_callback) == 3:
+            callback = partial(self._return_callback, self.host_id,
+                               *remote_callback)
+        else:
+            callback = None
+        super(PubSubManager, self).emit(message['event'], message['data'],
+                                        namespace=message.get('namespace'),
+                                        room=message.get('room'),
+                                        skip_sid=message.get('skip_sid'),
+                                        callback=callback)
+
+    def _handle_callback(self, message):
+        if self.host_id == message.get('host_id'):
+            try:
+                sid = message['sid']
+                namespace = message['namespace']
+                id = message['id']
+                args = message['args']
+            except KeyError:
+                return
+            self.trigger_callback(sid, namespace, id, args)
+
+    def _return_callback(self, host_id, sid, namespace, callback_id, *args):
+        # When an event callback is received, the callback is returned back
+        # the sender, which is identified by the host_id
+        self._publish({'method': 'callback', 'host_id': host_id,
+                       'sid': sid, 'namespace': namespace, 'id': callback_id,
+                       'args': args})
+
+    def _handle_close_room(self, message):
+        super(PubSubManager, self).close_room(
+            room=message.get('room'), namespace=message.get('namespace'))
+
+    def _thread(self):
+        for message in self._listen():
+            data = None
+            if isinstance(message, dict):
+                data = message
+            else:
+                if isinstance(message, six.binary_type):  # pragma: no cover
+                    try:
+                        data = pickle.loads(message)
+                    except:
+                        pass
+                if data is None:
+                    try:
+                        data = json.loads(message)
+                    except:
+                        pass
+            if data and 'method' in data:
+                if data['method'] == 'emit':
+                    self._handle_emit(data)
+                elif data['method'] == 'callback':
+                    self._handle_callback(data)
+                elif data['method'] == 'close_room':
+                    self._handle_close_room(data)
diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py
new file mode 100644
index 0000000..900baaf
--- /dev/null
+++ b/socketio/redis_manager.py
@@ -0,0 +1,50 @@
+import pickle
+
+try:
+    import redis
+except ImportError:
+    redis = None
+
+from .pubsub_manager import PubSubManager
+
+
+class RedisManager(PubSubManager):  # pragma: no cover
+    """Redis based client manager.
+
+    This class implements a Redis backend for event sharing across multiple
+    processes. Only kept here as one more example of how to build a custom
+    backend, since the kombu backend is perfectly adequate to support a Redis
+    message queue.
+
+    To use a Redis backend, initialize the :class:`Server` instance as
+    follows::
+
+        url = 'redis://hostname:port/0'
+        server = socketio.Server(client_manager=socketio.RedisManager(url))
+
+    :param url: The connection URL for the Redis server.
+    :param channel: The channel name on which the server sends and receives
+                    notifications. Must be the same in all the servers.
+    """
+    name = 'redis'
+
+    def __init__(self, url='redis://localhost:6379/0', channel='socketio'):
+        if redis is None:
+            raise RuntimeError('Redis package is not installed '
+                               '(Run "pip install redis" in your '
+                               'virtualenv).')
+        self.redis = redis.Redis.from_url(url)
+        self.pubsub = self.redis.pubsub()
+        super(RedisManager, self).__init__(channel=channel)
+
+    def _publish(self, data):
+        return self.redis.publish(self.channel, pickle.dumps(data))
+
+    def _listen(self):
+        channel = self.channel.encode('utf-8')
+        self.pubsub.subscribe(self.channel)
+        for message in self.pubsub.listen():
+            if message['channel'] == channel and \
+                    message['type'] == 'message' and 'data' in message:
+                yield message['data']
+        self.pubsub.unsubscribe(self.channel)
diff --git a/socketio/server.py b/socketio/server.py
index dcb1bae..0f22f98 100644
--- a/socketio/server.py
+++ b/socketio/server.py
@@ -14,9 +14,9 @@ class Server(object):
     for websocket and long-polling transports.
 
     :param client_manager: The client manager instance that will manage the
-                           client list. By default the client list is stored
-                           in an in-memory structure, which prevents the use
-                           of multiple worker processes.
+                           client list. When this is omitted, the client list
+                           is stored in an in-memory structure, so the use of
+                           multiple connected servers is not possible.
     :param logger: To enable logging set to ``True`` or pass a logger object to
                    use. To disable logging set to ``False``.
     :param binary: ``True`` to support binary payloads, ``False`` to treat all
@@ -62,9 +62,6 @@ class Server(object):
     """
     def __init__(self, client_manager=None, logger=False, binary=False,
                  json=None, **kwargs):
-        if client_manager is None:
-            client_manager = base_manager.BaseManager(self)
-        self.manager = client_manager
         engineio_options = kwargs
         engineio_logger = engineio_options.pop('engineio_logger', None)
         if engineio_logger is not None:
@@ -97,6 +94,11 @@ class Server(object):
                     self.logger.setLevel(logging.ERROR)
                 self.logger.addHandler(logging.StreamHandler())
 
+        if client_manager is None:
+            client_manager = base_manager.BaseManager()
+        client_manager.initialize(self)
+        self.manager = client_manager
+
     def on(self, event, handler=None, namespace=None):
         """Register an event handler.
 
@@ -248,7 +250,7 @@ class Server(object):
         """
         namespace = namespace or '/'
         self.logger.info('room %s is closing [%s]', room, namespace)
-        self.manager.close_room(namespace, room)
+        self.manager.close_room(room, namespace)
 
     def rooms(self, sid, namespace=None):
         """Return the rooms a client is in.
@@ -300,6 +302,9 @@ class Server(object):
         """
         return self.eio.handle_request(environ, start_response)
 
+    def start_background_task(self, target, *args, **kwargs):
+        self.eio.start_background_task(target, *args, **kwargs)
+
     def _emit_internal(self, sid, event, data, namespace=None, id=None):
         """Send a message to a client."""
         if six.PY2 and not self.binary:
diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py
index 1cdfa6b..8ff9369 100644
--- a/tests/test_base_manager.py
+++ b/tests/test_base_manager.py
@@ -12,7 +12,8 @@ from socketio import base_manager
 class TestBaseManager(unittest.TestCase):
     def setUp(self):
         mock_server = mock.MagicMock()
-        self.bm = base_manager.BaseManager(mock_server)
+        self.bm = base_manager.BaseManager()
+        self.bm.initialize(mock_server)
 
     def test_connect(self):
         self.bm.connect('123', '/foo')
@@ -103,12 +104,11 @@ class TestBaseManager(unittest.TestCase):
         self.bm.connect('123', '/')
         cb = mock.MagicMock()
         id = self.bm._generate_ack_id('123', '/', cb)
-        self.assertRaises(ValueError, self.bm.trigger_callback,
-                          '124', '/', id, ['foo'])
-        self.assertRaises(ValueError, self.bm.trigger_callback,
-                          '123', '/foo', id, ['foo'])
-        self.assertRaises(ValueError, self.bm.trigger_callback,
-                          '123', '/', id + 1, ['foo'])
+
+        # these should not raise an exception
+        self.bm.trigger_callback('124', '/', id, ['foo'])
+        self.bm.trigger_callback('123', '/foo', id, ['foo'])
+        self.bm.trigger_callback('123', '/', id + 1, ['foo'])
         self.assertEqual(cb.call_count, 0)
 
     def test_get_namespaces(self):
@@ -142,11 +142,11 @@ class TestBaseManager(unittest.TestCase):
         self.bm.connect('789', '/foo')
         self.bm.enter_room('123', '/foo', 'bar')
         self.bm.enter_room('123', '/foo', 'bar')
-        self.bm.close_room('/foo', 'bar')
+        self.bm.close_room('bar', '/foo')
         self.assertNotIn('bar', self.bm.rooms['/foo'])
 
     def test_close_invalid_room(self):
-        self.bm.close_room('/foo', 'bar')
+        self.bm.close_room('bar', '/foo')
 
     def test_rooms(self):
         self.bm.connect('123', '/foo')
diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py
new file mode 100644
index 0000000..0345eb2
--- /dev/null
+++ b/tests/test_pubsub_manager.py
@@ -0,0 +1,214 @@
+import functools
+import unittest
+
+import six
+if six.PY3:
+    from unittest import mock
+else:
+    import mock
+
+from socketio import base_manager
+from socketio import pubsub_manager
+
+
+class TestBaseManager(unittest.TestCase):
+    def setUp(self):
+        mock_server = mock.MagicMock()
+        self.pm = pubsub_manager.PubSubManager()
+        self.pm._publish = mock.MagicMock()
+        self.pm.initialize(mock_server)
+
+    def test_default_init(self):
+        self.assertEqual(self.pm.channel, 'socketio')
+        self.assertEqual(len(self.pm.host_id), 32)
+        self.pm.server.start_background_task.assert_called_once_with(
+            self.pm._thread)
+
+    def test_custom_init(self):
+        pubsub = pubsub_manager.PubSubManager(channel='foo')
+        self.assertEqual(pubsub.channel, 'foo')
+        self.assertEqual(len(pubsub.host_id), 32)
+
+    def test_emit(self):
+        self.pm.emit('foo', 'bar')
+        self.pm._publish.assert_called_once_with(
+            {'method': 'emit', 'event': 'foo', 'data': 'bar',
+             'namespace': '/', 'room': None, 'skip_sid': None,
+             'callback': None})
+
+    def test_emit_with_namespace(self):
+        self.pm.emit('foo', 'bar', namespace='/baz')
+        self.pm._publish.assert_called_once_with(
+            {'method': 'emit', 'event': 'foo', 'data': 'bar',
+             'namespace': '/baz', 'room': None, 'skip_sid': None,
+             'callback': None})
+
+    def test_emit_with_room(self):
+        self.pm.emit('foo', 'bar', room='baz')
+        self.pm._publish.assert_called_once_with(
+            {'method': 'emit', 'event': 'foo', 'data': 'bar',
+             'namespace': '/', 'room': 'baz', 'skip_sid': None,
+             'callback': None})
+
+    def test_emit_with_skip_sid(self):
+        self.pm.emit('foo', 'bar', skip_sid='baz')
+        self.pm._publish.assert_called_once_with(
+            {'method': 'emit', 'event': 'foo', 'data': 'bar',
+             'namespace': '/', 'room': None, 'skip_sid': 'baz',
+             'callback': None})
+
+    def test_emit_with_callback(self):
+        with mock.patch.object(self.pm, '_generate_ack_id',
+                               return_value='123'):
+            self.pm.emit('foo', 'bar', room='baz', callback='cb')
+            self.pm._publish.assert_called_once_with(
+                {'method': 'emit', 'event': 'foo', 'data': 'bar',
+                 'namespace': '/', 'room': 'baz', 'skip_sid': None,
+                 'callback': ('baz', '/', '123')})
+
+    def test_emit_with_callback_without_server(self):
+        standalone_pm = pubsub_manager.PubSubManager()
+        self.assertRaises(RuntimeError, standalone_pm.emit, 'foo', 'bar',
+                          callback='cb')
+
+    def test_emit_with_callback_missing_room(self):
+        with mock.patch.object(self.pm, '_generate_ack_id',
+                               return_value='123'):
+            self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar',
+                              callback='cb')
+
+    def test_close_room(self):
+        self.pm.close_room('foo')
+        self.pm._publish.assert_called_once_with(
+            {'method': 'close_room', 'room': 'foo', 'namespace': '/'})
+
+    def test_close_room_with_namespace(self):
+        self.pm.close_room('foo', '/bar')
+        self.pm._publish.assert_called_once_with(
+            {'method': 'close_room', 'room': 'foo', 'namespace': '/bar'})
+
+    def test_handle_emit(self):
+        with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
+            self.pm._handle_emit({'event': 'foo', 'data': 'bar'})
+            super_emit.assert_called_once_with('foo', 'bar', namespace=None,
+                                               room=None, skip_sid=None,
+                                               callback=None)
+
+    def test_handle_emit_with_namespace(self):
+        with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
+            self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+                                  'namespace': '/baz'})
+            super_emit.assert_called_once_with('foo', 'bar', namespace='/baz',
+                                               room=None, skip_sid=None,
+                                               callback=None)
+
+    def test_handle_emiti_with_room(self):
+        with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
+            self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+                                  'room': 'baz'})
+            super_emit.assert_called_once_with('foo', 'bar', namespace=None,
+                                               room='baz', skip_sid=None,
+                                               callback=None)
+
+    def test_handle_emit_with_skip_sid(self):
+        with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
+            self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+                                  'skip_sid': '123'})
+            super_emit.assert_called_once_with('foo', 'bar', namespace=None,
+                                               room=None, skip_sid='123',
+                                               callback=None)
+
+    def test_handle_emit_with_callback(self):
+        host_id = self.pm.host_id
+        with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit:
+            self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+                                  'namespace': '/baz',
+                                  'callback': ('sid', '/baz', 123)})
+            self.assertEqual(super_emit.call_count, 1)
+            self.assertEqual(super_emit.call_args[0], ('foo', 'bar'))
+            self.assertEqual(super_emit.call_args[1]['namespace'], '/baz')
+            self.assertIsNone(super_emit.call_args[1]['room'])
+            self.assertIsNone(super_emit.call_args[1]['skip_sid'])
+            self.assertIsInstance(super_emit.call_args[1]['callback'],
+                                  functools.partial)
+            super_emit.call_args[1]['callback']('one', 2, 'three')
+            self.pm._publish.assert_called_once_with(
+                {'method': 'callback', 'host_id': host_id, 'sid': 'sid',
+                 'namespace': '/baz', 'id': 123, 'args': ('one', 2, 'three')})
+
+    def test_handle_callback(self):
+        host_id = self.pm.host_id
+        with mock.patch.object(self.pm, 'trigger_callback') as trigger:
+            self.pm._handle_callback({'method': 'callback',
+                                      'host_id': host_id, 'sid': 'sid',
+                                      'namespace': '/', 'id': 123,
+                                      'args': ('one', 2)})
+            trigger.assert_called_once_with('sid', '/', 123, ('one', 2))
+
+    def test_handle_callback_bad_host_id(self):
+        with mock.patch.object(self.pm, 'trigger_callback') as trigger:
+            self.pm._handle_callback({'method': 'callback',
+                                      'host_id': 'bad', 'sid': 'sid',
+                                      'namespace': '/', 'id': 123,
+                                      'args': ('one', 2)})
+            self.assertEqual(trigger.call_count, 0)
+
+    def test_handle_callback_missing_args(self):
+        host_id = self.pm.host_id
+        with mock.patch.object(self.pm, 'trigger_callback') as trigger:
+            self.pm._handle_callback({'method': 'callback',
+                                      'host_id': host_id, 'sid': 'sid',
+                                      'namespace': '/', 'id': 123})
+            self.pm._handle_callback({'method': 'callback',
+                                      'host_id': host_id, 'sid': 'sid',
+                                      'namespace': '/'})
+            self.pm._handle_callback({'method': 'callback',
+                                      'host_id': host_id, 'sid': 'sid'})
+            self.pm._handle_callback({'method': 'callback',
+                                      'host_id': host_id})
+            self.assertEqual(trigger.call_count, 0)
+
+    def test_handle_close_room(self):
+        with mock.patch.object(base_manager.BaseManager, 'close_room') \
+                as super_close_room:
+            self.pm._handle_close_room({'method': 'close_room',
+                                        'room': 'foo'})
+            super_close_room.assert_called_once_with(room='foo',
+                                                     namespace=None)
+
+    def test_handle_close_room_with_namespace(self):
+        with mock.patch.object(base_manager.BaseManager, 'close_room') \
+                as super_close_room:
+            self.pm._handle_close_room({'method': 'close_room',
+                                        'room': 'foo', 'namespace': '/bar'})
+            super_close_room.assert_called_once_with(room='foo',
+                                                     namespace='/bar')
+
+    def test_background_thread(self):
+        self.pm._handle_emit = mock.MagicMock()
+        self.pm._handle_callback = mock.MagicMock()
+        self.pm._handle_close_room = mock.MagicMock()
+
+        def messages():
+            import pickle
+            yield {'method': 'emit', 'value': 'foo'}
+            yield {'missing': 'method'}
+            yield '{"method": "callback", "value": "bar"}'
+            yield {'method': 'bogus'}
+            yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
+            yield 'bad json'
+            yield b'bad pickled'
+            raise KeyboardInterrupt
+
+        self.pm._listen = mock.MagicMock(side_effect=messages)
+        try:
+            self.pm._thread()
+        except KeyboardInterrupt:
+            pass
+
+        self.pm._handle_emit.assert_called_once_with(
+            {'method': 'emit', 'value': 'foo'})
+        self.pm._handle_callback.assert_called_once_with(
+            {'method': 'callback', 'value': 'bar'})
+        self.pm._handle_close_room.assert_called_once_with(
+            {'method': 'close_room', 'value': 'baz'})
diff --git a/tests/test_server.py b/tests/test_server.py
index 142e207..6f2c7ca 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -92,13 +92,13 @@ class TestServer(unittest.TestCase):
         mgr = mock.MagicMock()
         s = server.Server(client_manager=mgr)
         s.close_room('room', namespace='/foo')
-        s.manager.close_room.assert_called_once_with('/foo', 'room')
+        s.manager.close_room.assert_called_once_with('room', '/foo')
 
     def test_close_room_default_namespace(self, eio):
         mgr = mock.MagicMock()
         s = server.Server(client_manager=mgr)
         s.close_room('room')
-        s.manager.close_room.assert_called_once_with('/', 'room')
+        s.manager.close_room.assert_called_once_with('room', '/')
 
     def test_rooms(self, eio):
         mgr = mock.MagicMock()
@@ -274,7 +274,9 @@ class TestServer(unittest.TestCase):
         s._handle_eio_message('123', '61-1["my message","a",'
                                      '{"_placeholder":true,"num":0}]')
         self.assertEqual(s._attachment_count, 1)
-        self.assertRaises(ValueError, s._handle_eio_message, '123', b'foo')
+        # the following call should not raise an exception in spite of the
+        # callback id being invalid
+        s._handle_eio_message('123', b'foo')
 
     def test_handle_event_with_ack(self, eio):
         s = server.Server()
@@ -397,3 +399,9 @@ class TestServer(unittest.TestCase):
 
         # restore the default JSON module
         packet.Packet.json = json
+
+    def test_start_background_task(self, eio):
+        s = server.Server()
+        s.start_background_task('foo', 'bar', baz='baz')
+        s.eio.start_background_task.assert_called_once_with('foo', 'bar',
+                                                            baz='baz')