diff --git a/.gitignore b/.gitignore index 1b1a6ae..7aea846 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ storage.json .eggs/ .cache/ __pycache__ +.idea/* +*.pyc # Documentation stuff docs/api/ diff --git a/disco/gateway/events.py b/disco/gateway/events.py index 8d5e882..d28c2fc 100644 --- a/disco/gateway/events.py +++ b/disco/gateway/events.py @@ -178,6 +178,48 @@ class GuildCreate(GatewayEvent): return self.unavailable is None +class UserSpeaking(GatewayEvent): + """ + Sent when a user starts or stops speaking in a voice channel + + Attributes + ----- + member : :class:`disco.types.guild.GuildMember` + The member speaking + speaking : `bool` + Is the member speaking? + channel : :class:`disco.types.channel.Channel` + The channel the member is speaking in + """ + member = Field(GuildMember) + speaking = Field(bool) + channel = Field(Channel) + + +class VoiceReceived(GatewayEvent): + """ + Sent when a user starts or stops speaking in a voice channel + + Attributes + ----- + member : :class:`disco.types.guild.GuildMember` + The member speaking + channel : :class:`disco.types.channel.Channel` + The channel the member is speaking in + voice_data : `bytes` + The raw PCM data + timestamp : `int` + The timestamp of the packet + sequence : `int` + The sequence of the packet + """ + member = Field(GuildMember) + channel = Field(Channel) + voice_data = Field(bytes) + timestamp = Field(int) + sequence = Field(int) + + @wraps_model(Guild) class GuildUpdate(GatewayEvent): """ diff --git a/disco/voice/bin/libopus-0.x64.dll b/disco/voice/bin/libopus-0.x64.dll new file mode 100644 index 0000000..a962869 Binary files /dev/null and b/disco/voice/bin/libopus-0.x64.dll differ diff --git a/disco/voice/bin/libopus-0.x86.dll b/disco/voice/bin/libopus-0.x86.dll new file mode 100644 index 0000000..a9eec80 Binary files /dev/null and b/disco/voice/bin/libopus-0.x86.dll differ diff --git a/disco/voice/client.py b/disco/voice/client.py index af480df..bccdb8c 100644 --- a/disco/voice/client.py +++ b/disco/voice/client.py @@ -3,6 +3,7 @@ from __future__ import print_function import gevent import socket import struct +import heapq import time try: @@ -14,9 +15,11 @@ from holster.enum import Enum from holster.emitter import Emitter from disco.gateway.encoding.json import JSONEncoder +from disco.gateway.events import UserSpeaking, VoiceReceived from disco.util.websocket import Websocket from disco.util.logging import LoggingClass from disco.voice.packets import VoiceOPCode +from disco.voice.opus import OpusDecoder from disco.gateway.packets import OPCode VoiceState = Enum( @@ -37,6 +40,13 @@ class VoiceException(Exception): class UDPVoiceClient(LoggingClass): + # Struct format of the packet metadata + _FORMAT = '>HHII' + # Packets should start with b'\x80\x78' (32888 as a big endian ushort) + _CHECK = 32888 + # Some packets will start with b'\x90\x78' (36984) + _CHECK2 = 36984 + def __init__(self, vc): super(UDPVoiceClient, self).__init__() self.vc = vc @@ -56,6 +66,8 @@ class UDPVoiceClient(LoggingClass): self._buffer[0] = 0x80 self._buffer[1] = 0x78 + self._decoders = {} + def send_frame(self, frame, sequence=None, timestamp=None): # Convert the frame to a bytearray frame = bytearray(frame) @@ -78,7 +90,56 @@ class UDPVoiceClient(LoggingClass): def run(self): while True: - self.conn.recvfrom(4096) + data, addr = self.conn.recvfrom(4096) + + # Check the packet size + if len(data) < 13: + raise ValueError('packet is too small: {}'.format(data)) + + # Unpack header + check, seq, ts, ssrc = struct.unpack_from(self._FORMAT, data) + header = data[:12] + buff = data[12:] + + # Check the packet is valid + if check != self._CHECK and check != self._CHECK2: + fmt = 'packet has invalid check bytes: {}' + raise ValueError(fmt.format(data)) + + # Decrypt data + nonce = bytearray(24) + nonce[:12] = header + buff = self.vc.secret_box.decrypt(bytes(buff), bytes(nonce)) + + # Packets starting with b'\x90' need the first 8 bytes ignored + if check == self._CHECK2: + buff = buff[8:] + + if ssrc not in self._decoders: + self._decoders[ssrc] = OpusDecoder(48000, 2) + + # Lookup the SSRC and then get the user + user_id = 0 + member = None + + if ssrc in self.vc.ssrc_lookup: + user_id = int(self.vc.ssrc_lookup[ssrc]) + + member = self.vc.channel.guild.get_member(user_id) + + buff = self._decoders[ssrc].decode(buff) + + obj = VoiceReceived() + obj.member = member + obj.channel = self.vc.channel + obj.voice_data = buff + obj.timestamp = ts + obj.sequence = seq + + self.vc.client.gw.events.emit('VoiceReceived', obj) + + for cb in self.vc.pcm_listeners: + cb(obj) def send(self, data): self.conn.sendto(data, (self.ip, self.port)) @@ -132,6 +193,7 @@ class VoiceClient(LoggingClass): self.packets = Emitter(gevent.spawn) self.packets.on(VoiceOPCode.READY, self.on_voice_ready) self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp) + self.packets.on(VoiceOPCode.SPEAKING, self.on_voice_speaking) # State + state change emitter self.state = VoiceState.DISCONNECTED @@ -150,6 +212,10 @@ class VoiceClient(LoggingClass): self.timestamp = 0 self.update_listener = None + self.ssrc_lookup = {} + + self.pcm_listeners = [] + self.speaking_listeners = [] # Websocket connection self.ws = None @@ -182,6 +248,69 @@ class VoiceClient(LoggingClass): 'd': data, }), self.encoder.OPCODE) + def pipe_voice_into_file(self, member, file_object, buffer_size=0.2): + # Convert buffer_size from seconds to packets. + buffer_size = int(round((buffer_size * 1000) / 20)) + + SAMPLE_RATE = 48000 + SAMPLE_SIZE = 2 + + user_id = member.id + + state = {'last_packet': 0, 'voice_buffer': [], 'last_speaking': 0} + + def listener(vp): + if user_id == (None if vp.member is None else vp.member.id): + state['last_packet'] = vp.timestamp + heapq.heappush(state['voice_buffer'], (vp.timestamp, vp)) + + if len(state['voice_buffer']) > buffer_size: + packet = heapq.heappop(state['voice_buffer'])[1] + + if state['last_packet'] == 0: + delta = 0 + else: + delta = (packet.timestamp - state['last_packet']) + 9600 + + print(delta, state['last_packet']) + + if delta >= 0: # Ignore skipped packets + data = bytearray([0] * delta * 4) + print(packet.voice_data) + data += packet.voice_data + + file_object.write(data) + + state['last_packet'] = packet.timestamp + + def speaking_listener(uid, speaking): + if uid == user_id: + if speaking: + if state['last_speaking'] != 0: + delta = time.time() - state['last_speaking'] + padding = bytearray([0] * int(delta * SAMPLE_RATE) * SAMPLE_SIZE) + file_object.write(padding) + + state['last_speaking'] = 0 + else: + state['last_speaking'] = time.time() + + self.pcm_listeners.append(listener) + self.speaking_listeners.append(speaking_listener) + + def on_voice_speaking(self, data): + self.ssrc_lookup[data['ssrc']] = data['user_id'] + + obj = UserSpeaking() + obj.member = self.channel.guild.get_member(int(data['user_id'])) + obj.speaking = data['speaking'] + obj.channel = self.channel + + for cb in self.speaking_listeners: + cb(int(data['user_id']), data['speaking']) + + self.client.gw.events.emit('UserSpeaking', obj) + def on_voice_ready(self, data): self.log.info('[%s] Recived Voice READY payload, attempting to negotiate voice connection w/ remote', self) self.set_state(VoiceState.CONNECTING) diff --git a/disco/voice/opus.py b/disco/voice/opus.py index a86a4ed..c8497ce 100644 --- a/disco/voice/opus.py +++ b/disco/voice/opus.py @@ -1,3 +1,4 @@ +import os import six import sys import array @@ -54,7 +55,14 @@ class BaseOpus(LoggingClass): @staticmethod def find_library(): if sys.platform == 'win32': - raise Exception('Cannot auto-load opus on Windows, please specify full library path') + _basedir = os.path.dirname(os.path.abspath(__file__)) + _bitness = 'x64' if sys.maxsize > 2 ** 32 else 'x86' + _filename = os.path.join(_basedir, 'bin', 'libopus-0.{}.dll'.format(_bitness)) + + return _filename + + # TFW b1nzy?? + # raise Exception('Cannot auto-load opus on Windows, please specify full library path') return ctypes.util.find_library('opus') @@ -149,4 +157,100 @@ class OpusEncoder(BaseOpus): class OpusDecoder(BaseOpus): - pass + EXPORTED = { + 'opus_decoder_get_size': ([ctypes.c_int], ctypes.c_int), + 'opus_decoder_create': ([ctypes.c_int, ctypes.c_int, c_int_ptr], DecoderStructPtr), + 'opus_packet_get_bandwidth': ([ctypes.c_char_p], ctypes.c_int), + 'opus_packet_get_nb_channels': ([ctypes.c_char_p], ctypes.c_int), + 'opus_packet_get_nb_frames': ([ctypes.c_char_p, ctypes.c_int], ctypes.c_int), + 'opus_packet_get_samples_per_frame': ([ctypes.c_char_p, ctypes.c_int], ctypes.c_int), + 'opus_decoder_get_nb_samples': ([DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int), + 'opus_decode': ([DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32, c_int16_ptr, ctypes.c_int, ctypes.c_int], ctypes.c_int), + 'opus_decode_float': ([DecoderStructPtr, ctypes.c_char_p, ctypes.c_int32, c_float_ptr, ctypes.c_int, ctypes.c_int], ctypes.c_int), + 'opus_decoder_destroy': ([DecoderStructPtr], None), + } + + def __init__(self, sampling_rate, channels, application=Application.AUDIO, library_path=None): + super(OpusDecoder, self).__init__(library_path) + self.sampling_rate = sampling_rate + self.channels = channels + self.application = application + + self._inst = None + + @property + def inst(self): + if not self._inst: + self._inst = self.create() + return self._inst + + def create(self): + ret = ctypes.c_int() + result = self.opus_decoder_create(self.sampling_rate, self.channels, ctypes.byref(ret)) + + if ret.value != 0: + raise Exception('Failed to create opus decoder: {}'.format(ret.value)) + + return result + + def __del__(self): + if hasattr(self, '_inst') and self._inst: + self.opus_decoder_destroy(self._inst) + self._inst = None + + def _packet_get_nb_frames(self, data): + """Gets the number of frames in an Opus packet""" + result = self.opus_packet_get_nb_frames(data, len(data)) + if result < 0: + # log.info('error has happened in packet_get_nb_frames') + raise Exception('Error in opus_packet_get_nb_frames: {}'.format(result)) + + return result + + def _packet_get_nb_channels(self, data): + """Gets the number of channels in an Opus packet""" + result = self.opus_packet_get_nb_channels(data) + if result < 0: + # log.info('error has happened in packet_get_nb_channels') + raise Exception('Error in packet_get_nb_channels: {}'.format(result)) + + return result + + def _packet_get_samples_per_frame(self, data): + """Gets the number of samples per frame from an Opus packet""" + result = self.opus_packet_get_samples_per_frame(data, self.sampling_rate) + if result < 0: + # log.info('error has happened in packet_get_samples_per_frame') + raise Exception('Error in packet_get_samples_per_frame: {}'.format(result)) + + return result + + def decode(self, data, frame_size=None, decode_fec=False): + if frame_size is None: + frames = self._packet_get_nb_frames(data) + samples_per_frame = self._packet_get_samples_per_frame(data) + + # note: channels could be different from self.channels + # this doesn't actually get used in frame_size, but we get + # the value for debugging + channels = self._packet_get_nb_channels(data) + + frame_size = frames * samples_per_frame + + pcm_size = frame_size * self.channels + pcm = (ctypes.c_int16 * pcm_size)() + pcm_ptr = ctypes.cast(pcm, ctypes.POINTER(ctypes.c_int16)) + + decode_fec = int(bool(decode_fec)) + + result = self.opus_decode(self.inst, data, len(data), pcm_ptr, frame_size, decode_fec) + if result < 0: + # log.debug('error happened in decode') + raise Exception('Failed to decode: {}'.format(result)) + + # log.debug('opus decode result: {} (total buf size: {})'.format(result, len(pcm))) + + if six.PY3: + return array.array('h', pcm).tobytes() + else: + return array.array('h', pcm).tostring()