diff --git a/disco/state.py b/disco/state.py index cdcdfe8..45824e0 100644 --- a/disco/state.py +++ b/disco/state.py @@ -9,6 +9,7 @@ from disco.types.base import UNSET from disco.util.config import Config from disco.util.string import underscore from disco.util.hashmap import HashMap, DefaultHashMap +from disco.voice.client import VoiceState class StackMessage(namedtuple('StackMessage', ['id', 'channel_id', 'author_id'])): @@ -82,6 +83,8 @@ class State(object): Weak mapping of all known/loaded Channels users : dict(snowflake, `User`) Weak mapping of all known/loaded Users + voice_clients : dict(str, 'VoiceClient') + Weak mapping of all known voice clients voice_states : dict(str, `VoiceState`) Weak mapping of all known/active Voice States messages : Optional[dict(snowflake, deque)] @@ -90,8 +93,8 @@ class State(object): EVENTS = [ 'Ready', 'GuildCreate', 'GuildUpdate', 'GuildDelete', 'GuildMemberAdd', 'GuildMemberRemove', 'GuildMemberUpdate', 'GuildMembersChunk', 'GuildRoleCreate', 'GuildRoleUpdate', 'GuildRoleDelete', - 'GuildEmojisUpdate', 'ChannelCreate', 'ChannelUpdate', 'ChannelDelete', 'VoiceStateUpdate', 'MessageCreate', - 'PresenceUpdate', + 'GuildEmojisUpdate', 'ChannelCreate', 'ChannelUpdate', 'ChannelDelete', 'VoiceServerUpdate', 'VoiceStateUpdate', + 'MessageCreate', 'PresenceUpdate', ] def __init__(self, client, config): @@ -106,6 +109,7 @@ class State(object): self.guilds = HashMap() self.channels = HashMap(weakref.WeakValueDictionary()) self.users = HashMap(weakref.WeakValueDictionary()) + self.voice_clients = HashMap(weakref.WeakValueDictionary()) self.voice_states = HashMap(weakref.WeakValueDictionary()) # If message tracking is enabled, listen to those events @@ -211,6 +215,9 @@ class State(object): # Just delete the guild, channel references will fall del self.guilds[event.id] + if event.id in self.voice_clients: + self.voice_clients[event.id].disconnect() + def on_channel_create(self, event): if event.channel.is_guild and event.channel.guild_id in self.guilds: self.guilds[event.channel.guild_id].channels[event.channel.id] = event.channel @@ -233,6 +240,14 @@ class State(object): elif event.channel.is_dm and event.channel.id in self.dms: del self.dms[event.channel.id] + def on_voice_server_update(self, event): + if event.guild_id not in self.voice_clients: + return + + voice_client = self.voice_clients.get(event.guild_id) + voice_client.set_endpoint(event.endpoint) + voice_client.set_token(event.token) + def on_voice_state_update(self, event): # Existing connection, we are either moving channels or disconnecting if event.state.session_id in self.voice_states: @@ -257,6 +272,21 @@ class State(object): del self.voice_states[expired_voice_state.session_id] self.voice_states[event.state.session_id] = event.state + if event.state.user_id != self.me.id: + return + + server_id = event.state.guild_id or event.state.channel_id + if server_id in self.voice_clients: + voice_client = self.voice_clients[server_id] + + voice_client.channel_id = event.state.channel_id + if not event.state.channel_id: + voice_client.disconnect() + return + + if voice_client.token: + voice_client.set_state(VoiceState.CONNECTED) + def on_guild_member_add(self, event): if event.member.user.id not in self.users: self.users[event.member.user.id] = event.member.user diff --git a/disco/types/channel.py b/disco/types/channel.py index 9f4a9a5..ced2608 100644 --- a/disco/types/channel.py +++ b/disco/types/channel.py @@ -359,9 +359,11 @@ class Channel(SlottedModel, Permissible): """ from disco.voice.client import VoiceClient assert self.is_voice, 'Channel must support voice to connect' - vc = VoiceClient(self) - vc.connect(*args, **kwargs) - return vc + + server_id = self.guild_id or self.id + vc = self.client.state.voice_clients.get(server_id) or VoiceClient(self.client, server_id, is_dm=self.is_dm) + + return vc.connect(self.id, *args, **kwargs) def create_overwrite(self, *args, **kwargs): """ diff --git a/disco/voice/client.py b/disco/voice/client.py index cb78785..2a48346 100644 --- a/disco/voice/client.py +++ b/disco/voice/client.py @@ -12,13 +12,15 @@ from disco.gateway.encoding.json import JSONEncoder from disco.util.websocket import Websocket from disco.util.logging import LoggingClass from disco.gateway.packets import OPCode +from disco.types.base import cached_property from disco.voice.packets import VoiceOPCode -from disco.voice.udp import AudioCodecs, PayloadTypes, UDPVoiceClient +from disco.voice.udp import AudioCodecs, RTPPayloadTypes, UDPVoiceClient -SpeakingCodes = Enum( +SpeakingFlags = Enum( NONE=0, VOICE=1 << 0, SOUNDSHARE=1 << 1, + PRIORITY=1 << 2, ) VoiceState = Enum( @@ -38,6 +40,7 @@ VoiceSpeaking = namedtuple('VoiceSpeaking', [ 'user_id', 'speaking', 'soundshare', + 'priority', ]) @@ -56,16 +59,19 @@ class VoiceClient(LoggingClass): 'xsalsa20_poly1305', } - def __init__(self, channel, encoder=None, max_reconnects=5): + def __init__(self, client, server_id, is_dm=False, encoder=None, max_reconnects=5): super(VoiceClient, self).__init__() - if not channel.is_voice: - raise ValueError('Cannot spawn a VoiceClient for a non-voice channel') - - self.channel = channel - self.client = self.channel.client + self.client = client + self.server_id = server_id + self.channel_id = None + self.is_dm = is_dm self.encoder = encoder or JSONEncoder self.max_reconnects = max_reconnects + self.video_enabled = False + + # Set the VoiceClient in the state's voice clients + self.client.state.voice_clients[self.server_id] = self # Bind to some WS packets self.packets = Emitter() @@ -97,16 +103,44 @@ class VoiceClient(LoggingClass): # Websocket connection self.ws = None - self._session_id = None + self._session_id = self.client.gw.session_id self._reconnects = 0 - self._update_listener = None self._heartbeat_task = None + self._identified = False # SSRCs self.audio_ssrcs = {} def __repr__(self): - return u''.format(self.channel) + return u''.format(self.server_id) + + @cached_property + def guild(self): + return self.client.state.guilds.get(self.server_id) if not self.is_dm else None + + @cached_property + def channel(self): + return self.client.state.channels.get(self.channel_id) + + @property + def user_id(self): + return self.client.state.me.id + + @property + def ssrc_audio(self): + return self.ssrc + + @property + def ssrc_video(self): + return self.ssrc + 1 + + @property + def ssrc_rtx(self): + return self.ssrc + 2 + + @property + def ssrc_rtcp(self): + return self.ssrc + 3 def set_state(self, state): self.log.debug('[%s] state %s -> %s', self, self.state, state) @@ -114,6 +148,29 @@ class VoiceClient(LoggingClass): self.state = state self.state_emitter.emit(state, prev_state) + def set_endpoint(self, endpoint): + endpoint = endpoint.split(':', 1)[0] + if self.endpoint == endpoint: + return + + self.log.info( + '[%s] Set endpoint from VOICE_SERVER_UPDATE (state = %s / endpoint = %s)', self, self.state, endpoint) + + self.endpoint = endpoint + + if self.ws and self.ws.sock and self.ws.sock.connected: + self.ws.close() + self.ws = None + + self._identified = False + + def set_token(self, token): + if self.token == token: + return + self.token = token + if not self._identified: + self._connect_and_run() + def _connect_and_run(self): self.ws = Websocket('wss://' + self.endpoint + '/?v={}'.format(self.VOICE_GATEWAY_VERSION)) self.ws.emitter.on('on_open', self.on_open) @@ -127,12 +184,14 @@ class VoiceClient(LoggingClass): self.send(VoiceOPCode.HEARTBEAT, time.time()) gevent.sleep(interval / 1000) - def set_speaking(self, voice=False, soundshare=False, delay=0): - value = SpeakingCodes.NONE.value + def set_speaking(self, voice=False, soundshare=False, priority=False, delay=0): + value = SpeakingFlags.NONE.value if voice: - value |= SpeakingCodes.VOICE.value + value |= SpeakingFlags.VOICE.value if soundshare: - value |= SpeakingCodes.SOUNDSHARE.value + value |= SpeakingFlags.SOUNDSHARE.value + if priority: + value |= SpeakingFlags.PRIORITY.value self.send(VoiceOPCode.SPEAKING, { 'speaking': value, @@ -140,20 +199,36 @@ class VoiceClient(LoggingClass): 'ssrc': self.ssrc, }) + def set_voice_state(self, channel_id, mute=False, deaf=False, video=False): + self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { + 'self_mute': bool(mute), + 'self_deaf': bool(deaf), + 'self_video': bool(video), + 'guild_id': None if self.is_dm else self.server_id, + 'channel_id': channel_id, + }) + def send(self, op, data): - self.log.debug('[%s] sending OP %s (data = %s)', self, op, data) - self.ws.send(self.encoder.encode({ - 'op': op.value, - 'd': data, - }), self.encoder.OPCODE) + if self.ws and self.ws.sock and self.ws.sock.connected: + self.log.debug('[%s] sending OP %s (data = %s)', self, op, data) + self.ws.send(self.encoder.encode({ + 'op': op.value, + 'd': data, + }), self.encoder.OPCODE) + else: + self.log.debug('[%s] dropping because ws is closed OP %s (data = %s)', self, op, data) def on_voice_client_connect(self, data): - self.audio_ssrcs[data['audio_ssrc']] = data['user_id'] + user_id = int(data['user_id']) + + self.audio_ssrcs[data['audio_ssrc']] = user_id # ignore data['voice_ssrc'] for now def on_voice_client_disconnect(self, data): + user_id = int(data['user_id']) + for ssrc in self.audio_ssrcs.keys(): - if self.audio_ssrcs[ssrc] == data['user_id']: + if self.audio_ssrcs[ssrc] == user_id: del self.audio_ssrcs[ssrc] break @@ -166,16 +241,17 @@ class VoiceClient(LoggingClass): self.udp.set_audio_codec(data['audio_codec']) def on_voice_hello(self, data): - self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self) + self.log.info('[%s] Received Voice HELLO payload, starting heartbeater', self) self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval']) self.set_state(VoiceState.AUTHENTICATED) def on_voice_ready(self, data): - self.log.info('[%s] Recived Voice READY payload, attempting to negotiate voice connection w/ remote', self) + self.log.info('[%s] Received Voice READY payload, attempting to negotiate voice connection w/ remote', self) self.set_state(VoiceState.CONNECTING) self.ssrc = data['ssrc'] self.ip = data['ip'] self.port = data['port'] + self._identified = True for mode in self.SUPPORTED_MODES: if mode in data['modes']: @@ -202,7 +278,7 @@ class VoiceClient(LoggingClass): 'name': codec, 'type': 'audio', 'priority': (idx + 1) * 1000, - 'payload_type': PayloadTypes.get(codec).value, + 'payload_type': RTPPayloadTypes.get(codec).value, }) self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port) @@ -222,11 +298,11 @@ class VoiceClient(LoggingClass): }) def on_voice_resumed(self, data): - self.log.info('[%s] Recieved resumed', self) + self.log.info('[%s] Received resumed', self) self.set_state(VoiceState.CONNECTED) def on_voice_sdp(self, sdp): - self.log.info('[%s] Recieved session description, connection completed', self) + self.log.info('[%s] Received session description, connection completed', self) self.mode = sdp['mode'] self.audio_codec = sdp['audio_codec'] @@ -241,30 +317,18 @@ class VoiceClient(LoggingClass): self.set_state(VoiceState.CONNECTED) - def on_voice_server_update(self, data): - if self.channel.guild_id != data.guild_id or not data.token: - return - - if self.token and self.token != data.token: - return - - self.log.info('[%s] Recieved VOICE_SERVER_UPDATE (state = %s / endpoint = %s)', self, self.state, data.endpoint) - - self.token = data.token - self.set_state(VoiceState.AUTHENTICATING) - - self.endpoint = data.endpoint.split(':', 1)[0] - - self._connect_and_run() - def on_voice_speaking(self, data): - self.audio_ssrcs[data['ssrc']] = data['user_id'] + user_id = int(data['user_id']) + self.audio_ssrcs[data['ssrc']] = user_id + + # Maybe rename speaking to voice in future payload = VoiceSpeaking( client=self, - user_id=data['user_id'], - speaking=bool(data['speaking'] & SpeakingCodes.VOICE.value), - soundshare=bool(data['speaking'] & SpeakingCodes.SOUNDSHARE.value), + user_id=user_id, + speaking=bool(data['speaking'] & SpeakingFlags.VOICE.value), + soundshare=bool(data['speaking'] & SpeakingFlags.SOUNDSHARE.value), + priority=bool(data['speaking'] & SpeakingFlags.PRIORITY.value), ) self.client.gw.events.emit('VoiceSpeaking', payload) @@ -280,92 +344,113 @@ class VoiceClient(LoggingClass): self.log.error('[%s] Voice websocket error: %s', self, err) def on_open(self): - if self._session_id: - return self.send(VoiceOPCode.RESUME, { - 'server_id': self.channel.guild_id, - 'user_id': self.client.state.me.id, + if self._identified: + self.send(VoiceOPCode.RESUME, { + 'server_id': self.server_id, 'session_id': self._session_id, 'token': self.token, }) - - self._session_id = self.client.gw.session_id - - self.send(VoiceOPCode.IDENTIFY, { - 'server_id': self.channel.guild_id, - 'user_id': self.client.state.me.id, - 'session_id': self._session_id, - 'token': self.token, - }) + else: + self.send(VoiceOPCode.IDENTIFY, { + 'server_id': self.server_id, + 'user_id': self.user_id, + 'session_id': self._session_id, + 'token': self.token, + 'video': self.video_enabled, + }) def on_close(self, code, reason): self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects) if self._heartbeat_task: self._heartbeat_task.kill() + self._heartbeat_task = None + + self.ws = None - # If we're not in a connected state, don't try to resume/reconnect - if self.state != VoiceState.CONNECTED: + # If we killed the connection, don't try resuming + if self.state == VoiceState.DISCONNECTED: return self.log.info('[%s] Attempting Websocket Resumption', self) - self._reconnects += 1 - - if self.max_reconnects and self._reconnects > self.max_reconnects: - raise VoiceException('Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects)) self.set_state(VoiceState.RECONNECTING) - # Don't resume for these error codes: - if code and 4000 <= code <= 4016: - self._session_id = None + # Check if code is not None, was not from us + if code is not None: + self._reconnects += 1 + + if self.max_reconnects and self._reconnects > self.max_reconnects: + raise VoiceException( + 'Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects), self) + + # Don't resume for these error codes: + if 4000 <= code <= 4016: + self._identified = False + + if self.udp and self.udp.connected: + self.udp.disconnect() - if self.udp and self.udp.connected: - self.udp.disconnect() + wait_time = 5 + else: + wait_time = 1 - wait_time = (self._reconnects - 1) * 5 self.log.info( - '[%s] Will attempt to %s after %s seconds', self, 'resume' if self._session_id else 'reconnect', wait_time) + '[%s] Will attempt to %s after %s seconds', self, 'resume' if self._identified else 'reconnect', wait_time) gevent.sleep(wait_time) self._connect_and_run() - def connect(self, timeout=5, mute=False, deaf=False): - self.log.debug('[%s] Attempting connection', self) - self.set_state(VoiceState.AWAITING_ENDPOINT) + def connect(self, channel_id, timeout=10, **kwargs): + if self.is_dm: + channel_id = self.server_id - self._update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update) + if not channel_id: + raise VoiceException('[%s] cannot connect to an empty channel id', self) - self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { - 'self_mute': mute, - 'self_deaf': deaf, - 'guild_id': int(self.channel.guild_id), - 'channel_id': int(self.channel.id), - }) + if self.channel_id == channel_id: + if self.state == VoiceState.CONNECTED: + self.log.debug('[%s] Already connected to %s, returning', self, self.channel) + return self + else: + if self.state == VoiceState.CONNECTED: + self.log.debug('[%s] Moving to channel %s', self, channel_id) + else: + self.log.debug('[%s] Attempting connection to channel id %s', self, channel_id) + self.set_state(VoiceState.AWAITING_ENDPOINT) + + self.set_voice_state(channel_id, **kwargs) if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout): self.disconnect() raise VoiceException('Failed to connect to voice', self) + else: + return self def disconnect(self): + if self.state == VoiceState.DISCONNECTED: + return + self.log.debug('[%s] disconnect called', self) self.set_state(VoiceState.DISCONNECTED) + del self.client.state.voice_clients[self.server_id] + if self._heartbeat_task: self._heartbeat_task.kill() self._heartbeat_task = None if self.ws and self.ws.sock and self.ws.sock.connected: self.ws.close() + self.ws = None if self.udp and self.udp.connected: self.udp.disconnect() - self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { - 'self_mute': False, - 'self_deaf': False, - 'guild_id': int(self.channel.guild_id), - 'channel_id': None, - }) + if self.channel_id: + self.set_voice_state(None) + + self.client.gw.events.emit('VoiceDisconnect', self) def send_frame(self, *args, **kwargs): self.udp.send_frame(*args, **kwargs) diff --git a/disco/voice/udp.py b/disco/voice/udp.py index 840f4c1..b269dd9 100644 --- a/disco/voice/udp.py +++ b/disco/voice/udp.py @@ -15,7 +15,17 @@ from disco.util.logging import LoggingClass AudioCodecs = ('opus',) -PayloadTypes = Enum(OPUS=0x78) +RTPPayloadTypes = Enum(OPUS=0x78) + +RTCPPayloadTypes = Enum( + SENDER_REPORT=200, + RECEIVER_REPORT=201, + SOURCE_DESCRIPTION=202, + BYE=203, + APP=204, + RTPFB=205, + PSFB=206, +) MAX_UINT32 = 4294967295 MAX_SEQUENCE = 65535 @@ -23,7 +33,6 @@ MAX_SEQUENCE = 65535 RTP_HEADER_VERSION = 0x80 # Only RTP Version is set here (value of 2 << 6) RTP_EXTENSION_ONE_BYTE = (0xBE, 0xDE) - RTPHeader = namedtuple('RTPHeader', [ 'version', 'padding', @@ -36,11 +45,29 @@ RTPHeader = namedtuple('RTPHeader', [ 'ssrc', ]) +RTCPHeader = namedtuple('RTCPHeader', [ + 'version', + 'padding', + 'reception_count', + 'packet_type', + 'length', + 'ssrc', +]) + +RTCPData = namedtuple('RTCPData', [ + 'client', + 'user_id', + 'payload_type', + 'header', + 'data', +]) + VoiceData = namedtuple('VoiceData', [ 'client', 'user_id', 'payload_type', 'rtp', + 'nonce', 'data', ]) @@ -71,12 +98,12 @@ class UDPVoiceClient(LoggingClass): self._rtp_audio_header[0] = RTP_HEADER_VERSION def set_audio_codec(self, codec): - ptype = PayloadTypes.get(codec) - if ptype: - self._rtp_audio_header[1] = ptype.value - self.log.debug('[%s] Set UDP\'s Audio Codec to %s, RTP payload type %s', self.vc, ptype.name, ptype.value) - else: - raise Exception('The voice codec, {}, isn\'t supported.'.format(codec)) + if codec not in AudioCodecs: + raise Exception('Unsupported audio codec received, {}'.format(codec)) + + ptype = RTPPayloadTypes.get(codec) + self._rtp_audio_header[1] = ptype.value + self.log.debug('[%s] Set UDP\'s Audio Codec to %s, RTP payload type %s', self.vc, ptype.name, ptype.value) def increment_timestamp(self, by): self.timestamp += by @@ -93,7 +120,7 @@ class UDPVoiceClient(LoggingClass): # Pack the rtc header into our buffer struct.pack_into('>H', self._rtp_audio_header, 2, sequence or self.sequence) struct.pack_into('>I', self._rtp_audio_header, 4, timestamp or self.timestamp) - struct.pack_into('>i', self._rtp_audio_header, 8, self.vc.ssrc) + struct.pack_into('>i', self._rtp_audio_header, 8, self.vc.ssrc_audio) if self.vc.mode == 'xsalsa20_poly1305_lite': # Use an incrementing number as a nonce, only first 4 bytes of the nonce is padded on @@ -144,104 +171,138 @@ class UDPVoiceClient(LoggingClass): self.log.debug('[%s] [VoiceData] Received voice data under 13 bytes', self.vc) continue - first, second, sequence, timestamp, ssrc = struct.unpack_from('>BBHII', data) - - rtp = RTPHeader( - version=first >> 6, - padding=(first >> 5) & 1, - extension=(first >> 4) & 1, - csrc_count=first & 0x0F, - marker=second >> 7, - payload_type=second & 0x7F, - sequence=sequence, - timestamp=timestamp, - ssrc=ssrc, - ) - - # Check if rtp version is 2 - if rtp.version != 2: - self.log.debug('[%s] [VoiceData] Received an invalid RTP packet version, %s', self.vc, rtp.version) - continue - - payload_type = PayloadTypes.get(rtp.payload_type) - - # Unsupported payload type received - if not payload_type: - self.log.debug('[%s] [VoiceData] Received unsupported payload type, %s', self.vc, rtp.payload_type) - continue - - nonce = bytearray(24) - if self.vc.mode == 'xsalsa20_poly1305_lite': - nonce[:4] = data[-4:] - data = data[:-4] - elif self.vc.mode == 'xsalsa20_poly1305_suffx': - nonce[:24] = data[-24:] - data = data[:-24] - elif self.vc.mode == 'xsalsa20_poly1305': - nonce[:12] = data[:12] + first, second = struct.unpack_from('>BB', data) + + payload_type = RTCPPayloadTypes.get(second) + if payload_type: + length, ssrc = struct.unpack_from('>HI', data, 2) + + rtcp = RTCPHeader( + version=first >> 6, + padding=(first >> 5) & 1, + reception_count=first & 0x1F, + packet_type=second, + length=length, + ssrc=ssrc, + ) + + if rtcp.ssrc == self.vc.ssrc_rtcp: + user_id = self.vc.user_id + else: + rtcp_ssrc = rtcp.ssrc + if rtcp_ssrc: + rtcp_ssrc -= 3 + user_id = self.vc.audio_ssrcs.get(rtcp_ssrc, None) + + payload = RTCPData( + client=self.vc, + user_id=user_id, + payload_type=payload_type.name, + header=rtcp, + data=data[8:], + ) + + self.vc.client.gw.events.emit('RTCPData', payload) else: - self.log.debug('[%s] [VoiceData] Unsupported Encryption Mode, %s', self.vc, self.vc.mode) - continue - - try: - data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce)) - except Exception: - self.log.debug('[%s] [VoiceData] Failed to decode data from ssrc %s', self.vc, rtp.ssrc) - continue - - # RFC3550 Section 5.1 (Padding) - if rtp.padding: - padding_amount, = struct.unpack_from('>B', data[:-1]) - data = data[-padding_amount:] - - if rtp.extension: - # RFC5285 Section 4.2: One-Byte Header - rtp_extension_header = struct.unpack_from('>BB', data) - if rtp_extension_header == RTP_EXTENSION_ONE_BYTE: - data = data[2:] - - fields_amount, = struct.unpack_from('>H', data) - fields = [] - - offset = 4 - for i in range(fields_amount): - first_byte, = struct.unpack_from('>B', data[offset]) - offset += 1 - - rtp_extension_identifer = first_byte & 0xF - rtp_extension_len = ((first_byte >> 4) & 0xF) + 1 - - # Ignore data if identifer == 15, so skip if this is set as 0 - if rtp_extension_identifer: - fields.append(data[offset:offset + rtp_extension_len]) - - offset += rtp_extension_len - - # skip padding - while data[offset] == 0: + sequence, timestamp, ssrc = struct.unpack_from('>HII', data, 2) + + rtp = RTPHeader( + version=first >> 6, + padding=(first >> 5) & 1, + extension=(first >> 4) & 1, + csrc_count=first & 0x0F, + marker=second >> 7, + payload_type=second & 0x7F, + sequence=sequence, + timestamp=timestamp, + ssrc=ssrc, + ) + + # Check if rtp version is 2 + if rtp.version != 2: + self.log.debug('[%s] [VoiceData] Received an invalid RTP packet version, %s', self.vc, rtp.version) + continue + + payload_type = RTPPayloadTypes.get(rtp.payload_type) + + # Unsupported payload type received + if not payload_type: + self.log.debug('[%s] [VoiceData] Received unsupported payload type, %s', self.vc, rtp.payload_type) + continue + + nonce = bytearray(24) + if self.vc.mode == 'xsalsa20_poly1305_lite': + nonce[:4] = data[-4:] + data = data[:-4] + elif self.vc.mode == 'xsalsa20_poly1305_suffx': + nonce[:24] = data[-24:] + data = data[:-24] + elif self.vc.mode == 'xsalsa20_poly1305': + nonce[:12] = data[:12] + else: + self.log.debug('[%s] [VoiceData] Unsupported Encryption Mode, %s', self.vc, self.vc.mode) + continue + + try: + data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce)) + except Exception: + self.log.debug('[%s] [VoiceData] Failed to decode data from ssrc %s', self.vc, rtp.ssrc) + continue + + # RFC3550 Section 5.1 (Padding) + if rtp.padding: + padding_amount, = struct.unpack_from('>B', data[:-1]) + data = data[-padding_amount:] + + if rtp.extension: + # RFC5285 Section 4.2: One-Byte Header + rtp_extension_header = struct.unpack_from('>BB', data) + if rtp_extension_header == RTP_EXTENSION_ONE_BYTE: + data = data[2:] + + fields_amount, = struct.unpack_from('>H', data) + fields = [] + + offset = 4 + for i in range(fields_amount): + first_byte, = struct.unpack_from('>B', data[offset]) offset += 1 - if len(fields): - fields.append(data[offset:]) - data = b''.join(fields) - else: - data = data[offset:] - - # RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header - # clients send it sometimes, definitely on fresh connects to a server, dunno what to do here - if rtp.marker: - self.log.debug('[%s] [VoiceData] Received RTP data with the marker set, skipping', self.vc) - continue - - payload = VoiceData( - client=self.vc, - payload_type=payload_type.name, - user_id=self.vc.audio_ssrcs.get(rtp.ssrc, None), - rtp=rtp, - data=data, - ) - - self.vc.client.gw.events.emit('VoiceData', payload) + rtp_extension_identifer = first_byte & 0xF + rtp_extension_len = ((first_byte >> 4) & 0xF) + 1 + + # Ignore data if identifer == 15, so skip if this is set as 0 + if rtp_extension_identifer: + fields.append(data[offset:offset + rtp_extension_len]) + + offset += rtp_extension_len + + # skip padding + while data[offset] == 0: + offset += 1 + + if len(fields): + fields.append(data[offset:]) + data = b''.join(fields) + else: + data = data[offset:] + + # RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header + # clients send it sometimes, definitely on fresh connects to a server, dunno what to do here + if rtp.marker: + self.log.debug('[%s] [VoiceData] Received RTP data with the marker set, skipping', self.vc) + continue + + payload = VoiceData( + client=self.vc, + user_id=self.vc.audio_ssrcs.get(rtp.ssrc, None), + payload_type=payload_type.name, + rtp=rtp, + nonce=nonce, + data=data, + ) + + self.vc.client.gw.events.emit('VoiceData', payload) def send(self, data): self.conn.sendto(data, (self.ip, self.port))