Browse Source

Zlib streaming compression pt1

feature/zlib-stream-compression
Andrei 8 years ago
parent
commit
460b0eadb8
  1. 30
      disco/gateway/client.py

30
disco/gateway/client.py

@ -1,4 +1,6 @@
import struct
import gevent import gevent
import array
import zlib import zlib
import six import six
import ssl import ssl
@ -11,16 +13,18 @@ from disco.util.logging import LoggingClass
from disco.util.limiter import SimpleLimiter from disco.util.limiter import SimpleLimiter
TEN_MEGABYTES = 10490000 TEN_MEGABYTES = 10490000
ZLIB_SUFFIX = 0xffff
class GatewayClient(LoggingClass): class GatewayClient(LoggingClass):
GATEWAY_VERSION = 6 GATEWAY_VERSION = 6
def __init__(self, client, max_reconnects=5, encoder='json', ipc=None): def __init__(self, client, max_reconnects=5, encoder='json', zlib_enabled=True, ipc=None):
super(GatewayClient, self).__init__() super(GatewayClient, self).__init__()
self.client = client self.client = client
self.max_reconnects = max_reconnects self.max_reconnects = max_reconnects
self.encoder = ENCODERS[encoder] self.encoder = ENCODERS[encoder]
self.zlib_enabled = zlib_enabled
self.events = client.events self.events = client.events
self.packets = client.packets self.packets = client.packets
@ -47,6 +51,8 @@ class GatewayClient(LoggingClass):
# Websocket connection # Websocket connection
self.ws = None self.ws = None
self.ws_event = gevent.event.Event() self.ws_event = gevent.event.Event()
self._zlib = zlib.decompressobj()
self._buffer = array.array('c')
# State # State
self.seq = 0 self.seq = 0
@ -122,6 +128,9 @@ class GatewayClient(LoggingClass):
gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE) gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE)
if self.zlib_enabled:
gateway_url += '&compress=zlib-stream'
self.log.info('Opening websocket connection to URL `%s`', gateway_url) self.log.info('Opening websocket connection to URL `%s`', gateway_url)
self.ws = Websocket(gateway_url) self.ws = Websocket(gateway_url)
self.ws.emitter.on('on_open', self.on_open) self.ws.emitter.on('on_open', self.on_open)
@ -132,10 +141,21 @@ class GatewayClient(LoggingClass):
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
def on_message(self, msg): def on_message(self, msg):
# Detect zlib and decompress if self.zlib_enabled:
is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131)) self._buffer.extend(msg)
if msg[0] != '{' and not is_erlpack:
msg = zlib.decompress(msg, 15, TEN_MEGABYTES).decode("utf-8") if len(msg) >= 4:
suffix = struct.unpack('>I', msg[-4:])[0]
if suffix == ZLIB_SUFFIX:
msg = self._zlib.decompress(self._buffer.tostring()).decode('utf-8')
self._buffer = array.array('c')
else:
return
else:
# Detect zlib and decompress
is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131))
if msg[0] != '{' and not is_erlpack:
msg = zlib.decompress(msg, 15, TEN_MEGABYTES).decode("utf-8")
try: try:
data = self.encoder.decode(msg) data = self.encoder.decode(msg)

Loading…
Cancel
Save