You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

292 lines
9.5 KiB

from __future__ import print_function
import gevent
import time
from holster.enum import Enum
from holster.emitter import Emitter
from disco.gateway.encoding.json import JSONEncoder
from disco.util.websocket import Websocket
from disco.util.logging import LoggingClass
from disco.gateway.packets import OPCode
from disco.voice.packets import VoiceOPCode
from disco.voice.udp import UDPVoiceClient
VoiceState = Enum(
DISCONNECTED=0,
RECONNECTING=1,
AWAITING_ENDPOINT=2,
AUTHENTICATING=3,
AUTHENTICATED=4,
CONNECTING=5,
CONNECTED=6,
VOICE_CONNECTING=7,
VOICE_CONNECTED=8,
)
class VoiceException(Exception):
def __init__(self, msg, client):
self.voice_client = client
super(VoiceException, self).__init__(msg)
class VoiceClient(LoggingClass):
VOICE_GATEWAY_VERSION = 3
SUPPORTED_MODES = {
'xsalsa20_poly1305_lite',
'xsalsa20_poly1305_suffix',
'xsalsa20_poly1305',
}
def __init__(self, channel, encoder=None, max_reconnects=5):
super(VoiceClient, self).__init__()
if not channel.is_voice:
raise ValueError('Cannot spawn a VoiceClient for a non-voice channel')
self.channel = channel
self.client = self.channel.client
self.encoder = encoder or JSONEncoder
self.max_reconnects = max_reconnects
# Bind to some WS packets
self.packets = Emitter()
self.packets.on(VoiceOPCode.HELLO, self.on_voice_hello)
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed)
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp)
# State + state change emitter
self.state = VoiceState.DISCONNECTED
self.state_emitter = Emitter()
# Connection metadata
self.token = None
self.endpoint = None
self.ssrc = None
self.ip = None
self.port = None
self.mode = None
self.udp = None
# Websocket connection
self.ws = None
self._session_id = None
self._reconnects = 0
self._update_listener = None
self._heartbeat_task = None
def __repr__(self):
return u'<VoiceClient {}>'.format(self.channel)
def set_state(self, state):
self.log.debug('[%s] state %s -> %s', self, self.state, state)
prev_state = self.state
self.state = state
self.state_emitter.emit(state, prev_state)
def _connect_and_run(self):
self.ws = Websocket('wss://' + self.endpoint + '/v={}'.format(self.VOICE_GATEWAY_VERSION))
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
self.ws.emitter.on('on_message', self.on_message)
self.ws.run_forever()
def _heartbeat(self, interval):
while True:
self.send(VoiceOPCode.HEARTBEAT, time.time())
gevent.sleep(interval / 1000)
def set_speaking(self, value):
self.send(VoiceOPCode.SPEAKING, {
'speaking': value,
'delay': 0,
})
def send(self, op, data):
self.log.debug('[%s] sending OP %s (data = %s)', self, op, data)
self.ws.send(self.encoder.encode({
'op': op.value,
'd': data,
}), self.encoder.OPCODE)
def on_voice_hello(self, data):
self.log.info('[%s] Recieved Voice HELLO payload, starting heartbeater', self)
self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'] * 0.75)
self.set_state(VoiceState.AUTHENTICATED)
def on_voice_ready(self, data):
self.log.info('[%s] Recived Voice READY payload, attempting to negotiate voice connection w/ remote', self)
self.set_state(VoiceState.CONNECTING)
self.ssrc = data['ssrc']
self.ip = data['ip']
self.port = data['port']
for mode in self.SUPPORTED_MODES:
if mode in data['modes']:
self.mode = mode
self.log.debug('[%s] Selected mode %s', self, mode)
break
else:
raise Exception('Failed to find a supported voice mode')
self.log.debug('[%s] Attempting IP discovery over UDP to %s:%s', self, self.ip, self.port)
self.udp = UDPVoiceClient(self)
ip, port = self.udp.connect(self.ip, self.port)
if not ip:
self.log.error('Failed to discover our IP, perhaps a NAT or firewall is fucking us')
self.disconnect()
return
self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port)
self.send(VoiceOPCode.SELECT_PROTOCOL, {
'protocol': 'udp',
'data': {
'port': port,
'address': ip,
'mode': self.mode,
},
})
def on_voice_resumed(self, data):
self.log.info('[%s] Recieved resumed', self)
self.set_state(VoiceState.CONNECTED)
def on_voice_sdp(self, sdp):
self.log.info('[%s] Recieved session description, connection completed', self)
# Create a secret box for encryption/decryption
self.udp.setup_encryption(bytes(bytearray(sdp['secret_key'])))
# Toggle speaking state so clients learn of our SSRC
self.set_speaking(True)
self.set_speaking(False)
gevent.sleep(0.25)
self.set_state(VoiceState.CONNECTED)
def on_voice_server_update(self, data):
if self.channel.guild_id != data.guild_id or not data.token:
return
if self.token and self.token != data.token:
return
self.log.info('[%s] Recieved VOICE_SERVER_UPDATE (state = %s / endpoint = %s)', self, self.state, data.endpoint)
self.token = data.token
self.set_state(VoiceState.AUTHENTICATING)
self.endpoint = data.endpoint.split(':', 1)[0]
self._connect_and_run()
def on_message(self, msg):
try:
data = self.encoder.decode(msg)
self.packets.emit(VoiceOPCode[data['op']], data['d'])
except Exception:
self.log.exception('Failed to parse voice gateway message: ')
def on_error(self, err):
self.log.error('[%s] Voice websocket error: %s', self, err)
def on_open(self):
if self._session_id:
return self.send(VoiceOPCode.RESUME, {
'server_id': self.channel.guild_id,
'user_id': self.client.state.me.id,
'session_id': self._session_id,
'token': self.token,
})
self._session_id = self.client.gw.session_id
self.send(VoiceOPCode.IDENTIFY, {
'server_id': self.channel.guild_id,
'user_id': self.client.state.me.id,
'session_id': self._session_id,
'token': self.token,
})
def on_close(self, code, reason):
self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects)
if self._heartbeat_task:
self._heartbeat_task.kill()
# If we're not in a connected state, don't try to resume/reconnect
if self.state != VoiceState.CONNECTED:
return
self.log.info('[%s] Attempting Websocket Resumption', self)
self._reconnects += 1
if self.max_reconnects and self._reconnects > self.max_reconnects:
raise VoiceException('Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects))
self.set_state(VoiceState.RECONNECTING)
# Don't resume for these error codes:
if code and 4000 <= code <= 4016:
self._session_id = None
if self.udp and self.udp.connected:
self.udp.disconnect()
wait_time = (self._reconnects - 1) * 5
self.log.info(
'[%s] Will attempt to %s after %s seconds', self, 'resume' if self._session_id else 'reconnect', wait_time)
gevent.sleep(wait_time)
self._connect_and_run()
def connect(self, timeout=5, mute=False, deaf=False):
self.log.debug('[%s] Attempting connection', self)
self.set_state(VoiceState.AWAITING_ENDPOINT)
self._update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update)
self.client.gw.send(OPCode.VOICE_STATE_UPDATE, {
'self_mute': mute,
'self_deaf': deaf,
'guild_id': int(self.channel.guild_id),
'channel_id': int(self.channel.id),
})
if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout):
self.disconnect()
raise VoiceException('Failed to connect to voice', self)
def disconnect(self):
self.log.debug('[%s] disconnect called', self)
self.set_state(VoiceState.DISCONNECTED)
if self._heartbeat_task:
self._heartbeat_task.kill()
self._heartbeat_task = None
if self.ws and self.ws.sock and self.ws.sock.connected:
self.ws.close()
if self.udp and self.udp.connected:
self.udp.disconnect()
self.client.gw.send(OPCode.VOICE_STATE_UPDATE, {
'self_mute': False,
'self_deaf': False,
'guild_id': int(self.channel.guild_id),
'channel_id': None,
})
def send_frame(self, *args, **kwargs):
self.udp.send_frame(*args, **kwargs)
def increment_timestamp(self, *args, **kwargs):
self.udp.increment_timestamp(*args, **kwargs)