Browse Source

voice receive support (#101)

* initial voice

* some updates

fixed it

* some style updates

replaced the speaking when joining with an actual client_connect event

* please pass

* replaced channel with client in VoiceData and VoiceSpeaking events

Also changed some voice sending stuff

* added more debug logs

* fixed version not being set on the voice gateway
pull/109/head
Dan 7 years ago
committed by Andrei Zbikowski
parent
commit
9cc7ca7000
  1. 104
      disco/voice/client.py
  2. 2
      disco/voice/packets.py
  3. 182
      disco/voice/udp.py

104
disco/voice/client.py

@ -3,6 +3,8 @@ from __future__ import print_function
import gevent
import time
from collections import namedtuple
from holster.enum import Enum
from holster.emitter import Emitter
@ -11,7 +13,13 @@ from disco.util.websocket import Websocket
from disco.util.logging import LoggingClass
from disco.gateway.packets import OPCode
from disco.voice.packets import VoiceOPCode
from disco.voice.udp import UDPVoiceClient
from disco.voice.udp import AudioCodecs, PayloadTypes, UDPVoiceClient
SpeakingCodes = Enum(
NONE=0,
VOICE=1 << 0,
SOUNDSHARE=1 << 1,
)
VoiceState = Enum(
DISCONNECTED=0,
@ -25,6 +33,13 @@ VoiceState = Enum(
VOICE_CONNECTED=8,
)
VoiceSpeaking = namedtuple('VoiceSpeaking', [
'client',
'user_id',
'speaking',
'soundshare',
])
class VoiceException(Exception):
def __init__(self, msg, client):
@ -33,7 +48,7 @@ class VoiceException(Exception):
class VoiceClient(LoggingClass):
VOICE_GATEWAY_VERSION = 3
VOICE_GATEWAY_VERSION = 4
SUPPORTED_MODES = {
'xsalsa20_poly1305_lite',
@ -58,6 +73,10 @@ class VoiceClient(LoggingClass):
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed)
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp)
self.packets.on(VoiceOPCode.SPEAKING, self.on_voice_speaking)
self.packets.on(VoiceOPCode.CLIENT_CONNECT, self.on_voice_client_connect)
self.packets.on(VoiceOPCode.CLIENT_DISCONNECT, self.on_voice_client_disconnect)
self.packets.on(VoiceOPCode.CODECS, self.on_voice_codecs)
# State + state change emitter
self.state = VoiceState.DISCONNECTED
@ -71,6 +90,9 @@ class VoiceClient(LoggingClass):
self.port = None
self.mode = None
self.udp = None
self.audio_codec = None
self.video_codec = None
self.transport_id = None
# Websocket connection
self.ws = None
@ -80,6 +102,9 @@ class VoiceClient(LoggingClass):
self._update_listener = None
self._heartbeat_task = None
# SSRCs
self.audio_ssrcs = {}
def __repr__(self):
return u'<VoiceClient {}>'.format(self.channel)
@ -90,7 +115,7 @@ class VoiceClient(LoggingClass):
self.state_emitter.emit(state, prev_state)
def _connect_and_run(self):
self.ws = Websocket('wss://' + self.endpoint + '/v={}'.format(self.VOICE_GATEWAY_VERSION))
self.ws = Websocket('wss://' + self.endpoint + '/?v={}'.format(self.VOICE_GATEWAY_VERSION))
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
@ -102,10 +127,17 @@ class VoiceClient(LoggingClass):
self.send(VoiceOPCode.HEARTBEAT, time.time())
gevent.sleep(interval / 1000)
def set_speaking(self, value):
def set_speaking(self, voice=False, soundshare=False, delay=0):
value = SpeakingCodes.NONE.value
if voice:
value |= SpeakingCodes.VOICE.value
if soundshare:
value |= SpeakingCodes.SOUNDSHARE.value
self.send(VoiceOPCode.SPEAKING, {
'speaking': value,
'delay': 0,
'delay': delay,
'ssrc': self.ssrc,
})
def send(self, op, data):
@ -115,9 +147,27 @@ class VoiceClient(LoggingClass):
'd': data,
}), self.encoder.OPCODE)
def on_voice_client_connect(self, data):
self.audio_ssrcs[data['audio_ssrc']] = data['user_id']
# ignore data['voice_ssrc'] for now
def on_voice_client_disconnect(self, data):
for ssrc in self.audio_ssrcs.keys():
if self.audio_ssrcs[ssrc] == data['user_id']:
del self.audio_ssrcs[ssrc]
break
def on_voice_codecs(self, data):
self.audio_codec = data['audio_codec']
self.video_codec = data['video_codec']
self.transport_id = data['media_session_id']
# Set the UDP's RTP Audio Header's Payload Type
self.udp.set_audio_codec(data['audio_codec'])
def on_voice_hello(self, data):
self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self)
self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'] * 0.75)
self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'])
self.set_state(VoiceState.AUTHENTICATED)
def on_voice_ready(self, data):
@ -144,6 +194,17 @@ class VoiceClient(LoggingClass):
self.disconnect()
return
codecs = []
# Sending discord our available codecs and rtp payload type for it
for idx, codec in enumerate(AudioCodecs):
codecs.append({
'name': codec,
'type': 'audio',
'priority': (idx + 1) * 1000,
'payload_type': PayloadTypes.get(codec).value,
})
self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port)
self.send(VoiceOPCode.SELECT_PROTOCOL, {
'protocol': 'udp',
@ -152,6 +213,12 @@ class VoiceClient(LoggingClass):
'address': ip,
'mode': self.mode,
},
'codecs': codecs,
})
self.send(VoiceOPCode.CLIENT_CONNECT, {
'audio_ssrc': self.ssrc,
'video_ssrc': 0,
'rtx_ssrc': 0,
})
def on_voice_resumed(self, data):
@ -161,14 +228,17 @@ class VoiceClient(LoggingClass):
def on_voice_sdp(self, sdp):
self.log.info('[%s] Recieved session description, connection completed', self)
self.mode = sdp['mode']
self.audio_codec = sdp['audio_codec']
self.video_codec = sdp['video_codec']
self.transport_id = sdp['media_session_id']
# Set the UDP's RTP Audio Header's Payload Type
self.udp.set_audio_codec(sdp['audio_codec'])
# Create a secret box for encryption/decryption
self.udp.setup_encryption(bytes(bytearray(sdp['secret_key'])))
# Toggle speaking state so clients learn of our SSRC
self.set_speaking(True)
self.set_speaking(False)
gevent.sleep(0.25)
self.set_state(VoiceState.CONNECTED)
def on_voice_server_update(self, data):
@ -187,6 +257,18 @@ class VoiceClient(LoggingClass):
self._connect_and_run()
def on_voice_speaking(self, data):
self.audio_ssrcs[data['ssrc']] = data['user_id']
payload = VoiceSpeaking(
client=self,
user_id=data['user_id'],
speaking=bool(data['speaking'] & SpeakingCodes.VOICE.value),
soundshare=bool(data['speaking'] & SpeakingCodes.SOUNDSHARE.value),
)
self.client.gw.events.emit('VoiceSpeaking', payload)
def on_message(self, msg):
try:
data = self.encoder.decode(msg)

2
disco/voice/packets.py

@ -11,5 +11,7 @@ VoiceOPCode = Enum(
RESUME=7,
HELLO=8,
RESUMED=9,
CLIENT_CONNECT=12,
CLIENT_DISCONNECT=13,
CODECS=14,
)

182
disco/voice/udp.py

@ -2,16 +2,48 @@ import struct
import socket
import gevent
from collections import namedtuple
try:
import nacl.secret
except ImportError:
print('WARNING: nacl is not installed, voice support is disabled')
from holster.enum import Enum
from disco.util.logging import LoggingClass
AudioCodecs = ('opus',)
PayloadTypes = Enum(OPUS=0x78)
MAX_UINT32 = 4294967295
MAX_SEQUENCE = 65535
RTP_HEADER_VERSION = 0x80 # Only RTP Version is set here (value of 2 << 6)
RTP_EXTENSION_ONE_BYTE = (0xBE, 0xDE)
RTPHeader = namedtuple('RTPHeader', [
'version',
'padding',
'extension',
'csrc_count',
'marker',
'payload_type',
'sequence',
'timestamp',
'ssrc',
])
VoiceData = namedtuple('VoiceData', [
'client',
'user_id',
'payload_type',
'rtp',
'data',
])
class UDPVoiceClient(LoggingClass):
def __init__(self, vc):
@ -34,10 +66,17 @@ class UDPVoiceClient(LoggingClass):
self._run_task = None
self._secret_box = None
# Buffer used for encoding/sending frames
self._buffer = bytearray(24)
self._buffer[0] = 0x80
self._buffer[1] = 0x78
# RTP Header
self._rtp_audio_header = bytearray(12)
self._rtp_audio_header[0] = RTP_HEADER_VERSION
def set_audio_codec(self, codec):
ptype = PayloadTypes.get(codec)
if ptype:
self._rtp_audio_header[1] = ptype.value
self.log.debug('[%s] Set UDP\'s Audio Codec to %s, RTP payload type %s', self.vc, ptype.name, ptype.value)
else:
raise Exception('The voice codec, {}, isn\'t supported.'.format(codec))
def increment_timestamp(self, by):
self.timestamp += by
@ -52,27 +91,40 @@ class UDPVoiceClient(LoggingClass):
frame = bytearray(frame)
# Pack the rtc header into our buffer
struct.pack_into('>H', self._buffer, 2, sequence or self.sequence)
struct.pack_into('>I', self._buffer, 4, timestamp or self.timestamp)
struct.pack_into('>i', self._buffer, 8, self.vc.ssrc)
struct.pack_into('>H', self._rtp_audio_header, 2, sequence or self.sequence)
struct.pack_into('>I', self._rtp_audio_header, 4, timestamp or self.timestamp)
struct.pack_into('>i', self._rtp_audio_header, 8, self.vc.ssrc)
if self.vc.mode == 'xsalsa20_poly1305_lite':
# Use an incrementing number as a nonce, only first 4 bytes of the nonce is padded on
self._nonce += 1
if self._nonce > MAX_UINT32:
self._nonce = 0
nonce = bytearray(24)
struct.pack_into('>I', nonce, 0, self._nonce)
raw = self._secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce[:4]
nonce_padding = nonce[:4]
elif self.vc.mode == 'xsalsa20_poly1305_suffix':
# Generate a nonce
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
raw = self._secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce
nonce_padding = nonce
elif self.vc.mode == 'xsalsa20_poly1305':
# Nonce is the header
nonce = bytearray(24)
nonce[:12] = self._rtp_audio_header
nonce_padding = None
else:
# Now encrypt the payload with the nonce as a header
raw = self._secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext
raise Exception('The voice mode, {}, isn\'t supported.'.format(self.vc.mode))
# Encrypt the payload with the nonce
payload = self._secret_box.encrypt(bytes(frame), bytes(nonce)).ciphertext
# Pad the payload with the nonce, if applicable
if nonce_padding:
payload += nonce_padding
# Send the header (sans nonce padding) plus the payload
self.send(self._buffer[:12] + raw)
self.send(self._rtp_audio_header + payload)
# Increment our sequence counter
self.sequence += 1
@ -85,7 +137,111 @@ class UDPVoiceClient(LoggingClass):
def run(self):
while True:
self.conn.recvfrom(4096)
data, addr = self.conn.recvfrom(4096)
# Data cannot be less than the bare minimum, just ignore
if len(data) <= 12:
self.log.debug('[%s] [VoiceData] Received voice data under 13 bytes', self.vc)
continue
first, second, sequence, timestamp, ssrc = struct.unpack_from('>BBHII', data)
rtp = RTPHeader(
version=first >> 6,
padding=(first >> 5) & 1,
extension=(first >> 4) & 1,
csrc_count=first & 0x0F,
marker=second >> 7,
payload_type=second & 0x7F,
sequence=sequence,
timestamp=timestamp,
ssrc=ssrc,
)
# Check if rtp version is 2
if rtp.version != 2:
self.log.debug('[%s] [VoiceData] Received an invalid RTP packet version, %s', self.vc, rtp.version)
continue
payload_type = PayloadTypes.get(rtp.payload_type)
# Unsupported payload type received
if not payload_type:
self.log.debug('[%s] [VoiceData] Received unsupported payload type, %s', self.vc, rtp.payload_type)
continue
nonce = bytearray(24)
if self.vc.mode == 'xsalsa20_poly1305_lite':
nonce[:4] = data[-4:]
data = data[:-4]
elif self.vc.mode == 'xsalsa20_poly1305_suffx':
nonce[:24] = data[-24:]
data = data[:-24]
elif self.vc.mode == 'xsalsa20_poly1305':
nonce[:12] = data[:12]
else:
self.log.debug('[%s] [VoiceData] Unsupported Encryption Mode, %s', self.vc, self.vc.mode)
continue
try:
data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce))
except Exception:
self.log.debug('[%s] [VoiceData] Failed to decode data from ssrc %s', self.vc, rtp.ssrc)
continue
# RFC3550 Section 5.1 (Padding)
if rtp.padding:
padding_amount, = struct.unpack_from('>B', data[:-1])
data = data[-padding_amount:]
if rtp.extension:
# RFC5285 Section 4.2: One-Byte Header
rtp_extension_header = struct.unpack_from('>BB', data)
if rtp_extension_header == RTP_EXTENSION_ONE_BYTE:
data = data[2:]
fields_amount, = struct.unpack_from('>H', data)
fields = []
offset = 4
for i in range(fields_amount):
first_byte, = struct.unpack_from('>B', data[offset])
offset += 1
rtp_extension_identifer = first_byte & 0xF
rtp_extension_len = ((first_byte >> 4) & 0xF) + 1
# Ignore data if identifer == 15, so skip if this is set as 0
if rtp_extension_identifer:
fields.append(data[offset:offset + rtp_extension_len])
offset += rtp_extension_len
# skip padding
while data[offset] == 0:
offset += 1
if len(fields):
fields.append(data[offset:])
data = b''.join(fields)
else:
data = data[offset:]
# RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header
# clients send it sometimes, definitely on fresh connects to a server, dunno what to do here
if rtp.marker:
self.log.debug('[%s] [VoiceData] Received RTP data with the marker set, skipping', self.vc)
continue
payload = VoiceData(
client=self.vc,
payload_type=payload_type.name,
user_id=self.vc.audio_ssrcs.get(rtp.ssrc, None),
rtp=rtp,
data=data,
)
self.vc.client.gw.events.emit('VoiceData', payload)
def send(self, data):
self.conn.sendto(data, (self.ip, self.port))

Loading…
Cancel
Save