|
@ -19,12 +19,12 @@ ZLIB_SUFFIX = 0xffff |
|
|
class GatewayClient(LoggingClass): |
|
|
class GatewayClient(LoggingClass): |
|
|
GATEWAY_VERSION = 6 |
|
|
GATEWAY_VERSION = 6 |
|
|
|
|
|
|
|
|
def __init__(self, client, max_reconnects=5, encoder='json', zlib_enabled=True, ipc=None): |
|
|
def __init__(self, client, max_reconnects=5, encoder='json', zlib_stream_enabled=True, ipc=None): |
|
|
super(GatewayClient, self).__init__() |
|
|
super(GatewayClient, self).__init__() |
|
|
self.client = client |
|
|
self.client = client |
|
|
self.max_reconnects = max_reconnects |
|
|
self.max_reconnects = max_reconnects |
|
|
self.encoder = ENCODERS[encoder] |
|
|
self.encoder = ENCODERS[encoder] |
|
|
self.zlib_enabled = zlib_enabled |
|
|
self.zlib_stream_enabled = zlib_stream_enabled |
|
|
|
|
|
|
|
|
self.events = client.events |
|
|
self.events = client.events |
|
|
self.packets = client.packets |
|
|
self.packets = client.packets |
|
@ -52,7 +52,7 @@ class GatewayClient(LoggingClass): |
|
|
self.ws = None |
|
|
self.ws = None |
|
|
self.ws_event = gevent.event.Event() |
|
|
self.ws_event = gevent.event.Event() |
|
|
self._zlib = zlib.decompressobj() |
|
|
self._zlib = zlib.decompressobj() |
|
|
self._buffer = array.array('c') |
|
|
self._buffer = None |
|
|
|
|
|
|
|
|
# State |
|
|
# State |
|
|
self.seq = 0 |
|
|
self.seq = 0 |
|
@ -128,7 +128,7 @@ class GatewayClient(LoggingClass): |
|
|
|
|
|
|
|
|
gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE) |
|
|
gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE) |
|
|
|
|
|
|
|
|
if self.zlib_enabled: |
|
|
if self.zlib_stream_enabled: |
|
|
gateway_url += '&compress=zlib-stream' |
|
|
gateway_url += '&compress=zlib-stream' |
|
|
|
|
|
|
|
|
self.log.info('Opening websocket connection to URL `%s`', gateway_url) |
|
|
self.log.info('Opening websocket connection to URL `%s`', gateway_url) |
|
@ -141,16 +141,21 @@ class GatewayClient(LoggingClass): |
|
|
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) |
|
|
self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) |
|
|
|
|
|
|
|
|
def on_message(self, msg): |
|
|
def on_message(self, msg): |
|
|
if self.zlib_enabled: |
|
|
if self.zlib_stream_enabled: |
|
|
|
|
|
if not self._buffer: |
|
|
|
|
|
self._buffer = array.array('c' if six.PY2 else 'b') |
|
|
|
|
|
|
|
|
self._buffer.extend(msg) |
|
|
self._buffer.extend(msg) |
|
|
|
|
|
|
|
|
if len(msg) >= 4: |
|
|
if len(msg) < 4: |
|
|
suffix = struct.unpack('>I', msg[-4:])[0] |
|
|
return |
|
|
if suffix == ZLIB_SUFFIX: |
|
|
|
|
|
msg = self._zlib.decompress(self._buffer.tostring()).decode('utf-8') |
|
|
suffix = struct.unpack('>I', msg[-4:])[0] |
|
|
self._buffer = array.array('c') |
|
|
if suffix != ZLIB_SUFFIX: |
|
|
else: |
|
|
|
|
|
return |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
msg = self._zlib.decompress(self._buffer.tostring()).decode('utf-8') |
|
|
|
|
|
self._buffer = None |
|
|
else: |
|
|
else: |
|
|
# Detect zlib and decompress |
|
|
# Detect zlib and decompress |
|
|
is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131)) |
|
|
is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131)) |
|
|