diff --git a/discord/__init__.py b/discord/__init__.py index 6f0b735e7..fe3e97900 100644 --- a/discord/__init__.py +++ b/discord/__init__.py @@ -40,6 +40,8 @@ from .enums import * from collections import namedtuple from .embeds import Embed from .shard import AutoShardedClient +from .player import * +from .voice_client import VoiceClient import logging diff --git a/discord/abc.py b/discord/abc.py index 9c94c5dcc..07d720937 100644 --- a/discord/abc.py +++ b/discord/abc.py @@ -25,19 +25,18 @@ DEALINGS IN THE SOFTWARE. """ import abc -import io -import os import asyncio from collections import namedtuple from .iterators import HistoryIterator from .context_managers import Typing -from .errors import InvalidArgument +from .errors import InvalidArgument, ClientException from .permissions import PermissionOverwrite, Permissions from .role import Role from .invite import Invite from .file import File +from .voice_client import VoiceClient from . import utils, compat class _Undefined: @@ -783,3 +782,67 @@ class Messageable(metaclass=abc.ABCMeta): counter += 1 """ return HistoryIterator(self, limit=limit, before=before, after=after, around=around, reverse=reverse) + + +class Callable(metaclass=abc.ABCMeta): + __slots__ = () + + @abc.abstractmethod + def _get_voice_client_key(self): + raise NotImplementedError + + @abc.abstractmethod + def _get_voice_state_pair(self): + raise NotImplementedError + + @asyncio.coroutine + def connect(self, *, timeout=10.0, reconnect=True): + """|coro| + + Connects to voice and creates a :class:`VoiceClient` to establish + your connection to the voice server. + + Parameters + ----------- + timeout: float + The timeout in seconds to wait for the + initial handshake to be completed. + reconnect: bool + Whether the bot should automatically attempt + a reconnect if a part of the handshake fails + or the gateway goes down. + + Raises + ------- + asyncio.TimeoutError + Could not connect to the voice channel in time. + ClientException + You are already connected to a voice channel. + OpusNotLoaded + The opus library has not been loaded. + + Returns + ------- + :class:`VoiceClient` + A voice client that is fully connected to the voice server. + """ + key_id, key_name = self._get_voice_client_key() + state = self._state + + if state._get_voice_client(key_id): + raise ClientException('Already connected to a voice channel.') + + voice = VoiceClient(state=state, timeout=timeout, channel=self) + + try: + yield from voice.connect(reconnect=reconnect) + except asyncio.TimeoutError as e: + try: + yield from voice.disconnect() + except: + # we don't care if disconnect failed because connection failed + pass + raise e # re-raise + + state._add_voice_client(key_id, voice) + return voice diff --git a/discord/channel.py b/discord/channel.py index 67259f44d..6478c47be 100644 --- a/discord/channel.py +++ b/discord/channel.py @@ -290,7 +290,7 @@ class TextChannel(discord.abc.Messageable, discord.abc.GuildChannel, Hashable): count += 1 ret.append(msg) -class VoiceChannel(discord.abc.GuildChannel, Hashable): +class VoiceChannel(discord.abc.Callable, discord.abc.GuildChannel, Hashable): """Represents a Discord guild voice channel. Supported Operations: @@ -335,6 +335,12 @@ class VoiceChannel(discord.abc.GuildChannel, Hashable): def __repr__(self): return ''.format(self) + def _get_voice_client_key(self): + return self.guild.id, 'guild_id' + + def _get_voice_state_pair(self): + return self.guild.id, self.id + def _update(self, guild, data): self.guild = guild self.name = data['name'] diff --git a/discord/client.py b/discord/client.py index dca01a4ea..c8a81d333 100644 --- a/discord/client.py +++ b/discord/client.py @@ -31,6 +31,7 @@ from .errors import * from .permissions import Permissions, PermissionOverwrite from .enums import ChannelType, Status from .gateway import * +from .voice_client import VoiceClient from .emoji import Emoji from .http import HTTPClient from .state import ConnectionState @@ -119,10 +120,11 @@ class Client: self.connection.shard_count = self.shard_count self._closed = asyncio.Event(loop=self.loop) self._ready = asyncio.Event(loop=self.loop) + self.connection._get_websocket = lambda g: self.ws - # if VoiceClient.warn_nacl: - # VoiceClient.warn_nacl = False - # log.warning("PyNaCl is not installed, voice will NOT be supported") + if VoiceClient.warn_nacl: + VoiceClient.warn_nacl = False + log.warning("PyNaCl is not installed, voice will NOT be supported") # internals diff --git a/discord/gateway.py b/discord/gateway.py index 9a6991e8d..f8e500617 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -109,7 +109,6 @@ class VoiceKeepAliveHandler(KeepAliveHandler): self.msg = 'Keeping voice websocket alive with timestamp {0[d]}' def get_payload(self): - self.ack() return { 'op': self.ws.HEARTBEAT, 'd': int(time.time() * 1000) @@ -481,12 +480,9 @@ class DiscordWebSocket(websockets.client.WebSocketClientProtocol): } } + log.debug('Updating our voice state to %s.', payload) yield from self.send_as_json(payload) - # we're leaving a voice channel so remove it from the client list - if channel_id is None: - self._connection._remove_voice_client(guild_id) - @asyncio.coroutine def close_connection(self, force=False): if self._keep_alive: @@ -511,6 +507,14 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): Receive only. Gives you the secret key required for voice. SPEAKING Send only. Notifies the client if you are currently speaking. + HEARTBEAT_ACK + Receive only. Tells you your heartbeat has been acknowledged. + RESUME + Sent only. Tells the client to resume its session. + HELLO + Receive only. Tells you that your websocket connection was acknowledged. + INVALIDATE_SESSION + Sent only. Tells you that your RESUME request has failed and to re-IDENTIFY. """ IDENTIFY = 0 @@ -519,6 +523,10 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): HEARTBEAT = 3 SESSION_DESCRIPTION = 4 SPEAKING = 5 + HEARTBEAT_ACK = 6 + RESUME = 7 + HELLO = 8 + INVALIDATE_SESSION = 9 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -527,28 +535,50 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): @asyncio.coroutine def send_as_json(self, data): + log.debug('Sending voice websocket frame: %s.', data) yield from self.send(utils.to_json(data)) + @asyncio.coroutine + def resume(self): + state = self._connection + payload = { + 'op': self.RESUME, + 'd': { + 'token': state.token, + 'server_id': str(state.server_id), + 'session_id': state.session_id + } + } + yield from self.send_as_json(payload) + + @asyncio.coroutine + def identify(self): + state = self._connection + payload = { + 'op': self.IDENTIFY, + 'd': { + 'server_id': str(state.server_id), + 'user_id': str(state.user.id), + 'session_id': state.session_id, + 'token': state.token + } + } + yield from self.send_as_json(payload) + @classmethod @asyncio.coroutine - def from_client(cls, client): + def from_client(cls, client, *, resume=False): """Creates a voice websocket for the :class:`VoiceClient`.""" - gateway = 'wss://' + client.endpoint + gateway = 'wss://' + client.endpoint + '/?v=3' ws = yield from websockets.connect(gateway, loop=client.loop, klass=cls) ws.gateway = gateway ws._connection = client - identify = { - 'op': cls.IDENTIFY, - 'd': { - 'guild_id': client.guild_id, - 'user_id': client.user.id, - 'session_id': client.session_id, - 'token': client.token - } - } + if resume: + yield from ws.resume() + else: + yield from ws.identify() - yield from ws.send_as_json(identify) return ws @asyncio.coroutine @@ -566,7 +596,6 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): } yield from self.send_as_json(payload) - log.debug('Selected protocol as {}'.format(payload)) @asyncio.coroutine def speak(self, is_speaking=True): @@ -579,12 +608,11 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): } yield from self.send_as_json(payload) - log.debug('Voice speaking now set to {}'.format(is_speaking)) @asyncio.coroutine def received_message(self, msg): log.debug('Voice websocket frame received: {}'.format(msg)) - op = msg.get('op') + op = msg['op'] data = msg.get('d') if op == self.READY: @@ -592,14 +620,20 @@ class DiscordVoiceWebSocket(websockets.client.WebSocketClientProtocol): self._keep_alive = VoiceKeepAliveHandler(ws=self, interval=interval) self._keep_alive.start() yield from self.initial_connection(data) + elif op == self.HEARTBEAT_ACK: + self._keep_alive.ack() + elif op == self.INVALIDATE_SESSION: + log.info('Voice RESUME failed.') + yield from self.identify() elif op == self.SESSION_DESCRIPTION: yield from self.load_secret_key(data) @asyncio.coroutine def initial_connection(self, data): state = self._connection - state.ssrc = data.get('ssrc') - state.voice_port = data.get('port') + state.ssrc = data['ssrc'] + state.voice_port = data['port'] + packet = bytearray(70) struct.pack_into('>I', packet, 0, state.ssrc) state.socket.sendto(packet, (state.endpoint_ip, state.voice_port)) diff --git a/discord/opus.py b/discord/opus.py index 911501c13..fcf27a721 100644 --- a/discord/opus.py +++ b/discord/opus.py @@ -183,15 +183,16 @@ signal_ctl = { } class Encoder: - def __init__(self, sampling, channels, application=APPLICATION_AUDIO): - self.sampling_rate = sampling - self.channels = channels - self.application = application + SAMPLING_RATE = 48000 + CHANNELS = 2 + FRAME_LENGTH = 20 + SAMPLE_SIZE = 4 # (bit_rate / 8) * CHANNELS (bit_rate == 16) + SAMPLES_PER_FRAME = int(SAMPLING_RATE / 1000 * FRAME_LENGTH) + + FRAME_SIZE = SAMPLES_PER_FRAME * SAMPLE_SIZE - self.frame_length = 20 - self.sample_size = 2 * self.channels # (bit_rate / 8) but bit_rate == 16 - self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length) - self.frame_size = self.samples_per_frame * self.sample_size + def __init__(self, application=APPLICATION_AUDIO): + self.application = application if not is_loaded(): raise OpusNotLoaded() @@ -210,7 +211,7 @@ class Encoder: def _create_state(self): ret = ctypes.c_int() - result = _lib.opus_encoder_create(self.sampling_rate, self.channels, self.application, ctypes.byref(ret)) + result = _lib.opus_encoder_create(self.SAMPLING_RATE, self.CHANNELS, self.application, ctypes.byref(ret)) if ret.value != 0: log.info('error has happened in state creation') diff --git a/discord/player.py b/discord/player.py new file mode 100644 index 000000000..adda3584a --- /dev/null +++ b/discord/player.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- + +""" +The MIT License (MIT) + +Copyright (c) 2015-2017 Rapptz + +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the "Software"), +to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. +""" + +import threading +import subprocess +import shlex +import time + +from .errors import ClientException +from .opus import Encoder as OpusEncoder + +__all__ = [ 'AudioSource', 'PCMAudio', 'FFmpegPCMAudio' ] + +class AudioSource: + """Represents an audio stream. + + The audio stream can be Opus encoded or not, however if the audio stream + is not Opus encoded then the audio format must be 16-bit 48KHz stereo PCM. + + .. warning:: + + The audio source reads are done in a separate thread. + """ + + def read(self): + """Reads 20ms worth of audio. + + Subclasses must implement this. + + If the audio is complete, then returning an empty *bytes-like* object + to signal this is the way to do so. + + If :meth:`is_opus` method returns ``True``, then it must return + 20ms worth of Opus encoded audio. Otherwise, it must be 20ms + worth of 16-bit 48KHz stereo PCM, which is about 3,840 bytes + per frame (20ms worth of audio). + + Returns + -------- + bytes + A bytes like object that represents the PCM or Opus data. + """ + raise NotImplementedError + + def is_opus(self): + """Checks if the audio source is already encoded in Opus. + + Defaults to ``False``. + """ + return False + + def cleanup(self): + """Called when clean-up is needed to be done. + + Useful for clearing buffer data or processes after + it is done playing audio. + """ + pass + +class PCMAudio(AudioSource): + """Represents raw 16-bit 48KHz stereo PCM audio source. + + Attributes + ----------- + stream: file-like object + A file-like object that reads byte data representing raw PCM. + """ + def __init__(self, stream): + self.stream = stream + + def read(self): + return self.stream.read(OpusEncoder.FRAME_SIZE) + +class FFmpegPCMAudio(AudioSource): + """An audio source from FFmpeg (or AVConv). + + This launches a sub-process to a specific input file given. + + .. warning:: + + You must have the ffmpeg or avconv executable in your path environment + variable in order for this to work. + + Parameters + ------------ + source: Union[str, BinaryIO] + The input that ffmpeg will take and convert to PCM bytes. + If ``pipe`` is True then this is a file-like object that is + passed to the stdin of ffmpeg. + executable: str + The executable name (and path) to use. Defaults to ``ffmpeg``. + pipe: bool + If true, denotes that ``source`` parameter will be passed + to the stdin of ffmpeg. Defaults to ``False``. + stderr: Optional[BinaryIO] + A file-like object to pass to the Popen constructor. + Could also be an instance of ``subprocess.PIPE``. + options: Optional[str] + Extra command line arguments to pass to ffmpeg after the ``-i`` flag. + before_options: Optional[str] + Extra command line arguments to pass to ffmpeg before the ``-i`` flag. + + Raises + -------- + ClientException + The subprocess failed to be created. + """ + + def __init__(self, source, *, executable='ffmpeg', pipe=False, stderr=None, before_options=None, options=None): + stdin = None if not pipe else source + + args = [executable] + + if isinstance(before_options, str): + args.extend(shlex.split(before_options)) + + args.append('-i') + args.append('-' if pipe else shlex.quote(source)) + args.extend(('-f', 's16le', '-ar', '48000', '-ac', '2', '-loglevel', 'warning')) + + if isinstance(options, str): + args.extend(shlex.split(options)) + + args.append('pipe:1') + + try: + self._process = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) + self._stdout = self._process.stdout + except FileNotFoundError: + raise ClientException(executable + ' was not found.') from None + except subprocess.SubprocessError as e: + raise ClientException('Popen failed: {0.__class__.__name__}: {0}'.format(e)) from e + + def read(self): + return self._stdout.read(OpusEncoder.FRAME_SIZE) + + def cleanup(self): + proc = self._process + proc.kill() + if proc.poll() is None: + proc.communicate() + +class AudioPlayer(threading.Thread): + DELAY = OpusEncoder.FRAME_LENGTH / 1000.0 + + def __init__(self, source, client, *, after=None): + threading.Thread.__init__(self) + self.daemon = True + self.source = source + self.client = client + self.after = after + + self._end = threading.Event() + self._resumed = threading.Event() + self._resumed.set() # we are not paused + self._current_error = None + self._connected = client._connected + + if after is not None and not callable(after): + raise TypeError('Expected a callable for the "after" parameter.') + + def _do_run(self): + self.loops = 0 + self._start = time.time() + is_opus = self.source.is_opus() + + # getattr lookup speed ups + play_audio = self.client.send_audio_packet + + while not self._end.is_set(): + # are we paused? + if not self._resumed.is_set(): + # wait until we aren't + self._resumed.wait() + + # are we disconnected from voice? + if not self._connected.is_set(): + # wait until we are connected + self._connected.wait() + # reset our internal data + self.loops = 0 + self._start = time.time() + + self.loops += 1 + data = self.source.read() + + if not data: + self.stop() + break + + play_audio(data, encode=not is_opus) + next_time = self._start + self.DELAY * self.loops + delay = max(0, self.DELAY + (next_time - time.time())) + time.sleep(delay) + + def run(self): + try: + self._do_run() + except Exception as e: + self._current_error = e + self.stop() + finally: + self._call_after() + self.source.cleanup() + + def _call_after(self): + if self.after is not None: + try: + self.after(self._current_error) + except: + pass + + def stop(self): + self._end.set() + + def pause(self): + self._resumed.clear() + + def resume(self): + self.loops = 0 + self._start = time.time() + self._resumed.set() + + def is_playing(self): + return self._resumed.is_set() and not self._end.is_set() diff --git a/discord/shard.py b/discord/shard.py index 62bd9a3ca..d3d54ab5a 100644 --- a/discord/shard.py +++ b/discord/shard.py @@ -110,6 +110,11 @@ class AutoShardedClient(Client): # the key is the shard_id self.shards = {} + def _get_websocket(guild_id): + i = (guild_id >> 22) % self.shard_count + return self.shards[i].ws + + self.connection._get_websocket = _get_websocket self._still_sharding = True @asyncio.coroutine diff --git a/discord/state.py b/discord/state.py index 8d0757ddb..10f6d16e9 100644 --- a/discord/state.py +++ b/discord/state.py @@ -688,6 +688,16 @@ class ConnectionState: if call is not None: call._update_voice_state(data) + def parse_voice_server_update(self, data): + try: + key_id = int(data['guild_id']) + except KeyError: + key_id = int(data['channel_id']) + + vc = self._get_voice_client(key_id) + if vc is not None and vc.is_connected(): + compat.create_task(vc._switch_regions()) + def parse_typing_start(self, data): channel = self.get_channel(int(data['channel_id'])) if channel is not None: diff --git a/discord/voice_client.py b/discord/voice_client.py index 757637940..89f5ab0aa 100644 --- a/discord/voice_client.py +++ b/discord/voice_client.py @@ -29,9 +29,9 @@ DEALINGS IN THE SOFTWARE. - Our main web socket (mWS) sends opcode 4 with a guild ID and channel ID. - The mWS receives VOICE_STATE_UPDATE and VOICE_SERVER_UPDATE. - We pull the session_id from VOICE_STATE_UPDATE. -- We pull the token, endpoint and guild_id from VOICE_SERVER_UPDATE. +- We pull the token, endpoint and server_id from VOICE_SERVER_UPDATE. - Then we initiate the voice web socket (vWS) pointing to the endpoint. -- We send opcode 0 with the user_id, guild_id, session_id and token using the vWS. +- We send opcode 0 with the user_id, server_id, session_id and token using the vWS. - The vWS sends back opcode 2 with an ssrc, port, modes(array) and hearbeat_interval. - We send a UDP discovery packet to endpoint:port and receive our IP and our port in LE. - Then we send our IP and port via vWS with opcode 1. @@ -40,18 +40,10 @@ DEALINGS IN THE SOFTWARE. """ import asyncio -import websockets import socket -import json, time import logging import struct import threading -import subprocess -import shlex -import functools -import datetime -import audioop -import inspect log = logging.getLogger(__name__) @@ -62,123 +54,10 @@ except ImportError: has_nacl = False from . import opus +from .backoff import ExponentialBackoff from .gateway import * -from .errors import ClientException, InvalidArgument, ConnectionClosed - -class StreamPlayer(threading.Thread): - def __init__(self, stream, encoder, connected, player, after, **kwargs): - threading.Thread.__init__(self, **kwargs) - self.daemon = True - self.buff = stream - self.frame_size = encoder.frame_size - self.player = player - self._end = threading.Event() - self._resumed = threading.Event() - self._resumed.set() # we are not paused - self._connected = connected - self.after = after - self.delay = encoder.frame_length / 1000.0 - self._volume = 1.0 - self._current_error = None - - if after is not None and not callable(after): - raise TypeError('Expected a callable for the "after" parameter.') - - def _do_run(self): - self.loops = 0 - self._start = time.time() - while not self._end.is_set(): - # are we paused? - if not self._resumed.is_set(): - # wait until we aren't - self._resumed.wait() - - if not self._connected.is_set(): - self.stop() - break - - self.loops += 1 - data = self.buff.read(self.frame_size) - - if self._volume != 1.0: - data = audioop.mul(data, 2, min(self._volume, 2.0)) - - if len(data) != self.frame_size: - self.stop() - break - - self.player(data) - next_time = self._start + self.delay * self.loops - delay = max(0, self.delay + (next_time - time.time())) - time.sleep(delay) - - def run(self): - try: - self._do_run() - except Exception as e: - self._current_error = e - self.stop() - finally: - self._call_after() - - def _call_after(self): - if self.after is not None: - try: - arg_count = len(inspect.signature(self.after).parameters) - except: - # if this ended up happening, a mistake was made. - arg_count = 0 - - try: - if arg_count == 0: - self.after() - else: - self.after(self) - except: - pass - - def stop(self): - self._end.set() - - @property - def error(self): - return self._current_error - - @property - def volume(self): - return self._volume - - @volume.setter - def volume(self, value): - self._volume = max(value, 0.0) - - def pause(self): - self._resumed.clear() - - def resume(self): - self.loops = 0 - self._start = time.time() - self._resumed.set() - - def is_playing(self): - return self._resumed.is_set() and not self.is_done() - - def is_done(self): - return not self._connected.is_set() or self._end.is_set() - -class ProcessPlayer(StreamPlayer): - def __init__(self, process, client, after, **kwargs): - super().__init__(process.stdout, client.encoder, - client._connected, client.play_audio, after, **kwargs) - self.process = process - - def run(self): - super().run() - - self.process.kill() - if self.process.poll() is None: - self.process.communicate() - +from .errors import ClientException, ConnectionClosed +from .player import AudioPlayer, AudioSource class VoiceClient: """Represents a Discord voice connection. @@ -196,45 +75,46 @@ class VoiceClient: Attributes ----------- - session_id : str + session_id: str The voice connection session ID. - token : str + token: str The voice connection token. - user : :class:`User` - The user connected to voice. - endpoint : str + endpoint: str The endpoint we are connecting to. - channel : :class:`Channel` + channel: :class:`Channel` The voice channel connected to. - guild : :class:`Guild` - The guild the voice channel is connected to. - Shorthand for ``channel.guild``. loop The event loop that the voice client is running on. """ - def __init__(self, user, main_ws, session_id, channel, data, loop): + def __init__(self, state, timeout, channel): if not has_nacl: raise RuntimeError("PyNaCl library needed in order to use voice") - self.user = user - self.main_ws = main_ws self.channel = channel - self.session_id = session_id - self.loop = loop - self._connected = asyncio.Event(loop=self.loop) - self.token = data.get('token') - self.guild_id = data.get('guild_id') - self.endpoint = data.get('endpoint') + self.main_ws = None + self.timeout = timeout + self.loop = state.loop + self._state = state + # this will be used in the AudioPlayer thread + self._connected = threading.Event() + self._connections = 0 self.sequence = 0 self.timestamp = 0 - self.encoder = opus.Encoder(48000, 2) - log.info('created opus encoder with {0.__dict__}'.format(self.encoder)) + self._runner = None + self._player = None + self.encoder = opus.Encoder() warn_nacl = not has_nacl @property def guild(self): - return self.channel.guild + """Optional[:class:`Guild`]: The guild we're connected to, if applicable.""" + return getattr(self.channel, 'guild', None) + + @property + def user(self): + """:class:`ClientUser`: The user connected to voice (i.e. ourselves).""" + return self._state.user def checked_add(self, attr, value, limit): val = getattr(self, attr) @@ -246,56 +126,127 @@ class VoiceClient: # connection related @asyncio.coroutine - def connect(self): - log.info('voice connection is connecting...') - self.endpoint = self.endpoint.replace(':80', '') + def start_handshake(self): + log.info('Starting voice handshake...') + + key_id, key_name = self.channel._get_voice_client_key() + guild_id, channel_id = self.channel._get_voice_state_pair() + state = self._state + self.main_ws = ws = state._get_websocket(guild_id) + self._connections += 1 + + def session_id_found(data): + user_id = data.get('user_id', 0) + _guild_id = data.get(key_name) + return int(user_id) == state.self_id and int(_guild_id) == key_id + + # register the futures for waiting + session_id_future = ws.wait_for('VOICE_STATE_UPDATE', session_id_found) + voice_data_future = ws.wait_for('VOICE_SERVER_UPDATE', lambda d: int(d.get(key_name, 0)) == key_id) + + # request joining + yield from ws.voice_state(guild_id, channel_id) + + try: + session_id_data = yield from asyncio.wait_for(session_id_future, timeout=self.timeout, loop=self.loop) + data = yield from asyncio.wait_for(voice_data_future, timeout=self.timeout, loop=state.loop) + except asyncio.TimeoutError as e: + yield from ws.voice_state(guild_id, None, self_mute=True) + raise e + + self.session_id = session_id_data.get('session_id') + self.server_id = data.get(key_name) + self.token = data.get('token') + self.endpoint = data.get('endpoint', '').replace(':80', '') self.endpoint_ip = socket.gethostbyname(self.endpoint) self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.setblocking(False) - log.info('Voice endpoint found {0.endpoint} (IP: {0.endpoint_ip})'.format(self)) + log.info('Voice handshake complete. Endpoint found %s (IP: %s)', self.endpoint, self.endpoint_ip) - self.ws = yield from DiscordVoiceWebSocket.from_client(self) - while not self._connected.is_set(): - yield from self.ws.poll_event() - if hasattr(self, 'secret_key'): - # we have a secret key, so we don't need to poll - # websocket events anymore - self._connected.set() - break + @asyncio.coroutine + def terminate_handshake(self, *, remove=False): + guild_id, _ = self.channel._get_voice_state_pair() + yield from self.main_ws.voice_state(guild_id, None, self_mute=True) - self.loop.create_task(self.poll_voice_ws()) + if remove: + key_id, _ = self.channel._get_voice_client_key() + self._state._remove_voice_client(key_id) @asyncio.coroutine - def poll_voice_ws(self): - """|coro| - Reads from the voice websocket while connected. - """ - while self._connected.is_set(): + def _switch_regions(self): + # just reconnect when we're requested to switch voice regions + # signal the reconnect loop + yield from self.ws.close(1006) + + @asyncio.coroutine + def connect(self, *, reconnect=True, _tries=0, do_handshake=True): + log.info('Connecting to voice...') + try: + del self.secret_key + except AttributeError: + pass + + if do_handshake: + yield from self.start_handshake() + + try: + self.ws = yield from DiscordVoiceWebSocket.from_client(self) + self._connected.clear() + while not hasattr(self, 'secret_key'): + yield from self.ws.poll_event() + self._connected.set() + except (ConnectionClosed, asyncio.TimeoutError): + if reconnect and _tries < 5: + log.exception('Failed to connect to voice... Retrying...') + yield from asyncio.sleep(1 + _tries * 2.0, loop=self.loop) + yield from self.terminate_handshake() + yield from self.connect(reconnect=reconnect, _tries=_tries + 1) + else: + raise + + if self._runner is None: + self._runner = self.loop.create_task(self.poll_voice_ws(reconnect)) + + @asyncio.coroutine + def poll_voice_ws(self, reconnect): + backoff = ExponentialBackoff() + fmt = 'Disconnected from voice... Reconnecting in {:.2f}s.' + while True: try: yield from self.ws.poll_event() - except ConnectionClosed as e: - if e.code == 1000: - break - else: - raise + except (ConnectionClosed, asyncio.TimeoutError) as e: + if isinstance(e, ConnectionClosed): + if e.code == 1000: + yield from self.disconnect() + break + + if not reconnect: + yield from self.disconnect() + raise e + + retry = backoff.delay() + log.exception(fmt.format(retry)) + self._connected.clear() + yield from asyncio.sleep(retry, loop=self.loop) + yield from self.terminate_handshake() + yield from self.connect(reconnect=True) @asyncio.coroutine def disconnect(self): """|coro| Disconnects all connections to the voice client. - - In order to reconnect, you must create another voice client - using :meth:`Client.join_voice_channel`. """ if not self._connected.is_set(): return + self.stop() self._connected.clear() + try: yield from self.ws.close() - yield from self.main_ws.voice_state(self.guild_id, None, self_mute=True) + yield from self.terminate_handshake(remove=True) finally: self.socket.close() @@ -305,28 +256,16 @@ class VoiceClient: Moves you to a different voice channel. - .. warning:: - - :class:`Object` instances do not work with this function. - Parameters ----------- - channel : :class:`Channel` + channel: :class:`abc.Snowflake` The channel to move to. Must be a voice channel. - - Raises - ------- - InvalidArgument - Not a voice channel. """ - - if str(getattr(channel, 'type', 'text')) != 'voice': - raise InvalidArgument('Must be a voice channel.') - - yield from self.main_ws.voice_state(self.guild_id, channel.id) + guild_id, _ = self.channel._get_voice_state_pair() + yield from self.main_ws.voice_state(guild_id, channel.id) def is_connected(self): - """bool : Indicates if the voice client is connected to voice.""" + """bool: Indicates if the voice client is connected to voice.""" return self._connected.is_set() # audio related @@ -349,328 +288,75 @@ class VoiceClient: # Encrypt and return the data return header + box.encrypt(bytes(data), bytes(nonce)).ciphertext - def create_ffmpeg_player(self, filename, *, use_avconv=False, pipe=False, stderr=None, options=None, before_options=None, headers=None, after=None): - """Creates a stream player for ffmpeg that launches in a separate thread to play - audio. - - The ffmpeg player launches a subprocess of ``ffmpeg`` to a specific - filename and then plays that file. - - You must have the ffmpeg or avconv executable in your path environment variable - in order for this to work. - - The operations that can be done on the player are the same as those in - :meth:`create_stream_player`. - - Examples - ---------- - - Basic usage: :: - - voice = yield from client.join_voice_channel(channel) - player = voice.create_ffmpeg_player('cool.mp3') - player.start() - - Parameters - ----------- - filename - The filename that ffmpeg will take and convert to PCM bytes. - If ``pipe`` is True then this is a file-like object that is - passed to the stdin of ``ffmpeg``. - use_avconv: bool - Use ``avconv`` instead of ``ffmpeg``. - pipe : bool - If true, denotes that ``filename`` parameter will be passed - to the stdin of ffmpeg. - stderr - A file-like object or ``subprocess.PIPE`` to pass to the Popen - constructor. - options : str - Extra command line flags to pass to ``ffmpeg`` after the ``-i`` flag. - before_options : str - Command line flags to pass to ``ffmpeg`` before the ``-i`` flag. - headers: dict - HTTP headers dictionary to pass to ``-headers`` command line option - after : callable - The finalizer that is called after the stream is done being - played. All exceptions the finalizer throws are silently discarded. - - Raises - ------- - ClientException - Popen failed to due to an error in ``ffmpeg`` or ``avconv``. - - Returns - -------- - StreamPlayer - A stream player with specific operations. - See :meth:`create_stream_player`. - """ - command = 'ffmpeg' if not use_avconv else 'avconv' - input_name = '-' if pipe else shlex.quote(filename) - before_args = "" - if isinstance(headers, dict): - for key, value in headers.items(): - before_args += "{}: {}\r\n".format(key, value) - before_args = ' -headers ' + shlex.quote(before_args) - - if isinstance(before_options, str): - before_args += ' ' + before_options - - cmd = command + '{} -i {} -f s16le -ar {} -ac {} -loglevel warning' - cmd = cmd.format(before_args, input_name, self.encoder.sampling_rate, self.encoder.channels) - - if isinstance(options, str): - cmd = cmd + ' ' + options - - cmd += ' pipe:1' - - stdin = None if not pipe else filename - args = shlex.split(cmd) - try: - p = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) - return ProcessPlayer(p, self, after) - except FileNotFoundError as e: - raise ClientException('ffmpeg/avconv was not found in your PATH environment variable') from e - except subprocess.SubprocessError as e: - raise ClientException('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e - + def play(self, source, *, after=None): + """Plays an :class:`AudioSource`. - @asyncio.coroutine - def create_ytdl_player(self, url, *, ytdl_options=None, **kwargs): - """|coro| - - Creates a stream player for youtube or other services that launches - in a separate thread to play the audio. - - The player uses the ``youtube_dl`` python library to get the information - required to get audio from the URL. Since this uses an external library, - you must install it yourself. You can do so by calling - ``pip install youtube_dl``. - - You must have the ffmpeg or avconv executable in your path environment - variable in order for this to work. - - The operations that can be done on the player are the same as those in - :meth:`create_stream_player`. The player has been augmented and enhanced - to have some info extracted from the URL. If youtube-dl fails to extract - the information then the attribute is ``None``. The ``yt``, ``url``, and - ``download_url`` attributes are always available. - - +---------------------+---------------------------------------------------------+ - | Operation | Description | - +=====================+=========================================================+ - | player.yt | The `YoutubeDL ` instance. | - +---------------------+---------------------------------------------------------+ - | player.url | The URL that is currently playing. | - +---------------------+---------------------------------------------------------+ - | player.download_url | The URL that is currently being downloaded to ffmpeg. | - +---------------------+---------------------------------------------------------+ - | player.title | The title of the audio stream. | - +---------------------+---------------------------------------------------------+ - | player.description | The description of the audio stream. | - +---------------------+---------------------------------------------------------+ - | player.uploader | The uploader of the audio stream. | - +---------------------+---------------------------------------------------------+ - | player.upload_date | A datetime.date object of when the stream was uploaded. | - +---------------------+---------------------------------------------------------+ - | player.duration | The duration of the audio in seconds. | - +---------------------+---------------------------------------------------------+ - | player.likes | How many likes the audio stream has. | - +---------------------+---------------------------------------------------------+ - | player.dislikes | How many dislikes the audio stream has. | - +---------------------+---------------------------------------------------------+ - | player.is_live | Checks if the audio stream is currently livestreaming. | - +---------------------+---------------------------------------------------------+ - | player.views | How many views the audio stream has. | - +---------------------+---------------------------------------------------------+ - - .. _ytdl: https://github.com/rg3/youtube-dl/blob/master/youtube_dl/YoutubeDL.py#L128-L278 - - Examples - ---------- + The finalizer, ``after`` is called after the source has been exhausted + or an error occurred. - Basic usage: :: - - voice = await client.join_voice_channel(channel) - player = await voice.create_ytdl_player('https://www.youtube.com/watch?v=d62TYemN6MQ') - player.start() + If an error happens while the audio player is running, the exception is + caught and the audio player is then stopped. Parameters ----------- - url : str - The URL that ``youtube_dl`` will take and download audio to pass - to ``ffmpeg`` or ``avconv`` to convert to PCM bytes. - ytdl_options : dict - A dictionary of options to pass into the ``YoutubeDL`` instance. - See `the documentation `_ for more details. - \*\*kwargs - The rest of the keyword arguments are forwarded to - :func:`create_ffmpeg_player`. + source: :class:`AudioSource` + The audio source we're reading from. + after + The finalizer that is called after the stream is exhausted. + All exceptions it throws are silently discarded. This function + must have a single parameter, ``error``, that denotes an + optional exception that was raised during playing. Raises ------- ClientException - Popen failure from either ``ffmpeg``/``avconv``. - - Returns - -------- - StreamPlayer - An augmented StreamPlayer that uses ffmpeg. - See :meth:`create_stream_player` for base operations. + Already playing audio or not connected. + TypeError + source is not a :class:`AudioSource` or after is not a callable. """ - import youtube_dl - - use_avconv = kwargs.get('use_avconv', False) - opts = { - 'format': 'webm[abr>0]/bestaudio/best', - 'prefer_ffmpeg': not use_avconv - } - - if ytdl_options is not None and isinstance(ytdl_options, dict): - opts.update(ytdl_options) - - ydl = youtube_dl.YoutubeDL(opts) - func = functools.partial(ydl.extract_info, url, download=False) - info = yield from self.loop.run_in_executor(None, func) - if "entries" in info: - info = info['entries'][0] - - log.info('playing URL {}'.format(url)) - download_url = info['url'] - player = self.create_ffmpeg_player(download_url, **kwargs) - - # set the dynamic attributes from the info extraction - player.download_url = download_url - player.url = url - player.yt = ydl - player.views = info.get('view_count') - player.is_live = bool(info.get('is_live')) - player.likes = info.get('like_count') - player.dislikes = info.get('dislike_count') - player.duration = info.get('duration') - player.uploader = info.get('uploader') - - is_twitch = 'twitch' in url - if is_twitch: - # twitch has 'title' and 'description' sort of mixed up. - player.title = info.get('description') - player.description = None - else: - player.title = info.get('title') - player.description = info.get('description') - # upload date handling - date = info.get('upload_date') - if date: - try: - date = datetime.datetime.strptime(date, '%Y%M%d').date() - except ValueError: - date = None + if not self._connected: + raise ClientException('Not connected to voice.') - player.upload_date = date - return player + if self.is_playing(): + raise ClientException('Already playing audio.') - def encoder_options(self, *, sample_rate, channels=2): - """Sets the encoder options for the OpusEncoder. + if not isinstance(source, AudioSource): + raise TypeError('source must an AudioSource not {0.__class__.__name__}'.format(source)) - Calling this after you create a stream player - via :meth:`create_ffmpeg_player` or :meth:`create_stream_player` - has no effect. + self._player = AudioPlayer(source, self, after=after) + self._player.start() - Parameters - ---------- - sample_rate : int - Sets the sample rate of the OpusEncoder. The unit is in Hz. - channels : int - Sets the number of channels for the OpusEncoder. - 2 for stereo, 1 for mono. + def is_playing(self): + """Indicates if we're currently playing audio.""" + return self._player is not None and self._player.is_playing() - Raises - ------- - InvalidArgument - The values provided are invalid. - """ - if sample_rate not in (8000, 12000, 16000, 24000, 48000): - raise InvalidArgument('Sample rate out of range. Valid: [8000, 12000, 16000, 24000, 48000]') - if channels not in (1, 2): - raise InvalidArgument('Channels must be either 1 or 2.') - - self.encoder = opus.Encoder(sample_rate, channels) - log.info('created opus encoder with {0.__dict__}'.format(self.encoder)) - - def create_stream_player(self, stream, *, after=None): - """Creates a stream player that launches in a separate thread to - play audio. - - The stream player assumes that ``stream.read`` is a valid function - that returns a *bytes-like* object. - - The finalizer, ``after`` is called after the stream has been exhausted - or an error occurred (see below). - - The following operations are valid on the ``StreamPlayer`` object: - - +---------------------+-----------------------------------------------------+ - | Operation | Description | - +=====================+=====================================================+ - | player.start() | Starts the audio stream. | - +---------------------+-----------------------------------------------------+ - | player.stop() | Stops the audio stream. | - +---------------------+-----------------------------------------------------+ - | player.is_done() | Returns a bool indicating if the stream is done. | - +---------------------+-----------------------------------------------------+ - | player.is_playing() | Returns a bool indicating if the stream is playing. | - +---------------------+-----------------------------------------------------+ - | player.pause() | Pauses the audio stream. | - +---------------------+-----------------------------------------------------+ - | player.resume() | Resumes the audio stream. | - +---------------------+-----------------------------------------------------+ - | player.volume | Allows you to set the volume of the stream. 1.0 is | - | | equivalent to 100% and 0.0 is equal to 0%. The | - | | maximum the volume can be set to is 2.0 for 200%. | - +---------------------+-----------------------------------------------------+ - | player.error | The exception that stopped the player. If no error | - | | happened, then this returns None. | - +---------------------+-----------------------------------------------------+ - - The stream must have the same sampling rate as the encoder and the same - number of channels. The defaults are 48000 Hz and 2 channels. You - could change the encoder options by using :meth:`encoder_options` - but this must be called **before** this function. - - If an error happens while the player is running, the exception is caught and - the player is then stopped. The caught exception could then be retrieved - via ``player.error``\. When the player is stopped in this matter, the - finalizer under ``after`` is called. + def stop(self): + """Stops playing audio.""" + if self._player: + self._player.stop() + self._player = None - Parameters - ----------- - stream - The stream object to read from. - after - The finalizer that is called after the stream is exhausted. - All exceptions it throws are silently discarded. This function - can have either no parameters or a single parameter taking in the - current player. + def pause(self): + """Pauses the audio playing.""" + if self._player: + self._player.pause() - Returns - -------- - StreamPlayer - A stream player with the operations noted above. - """ - return StreamPlayer(stream, self.encoder, self._connected, self.play_audio, after) + def resume(self): + """Resumes the audio playing.""" + if self._player: + self._player.resume() - def play_audio(self, data, *, encode=True): + def send_audio_packet(self, data, *, encode=True): """Sends an audio packet composed of the data. You must be connected to play audio. Parameters ---------- - data : bytes + data: bytes The *bytes-like object* denoting PCM or Opus voice data. - encode : bool + encode: bool Indicates if ``data`` should be encoded into Opus. Raises @@ -683,13 +369,13 @@ class VoiceClient: self.checked_add('sequence', 1, 65535) if encode: - encoded_data = self.encoder.encode(data, self.encoder.samples_per_frame) + encoded_data = self.encoder.encode(data, self.encoder.SAMPLES_PER_FRAME) else: encoded_data = data packet = self._get_voice_packet(encoded_data) try: - sent = self.socket.sendto(packet, (self.endpoint_ip, self.voice_port)) + self.socket.sendto(packet, (self.endpoint_ip, self.voice_port)) except BlockingIOError: log.warning('A packet has been dropped (seq: {0.sequence}, timestamp: {0.timestamp})'.format(self)) - self.checked_add('timestamp', self.encoder.samples_per_frame, 4294967295) + self.checked_add('timestamp', self.encoder.SAMPLES_PER_FRAME, 4294967295) diff --git a/docs/api.rst b/docs/api.rst index 09f5be778..2177a2707 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -40,6 +40,21 @@ Client .. autoclass:: AutoShardedClient :members: +Voice +------ + +.. autoclass:: VoiceClient + :members: + +.. autoclass:: AudioSource + :members: + +.. autoclass:: PCMAudio + :members: + +.. autoclass:: FFmpegPCMAudio + :members: + Opus Library ~~~~~~~~~~~~~