Browse Source

Voice Gateway v3 (#80)

* [reqs] add wsaccel to performance reqs

* Support voice gateway v3
pull/82/head
Andrei Zbikowski 7 years ago
committed by GitHub
parent
commit
88504f7aa6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 223
      disco/voice/client.py
  2. 5
      disco/voice/packets.py
  3. 13
      disco/voice/player.py
  4. 115
      disco/voice/udp.py
  5. 4
      examples/music.py
  6. 2
      setup.py

223
disco/voice/client.py

@ -1,32 +1,28 @@
from __future__ import print_function
import gevent
import socket
import struct
import time
try:
import nacl.secret
except ImportError:
print('WARNING: nacl is not installed, voice support is disabled')
from holster.enum import Enum
from holster.emitter import Emitter
from disco.gateway.encoding.json import JSONEncoder
from disco.util.websocket import Websocket
from disco.util.logging import LoggingClass
from disco.voice.packets import VoiceOPCode
from disco.gateway.packets import OPCode
from disco.voice.packets import VoiceOPCode
from disco.voice.udp import UDPVoiceClient
VoiceState = Enum(
DISCONNECTED=0,
AWAITING_ENDPOINT=1,
AUTHENTICATING=2,
CONNECTING=3,
CONNECTED=4,
VOICE_CONNECTING=5,
VOICE_CONNECTED=6,
RECONNECTING=1,
AWAITING_ENDPOINT=2,
AUTHENTICATING=3,
AUTHENTICATED=4,
CONNECTING=5,
CONNECTED=6,
VOICE_CONNECTING=7,
VOICE_CONNECTED=8,
)
@ -36,98 +32,15 @@ class VoiceException(Exception):
super(VoiceException, self).__init__(msg)
class UDPVoiceClient(LoggingClass):
def __init__(self, vc):
super(UDPVoiceClient, self).__init__()
self.vc = vc
# The underlying UDP socket
self.conn = None
# Connection information
self.ip = None
self.port = None
self.run_task = None
self.connected = False
# Buffer used for encoding/sending frames
self._buffer = bytearray(24)
self._buffer[0] = 0x80
self._buffer[1] = 0x78
def send_frame(self, frame, sequence=None, timestamp=None):
# Convert the frame to a bytearray
frame = bytearray(frame)
# Pack the rtc header into our buffer
struct.pack_into('>H', self._buffer, 2, sequence or self.vc.sequence)
struct.pack_into('>I', self._buffer, 4, timestamp or self.vc.timestamp)
struct.pack_into('>i', self._buffer, 8, self.vc.ssrc)
if self.vc.mode == 'xsalsa20_poly1305_suffix':
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
raw = self.vc.secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce
else:
# Now encrypt the payload with the nonce as a header
raw = self.vc.secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext
# Send the header (sans nonce padding) plus the payload
self.send(self._buffer[:12] + raw)
# Increment our sequence counter
self.vc.sequence += 1
if self.vc.sequence >= 65535:
self.vc.sequence = 0
def run(self):
while True:
self.conn.recvfrom(4096)
def send(self, data):
self.conn.sendto(data, (self.ip, self.port))
def disconnect(self):
self.run_task.kill()
def connect(self, host, port, timeout=10, addrinfo=None):
self.ip = socket.gethostbyname(host)
self.port = port
self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if addrinfo:
ip, port = addrinfo
else:
# Send discovery packet
packet = bytearray(70)
struct.pack_into('>I', packet, 0, self.vc.ssrc)
self.send(packet)
# Wait for a response
try:
data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout)
except gevent.Timeout:
return (None, None)
# Read IP and port
ip = str(data[4:]).split('\x00', 1)[0]
port = struct.unpack('<H', data[-2:])[0]
# Spawn read thread so we don't max buffers
self.connected = True
self.run_task = gevent.spawn(self.run)
return (ip, port)
class VoiceClient(LoggingClass):
VOICE_GATEWAY_VERSION = 3
SUPPORTED_MODES = {
'xsalsa20_poly1305_suffix',
'xsalsa20_poly1305',
}
def __init__(self, channel, encoder=None):
def __init__(self, channel, encoder=None, max_reconnects=5):
super(VoiceClient, self).__init__()
if not channel.is_voice:
@ -136,10 +49,13 @@ class VoiceClient(LoggingClass):
self.channel = channel
self.client = self.channel.client
self.encoder = encoder or JSONEncoder
self.max_reconnects = max_reconnects
# Bind to some WS packets
self.packets = Emitter()
self.packets.on(VoiceOPCode.HELLO, self.on_voice_hello)
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed)
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp)
# State + state change emitter
@ -152,18 +68,15 @@ class VoiceClient(LoggingClass):
self.ssrc = None
self.port = None
self.mode = None
self.secret_box = None
self.udp = None
# Voice data state
self.sequence = 0
self.timestamp = 0
self.update_listener = None
# Websocket connection
self.ws = None
self.heartbeat_task = None
self._session_id = None
self._reconnects = 0
self._update_listener = None
self._heartbeat_task = None
def __repr__(self):
return u'<VoiceClient {}>'.format(self.channel)
@ -174,9 +87,17 @@ class VoiceClient(LoggingClass):
self.state = state
self.state_emitter.emit(state, prev_state)
def heartbeat(self, interval):
def _connect_and_run(self):
self.ws = Websocket('wss://' + self.endpoint + '/v={}'.format(self.VOICE_GATEWAY_VERSION))
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
self.ws.emitter.on('on_message', self.on_message)
self.ws.run_forever()
def _heartbeat(self, interval):
while True:
self.send(VoiceOPCode.HEARTBEAT, time.time() * 1000)
self.send(VoiceOPCode.HEARTBEAT, time.time())
gevent.sleep(interval / 1000)
def set_speaking(self, value):
@ -192,6 +113,11 @@ class VoiceClient(LoggingClass):
'd': data,
}), self.encoder.OPCODE)
def on_voice_hello(self, data):
self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self)
self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'] * 0.75)
self.set_state(VoiceState.AUTHENTICATED)
def on_voice_ready(self, data):
self.log.info('[%s] Recived Voice READY payload, attempting to negotiate voice connection w/ remote', self)
self.set_state(VoiceState.CONNECTING)
@ -206,8 +132,6 @@ class VoiceClient(LoggingClass):
else:
raise Exception('Failed to find a supported voice mode')
self.heartbeat_task = gevent.spawn(self.heartbeat, data['heartbeat_interval'])
self.log.debug('[%s] Attempting IP discovery over UDP to %s:%s', self, self.endpoint, self.port)
self.udp = UDPVoiceClient(self)
ip, port = self.udp.connect(self.endpoint, self.port)
@ -227,11 +151,15 @@ class VoiceClient(LoggingClass):
},
})
def on_voice_resumed(self, data):
self.log.info('[%s] Recieved resumed', self)
self.set_state(VoiceState.CONNECTED)
def on_voice_sdp(self, sdp):
self.log.info('[%s] Recieved session description, connection completed', self)
# Create a secret box for encryption/decryption
self.secret_box = nacl.secret.SecretBox(bytes(bytearray(sdp['secret_key'])))
self.udp.setup_encryption(bytes(bytearray(sdp['secret_key'])))
# Toggle speaking state so clients learn of our SSRC
self.set_speaking(True)
@ -253,12 +181,8 @@ class VoiceClient(LoggingClass):
self.set_state(VoiceState.AUTHENTICATING)
self.endpoint = data.endpoint.split(':', 1)[0]
self.ws = Websocket('wss://' + self.endpoint)
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
self.ws.emitter.on('on_message', self.on_message)
self.ws.run_forever()
self._connect_and_run()
def on_message(self, msg):
try:
@ -271,25 +195,60 @@ class VoiceClient(LoggingClass):
self.log.error('[%s] Voice websocket error: %s', self, err)
def on_open(self):
if self._session_id:
return self.send(VoiceOPCode.RESUME, {
'server_id': self.channel.guild_id,
'user_id': self.client.state.me.id,
'session_id': self._session_id,
'token': self.token,
})
self._session_id = self.client.gw.session_id
self.send(VoiceOPCode.IDENTIFY, {
'server_id': self.channel.guild_id,
'user_id': self.client.state.me.id,
'session_id': self.client.gw.session_id,
'session_id': self._session_id,
'token': self.token,
})
def on_close(self, code, error):
self.log.warning('[%s] Voice websocket disconnected (%s, %s)', self, code, error)
def on_close(self, code, reason):
self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects)
if self._heartbeat_task:
self._heartbeat_task.kill()
# If we're not in a connected state, don't try to resume/reconnect
if self.state != VoiceState.CONNECTED:
return
self.log.info('[%s] Attempting Websocket Resumption', self)
self._reconnects += 1
if self.state == VoiceState.CONNECTED:
self.log.info('Attempting voice reconnection')
self.connect()
if self.max_reconnects and self._reconnects > self.max_reconnects:
raise VoiceException('Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects))
self.set_state(VoiceState.RECONNECTING)
# Don't resume for these error codes:
if code and 4000 <= code <= 4016:
self._session_id = None
if self.udp and self.udp.connected:
self.udp.disconnect()
wait_time = (self._reconnects - 1) * 5
self.log.info(
'[%s] Will attempt to %s after %s seconds', self, 'resume' if self._session_id else 'reconnect', wait_time)
gevent.sleep(wait_time)
self._connect_and_run()
def connect(self, timeout=5, mute=False, deaf=False):
self.log.debug('[%s] Attempting connection', self)
self.set_state(VoiceState.AWAITING_ENDPOINT)
self.update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update)
self._update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update)
self.client.gw.send(OPCode.VOICE_STATE_UPDATE, {
'self_mute': mute,
@ -299,17 +258,18 @@ class VoiceClient(LoggingClass):
})
if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout):
self.disconnect()
raise VoiceException('Failed to connect to voice', self)
def disconnect(self):
self.log.debug('[%s] disconnect called', self)
self.set_state(VoiceState.DISCONNECTED)
if self.heartbeat_task:
self.heartbeat_task.kill()
self.heartbeat_task = None
if self._heartbeat_task:
self._heartbeat_task.kill()
self._heartbeat_task = None
if self.ws and self.ws.sock.connected:
if self.ws and self.ws.sock and self.ws.sock.connected:
self.ws.close()
if self.udp and self.udp.connected:
@ -324,3 +284,6 @@ class VoiceClient(LoggingClass):
def send_frame(self, *args, **kwargs):
self.udp.send_frame(*args, **kwargs)
def increment_timestamp(self, *args, **kwargs):
self.udp.increment_timestamp(*args, **kwargs)

5
disco/voice/packets.py

@ -7,4 +7,9 @@ VoiceOPCode = Enum(
HEARTBEAT=3,
SESSION_DESCRIPTION=4,
SPEAKING=5,
HEARTBEAT_ACK=6,
RESUME=7,
HELLO=8,
RESUMED=9,
CLIENT_DISCONNECT=13,
)

13
disco/voice/player.py

@ -6,11 +6,10 @@ from holster.emitter import Emitter
from disco.voice.client import VoiceState
from disco.voice.queue import PlayableQueue
from disco.util.logging import LoggingClass
MAX_TIMESTAMP = 4294967295
class Player(object):
class Player(LoggingClass):
Events = Enum(
'START_PLAY',
'STOP_PLAY',
@ -20,6 +19,7 @@ class Player(object):
)
def __init__(self, client, queue=None):
super(Player, self).__init__()
self.client = client
# Queue contains playable items
@ -92,12 +92,11 @@ class Player(object):
return
if self.client.state != VoiceState.CONNECTED:
self.client.state_emitter.wait(VoiceState.CONNECTED)
self.client.state_emitter.once(VoiceState.CONNECTED, timeout=30)
# Send the voice frame and increment our timestamp
self.client.send_frame(frame)
self.client.timestamp += item.samples_per_frame
if self.client.timestamp > MAX_TIMESTAMP:
self.client.timestamp = 0
self.client.increment_timestamp(item.samples_per_frame)
frame = item.next_frame()
if frame is None:

115
disco/voice/udp.py

@ -0,0 +1,115 @@
import struct
import socket
import gevent
try:
import nacl.secret
except ImportError:
print('WARNING: nacl is not installed, voice support is disabled')
from disco.util.logging import LoggingClass
MAX_TIMESTAMP = 4294967295
MAX_SEQUENCE = 65535
class UDPVoiceClient(LoggingClass):
def __init__(self, vc):
super(UDPVoiceClient, self).__init__()
self.vc = vc
# The underlying UDP socket
self.conn = None
# Connection information
self.ip = None
self.port = None
self.connected = False
# Voice information
self.sequence = 0
self.timestamp = 0
self._run_task = None
self._secret_box = None
# Buffer used for encoding/sending frames
self._buffer = bytearray(24)
self._buffer[0] = 0x80
self._buffer[1] = 0x78
def increment_timestamp(self, by):
self.timestamp += by
if self.timestamp > MAX_TIMESTAMP:
self.timestamp = 0
def setup_encryption(self, encryption_key):
self._secret_box = nacl.secret.SecretBox(encryption_key)
def send_frame(self, frame, sequence=None, timestamp=None, incr_timestamp=None):
# Convert the frame to a bytearray
frame = bytearray(frame)
# Pack the rtc header into our buffer
struct.pack_into('>H', self._buffer, 2, sequence or self.sequence)
struct.pack_into('>I', self._buffer, 4, timestamp or self.timestamp)
struct.pack_into('>i', self._buffer, 8, self.vc.ssrc)
if self.vc.mode == 'xsalsa20_poly1305_suffix':
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
raw = self._secret_box.encrypt(bytes(frame), nonce).ciphertext + nonce
else:
# Now encrypt the payload with the nonce as a header
raw = self._secret_box.encrypt(bytes(frame), bytes(self._buffer)).ciphertext
# Send the header (sans nonce padding) plus the payload
self.send(self._buffer[:12] + raw)
# Increment our sequence counter
self.sequence += 1
if self.sequence >= MAX_SEQUENCE:
self.sequence = 0
# Increment our timestamp (if applicable)
if incr_timestamp:
self.timestamp += incr_timestamp
def run(self):
while True:
self.conn.recvfrom(4096)
def send(self, data):
self.conn.sendto(data, (self.ip, self.port))
def disconnect(self):
self._run_task.kill()
def connect(self, host, port, timeout=10, addrinfo=None):
self.ip = socket.gethostbyname(host)
self.port = port
self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if addrinfo:
ip, port = addrinfo
else:
# Send discovery packet
packet = bytearray(70)
struct.pack_into('>I', packet, 0, self.vc.ssrc)
self.send(packet)
# Wait for a response
try:
data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout)
except gevent.Timeout:
return (None, None)
# Read IP and port
ip = str(data[4:]).split('\x00', 1)[0]
port = struct.unpack('<H', data[-2:])[0]
# Spawn read thread so we don't max buffers
self.connected = True
self._run_task = gevent.spawn(self.run)
return (ip, port)

4
examples/music.py

@ -50,3 +50,7 @@ class MusicPlugin(Plugin):
@Plugin.command('resume')
def on_resume(self, event):
self.get_player(event.guild.id).resume()
@Plugin.command('kill')
def on_kill(self, event):
self.get_player(event.guild.id).client.ws.sock.shutdown()

2
setup.py

@ -13,7 +13,7 @@ extras_require = {
'voice': ['pynacl==1.2.1'],
'http': ['flask==0.12.2'],
'yaml': ['pyyaml==3.12'],
'music': ['youtube_dl>=2018.1.14'],
'music': ['youtube_dl>=2018.1.21'],
'performance': [
'erlpack==0.3.2' if sys.version_info.major == 2 else 'earl-etf==2.1.2',
'ujson==1.35',

Loading…
Cancel
Save