From 9cc7ca7000c7935026848c4251c899f8abd4ea81 Mon Sep 17 00:00:00 2001 From: Dan <31395415+cakedan@users.noreply.github.com> Date: Wed, 1 Aug 2018 14:46:57 -0400 Subject: [PATCH] voice receive support (#101) * initial voice * some updates fixed it * some style updates replaced the speaking when joining with an actual client_connect event * please pass * replaced channel with client in VoiceData and VoiceSpeaking events Also changed some voice sending stuff * added more debug logs * fixed version not being set on the voice gateway --- disco/voice/client.py | 104 ++++++++++++++++++++--- disco/voice/packets.py | 2 + disco/voice/udp.py | 182 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 264 insertions(+), 24 deletions(-) diff --git a/disco/voice/client.py b/disco/voice/client.py index cfa999a..cb78785 100644 --- a/disco/voice/client.py +++ b/disco/voice/client.py @@ -3,6 +3,8 @@ from __future__ import print_function import gevent import time +from collections import namedtuple + from holster.enum import Enum from holster.emitter import Emitter @@ -11,7 +13,13 @@ from disco.util.websocket import Websocket from disco.util.logging import LoggingClass from disco.gateway.packets import OPCode from disco.voice.packets import VoiceOPCode -from disco.voice.udp import UDPVoiceClient +from disco.voice.udp import AudioCodecs, PayloadTypes, UDPVoiceClient + +SpeakingCodes = Enum( + NONE=0, + VOICE=1 << 0, + SOUNDSHARE=1 << 1, +) VoiceState = Enum( DISCONNECTED=0, @@ -25,6 +33,13 @@ VoiceState = Enum( VOICE_CONNECTED=8, ) +VoiceSpeaking = namedtuple('VoiceSpeaking', [ + 'client', + 'user_id', + 'speaking', + 'soundshare', +]) + class VoiceException(Exception): def __init__(self, msg, client): @@ -33,7 +48,7 @@ class VoiceException(Exception): class VoiceClient(LoggingClass): - VOICE_GATEWAY_VERSION = 3 + VOICE_GATEWAY_VERSION = 4 SUPPORTED_MODES = { 'xsalsa20_poly1305_lite', @@ -58,6 +73,10 @@ class VoiceClient(LoggingClass): self.packets.on(VoiceOPCode.READY, self.on_voice_ready) self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed) self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp) + self.packets.on(VoiceOPCode.SPEAKING, self.on_voice_speaking) + self.packets.on(VoiceOPCode.CLIENT_CONNECT, self.on_voice_client_connect) + self.packets.on(VoiceOPCode.CLIENT_DISCONNECT, self.on_voice_client_disconnect) + self.packets.on(VoiceOPCode.CODECS, self.on_voice_codecs) # State + state change emitter self.state = VoiceState.DISCONNECTED @@ -71,6 +90,9 @@ class VoiceClient(LoggingClass): self.port = None self.mode = None self.udp = None + self.audio_codec = None + self.video_codec = None + self.transport_id = None # Websocket connection self.ws = None @@ -80,6 +102,9 @@ class VoiceClient(LoggingClass): self._update_listener = None self._heartbeat_task = None + # SSRCs + self.audio_ssrcs = {} + def __repr__(self): return u''.format(self.channel) @@ -90,7 +115,7 @@ class VoiceClient(LoggingClass): self.state_emitter.emit(state, prev_state) def _connect_and_run(self): - self.ws = Websocket('wss://' + self.endpoint + '/v={}'.format(self.VOICE_GATEWAY_VERSION)) + self.ws = Websocket('wss://' + self.endpoint + '/?v={}'.format(self.VOICE_GATEWAY_VERSION)) self.ws.emitter.on('on_open', self.on_open) self.ws.emitter.on('on_error', self.on_error) self.ws.emitter.on('on_close', self.on_close) @@ -102,10 +127,17 @@ class VoiceClient(LoggingClass): self.send(VoiceOPCode.HEARTBEAT, time.time()) gevent.sleep(interval / 1000) - def set_speaking(self, value): + def set_speaking(self, voice=False, soundshare=False, delay=0): + value = SpeakingCodes.NONE.value + if voice: + value |= SpeakingCodes.VOICE.value + if soundshare: + value |= SpeakingCodes.SOUNDSHARE.value + self.send(VoiceOPCode.SPEAKING, { 'speaking': value, - 'delay': 0, + 'delay': delay, + 'ssrc': self.ssrc, }) def send(self, op, data): @@ -115,9 +147,27 @@ class VoiceClient(LoggingClass): 'd': data, }), self.encoder.OPCODE) + def on_voice_client_connect(self, data): + self.audio_ssrcs[data['audio_ssrc']] = data['user_id'] + # ignore data['voice_ssrc'] for now + + def on_voice_client_disconnect(self, data): + for ssrc in self.audio_ssrcs.keys(): + if self.audio_ssrcs[ssrc] == data['user_id']: + del self.audio_ssrcs[ssrc] + break + + def on_voice_codecs(self, data): + self.audio_codec = data['audio_codec'] + self.video_codec = data['video_codec'] + self.transport_id = data['media_session_id'] + + # Set the UDP's RTP Audio Header's Payload Type + self.udp.set_audio_codec(data['audio_codec']) + def on_voice_hello(self, data): self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self) - self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'] * 0.75) + self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval']) self.set_state(VoiceState.AUTHENTICATED) def on_voice_ready(self, data): @@ -144,6 +194,17 @@ class VoiceClient(LoggingClass): self.disconnect() return + codecs = [] + + # Sending discord our available codecs and rtp payload type for it + for idx, codec in enumerate(AudioCodecs): + codecs.append({ + 'name': codec, + 'type': 'audio', + 'priority': (idx + 1) * 1000, + 'payload_type': PayloadTypes.get(codec).value, + }) + self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port) self.send(VoiceOPCode.SELECT_PROTOCOL, { 'protocol': 'udp', @@ -152,6 +213,12 @@ class VoiceClient(LoggingClass): 'address': ip, 'mode': self.mode, }, + 'codecs': codecs, + }) + self.send(VoiceOPCode.CLIENT_CONNECT, { + 'audio_ssrc': self.ssrc, + 'video_ssrc': 0, + 'rtx_ssrc': 0, }) def on_voice_resumed(self, data): @@ -161,14 +228,17 @@ class VoiceClient(LoggingClass): def on_voice_sdp(self, sdp): self.log.info('[%s] Recieved session description, connection completed', self) + self.mode = sdp['mode'] + self.audio_codec = sdp['audio_codec'] + self.video_codec = sdp['video_codec'] + self.transport_id = sdp['media_session_id'] + + # Set the UDP's RTP Audio Header's Payload Type + self.udp.set_audio_codec(sdp['audio_codec']) + # Create a secret box for encryption/decryption self.udp.setup_encryption(bytes(bytearray(sdp['secret_key']))) - # Toggle speaking state so clients learn of our SSRC - self.set_speaking(True) - self.set_speaking(False) - gevent.sleep(0.25) - self.set_state(VoiceState.CONNECTED) def on_voice_server_update(self, data): @@ -187,6 +257,18 @@ class VoiceClient(LoggingClass): self._connect_and_run() + def on_voice_speaking(self, data): + self.audio_ssrcs[data['ssrc']] = data['user_id'] + + payload = VoiceSpeaking( + client=self, + user_id=data['user_id'], + speaking=bool(data['speaking'] & SpeakingCodes.VOICE.value), + soundshare=bool(data['speaking'] & SpeakingCodes.SOUNDSHARE.value), + ) + + self.client.gw.events.emit('VoiceSpeaking', payload) + def on_message(self, msg): try: data = self.encoder.decode(msg) diff --git a/disco/voice/packets.py b/disco/voice/packets.py index 0ffb992..fec6391 100644 --- a/disco/voice/packets.py +++ b/disco/voice/packets.py @@ -11,5 +11,7 @@ VoiceOPCode = Enum( RESUME=7, HELLO=8, RESUMED=9, + CLIENT_CONNECT=12, CLIENT_DISCONNECT=13, + CODECS=14, ) diff --git a/disco/voice/udp.py b/disco/voice/udp.py index de13376..840f4c1 100644 --- a/disco/voice/udp.py +++ b/disco/voice/udp.py @@ -2,16 +2,48 @@ import struct import socket import gevent +from collections import namedtuple + try: import nacl.secret except ImportError: print('WARNING: nacl is not installed, voice support is disabled') +from holster.enum import Enum + from disco.util.logging import LoggingClass +AudioCodecs = ('opus',) + +PayloadTypes = Enum(OPUS=0x78) + MAX_UINT32 = 4294967295 MAX_SEQUENCE = 65535 +RTP_HEADER_VERSION = 0x80 # Only RTP Version is set here (value of 2 << 6) +RTP_EXTENSION_ONE_BYTE = (0xBE, 0xDE) + + +RTPHeader = namedtuple('RTPHeader', [ + 'version', + 'padding', + 'extension', + 'csrc_count', + 'marker', + 'payload_type', + 'sequence', + 'timestamp', + 'ssrc', +]) + +VoiceData = namedtuple('VoiceData', [ + 'client', + 'user_id', + 'payload_type', + 'rtp', + 'data', +]) + class UDPVoiceClient(LoggingClass): def __init__(self, vc): @@ -34,10 +66,17 @@ class UDPVoiceClient(LoggingClass): self._run_task = None self._secret_box = None - # Buffer used for encoding/sending frames - self._buffer = bytearray(24) - self._buffer[0] = 0x80 - self._buffer[1] = 0x78 + # RTP Header + self._rtp_audio_header = bytearray(12) + self._rtp_audio_header[0] = RTP_HEADER_VERSION + + def set_audio_codec(self, codec): + ptype = PayloadTypes.get(codec) + if ptype: + self._rtp_audio_header[1] = ptype.value + self.log.debug('[%s] Set UDP\'s Audio Codec to %s, RTP payload type %s', self.vc, ptype.name, ptype.value) + else: + raise Exception('The voice codec, {}, isn\'t supported.'.format(codec)) def increment_timestamp(self, by): self.timestamp += by @@ -52,27 +91,40 @@ class UDPVoiceClient(LoggingClass): frame = bytearray(frame) # Pack the rtc header into our buffer - struct.pack_into('>H', self._buffer, 2, sequence or self.sequence) - struct.pack_into('>I', self._buffer, 4, timestamp or self.timestamp) - struct.pack_into('>i', self._buffer, 8, self.vc.ssrc) + struct.pack_into('>H', self._rtp_audio_header, 2, sequence or self.sequence) + struct.pack_into('>I', self._rtp_audio_header, 4, timestamp or self.timestamp) + struct.pack_into('>i', self._rtp_audio_header, 8, self.vc.ssrc) if self.vc.mode == 'xsalsa20_poly1305_lite': + # Use an incrementing number as a nonce, only first 4 bytes of the nonce is padded on self._nonce += 1 if self._nonce > MAX_UINT32: self._nonce = 0 nonce = bytearray(24) struct.pack_into('>I', nonce, 0, self._nonce) - raw = self._secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce[:4] + nonce_padding = nonce[:4] elif self.vc.mode == 'xsalsa20_poly1305_suffix': + # Generate a nonce nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) - raw = self._secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce + nonce_padding = nonce + elif self.vc.mode == 'xsalsa20_poly1305': + # Nonce is the header + nonce = bytearray(24) + nonce[:12] = self._rtp_audio_header + nonce_padding = None else: - # Now encrypt the payload with the nonce as a header - raw = self._secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext + raise Exception('The voice mode, {}, isn\'t supported.'.format(self.vc.mode)) + + # Encrypt the payload with the nonce + payload = self._secret_box.encrypt(bytes(frame), bytes(nonce)).ciphertext + + # Pad the payload with the nonce, if applicable + if nonce_padding: + payload += nonce_padding # Send the header (sans nonce padding) plus the payload - self.send(self._buffer[:12] + raw) + self.send(self._rtp_audio_header + payload) # Increment our sequence counter self.sequence += 1 @@ -85,7 +137,111 @@ class UDPVoiceClient(LoggingClass): def run(self): while True: - self.conn.recvfrom(4096) + data, addr = self.conn.recvfrom(4096) + + # Data cannot be less than the bare minimum, just ignore + if len(data) <= 12: + self.log.debug('[%s] [VoiceData] Received voice data under 13 bytes', self.vc) + continue + + first, second, sequence, timestamp, ssrc = struct.unpack_from('>BBHII', data) + + rtp = RTPHeader( + version=first >> 6, + padding=(first >> 5) & 1, + extension=(first >> 4) & 1, + csrc_count=first & 0x0F, + marker=second >> 7, + payload_type=second & 0x7F, + sequence=sequence, + timestamp=timestamp, + ssrc=ssrc, + ) + + # Check if rtp version is 2 + if rtp.version != 2: + self.log.debug('[%s] [VoiceData] Received an invalid RTP packet version, %s', self.vc, rtp.version) + continue + + payload_type = PayloadTypes.get(rtp.payload_type) + + # Unsupported payload type received + if not payload_type: + self.log.debug('[%s] [VoiceData] Received unsupported payload type, %s', self.vc, rtp.payload_type) + continue + + nonce = bytearray(24) + if self.vc.mode == 'xsalsa20_poly1305_lite': + nonce[:4] = data[-4:] + data = data[:-4] + elif self.vc.mode == 'xsalsa20_poly1305_suffx': + nonce[:24] = data[-24:] + data = data[:-24] + elif self.vc.mode == 'xsalsa20_poly1305': + nonce[:12] = data[:12] + else: + self.log.debug('[%s] [VoiceData] Unsupported Encryption Mode, %s', self.vc, self.vc.mode) + continue + + try: + data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce)) + except Exception: + self.log.debug('[%s] [VoiceData] Failed to decode data from ssrc %s', self.vc, rtp.ssrc) + continue + + # RFC3550 Section 5.1 (Padding) + if rtp.padding: + padding_amount, = struct.unpack_from('>B', data[:-1]) + data = data[-padding_amount:] + + if rtp.extension: + # RFC5285 Section 4.2: One-Byte Header + rtp_extension_header = struct.unpack_from('>BB', data) + if rtp_extension_header == RTP_EXTENSION_ONE_BYTE: + data = data[2:] + + fields_amount, = struct.unpack_from('>H', data) + fields = [] + + offset = 4 + for i in range(fields_amount): + first_byte, = struct.unpack_from('>B', data[offset]) + offset += 1 + + rtp_extension_identifer = first_byte & 0xF + rtp_extension_len = ((first_byte >> 4) & 0xF) + 1 + + # Ignore data if identifer == 15, so skip if this is set as 0 + if rtp_extension_identifer: + fields.append(data[offset:offset + rtp_extension_len]) + + offset += rtp_extension_len + + # skip padding + while data[offset] == 0: + offset += 1 + + if len(fields): + fields.append(data[offset:]) + data = b''.join(fields) + else: + data = data[offset:] + + # RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header + # clients send it sometimes, definitely on fresh connects to a server, dunno what to do here + if rtp.marker: + self.log.debug('[%s] [VoiceData] Received RTP data with the marker set, skipping', self.vc) + continue + + payload = VoiceData( + client=self.vc, + payload_type=payload_type.name, + user_id=self.vc.audio_ssrcs.get(rtp.ssrc, None), + rtp=rtp, + data=data, + ) + + self.vc.client.gw.events.emit('VoiceData', payload) def send(self, data): self.conn.sendto(data, (self.ip, self.port))