diff --git a/disco/gateway/client.py b/disco/gateway/client.py index 46d5fa6..5ff6956 100644 --- a/disco/gateway/client.py +++ b/disco/gateway/client.py @@ -147,6 +147,8 @@ class GatewayClient(LoggingClass): raise Exception('WS recieved error: %s', error) def on_open(self): + self.log.info('Opened, headers: %s', self.ws.sock.headers) + if self.seq and self.session_id: self.log.info('WS Opened: attempting resume w/ SID: %s SEQ: %s', self.session_id, self.seq) self.send(OPCode.RESUME, { @@ -175,7 +177,8 @@ class GatewayClient(LoggingClass): def on_close(self, code, reason): # Kill heartbeater, a reconnect/resume will trigger a HELLO which will # respawn it - self._heartbeat_task.kill() + if self._heartbeat_task: + self._heartbeat_task.kill() # If we're quitting, just break out of here if self.shutting_down: diff --git a/disco/gateway/sharder.py b/disco/gateway/sharder.py index 45e756a..0321cee 100644 --- a/disco/gateway/sharder.py +++ b/disco/gateway/sharder.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import gipc import gevent +import pickle import logging import marshal @@ -82,7 +83,24 @@ class AutoSharder(object): format='{} [%(levelname)s] %(asctime)s - %(name)s:%(lineno)d - %(message)s'.format(id) ) + @staticmethod + def dumps(data): + if isinstance(data, (basestring, int, long, bool, list, set, dict)): + return '\x01' + marshal.dumps(data) + elif isinstance(data, object) and data.__class__.__name__ == 'code': + return '\x01' + marshal.dumps(data) + else: + return '\x02' + pickle.dumps(data) + + @staticmethod + def loads(data): + enc_type = data[0] + if enc_type == '\x01': + return marshal.loads(data[1:]) + elif enc_type == '\x02': + return pickle.loads(data[1:]) + def start_shard(self, sid): - cpipe, ppipe = gipc.pipe(duplex=True, encoder=marshal.dumps, decoder=marshal.loads) + cpipe, ppipe = gipc.pipe(duplex=True, encoder=self.dumps, decoder=self.loads) gipc.start_process(run_shard, (self.config, sid, cpipe)) self.shards[sid] = GIPCProxy(self, ppipe)