From 88504f7aa6a02cf86ce3a54f255b315b36c2881e Mon Sep 17 00:00:00 2001 From: Andrei Zbikowski Date: Mon, 22 Jan 2018 08:08:35 -0800 Subject: [PATCH] Voice Gateway v3 (#80) * [reqs] add wsaccel to performance reqs * Support voice gateway v3 --- disco/voice/client.py | 223 +++++++++++++++++------------------------ disco/voice/packets.py | 5 + disco/voice/player.py | 13 ++- disco/voice/udp.py | 115 +++++++++++++++++++++ examples/music.py | 4 + setup.py | 2 +- 6 files changed, 224 insertions(+), 138 deletions(-) create mode 100644 disco/voice/udp.py diff --git a/disco/voice/client.py b/disco/voice/client.py index 993555b..4f24ad2 100644 --- a/disco/voice/client.py +++ b/disco/voice/client.py @@ -1,32 +1,28 @@ from __future__ import print_function import gevent -import socket -import struct import time -try: - import nacl.secret -except ImportError: - print('WARNING: nacl is not installed, voice support is disabled') - from holster.enum import Enum from holster.emitter import Emitter from disco.gateway.encoding.json import JSONEncoder from disco.util.websocket import Websocket from disco.util.logging import LoggingClass -from disco.voice.packets import VoiceOPCode from disco.gateway.packets import OPCode +from disco.voice.packets import VoiceOPCode +from disco.voice.udp import UDPVoiceClient VoiceState = Enum( DISCONNECTED=0, - AWAITING_ENDPOINT=1, - AUTHENTICATING=2, - CONNECTING=3, - CONNECTED=4, - VOICE_CONNECTING=5, - VOICE_CONNECTED=6, + RECONNECTING=1, + AWAITING_ENDPOINT=2, + AUTHENTICATING=3, + AUTHENTICATED=4, + CONNECTING=5, + CONNECTED=6, + VOICE_CONNECTING=7, + VOICE_CONNECTED=8, ) @@ -36,98 +32,15 @@ class VoiceException(Exception): super(VoiceException, self).__init__(msg) -class UDPVoiceClient(LoggingClass): - def __init__(self, vc): - super(UDPVoiceClient, self).__init__() - self.vc = vc - - # The underlying UDP socket - self.conn = None - - # Connection information - self.ip = None - self.port = None - - self.run_task = None - self.connected = False - - # Buffer used for encoding/sending frames - self._buffer = bytearray(24) - self._buffer[0] = 0x80 - self._buffer[1] = 0x78 - - def send_frame(self, frame, sequence=None, timestamp=None): - # Convert the frame to a bytearray - frame = bytearray(frame) - - # Pack the rtc header into our buffer - struct.pack_into('>H', self._buffer, 2, sequence or self.vc.sequence) - struct.pack_into('>I', self._buffer, 4, timestamp or self.vc.timestamp) - struct.pack_into('>i', self._buffer, 8, self.vc.ssrc) - - if self.vc.mode == 'xsalsa20_poly1305_suffix': - nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) - raw = self.vc.secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce - else: - # Now encrypt the payload with the nonce as a header - raw = self.vc.secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext - - # Send the header (sans nonce padding) plus the payload - self.send(self._buffer[:12] + raw) - - # Increment our sequence counter - self.vc.sequence += 1 - if self.vc.sequence >= 65535: - self.vc.sequence = 0 - - def run(self): - while True: - self.conn.recvfrom(4096) - - def send(self, data): - self.conn.sendto(data, (self.ip, self.port)) - - def disconnect(self): - self.run_task.kill() - - def connect(self, host, port, timeout=10, addrinfo=None): - self.ip = socket.gethostbyname(host) - self.port = port - - self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - if addrinfo: - ip, port = addrinfo - else: - # Send discovery packet - packet = bytearray(70) - struct.pack_into('>I', packet, 0, self.vc.ssrc) - self.send(packet) - - # Wait for a response - try: - data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout) - except gevent.Timeout: - return (None, None) - - # Read IP and port - ip = str(data[4:]).split('\x00', 1)[0] - port = struct.unpack(''.format(self.channel) @@ -174,9 +87,17 @@ class VoiceClient(LoggingClass): self.state = state self.state_emitter.emit(state, prev_state) - def heartbeat(self, interval): + def _connect_and_run(self): + self.ws = Websocket('wss://' + self.endpoint + '/v={}'.format(self.VOICE_GATEWAY_VERSION)) + self.ws.emitter.on('on_open', self.on_open) + self.ws.emitter.on('on_error', self.on_error) + self.ws.emitter.on('on_close', self.on_close) + self.ws.emitter.on('on_message', self.on_message) + self.ws.run_forever() + + def _heartbeat(self, interval): while True: - self.send(VoiceOPCode.HEARTBEAT, time.time() * 1000) + self.send(VoiceOPCode.HEARTBEAT, time.time()) gevent.sleep(interval / 1000) def set_speaking(self, value): @@ -192,6 +113,11 @@ class VoiceClient(LoggingClass): 'd': data, }), self.encoder.OPCODE) + def on_voice_hello(self, data): + self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self) + self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'] * 0.75) + self.set_state(VoiceState.AUTHENTICATED) + def on_voice_ready(self, data): self.log.info('[%s] Recived Voice READY payload, attempting to negotiate voice connection w/ remote', self) self.set_state(VoiceState.CONNECTING) @@ -206,8 +132,6 @@ class VoiceClient(LoggingClass): else: raise Exception('Failed to find a supported voice mode') - self.heartbeat_task = gevent.spawn(self.heartbeat, data['heartbeat_interval']) - self.log.debug('[%s] Attempting IP discovery over UDP to %s:%s', self, self.endpoint, self.port) self.udp = UDPVoiceClient(self) ip, port = self.udp.connect(self.endpoint, self.port) @@ -227,11 +151,15 @@ class VoiceClient(LoggingClass): }, }) + def on_voice_resumed(self, data): + self.log.info('[%s] Recieved resumed', self) + self.set_state(VoiceState.CONNECTED) + def on_voice_sdp(self, sdp): self.log.info('[%s] Recieved session description, connection completed', self) # Create a secret box for encryption/decryption - self.secret_box = nacl.secret.SecretBox(bytes(bytearray(sdp['secret_key']))) + self.udp.setup_encryption(bytes(bytearray(sdp['secret_key']))) # Toggle speaking state so clients learn of our SSRC self.set_speaking(True) @@ -253,12 +181,8 @@ class VoiceClient(LoggingClass): self.set_state(VoiceState.AUTHENTICATING) self.endpoint = data.endpoint.split(':', 1)[0] - self.ws = Websocket('wss://' + self.endpoint) - self.ws.emitter.on('on_open', self.on_open) - self.ws.emitter.on('on_error', self.on_error) - self.ws.emitter.on('on_close', self.on_close) - self.ws.emitter.on('on_message', self.on_message) - self.ws.run_forever() + + self._connect_and_run() def on_message(self, msg): try: @@ -271,25 +195,60 @@ class VoiceClient(LoggingClass): self.log.error('[%s] Voice websocket error: %s', self, err) def on_open(self): + if self._session_id: + return self.send(VoiceOPCode.RESUME, { + 'server_id': self.channel.guild_id, + 'user_id': self.client.state.me.id, + 'session_id': self._session_id, + 'token': self.token, + }) + + self._session_id = self.client.gw.session_id + self.send(VoiceOPCode.IDENTIFY, { 'server_id': self.channel.guild_id, 'user_id': self.client.state.me.id, - 'session_id': self.client.gw.session_id, + 'session_id': self._session_id, 'token': self.token, }) - def on_close(self, code, error): - self.log.warning('[%s] Voice websocket disconnected (%s, %s)', self, code, error) + def on_close(self, code, reason): + self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects) + + if self._heartbeat_task: + self._heartbeat_task.kill() + + # If we're not in a connected state, don't try to resume/reconnect + if self.state != VoiceState.CONNECTED: + return + + self.log.info('[%s] Attempting Websocket Resumption', self) + self._reconnects += 1 - if self.state == VoiceState.CONNECTED: - self.log.info('Attempting voice reconnection') - self.connect() + if self.max_reconnects and self._reconnects > self.max_reconnects: + raise VoiceException('Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects)) + + self.set_state(VoiceState.RECONNECTING) + + # Don't resume for these error codes: + if code and 4000 <= code <= 4016: + self._session_id = None + + if self.udp and self.udp.connected: + self.udp.disconnect() + + wait_time = (self._reconnects - 1) * 5 + self.log.info( + '[%s] Will attempt to %s after %s seconds', self, 'resume' if self._session_id else 'reconnect', wait_time) + gevent.sleep(wait_time) + + self._connect_and_run() def connect(self, timeout=5, mute=False, deaf=False): self.log.debug('[%s] Attempting connection', self) self.set_state(VoiceState.AWAITING_ENDPOINT) - self.update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update) + self._update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update) self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { 'self_mute': mute, @@ -299,17 +258,18 @@ class VoiceClient(LoggingClass): }) if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout): + self.disconnect() raise VoiceException('Failed to connect to voice', self) def disconnect(self): self.log.debug('[%s] disconnect called', self) self.set_state(VoiceState.DISCONNECTED) - if self.heartbeat_task: - self.heartbeat_task.kill() - self.heartbeat_task = None + if self._heartbeat_task: + self._heartbeat_task.kill() + self._heartbeat_task = None - if self.ws and self.ws.sock.connected: + if self.ws and self.ws.sock and self.ws.sock.connected: self.ws.close() if self.udp and self.udp.connected: @@ -324,3 +284,6 @@ class VoiceClient(LoggingClass): def send_frame(self, *args, **kwargs): self.udp.send_frame(*args, **kwargs) + + def increment_timestamp(self, *args, **kwargs): + self.udp.increment_timestamp(*args, **kwargs) diff --git a/disco/voice/packets.py b/disco/voice/packets.py index 95d6118..0ffb992 100644 --- a/disco/voice/packets.py +++ b/disco/voice/packets.py @@ -7,4 +7,9 @@ VoiceOPCode = Enum( HEARTBEAT=3, SESSION_DESCRIPTION=4, SPEAKING=5, + HEARTBEAT_ACK=6, + RESUME=7, + HELLO=8, + RESUMED=9, + CLIENT_DISCONNECT=13, ) diff --git a/disco/voice/player.py b/disco/voice/player.py index 12b4a72..489793d 100644 --- a/disco/voice/player.py +++ b/disco/voice/player.py @@ -6,11 +6,10 @@ from holster.emitter import Emitter from disco.voice.client import VoiceState from disco.voice.queue import PlayableQueue +from disco.util.logging import LoggingClass -MAX_TIMESTAMP = 4294967295 - -class Player(object): +class Player(LoggingClass): Events = Enum( 'START_PLAY', 'STOP_PLAY', @@ -20,6 +19,7 @@ class Player(object): ) def __init__(self, client, queue=None): + super(Player, self).__init__() self.client = client # Queue contains playable items @@ -92,12 +92,11 @@ class Player(object): return if self.client.state != VoiceState.CONNECTED: - self.client.state_emitter.wait(VoiceState.CONNECTED) + self.client.state_emitter.once(VoiceState.CONNECTED, timeout=30) + # Send the voice frame and increment our timestamp self.client.send_frame(frame) - self.client.timestamp += item.samples_per_frame - if self.client.timestamp > MAX_TIMESTAMP: - self.client.timestamp = 0 + self.client.increment_timestamp(item.samples_per_frame) frame = item.next_frame() if frame is None: diff --git a/disco/voice/udp.py b/disco/voice/udp.py new file mode 100644 index 0000000..b605d17 --- /dev/null +++ b/disco/voice/udp.py @@ -0,0 +1,115 @@ +import struct +import socket +import gevent + +try: + import nacl.secret +except ImportError: + print('WARNING: nacl is not installed, voice support is disabled') + +from disco.util.logging import LoggingClass + +MAX_TIMESTAMP = 4294967295 +MAX_SEQUENCE = 65535 + + +class UDPVoiceClient(LoggingClass): + def __init__(self, vc): + super(UDPVoiceClient, self).__init__() + self.vc = vc + + # The underlying UDP socket + self.conn = None + + # Connection information + self.ip = None + self.port = None + self.connected = False + + # Voice information + self.sequence = 0 + self.timestamp = 0 + + self._run_task = None + self._secret_box = None + + # Buffer used for encoding/sending frames + self._buffer = bytearray(24) + self._buffer[0] = 0x80 + self._buffer[1] = 0x78 + + def increment_timestamp(self, by): + self.timestamp += by + if self.timestamp > MAX_TIMESTAMP: + self.timestamp = 0 + + def setup_encryption(self, encryption_key): + self._secret_box = nacl.secret.SecretBox(encryption_key) + + def send_frame(self, frame, sequence=None, timestamp=None, incr_timestamp=None): + # Convert the frame to a bytearray + frame = bytearray(frame) + + # Pack the rtc header into our buffer + struct.pack_into('>H', self._buffer, 2, sequence or self.sequence) + struct.pack_into('>I', self._buffer, 4, timestamp or self.timestamp) + struct.pack_into('>i', self._buffer, 8, self.vc.ssrc) + + if self.vc.mode == 'xsalsa20_poly1305_suffix': + nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) + raw = self._secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce + else: + # Now encrypt the payload with the nonce as a header + raw = self._secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext + + # Send the header (sans nonce padding) plus the payload + self.send(self._buffer[:12] + raw) + + # Increment our sequence counter + self.sequence += 1 + if self.sequence >= MAX_SEQUENCE: + self.sequence = 0 + + # Increment our timestamp (if applicable) + if incr_timestamp: + self.timestamp += incr_timestamp + + def run(self): + while True: + self.conn.recvfrom(4096) + + def send(self, data): + self.conn.sendto(data, (self.ip, self.port)) + + def disconnect(self): + self._run_task.kill() + + def connect(self, host, port, timeout=10, addrinfo=None): + self.ip = socket.gethostbyname(host) + self.port = port + + self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + if addrinfo: + ip, port = addrinfo + else: + # Send discovery packet + packet = bytearray(70) + struct.pack_into('>I', packet, 0, self.vc.ssrc) + self.send(packet) + + # Wait for a response + try: + data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout) + except gevent.Timeout: + return (None, None) + + # Read IP and port + ip = str(data[4:]).split('\x00', 1)[0] + port = struct.unpack('=2018.1.14'], + 'music': ['youtube_dl>=2018.1.21'], 'performance': [ 'erlpack==0.3.2' if sys.version_info.major == 2 else 'earl-etf==2.1.2', 'ujson==1.35',