From 460b0eadb8956442ff9140fcfaea1a22c8b8c939 Mon Sep 17 00:00:00 2001 From: Andrei Date: Thu, 12 Oct 2017 19:02:37 -0700 Subject: [PATCH 1/4] Zlib streaming compression pt1 --- disco/gateway/client.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/disco/gateway/client.py b/disco/gateway/client.py index f0e8869..03a9681 100644 --- a/disco/gateway/client.py +++ b/disco/gateway/client.py @@ -1,4 +1,6 @@ +import struct import gevent +import array import zlib import six import ssl @@ -11,16 +13,18 @@ from disco.util.logging import LoggingClass from disco.util.limiter import SimpleLimiter TEN_MEGABYTES = 10490000 +ZLIB_SUFFIX = 0xffff class GatewayClient(LoggingClass): GATEWAY_VERSION = 6 - def __init__(self, client, max_reconnects=5, encoder='json', ipc=None): + def __init__(self, client, max_reconnects=5, encoder='json', zlib_enabled=True, ipc=None): super(GatewayClient, self).__init__() self.client = client self.max_reconnects = max_reconnects self.encoder = ENCODERS[encoder] + self.zlib_enabled = zlib_enabled self.events = client.events self.packets = client.packets @@ -47,6 +51,8 @@ class GatewayClient(LoggingClass): # Websocket connection self.ws = None self.ws_event = gevent.event.Event() + self._zlib = zlib.decompressobj() + self._buffer = array.array('c') # State self.seq = 0 @@ -122,6 +128,9 @@ class GatewayClient(LoggingClass): gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE) + if self.zlib_enabled: + gateway_url += '&compress=zlib-stream' + self.log.info('Opening websocket connection to URL `%s`', gateway_url) self.ws = Websocket(gateway_url) self.ws.emitter.on('on_open', self.on_open) @@ -132,10 +141,21 @@ class GatewayClient(LoggingClass): self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) def on_message(self, msg): - # Detect zlib and decompress - is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131)) - if msg[0] != '{' and not is_erlpack: - msg = zlib.decompress(msg, 15, TEN_MEGABYTES).decode("utf-8") + if self.zlib_enabled: + self._buffer.extend(msg) + + if len(msg) >= 4: + suffix = struct.unpack('>I', msg[-4:])[0] + if suffix == ZLIB_SUFFIX: + msg = self._zlib.decompress(self._buffer.tostring()).decode('utf-8') + self._buffer = array.array('c') + else: + return + else: + # Detect zlib and decompress + is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131)) + if msg[0] != '{' and not is_erlpack: + msg = zlib.decompress(msg, 15, TEN_MEGABYTES).decode("utf-8") try: data = self.encoder.decode(msg) From be20c2d82213aaf78dd962b4e50a4370d7b54e95 Mon Sep 17 00:00:00 2001 From: Andrei Date: Thu, 12 Oct 2017 19:20:40 -0700 Subject: [PATCH 2/4] Cleanup --- disco/gateway/client.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/disco/gateway/client.py b/disco/gateway/client.py index 03a9681..1d996c0 100644 --- a/disco/gateway/client.py +++ b/disco/gateway/client.py @@ -19,12 +19,12 @@ ZLIB_SUFFIX = 0xffff class GatewayClient(LoggingClass): GATEWAY_VERSION = 6 - def __init__(self, client, max_reconnects=5, encoder='json', zlib_enabled=True, ipc=None): + def __init__(self, client, max_reconnects=5, encoder='json', zlib_stream_enabled=True, ipc=None): super(GatewayClient, self).__init__() self.client = client self.max_reconnects = max_reconnects self.encoder = ENCODERS[encoder] - self.zlib_enabled = zlib_enabled + self.zlib_stream_enabled = zlib_stream_enabled self.events = client.events self.packets = client.packets @@ -52,7 +52,7 @@ class GatewayClient(LoggingClass): self.ws = None self.ws_event = gevent.event.Event() self._zlib = zlib.decompressobj() - self._buffer = array.array('c') + self._buffer = None # State self.seq = 0 @@ -128,7 +128,7 @@ class GatewayClient(LoggingClass): gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE) - if self.zlib_enabled: + if self.zlib_stream_enabled: gateway_url += '&compress=zlib-stream' self.log.info('Opening websocket connection to URL `%s`', gateway_url) @@ -141,16 +141,21 @@ class GatewayClient(LoggingClass): self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) def on_message(self, msg): - if self.zlib_enabled: + if self.zlib_stream_enabled: + if not self._buffer: + self._buffer = array.array('c' if six.PY2 else 'b') + self._buffer.extend(msg) - if len(msg) >= 4: - suffix = struct.unpack('>I', msg[-4:])[0] - if suffix == ZLIB_SUFFIX: - msg = self._zlib.decompress(self._buffer.tostring()).decode('utf-8') - self._buffer = array.array('c') - else: + if len(msg) < 4: + return + + suffix = struct.unpack('>I', msg[-4:])[0] + if suffix != ZLIB_SUFFIX: return + + msg = self._zlib.decompress(self._buffer.tostring()).decode('utf-8') + self._buffer = None else: # Detect zlib and decompress is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131)) From 8bcbdcf39af2388c629a7ae11085bffc1936e2b9 Mon Sep 17 00:00:00 2001 From: Andrei Date: Thu, 12 Oct 2017 20:24:26 -0700 Subject: [PATCH 3/4] Create a new zlib compressobj every connection, cleanup buffers --- disco/gateway/client.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/disco/gateway/client.py b/disco/gateway/client.py index 1d996c0..ace992f 100644 --- a/disco/gateway/client.py +++ b/disco/gateway/client.py @@ -51,7 +51,7 @@ class GatewayClient(LoggingClass): # Websocket connection self.ws = None self.ws_event = gevent.event.Event() - self._zlib = zlib.decompressobj() + self._zlib = None self._buffer = None # State @@ -182,6 +182,9 @@ class GatewayClient(LoggingClass): raise Exception('WS recieved error: %s', error) def on_open(self): + if self.zlib_stream_enabled: + self._zlib = zlib.decompressobj() + if self.seq and self.session_id: self.log.info('WS Opened: attempting resume w/ SID: %s SEQ: %s', self.session_id, self.seq) self.replaying = True @@ -209,6 +212,9 @@ class GatewayClient(LoggingClass): }) def on_close(self, code, reason): + # Make sure we cleanup any old data + self._buffer = None + # Kill heartbeater, a reconnect/resume will trigger a HELLO which will # respawn it if self._heartbeat_task: From c836b6e8656a12fd0768254095a66883e560cc58 Mon Sep 17 00:00:00 2001 From: Andrei Date: Fri, 13 Oct 2017 11:57:09 -0700 Subject: [PATCH 4/4] Cleanup / performance stuff --- disco/gateway/client.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/disco/gateway/client.py b/disco/gateway/client.py index ace992f..212195f 100644 --- a/disco/gateway/client.py +++ b/disco/gateway/client.py @@ -1,6 +1,4 @@ -import struct import gevent -import array import zlib import six import ssl @@ -13,7 +11,7 @@ from disco.util.logging import LoggingClass from disco.util.limiter import SimpleLimiter TEN_MEGABYTES = 10490000 -ZLIB_SUFFIX = 0xffff +ZLIB_SUFFIX = b'\x00\x00\xff\xff' class GatewayClient(LoggingClass): @@ -143,18 +141,17 @@ class GatewayClient(LoggingClass): def on_message(self, msg): if self.zlib_stream_enabled: if not self._buffer: - self._buffer = array.array('c' if six.PY2 else 'b') + self._buffer = bytearray() self._buffer.extend(msg) if len(msg) < 4: return - suffix = struct.unpack('>I', msg[-4:])[0] - if suffix != ZLIB_SUFFIX: + if msg[-4:] != ZLIB_SUFFIX: return - msg = self._zlib.decompress(self._buffer.tostring()).decode('utf-8') + msg = self._zlib.decompress(self._buffer if six.PY3 else str(self._buffer)).decode('utf-8') self._buffer = None else: # Detect zlib and decompress