From a7379e0e89030e3fedc76360e3b4b1abf085985b Mon Sep 17 00:00:00 2001 From: Dan <31395415+cakedan@users.noreply.github.com> Date: Thu, 14 Jun 2018 20:31:48 -0400 Subject: [PATCH] initial voice --- disco/gateway/packets.py | 2 +- disco/voice/client.py | 86 +++++++++++++++++++++++-- disco/voice/packets.py | 4 +- disco/voice/udp.py | 132 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 215 insertions(+), 9 deletions(-) diff --git a/disco/gateway/packets.py b/disco/gateway/packets.py index a15bfd8..2f0bf82 100644 --- a/disco/gateway/packets.py +++ b/disco/gateway/packets.py @@ -16,5 +16,5 @@ OPCode = Enum( INVALID_SESSION=9, HELLO=10, HEARTBEAT_ACK=11, - GUILD_SYNC=12, + GUILD_SYNC=12 ) diff --git a/disco/voice/client.py b/disco/voice/client.py index cfa999a..8f4214a 100644 --- a/disco/voice/client.py +++ b/disco/voice/client.py @@ -3,6 +3,8 @@ from __future__ import print_function import gevent import time +from collections import namedtuple + from holster.enum import Enum from holster.emitter import Emitter @@ -13,6 +15,16 @@ from disco.gateway.packets import OPCode from disco.voice.packets import VoiceOPCode from disco.voice.udp import UDPVoiceClient +AudioCodecs = ('opus',) + +PayloadTypes = Enum(OPUS=0x78) + +SpeakingCodes = Enum( + NONE=0 + VOICE=1 << 0, + SOUNDSHARE=1 << 1 +) + VoiceState = Enum( DISCONNECTED=0, RECONNECTING=1, @@ -25,6 +37,18 @@ VoiceState = Enum( VOICE_CONNECTED=8, ) +class VoiceSpeaking(namedtuple('VoiceSpeaking', ['user_id', 'speaking', 'soundshare'])): + """ + Voice Speaking Event + Attributes + --------- + user_id : snowflake + the id of the user + speaking : bool + if they are speaking + soundshare : bool + if they are using soundshare + """ class VoiceException(Exception): def __init__(self, msg, client): @@ -33,7 +57,7 @@ class VoiceException(Exception): class VoiceClient(LoggingClass): - VOICE_GATEWAY_VERSION = 3 + VOICE_GATEWAY_VERSION = 4 SUPPORTED_MODES = { 'xsalsa20_poly1305_lite', @@ -58,6 +82,10 @@ class VoiceClient(LoggingClass): self.packets.on(VoiceOPCode.READY, self.on_voice_ready) self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed) self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp) + self.packets.on(VoiceOPCode.SPEAKING, self.on_voice_speaking) + self.packets.on(VoiceOPCode.CLIENT_CONNECT, self.on_voice_client_connect) + self.packets.on(VoiceOPCode.CLIENT_DISCONNECT, self.on_voice_client_disconnect) + self.packets.on(VoiceOPCode.CODECS, self.on_voice_codecs) # State + state change emitter self.state = VoiceState.DISCONNECTED @@ -71,6 +99,9 @@ class VoiceClient(LoggingClass): self.port = None self.mode = None self.udp = None + self.audio_codec = None + self.video_codec = None + self.transport_id = None # Websocket connection self.ws = None @@ -80,6 +111,10 @@ class VoiceClient(LoggingClass): self._update_listener = None self._heartbeat_task = None + # SSRCs + + self.audio_ssrcs = {} + def __repr__(self): return u''.format(self.channel) @@ -102,10 +137,11 @@ class VoiceClient(LoggingClass): self.send(VoiceOPCode.HEARTBEAT, time.time()) gevent.sleep(interval / 1000) - def set_speaking(self, value): + def set_speaking(self, value, delay=0): self.send(VoiceOPCode.SPEAKING, { - 'speaking': value, - 'delay': 0, + 'speaking': int(value), + 'delay': delay, + 'ssrc': self.ssrc }) def send(self, op, data): @@ -114,6 +150,21 @@ class VoiceClient(LoggingClass): 'op': op.value, 'd': data, }), self.encoder.OPCODE) + + def on_voice_client_connect(self, data): + self.audio_ssrcs[data['audio_ssrc']] = data['user_id'] + # ignore data['voice_ssrc'] for now + + def on_voice_client_disconnect(self, data): + for ssrc in self.audio_ssrcs.keys(): + if self.audio_ssrcs[ssrc] == data['user_id']: + del self.audio_ssrcs[ssrc] + break + + def on_voice_codecs(self, data): + self.audio_codec = data['audio_codec'] + self.video_codec = data['video_codec'] + self.transport_id = data['media_session_id'] def on_voice_hello(self, data): self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self) @@ -143,6 +194,17 @@ class VoiceClient(LoggingClass): self.log.error('Failed to discover our IP, perhaps a NAT or firewall is fucking us') self.disconnect() return + + codecs = [] + + for i in range(len(AudioCodecs)): + codec = AudioCodecs[i] + codecs.append({ + 'name': codec, + 'type': 'audio', + 'priority': i * 1000, + 'payload_type': PayloadTypes.get(codec.upper()) + }) self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port) self.send(VoiceOPCode.SELECT_PROTOCOL, { @@ -152,6 +214,7 @@ class VoiceClient(LoggingClass): 'address': ip, 'mode': self.mode, }, + 'codecs': codecs }) def on_voice_resumed(self, data): @@ -161,6 +224,11 @@ class VoiceClient(LoggingClass): def on_voice_sdp(self, sdp): self.log.info('[%s] Recieved session description, connection completed', self) + self.mode = sdp['mode'] + self.audio_codec = sdp['audio_codec'] + self.video_codec = sdp['video_codec'] + self.transport_id = sdp['media_session_id'] + # Create a secret box for encryption/decryption self.udp.setup_encryption(bytes(bytearray(sdp['secret_key']))) @@ -187,6 +255,16 @@ class VoiceClient(LoggingClass): self._connect_and_run() + def on_voice_speaking(self, data): + self.audio_ssrcs[data['ssrc']] = data['user_id'] + + payload = VoiceSpeaking() + payload.user_id = data['user_id'] + payload.speaking = (data['speaking'] & SpeakingCodes.VOICE) == SpeakingCodes.VOICE + payload.soundshare = (data['speaking'] & SpeakingCodes.SOUNDSHARE) == SpeakingCodes.SOUNDSHARE + + self.client.gw.events.emit('VoiceSpeaking', payload) + def on_message(self, msg): try: data = self.encoder.decode(msg) diff --git a/disco/voice/packets.py b/disco/voice/packets.py index 0ffb992..dfca477 100644 --- a/disco/voice/packets.py +++ b/disco/voice/packets.py @@ -11,5 +11,7 @@ VoiceOPCode = Enum( RESUME=7, HELLO=8, RESUMED=9, + CLIENT_CONNECT=12, CLIENT_DISCONNECT=13, -) + CODECS=14 +) \ No newline at end of file diff --git a/disco/voice/udp.py b/disco/voice/udp.py index de13376..b74c549 100644 --- a/disco/voice/udp.py +++ b/disco/voice/udp.py @@ -2,16 +2,59 @@ import struct import socket import gevent +from collections import namedtuple + try: import nacl.secret except ImportError: print('WARNING: nacl is not installed, voice support is disabled') +from holster.enum import Enum + from disco.util.logging import LoggingClass +from disco.voice.client import PayloadTypes MAX_UINT32 = 4294967295 MAX_SEQUENCE = 65535 +RTP_HEADER_ONE_BYTE = (0xBE, 0xDE) + +class RTPHeader(namedtuple('RTPHeader', ['version', 'padding', 'extension', 'csrc_count', 'marker', 'payload_type', 'sequence', 'timestamp', 'ssrc'])): + """ + RTP Packet's Header information + Attributes + --------- + version : integer + the RTP version the packet's using + padding : integer + is this RTP packet using padding + extension : integer + is this RTP packet using extension + csrc_count : integer + marker : integer + is this RTP packet having a marker + payload_type : integer + RTP packet's payload type, currently should only be OPUS data + sequence : integer + RTP packet's sequence + timestamp : integer + RTP packet's timestamp + ssrc : integer + RTP packet's SSRC, the person talking + """ + +class VoiceData(namedtuple('VoiceData', ['data', 'user_id', 'rtp'])): + """ + Voice Data received from the UDP socket + Attributes + --------- + data : bytes + the decrypted data + user_id: snowflake + the id of the user who sent this data + rtp : RTPHeader + the rtp packet's header data + """ class UDPVoiceClient(LoggingClass): def __init__(self, vc): @@ -36,8 +79,8 @@ class UDPVoiceClient(LoggingClass): # Buffer used for encoding/sending frames self._buffer = bytearray(24) - self._buffer[0] = 0x80 - self._buffer[1] = 0x78 + self._buffer[0] = 2 << 6 # Only RTP Version set in the first byte of the header, 0x80 + self._buffer[1] = PayloadTypes.OPUS def increment_timestamp(self, by): self.timestamp += by @@ -85,7 +128,90 @@ class UDPVoiceClient(LoggingClass): def run(self): while True: - self.conn.recvfrom(4096) + data, addr = self.conn.recvfrom(4096) + + # Data cannot be less than the bare minimum, just ignore + if len(data) <= 12: + continue + + rtp = RTPHeader() + rtp.version = data[1] >> 6 + rtp.padding = (data[1] >> 5) & 1 + rtp.extension = (data[1] >> 4) & 1 + rtp.csrc_count = data[1] & 0x0F + + rtp.marker = data[2] >> 7 + rtp.payload_type = data[2] & 0x7F + + rtp.sequence = struct.unpack('>H', data[2:]) + rtp.timestamp = struct.unpack('>I', data[4:]) + rtp.ssrc = struct.unpack('>I', data[8:]) + + # Check if rtp version is 2 + if rtp.version != 2: + continue + + payload_type = PayloadTypes.get(rtp.payload_type) + + # Unsupported payload type received + if not payload_type: + continue + + nonce = bytearray(24) + if self.vc.mode == 'xsalsa20_poly1305_lite': + struct.pack_into('>I', nonce, 0, data[-4:]) + data = data[-4:] + elif self.vc.mode == 'xsalsa20_poly1305_suffx': + struct.pack_into('>I', nonce, 0, data[-24:]) + data = data[-24:] + else: + struct.pack_into('>I', nonce, 0, data[:12]) + + data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce)) + + # RFC3550 Section 5.1 (Padding) + if rtp.padding: + padding_amount = data[:-1] + data = data[-padding_amount:] + + if rtp.extension: + # RFC5285 Section 4.2: One-Byte Header + if all(data[i] == RTP_HEADER_ONE_BYTE[i] for i in range(len(RTP_HEADER_ONE_BYTE))): + fields_amount = struct.unpack_from('>H', data) + fields = [] + + offset = 4 + for i in range(fields_amount): + offset += 1 + first_byte = data[offset] + + rtp_extension_identifer = first_byte & 0xF + rtp_extension_len = ((first_byte >> 4) & 0xF) + 1 + + # Ignore data if identifer == 15, so skip if this is set as 0 + if rtp_extension_identifer: + fields.append(data[offset:offset + rtp_extension_len]) + + offset += rtp_extension_len + + # skip padding + while data[offset] == 0: + offset += 1 + + if len(fields): + data = b''.join(fields + [data[offset:]]) + else: + data = data[offset:] + + # RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header + # clients send it sometimes, definitely on fresh connects to a server, dunno what to do here + if rtp.marker: + continue + + user_id = self.vc.audio_ssrcs.get(rtp.ssrc, None) + payload = VoiceData(data=data, user_id=user_id, rtp=rtp) + + self.vc.client.gw.events.emit('VoiceReceived', payload) def send(self, data): self.conn.sendto(data, (self.ip, self.port))