Browse Source

Merge remote-tracking branch 'origin/feature/zlib-stream-compression'

pull/60/head
andrei 8 years ago
parent
commit
77f43c3f6a
  1. 30
      disco/gateway/client.py

30
disco/gateway/client.py

@ -11,16 +11,18 @@ from disco.util.logging import LoggingClass
from disco.util.limiter import SimpleLimiter
TEN_MEGABYTES = 10490000
ZLIB_SUFFIX = b'\x00\x00\xff\xff'
class GatewayClient(LoggingClass):
GATEWAY_VERSION = 6
def __init__(self, client, max_reconnects=5, encoder='json', ipc=None):
def __init__(self, client, max_reconnects=5, encoder='json', zlib_stream_enabled=True, ipc=None):
super(GatewayClient, self).__init__()
self.client = client
self.max_reconnects = max_reconnects
self.encoder = ENCODERS[encoder]
self.zlib_stream_enabled = zlib_stream_enabled
self.events = client.events
self.packets = client.packets
@ -47,6 +49,8 @@ class GatewayClient(LoggingClass):
# Websocket connection
self.ws = None
self.ws_event = gevent.event.Event()
self._zlib = None
self._buffer = None
# State
self.seq = 0
@ -122,6 +126,9 @@ class GatewayClient(LoggingClass):
gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE)
if self.zlib_stream_enabled:
gateway_url += '&compress=zlib-stream'
self.log.info('Opening websocket connection to URL `%s`', gateway_url)
self.ws = Websocket(gateway_url)
self.ws.emitter.on('on_open', self.on_open)
@ -132,6 +139,21 @@ class GatewayClient(LoggingClass):
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
def on_message(self, msg):
if self.zlib_stream_enabled:
if not self._buffer:
self._buffer = bytearray()
self._buffer.extend(msg)
if len(msg) < 4:
return
if msg[-4:] != ZLIB_SUFFIX:
return
msg = self._zlib.decompress(self._buffer if six.PY3 else str(self._buffer)).decode('utf-8')
self._buffer = None
else:
# Detect zlib and decompress
is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131))
if msg[0] != '{' and not is_erlpack:
@ -157,6 +179,9 @@ class GatewayClient(LoggingClass):
raise Exception('WS recieved error: %s', error)
def on_open(self):
if self.zlib_stream_enabled:
self._zlib = zlib.decompressobj()
if self.seq and self.session_id:
self.log.info('WS Opened: attempting resume w/ SID: %s SEQ: %s', self.session_id, self.seq)
self.replaying = True
@ -184,6 +209,9 @@ class GatewayClient(LoggingClass):
})
def on_close(self, code, reason):
# Make sure we cleanup any old data
self._buffer = None
# Kill heartbeater, a reconnect/resume will trigger a HELLO which will
# respawn it
if self._heartbeat_task:

Loading…
Cancel
Save