From 250d7d0d8fba571788a8aa4b4b355b5c4cd91704 Mon Sep 17 00:00:00 2001 From: Andrei Zbikowski Date: Thu, 16 May 2019 15:55:56 -0700 Subject: [PATCH] [Feature] Telecom Voice (#135) * Nuke existing voice support in preparation for telecom support * Telecom voice v1 * ytdl + event pipe --- disco/state.py | 16 -- disco/types/channel.py | 12 -- disco/voice.py | 176 +++++++++++++++ disco/voice/__init__.py | 5 - disco/voice/client.py | 458 ---------------------------------------- disco/voice/opus.py | 148 ------------- disco/voice/packets.py | 14 -- disco/voice/playable.py | 357 ------------------------------- disco/voice/player.py | 123 ----------- disco/voice/queue.py | 52 ----- disco/voice/udp.py | 345 ------------------------------ examples/music.py | 74 +++---- setup.py | 2 +- tests/imports.py | 5 - tests/voice/__init__.py | 0 tests/voice/queue.py | 66 ------ 16 files changed, 202 insertions(+), 1651 deletions(-) create mode 100644 disco/voice.py delete mode 100644 disco/voice/__init__.py delete mode 100644 disco/voice/client.py delete mode 100644 disco/voice/opus.py delete mode 100644 disco/voice/packets.py delete mode 100644 disco/voice/playable.py delete mode 100644 disco/voice/player.py delete mode 100644 disco/voice/queue.py delete mode 100644 disco/voice/udp.py delete mode 100644 tests/voice/__init__.py delete mode 100644 tests/voice/queue.py diff --git a/disco/state.py b/disco/state.py index d2352a5..b78e2ef 100644 --- a/disco/state.py +++ b/disco/state.py @@ -9,7 +9,6 @@ from disco.util.config import Config from disco.util.string import underscore from disco.util.hashmap import HashMap, DefaultHashMap from disco.util.emitter import Priority -from disco.voice.client import VoiceState class StackMessage(namedtuple('StackMessage', ['id', 'channel_id', 'author_id'])): @@ -274,21 +273,6 @@ class State(object): del self.voice_states[expired_voice_state.session_id] self.voice_states[event.state.session_id] = event.state - if event.state.user_id != self.me.id: - return - - server_id = event.state.guild_id or event.state.channel_id - if server_id in self.voice_clients: - voice_client = self.voice_clients[server_id] - - voice_client.channel_id = event.state.channel_id - if not event.state.channel_id: - voice_client.disconnect() - return - - if voice_client.token: - voice_client.set_state(VoiceState.CONNECTED) - def on_guild_member_add(self, event): if event.member.user.id not in self.users: self.users[event.member.user.id] = event.member.user diff --git a/disco/types/channel.py b/disco/types/channel.py index f656e3a..b446663 100644 --- a/disco/types/channel.py +++ b/disco/types/channel.py @@ -361,18 +361,6 @@ class Channel(SlottedModel, Permissible): """ self.client.api.channels_typing(self.id) - def connect(self, *args, **kwargs): - """ - Connect to this channel over voice. - """ - from disco.voice.client import VoiceClient - assert self.is_voice, 'Channel must support voice to connect' - - server_id = self.guild_id or self.id - vc = self.client.state.voice_clients.get(server_id) or VoiceClient(self.client, server_id, is_dm=self.is_dm) - - return vc.connect(self.id, *args, **kwargs) - def create_overwrite(self, *args, **kwargs): """ Creates a `PermissionOverwrite` for this channel. See diff --git a/disco/voice.py b/disco/voice.py new file mode 100644 index 0000000..5497a13 --- /dev/null +++ b/disco/voice.py @@ -0,0 +1,176 @@ +import os +import json + +import gevent +from gevent.os import make_nonblocking, nb_read + +from disco.gateway.packets import OPCode +from disco.types.channel import Channel +from disco.util.emitter import Emitter +from telecom import TelecomConnection, AvConvPlayable + +try: + import youtube_dl + ytdl = youtube_dl.YoutubeDL() +except ImportError: + ytdl = None + + +class YoutubeDLPlayable(AvConvPlayable): + def __init__(self, url): + url = next(self.from_url(url), None) + if not url: + raise Exception('No result found for URL {}'.format(url)) + super(YoutubeDLPlayable, self).__init__(url) + + @classmethod + def from_url(cls, url): + assert ytdl is not None, 'YoutubeDL isn\'t installed' + + results = ytdl.extract_info(url, download=False) + if 'entries' not in results: + results = [results] + else: + results = results['entries'] + + for result in results: + audio_formats = [fmt for fmt in result['formats'] if fmt['vcodec'] == 'none' and fmt['acodec'] == 'opus'] + if not audio_formats: + raise Exception("Couldn't find valid audio format for {}".format(url)) + + best_audio_format = sorted(audio_formats, key=lambda i: i['abr'], reverse=True)[0] + yield AvConvPlayable(best_audio_format['url']) + + +class VoiceConnection(object): + def __init__(self, client, guild_id, enable_events=False): + self.client = client + self.guild_id = guild_id + self.channel_id = None + self.enable_events = enable_events + self._conn = None + self._voice_server_update_listener = self.client.events.on( + 'VoiceServerUpdate', + self._on_voice_server_update, + ) + self._event_reader_greenlet = None + + self.events = None + if self.enable_events: + self.events = Emitter() + + self._mute = False + self._deaf = False + + def __del__(self): + if self._event_reader_greenlet: + self._event_reader_greenlet.kill() + + @property + def mute(self): + return self._mute + + @property + def deaf(self): + return self._deaf + + @mute.setter + def mute(self, value): + if value is self._mute: + return + + self._mute = value + self._send_voice_state_update() + + @deaf.setter + def deaf(self, value): + if value is self._deaf: + return + + self._deaf = value + self._send_voice_state_update() + + @classmethod + def from_channel(self, channel, **kwargs): + assert channel.is_voice, 'Cannot connect to a non voice channel' + conn = VoiceConnection(channel.client, channel.guild_id, **kwargs) + conn.connect(channel.id) + return conn + + def set_channel(self, channel_or_id): + if channel_or_id and isinstance(channel_or_id, Channel): + channel_or_id = channel_or_id.id + + self.channel_id = channel_or_id + self._send_voice_state_update() + + def connect(self, channel_id): + assert self._conn is None, 'Already connected' + + self.set_channel(channel_id) + + self._conn = TelecomConnection( + self.client.state.me.id, + self.guild_id, + self.client.gw.session_id, + ) + + if self.enable_events: + r, w = os.pipe() + + self._event_reader_greenlet = gevent.spawn(self._event_reader, r) + self._conn.set_event_pipe(w) + + def disconnect(self): + assert self._conn is not None, 'Not connected' + + # Send disconnection + self.set_channel(None) + + # If we have an event reader, kill it + if self._event_reader_greenlet: + self._event_reader_greenlet.kill() + self._event_reader_greenlet = None + + # Delete our connection so it will get GC'd + del self._conn + self._conn = None + + def play(self, playable): + self._conn.play(playable) + + def play_file(self, url): + self._conn.play(AvConvPlayable(url)) + + def _on_voice_server_update(self, event): + if not self._conn or event.guild_id != self.guild_id: + return + + self._conn.update_server_info(event.endpoint, event.token) + + def _send_voice_state_update(self): + self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { + 'self_mute': self._mute, + 'self_deaf': self._deaf, + 'self_video': False, + 'guild_id': self.guild_id, + 'channel_id': self.channel_id, + }) + + def _event_reader(self, fd): + if not make_nonblocking(fd): + raise Exception('failed to make event pipe nonblocking') + + buff = "" + while True: + buff += nb_read(fd, 2048).decode('utf-8') + + parts = buff.split('\n') + for message in parts[:-1]: + event = json.loads(message) + self.events.emit(event['e'], event['d']) + + if len(parts) > 1: + buff = parts[-1] + else: + buff = "" diff --git a/disco/voice/__init__.py b/disco/voice/__init__.py deleted file mode 100644 index 437f477..0000000 --- a/disco/voice/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from disco.voice.client import * # noqa: F401,F403 -from disco.voice.player import * # noqa: F401,F403 -from disco.voice.playable import * # noqa: F401,F403 - -# TODO: deprecate this file diff --git a/disco/voice/client.py b/disco/voice/client.py deleted file mode 100644 index 459d4bc..0000000 --- a/disco/voice/client.py +++ /dev/null @@ -1,458 +0,0 @@ -from __future__ import print_function - -import gevent -import time - -from collections import namedtuple - -from disco.gateway.encoding.json import JSONEncoder -from disco.util.websocket import Websocket -from disco.util.logging import LoggingClass -from disco.util.emitter import Emitter -from disco.gateway.packets import OPCode -from disco.types.base import cached_property -from disco.voice.packets import VoiceOPCode -from disco.voice.udp import AudioCodecs, RTPPayloadTypes, UDPVoiceClient - - -class SpeakingFlags(object): - NONE = 0 - VOICE = 1 << 0 - SOUNDSHARE = 1 << 1 - PRIORITY = 1 << 2 - - -class VoiceState(object): - DISCONNECTED = 0 - RECONNECTING = 1 - AWAITING_ENDPOINT = 2 - AUTHENTICATING = 3 - AUTHENTICATED = 4 - CONNECTING = 5 - CONNECTED = 6 - VOICE_CONNECTING = 7 - VOICE_CONNECTED = 8 - - -VoiceSpeaking = namedtuple('VoiceSpeaking', [ - 'client', - 'user_id', - 'speaking', - 'soundshare', - 'priority', -]) - - -class VoiceException(Exception): - def __init__(self, msg, client): - self.voice_client = client - super(VoiceException, self).__init__(msg) - - -class VoiceClient(LoggingClass): - VOICE_GATEWAY_VERSION = 4 - - SUPPORTED_MODES = { - 'xsalsa20_poly1305_lite', - 'xsalsa20_poly1305_suffix', - 'xsalsa20_poly1305', - } - - def __init__(self, client, server_id, is_dm=False, encoder=None, max_reconnects=5): - super(VoiceClient, self).__init__() - - self.client = client - self.server_id = server_id - self.channel_id = None - self.is_dm = is_dm - self.encoder = encoder or JSONEncoder - self.max_reconnects = max_reconnects - self.video_enabled = False - - # Set the VoiceClient in the state's voice clients - self.client.state.voice_clients[self.server_id] = self - - # Bind to some WS packets - self.packets = Emitter() - self.packets.on(VoiceOPCode.HELLO, self.on_voice_hello) - self.packets.on(VoiceOPCode.READY, self.on_voice_ready) - self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed) - self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp) - self.packets.on(VoiceOPCode.SPEAKING, self.on_voice_speaking) - self.packets.on(VoiceOPCode.CLIENT_CONNECT, self.on_voice_client_connect) - self.packets.on(VoiceOPCode.CLIENT_DISCONNECT, self.on_voice_client_disconnect) - self.packets.on(VoiceOPCode.CODECS, self.on_voice_codecs) - - # State + state change emitter - self.state = VoiceState.DISCONNECTED - self.state_emitter = Emitter() - - # Connection metadata - self.token = None - self.endpoint = None - self.ssrc = None - self.ip = None - self.port = None - self.mode = None - self.udp = None - self.audio_codec = None - self.video_codec = None - self.transport_id = None - - # Websocket connection - self.ws = None - - self._session_id = self.client.gw.session_id - self._reconnects = 0 - self._heartbeat_task = None - self._identified = False - - # SSRCs - self.audio_ssrcs = {} - - def __repr__(self): - return u''.format(self.server_id) - - @cached_property - def guild(self): - return self.client.state.guilds.get(self.server_id) if not self.is_dm else None - - @cached_property - def channel(self): - return self.client.state.channels.get(self.channel_id) - - @property - def user_id(self): - return self.client.state.me.id - - @property - def ssrc_audio(self): - return self.ssrc - - @property - def ssrc_video(self): - return self.ssrc + 1 - - @property - def ssrc_rtx(self): - return self.ssrc + 2 - - @property - def ssrc_rtcp(self): - return self.ssrc + 3 - - def set_state(self, state): - self.log.debug('[%s] state %s -> %s', self, self.state, state) - prev_state = self.state - self.state = state - self.state_emitter.emit(state, prev_state) - - def set_endpoint(self, endpoint): - endpoint = endpoint.split(':', 1)[0] - if self.endpoint == endpoint: - return - - self.log.info( - '[%s] Set endpoint from VOICE_SERVER_UPDATE (state = %s / endpoint = %s)', self, self.state, endpoint) - - self.endpoint = endpoint - - if self.ws and self.ws.sock and self.ws.sock.connected: - self.ws.close() - self.ws = None - - self._identified = False - - def set_token(self, token): - if self.token == token: - return - self.token = token - if not self._identified: - self._connect_and_run() - - def _connect_and_run(self): - self.ws = Websocket('wss://' + self.endpoint + '/?v={}'.format(self.VOICE_GATEWAY_VERSION)) - self.ws.emitter.on('on_open', self.on_open) - self.ws.emitter.on('on_error', self.on_error) - self.ws.emitter.on('on_close', self.on_close) - self.ws.emitter.on('on_message', self.on_message) - self.ws.run_forever() - - def _heartbeat(self, interval): - while True: - self.send(VoiceOPCode.HEARTBEAT, time.time()) - gevent.sleep(interval / 1000) - - def set_speaking(self, voice=False, soundshare=False, priority=False, delay=0): - value = SpeakingFlags.NONE.value - if voice: - value |= SpeakingFlags.VOICE.value - if soundshare: - value |= SpeakingFlags.SOUNDSHARE.value - if priority: - value |= SpeakingFlags.PRIORITY.value - - self.send(VoiceOPCode.SPEAKING, { - 'speaking': value, - 'delay': delay, - 'ssrc': self.ssrc, - }) - - def set_voice_state(self, channel_id, mute=False, deaf=False, video=False): - self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { - 'self_mute': bool(mute), - 'self_deaf': bool(deaf), - 'self_video': bool(video), - 'guild_id': None if self.is_dm else self.server_id, - 'channel_id': channel_id, - }) - - def send(self, op, data): - if self.ws and self.ws.sock and self.ws.sock.connected: - self.log.debug('[%s] sending OP %s (data = %s)', self, op, data) - self.ws.send(self.encoder.encode({ - 'op': op.value, - 'd': data, - }), self.encoder.OPCODE) - else: - self.log.debug('[%s] dropping because ws is closed OP %s (data = %s)', self, op, data) - - def on_voice_client_connect(self, data): - user_id = int(data['user_id']) - - self.audio_ssrcs[data['audio_ssrc']] = user_id - # ignore data['voice_ssrc'] for now - - def on_voice_client_disconnect(self, data): - user_id = int(data['user_id']) - - for ssrc in self.audio_ssrcs.keys(): - if self.audio_ssrcs[ssrc] == user_id: - del self.audio_ssrcs[ssrc] - break - - def on_voice_codecs(self, data): - self.audio_codec = data['audio_codec'] - self.video_codec = data['video_codec'] - self.transport_id = data['media_session_id'] - - # Set the UDP's RTP Audio Header's Payload Type - self.udp.set_audio_codec(data['audio_codec']) - - def on_voice_hello(self, data): - self.log.info('[%s] Received Voice HELLO payload, starting heartbeater', self) - self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval']) - self.set_state(VoiceState.AUTHENTICATED) - - def on_voice_ready(self, data): - self.log.info('[%s] Received Voice READY payload, attempting to negotiate voice connection w/ remote', self) - self.set_state(VoiceState.CONNECTING) - self.ssrc = data['ssrc'] - self.ip = data['ip'] - self.port = data['port'] - self._identified = True - - for mode in self.SUPPORTED_MODES: - if mode in data['modes']: - self.mode = mode - self.log.debug('[%s] Selected mode %s', self, mode) - break - else: - raise Exception('Failed to find a supported voice mode') - - self.log.debug('[%s] Attempting IP discovery over UDP to %s:%s', self, self.ip, self.port) - self.udp = UDPVoiceClient(self) - ip, port = self.udp.connect(self.ip, self.port) - - if not ip: - self.log.error('Failed to discover our IP, perhaps a NAT or firewall is fucking us') - self.disconnect() - return - - codecs = [] - - # Sending discord our available codecs and rtp payload type for it - for idx, codec in enumerate(AudioCodecs): - codecs.append({ - 'name': codec, - 'type': 'audio', - 'priority': (idx + 1) * 1000, - 'payload_type': RTPPayloadTypes.get(codec).value, - }) - - self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port) - self.send(VoiceOPCode.SELECT_PROTOCOL, { - 'protocol': 'udp', - 'data': { - 'port': port, - 'address': ip, - 'mode': self.mode, - }, - 'codecs': codecs, - }) - self.send(VoiceOPCode.CLIENT_CONNECT, { - 'audio_ssrc': self.ssrc, - 'video_ssrc': 0, - 'rtx_ssrc': 0, - }) - - def on_voice_resumed(self, data): - self.log.info('[%s] Received resumed', self) - self.set_state(VoiceState.CONNECTED) - - def on_voice_sdp(self, sdp): - self.log.info('[%s] Received session description, connection completed', self) - - self.mode = sdp['mode'] - self.audio_codec = sdp['audio_codec'] - self.video_codec = sdp['video_codec'] - self.transport_id = sdp['media_session_id'] - - # Set the UDP's RTP Audio Header's Payload Type - self.udp.set_audio_codec(sdp['audio_codec']) - - # Create a secret box for encryption/decryption - self.udp.setup_encryption(bytes(bytearray(sdp['secret_key']))) - - self.set_state(VoiceState.CONNECTED) - - def on_voice_speaking(self, data): - user_id = int(data['user_id']) - - self.audio_ssrcs[data['ssrc']] = user_id - - # Maybe rename speaking to voice in future - payload = VoiceSpeaking( - client=self, - user_id=user_id, - speaking=bool(data['speaking'] & SpeakingFlags.VOICE.value), - soundshare=bool(data['speaking'] & SpeakingFlags.SOUNDSHARE.value), - priority=bool(data['speaking'] & SpeakingFlags.PRIORITY.value), - ) - - self.client.gw.events.emit('VoiceSpeaking', payload) - - def on_message(self, msg): - try: - data = self.encoder.decode(msg) - self.packets.emit(VoiceOPCode[data['op']], data['d']) - except Exception: - self.log.exception('Failed to parse voice gateway message: ') - - def on_error(self, err): - self.log.error('[%s] Voice websocket error: %s', self, err) - - def on_open(self): - if self._identified: - self.send(VoiceOPCode.RESUME, { - 'server_id': self.server_id, - 'session_id': self._session_id, - 'token': self.token, - }) - else: - self.send(VoiceOPCode.IDENTIFY, { - 'server_id': self.server_id, - 'user_id': self.user_id, - 'session_id': self._session_id, - 'token': self.token, - 'video': self.video_enabled, - }) - - def on_close(self, code, reason): - self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects) - - if self._heartbeat_task: - self._heartbeat_task.kill() - self._heartbeat_task = None - - self.ws = None - - # If we killed the connection, don't try resuming - if self.state == VoiceState.DISCONNECTED: - return - - self.log.info('[%s] Attempting Websocket Resumption', self) - - self.set_state(VoiceState.RECONNECTING) - - # Check if code is not None, was not from us - if code is not None: - self._reconnects += 1 - - if self.max_reconnects and self._reconnects > self.max_reconnects: - raise VoiceException( - 'Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects), self) - - # Don't resume for these error codes: - if 4000 <= code <= 4016: - self._identified = False - - if self.udp and self.udp.connected: - self.udp.disconnect() - - wait_time = 5 - else: - wait_time = 1 - - self.log.info( - '[%s] Will attempt to %s after %s seconds', self, 'resume' if self._identified else 'reconnect', wait_time) - gevent.sleep(wait_time) - - self._connect_and_run() - - def connect(self, channel_id, timeout=10, **kwargs): - if self.is_dm: - channel_id = self.server_id - - if not channel_id: - raise VoiceException('[{}] cannot connect to an empty channel id'.format(self)) - - if self.channel_id == channel_id: - if self.state == VoiceState.CONNECTED: - self.log.debug('[%s] Already connected to %s, returning', self, self.channel) - return self - else: - if self.state == VoiceState.CONNECTED: - self.log.debug('[%s] Moving to channel %s', self, channel_id) - else: - self.log.debug('[%s] Attempting connection to channel id %s', self, channel_id) - self.set_state(VoiceState.AWAITING_ENDPOINT) - - self.set_voice_state(channel_id, **kwargs) - - if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout): - self.disconnect() - raise VoiceException('Failed to connect to voice', self) - else: - return self - - def disconnect(self): - if self.state == VoiceState.DISCONNECTED: - return - - self.log.debug('[%s] disconnect called', self) - self.set_state(VoiceState.DISCONNECTED) - - del self.client.state.voice_clients[self.server_id] - - if self._heartbeat_task: - self._heartbeat_task.kill() - self._heartbeat_task = None - - if self.ws and self.ws.sock and self.ws.sock.connected: - self.ws.close() - self.ws = None - - if self.udp and self.udp.connected: - self.udp.disconnect() - - if self.channel_id: - self.set_voice_state(None) - - self.client.gw.events.emit('VoiceDisconnect', self) - - def send_frame(self, *args, **kwargs): - self.udp.send_frame(*args, **kwargs) - - def increment_timestamp(self, *args, **kwargs): - self.udp.increment_timestamp(*args, **kwargs) diff --git a/disco/voice/opus.py b/disco/voice/opus.py deleted file mode 100644 index 3d58c90..0000000 --- a/disco/voice/opus.py +++ /dev/null @@ -1,148 +0,0 @@ -import six -import sys -import array -import ctypes -import ctypes.util - -from disco.util.logging import LoggingClass - - -c_int_ptr = ctypes.POINTER(ctypes.c_int) -c_int16_ptr = ctypes.POINTER(ctypes.c_int16) -c_float_ptr = ctypes.POINTER(ctypes.c_float) - - -class EncoderStruct(ctypes.Structure): - pass - - -class DecoderStruct(ctypes.Structure): - pass - - -EncoderStructPtr = ctypes.POINTER(EncoderStruct) -DecoderStructPtr = ctypes.POINTER(DecoderStruct) - - -class BaseOpus(LoggingClass): - BASE_EXPORTED = { - 'opus_strerror': ([ctypes.c_int], ctypes.c_char_p), - } - - EXPORTED = {} - - def __init__(self, library_path=None): - self.path = library_path or self.find_library() - self.lib = ctypes.cdll.LoadLibrary(self.path) - - methods = {} - methods.update(self.BASE_EXPORTED) - methods.update(self.EXPORTED) - - for name, item in methods.items(): - func = getattr(self.lib, name) - - if item[0]: - func.argtypes = item[0] - - func.restype = item[1] - - setattr(self, name, func) - - @staticmethod - def find_library(): - if sys.platform == 'win32': - raise Exception('Cannot auto-load opus on Windows, please specify full library path') - - return ctypes.util.find_library('opus') - - -class Application(object): - AUDIO = 2049 - VOIP = 2048 - LOWDELAY = 2051 - - -class Control(object): - SET_BITRATE = 4002 - SET_BANDWIDTH = 4008 - SET_FEC = 4012 - SET_PLP = 4014 - - -class OpusEncoder(BaseOpus): - EXPORTED = { - 'opus_encoder_get_size': ([ctypes.c_int], ctypes.c_int), - 'opus_encoder_create': ([ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr), - 'opus_encode': ([EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32), - 'opus_encoder_ctl': (None, ctypes.c_int32), - 'opus_encoder_destroy': ([EncoderStructPtr], None), - } - - def __init__(self, sampling_rate, channels, application=Application.AUDIO, library_path=None): - super(OpusEncoder, self).__init__(library_path) - self.sampling_rate = sampling_rate - self.channels = channels - self.application = application - - self._inst = None - - @property - def inst(self): - if not self._inst: - self._inst = self.create() - self.set_bitrate(128) - self.set_fec(True) - self.set_expected_packet_loss_percent(0.15) - return self._inst - - def set_bitrate(self, kbps): - kbps = min(128, max(16, int(kbps))) - ret = self.opus_encoder_ctl(self.inst, Control.SET_BITRATE, kbps * 1024) - - if ret < 0: - raise Exception('Failed to set bitrate to {}: {}'.format(kbps, ret)) - - def set_fec(self, value): - ret = self.opus_encoder_ctl(self.inst, Control.SET_FEC, int(value)) - - if ret < 0: - raise Exception('Failed to set FEC to {}: {}'.format(value, ret)) - - def set_expected_packet_loss_percent(self, perc): - ret = self.opus_encoder_ctl(self.inst, Control.SET_PLP, min(100, max(0, int(perc * 100)))) - - if ret < 0: - raise Exception('Failed to set PLP to {}: {}'.format(perc, ret)) - - def create(self): - ret = ctypes.c_int() - result = self.opus_encoder_create(self.sampling_rate, self.channels, self.application, ctypes.byref(ret)) - - if ret.value != 0: - raise Exception('Failed to create opus encoder: {}'.format(ret.value)) - - return result - - def __del__(self): - if hasattr(self, '_inst') and self._inst: - self.opus_encoder_destroy(self._inst) - self._inst = None - - def encode(self, pcm, frame_size): - max_data_bytes = len(pcm) - pcm = ctypes.cast(pcm, c_int16_ptr) - data = (ctypes.c_char * max_data_bytes)() - - ret = self.opus_encode(self.inst, pcm, frame_size, data, max_data_bytes) - if ret < 0: - raise Exception('Failed to encode: {}'.format(ret)) - - if six.PY3: - return array.array('b', data[:ret]).tobytes() - else: - return array.array('b', data[:ret]).tostring() - - -class OpusDecoder(BaseOpus): - pass diff --git a/disco/voice/packets.py b/disco/voice/packets.py deleted file mode 100644 index a63ff98..0000000 --- a/disco/voice/packets.py +++ /dev/null @@ -1,14 +0,0 @@ -class VoiceOPCode(object): - IDENTIFY = 0 - SELECT_PROTOCOL = 1 - READY = 2 - HEARTBEAT = 3 - SESSION_DESCRIPTION = 4 - SPEAKING = 5 - HEARTBEAT_ACK = 6 - RESUME = 7 - HELLO = 8 - RESUMED = 9 - CLIENT_CONNECT = 12 - CLIENT_DISCONNECT = 13 - CODECS = 14 diff --git a/disco/voice/playable.py b/disco/voice/playable.py deleted file mode 100644 index a1eff5b..0000000 --- a/disco/voice/playable.py +++ /dev/null @@ -1,357 +0,0 @@ -import abc -import six -import types -import gevent -import struct -import subprocess - -from gevent.lock import Semaphore -from gevent.queue import Queue - -from disco.voice.opus import OpusEncoder - - -try: - from io import StringIO as BufferedIO -except ImportError: - if six.PY2: - from StringIO import StringIO as BufferedIO - else: - from io import BytesIO as BufferedIO - - -OPUS_HEADER_SIZE = struct.calcsize('0]/bestaudio/best'}) - - if self._url: - obj = ydl.extract_info(self._url, download=False, process=False) - if 'entries' in obj: - self._ie_info = list(obj['entries'])[0] - else: - self._ie_info = obj - - self._info = ydl.process_ie_result(self._ie_info, download=False) - return self._info - - @property - def _metadata(self): - return self.info - - @classmethod - def many(cls, url, *args, **kwargs): - import youtube_dl - - ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'}) - info = ydl.extract_info(url, download=False, process=False) - - if 'entries' not in info: - yield cls(ie_info=info, *args, **kwargs) - return - - for item in info['entries']: - yield cls(ie_info=item, *args, **kwargs) - - @property - def source(self): - return self.info['url'] - - -class BufferedOpusEncoderPlayable(BasePlayable, OpusEncoder, AbstractOpus): - def __init__(self, source, *args, **kwargs): - self.source = source - self.frames = Queue(kwargs.pop('queue_size', 4096)) - - # Call the AbstractOpus constructor, as we need properties it sets - AbstractOpus.__init__(self, *args, **kwargs) - - # Then call the OpusEncoder constructor, which requires some properties - # that AbstractOpus sets up - OpusEncoder.__init__(self, self.sampling_rate, self.channels) - - # Spawn the encoder loop - gevent.spawn(self._encoder_loop) - - def _encoder_loop(self): - while self.source: - raw = self.source.read(self.frame_size) - if len(raw) < self.frame_size: - break - - self.frames.put(self.encode(raw, self.samples_per_frame)) - gevent.idle() - self.source = None - self.frames.put(None) - - def next_frame(self): - return self.frames.get() - - -class DCADOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder): - def __init__(self, source, *args, **kwargs): - self.source = source - self.command = kwargs.pop('command', 'dcad') - self.on_complete = kwargs.pop('on_complete', None) - super(DCADOpusEncoderPlayable, self).__init__(*args, **kwargs) - - self._done = False - self._proc = None - - @property - def proc(self): - if not self._proc: - source = obj = self.source.fileobj() - if not hasattr(obj, 'fileno'): - source = subprocess.PIPE - - self._proc = subprocess.Popen([ - self.command, - '--channels', str(self.channels), - '--rate', str(self.sampling_rate), - '--size', str(self.samples_per_frame), - '--bitrate', '128', - '--fec', - '--packet-loss-percent', '30', - '--input', 'pipe:0', - '--output', 'pipe:1', - ], stdin=source, stdout=subprocess.PIPE) - - def writer(): - while True: - data = obj.read(2048) - if len(data) > 0: - self._proc.stdin.write(data) - if len(data) < 2048: - break - - if source == subprocess.PIPE: - gevent.spawn(writer) - return self._proc - - def next_frame(self): - if self._done: - return None - - header = self.proc.stdout.read(OPUS_HEADER_SIZE) - if len(header) < OPUS_HEADER_SIZE: - self._done = True - self.on_complete() - return - - size = struct.unpack(' MAX_UINT32: - self.timestamp = 0 - - def setup_encryption(self, encryption_key): - self._secret_box = nacl.secret.SecretBox(encryption_key) - - def send_frame(self, frame, sequence=None, timestamp=None, incr_timestamp=None): - # Convert the frame to a bytearray - frame = bytearray(frame) - - # Pack the rtc header into our buffer - struct.pack_into('>H', self._rtp_audio_header, 2, sequence or self.sequence) - struct.pack_into('>I', self._rtp_audio_header, 4, timestamp or self.timestamp) - struct.pack_into('>i', self._rtp_audio_header, 8, self.vc.ssrc_audio) - - if self.vc.mode == 'xsalsa20_poly1305_lite': - # Use an incrementing number as a nonce, only first 4 bytes of the nonce is padded on - self._nonce += 1 - if self._nonce > MAX_UINT32: - self._nonce = 0 - - nonce = bytearray(24) - struct.pack_into('>I', nonce, 0, self._nonce) - nonce_padding = nonce[:4] - elif self.vc.mode == 'xsalsa20_poly1305_suffix': - # Generate a nonce - nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) - nonce_padding = nonce - elif self.vc.mode == 'xsalsa20_poly1305': - # Nonce is the header - nonce = bytearray(24) - nonce[:12] = self._rtp_audio_header - nonce_padding = None - else: - raise Exception('The voice mode, {}, isn\'t supported.'.format(self.vc.mode)) - - # Encrypt the payload with the nonce - payload = self._secret_box.encrypt(bytes(frame), bytes(nonce)).ciphertext - - # Pad the payload with the nonce, if applicable - if nonce_padding: - payload += nonce_padding - - # Send the header (sans nonce padding) plus the payload - self.send(self._rtp_audio_header + payload) - - # Increment our sequence counter - self.sequence += 1 - if self.sequence >= MAX_SEQUENCE: - self.sequence = 0 - - # Increment our timestamp (if applicable) - if incr_timestamp: - self.timestamp += incr_timestamp - - def run(self): - while True: - data, addr = self.conn.recvfrom(4096) - - # Data cannot be less than the bare minimum, just ignore - if len(data) <= 12: - self.log.debug('[%s] [VoiceData] Received voice data under 13 bytes', self.vc) - continue - - first, second = struct.unpack_from('>BB', data) - - if second in RTCPPayloadTypes.ALL: - length, ssrc = struct.unpack_from('>HI', data, 2) - - rtcp = RTCPHeader( - version=first >> 6, - padding=(first >> 5) & 1, - reception_count=first & 0x1F, - packet_type=second, - length=length, - ssrc=ssrc, - ) - - if rtcp.ssrc == self.vc.ssrc_rtcp: - user_id = self.vc.user_id - else: - rtcp_ssrc = rtcp.ssrc - if rtcp_ssrc: - rtcp_ssrc -= 3 - user_id = self.vc.audio_ssrcs.get(rtcp_ssrc, None) - - payload = RTCPData( - client=self.vc, - user_id=user_id, - payload_type=second, - header=rtcp, - data=data[8:], - ) - - self.vc.client.gw.events.emit('RTCPData', payload) - else: - sequence, timestamp, ssrc = struct.unpack_from('>HII', data, 2) - - rtp = RTPHeader( - version=first >> 6, - padding=(first >> 5) & 1, - extension=(first >> 4) & 1, - csrc_count=first & 0x0F, - marker=second >> 7, - payload_type=second & 0x7F, - sequence=sequence, - timestamp=timestamp, - ssrc=ssrc, - ) - - # Check if rtp version is 2 - if rtp.version != 2: - self.log.debug('[%s] [VoiceData] Received an invalid RTP packet version, %s', self.vc, rtp.version) - continue - - # Unsupported payload type received - if rtp.payload_type not in RTPPayloadTypes.ALL: - self.log.debug('[%s] [VoiceData] Received unsupported payload type, %s', self.vc, rtp.payload_type) - continue - - nonce = bytearray(24) - if self.vc.mode == 'xsalsa20_poly1305_lite': - nonce[:4] = data[-4:] - data = data[:-4] - elif self.vc.mode == 'xsalsa20_poly1305_suffx': - nonce[:24] = data[-24:] - data = data[:-24] - elif self.vc.mode == 'xsalsa20_poly1305': - nonce[:12] = data[:12] - else: - self.log.debug('[%s] [VoiceData] Unsupported Encryption Mode, %s', self.vc, self.vc.mode) - continue - - try: - data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce)) - except Exception: - self.log.debug('[%s] [VoiceData] Failed to decode data from ssrc %s', self.vc, rtp.ssrc) - continue - - # RFC3550 Section 5.1 (Padding) - if rtp.padding: - padding_amount, = struct.unpack_from('>B', data[:-1]) - data = data[-padding_amount:] - - if rtp.extension: - # RFC5285 Section 4.2: One-Byte Header - rtp_extension_header = struct.unpack_from('>BB', data) - if rtp_extension_header == RTP_EXTENSION_ONE_BYTE: - data = data[2:] - - fields_amount, = struct.unpack_from('>H', data) - fields = [] - - offset = 4 - for i in range(fields_amount): - first_byte, = struct.unpack_from('>B', data[:offset]) - offset += 1 - - rtp_extension_identifer = first_byte & 0xF - rtp_extension_len = ((first_byte >> 4) & 0xF) + 1 - - # Ignore data if identifer == 15, so skip if this is set as 0 - if rtp_extension_identifer: - fields.append(data[offset:offset + rtp_extension_len]) - - offset += rtp_extension_len - - # skip padding - while data[offset] == 0: - offset += 1 - - if len(fields): - fields.append(data[offset:]) - data = b''.join(fields) - else: - data = data[offset:] - - # RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header - # clients send it sometimes, definitely on fresh connects to a server, dunno what to do here - if rtp.marker: - self.log.debug('[%s] [VoiceData] Received RTP data with the marker set, skipping', self.vc) - continue - - payload = VoiceData( - client=self.vc, - user_id=self.vc.audio_ssrcs.get(rtp.ssrc, None), - payload_type=second, - rtp=rtp, - nonce=nonce, - data=data, - ) - - self.vc.client.gw.events.emit('VoiceData', payload) - - def send(self, data): - self.conn.sendto(data, (self.ip, self.port)) - - def disconnect(self): - self._run_task.kill() - - def connect(self, host, port, timeout=10, addrinfo=None): - self.ip = socket.gethostbyname(host) - self.port = port - - self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - if addrinfo: - ip, port = addrinfo - else: - # Send discovery packet - packet = bytearray(70) - struct.pack_into('>I', packet, 0, self.vc.ssrc) - self.send(packet) - - # Wait for a response - try: - data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout) - except gevent.Timeout: - return (None, None) - - # Read IP and port - ip = str(data[4:]).split('\x00', 1)[0] - port = struct.unpack('') - def on_play(self, event, url): - item = YoutubeDLInput(url).pipe(BufferedOpusEncoderPlayable) - self.get_player(event.guild.id).queue.append(item) - - @Plugin.command('pause') - def on_pause(self, event): - self.get_player(event.guild.id).pause() - - @Plugin.command('resume') - def on_resume(self, event): - self.get_player(event.guild.id).resume() - - @Plugin.command('kill') - def on_kill(self, event): - self.get_player(event.guild.id).client.ws.sock.shutdown() + vs = event.guild.get_member(event.author).get_voice_state() + if not vs: + return event.msg.reply('you are not in a voice channel') + + if event.guild.id in self._connections: + if self._connections[event.guild.id].channel_id == vs.channel_id: + return event.msg.reply('already in that channel') + else: + self._connections[event.guild.id].set_channel(vs.channel) + return + + self._connections[event.guild.id] = VoiceConnection.from_channel(vs.channel, enable_events=True) + + @Plugin.command('play', '') + def on_play(self, event, song): + if event.guild.id not in self._connections: + return event.msg.reply('not in voice here') + + playables = list(YoutubeDLPlayable.from_url(song)) + for playable in playables: + self._connections[event.guild.id].play(playable) diff --git a/setup.py b/setup.py index eae8156..4978681 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ with open('README.md') as f: readme = f.read() extras_require = { - 'voice': ['pynacl==1.2.1'], + 'voice': ['telecom-py==0.0.4'], 'http': ['flask==0.12.2'], 'yaml': ['pyyaml==3.12'], 'music': ['youtube_dl>=2018.1.21'], diff --git a/tests/imports.py b/tests/imports.py index 2d00f68..04b3a54 100644 --- a/tests/imports.py +++ b/tests/imports.py @@ -34,8 +34,3 @@ from disco.util.logging import * from disco.util.serializer import * from disco.util.snowflake import * from disco.util.websocket import * -from disco.voice.client import * -from disco.voice.opus import * -from disco.voice.packets import * -from disco.voice.playable import * -from disco.voice.player import * diff --git a/tests/voice/__init__.py b/tests/voice/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/voice/queue.py b/tests/voice/queue.py deleted file mode 100644 index 914e8ea..0000000 --- a/tests/voice/queue.py +++ /dev/null @@ -1,66 +0,0 @@ -import gevent -from unittest import TestCase - -from disco.voice.queue import PlayableQueue - - -class TestPlayableQueue(TestCase): - def test_append(self): - q = PlayableQueue() - q.append(1) - q.append(2) - q.append(3) - - self.assertEqual(q._data, [1, 2, 3]) - self.assertEqual(q.get(), 1) - self.assertEqual(q.get(), 2) - self.assertEqual(q.get(), 3) - - def test_len(self): - q = PlayableQueue() - - for idx in range(1234): - q.append(idx) - - self.assertEqual(len(q), 1234) - - def test_iter(self): - q = PlayableQueue() - - for idx in range(5): - q.append(idx) - - self.assertEqual(sum(q), 10) - - def test_blocking_get(self): - q = PlayableQueue() - result = gevent.event.AsyncResult() - - def get(): - result.set(q.get()) - - gevent.spawn(get) - q.append(5) - self.assertEqual(result.get(), 5) - - def test_shuffle(self): - q = PlayableQueue() - - for idx in range(10000): - q.append(idx) - - self.assertEqual(q._data[0], 0) - q.shuffle() - self.assertNotEqual(q._data[0], 0) - - def test_clear(self): - q = PlayableQueue() - - for idx in range(100): - q.append(idx) - - self.assertEqual(q._data[0], 0) - self.assertEqual(q._data[-1], 99) - self.assertEqual(len(q), 100) - q.clear() - self.assertEqual(len(q), 0)