Browse Source

[Feature] Telecom Voice (#135)

* Nuke existing voice support in preparation for telecom support

* Telecom voice v1

* ytdl + event pipe
pull/140/head
Andrei Zbikowski 6 years ago
committed by GitHub
parent
commit
250d7d0d8f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      disco/state.py
  2. 12
      disco/types/channel.py
  3. 176
      disco/voice.py
  4. 5
      disco/voice/__init__.py
  5. 458
      disco/voice/client.py
  6. 148
      disco/voice/opus.py
  7. 14
      disco/voice/packets.py
  8. 357
      disco/voice/playable.py
  9. 123
      disco/voice/player.py
  10. 52
      disco/voice/queue.py
  11. 345
      disco/voice/udp.py
  12. 74
      examples/music.py
  13. 2
      setup.py
  14. 5
      tests/imports.py
  15. 0
      tests/voice/__init__.py
  16. 66
      tests/voice/queue.py

16
disco/state.py

@ -9,7 +9,6 @@ from disco.util.config import Config
from disco.util.string import underscore
from disco.util.hashmap import HashMap, DefaultHashMap
from disco.util.emitter import Priority
from disco.voice.client import VoiceState
class StackMessage(namedtuple('StackMessage', ['id', 'channel_id', 'author_id'])):
@ -274,21 +273,6 @@ class State(object):
del self.voice_states[expired_voice_state.session_id]
self.voice_states[event.state.session_id] = event.state
if event.state.user_id != self.me.id:
return
server_id = event.state.guild_id or event.state.channel_id
if server_id in self.voice_clients:
voice_client = self.voice_clients[server_id]
voice_client.channel_id = event.state.channel_id
if not event.state.channel_id:
voice_client.disconnect()
return
if voice_client.token:
voice_client.set_state(VoiceState.CONNECTED)
def on_guild_member_add(self, event):
if event.member.user.id not in self.users:
self.users[event.member.user.id] = event.member.user

12
disco/types/channel.py

@ -361,18 +361,6 @@ class Channel(SlottedModel, Permissible):
"""
self.client.api.channels_typing(self.id)
def connect(self, *args, **kwargs):
"""
Connect to this channel over voice.
"""
from disco.voice.client import VoiceClient
assert self.is_voice, 'Channel must support voice to connect'
server_id = self.guild_id or self.id
vc = self.client.state.voice_clients.get(server_id) or VoiceClient(self.client, server_id, is_dm=self.is_dm)
return vc.connect(self.id, *args, **kwargs)
def create_overwrite(self, *args, **kwargs):
"""
Creates a `PermissionOverwrite` for this channel. See

176
disco/voice.py

@ -0,0 +1,176 @@
import os
import json
import gevent
from gevent.os import make_nonblocking, nb_read
from disco.gateway.packets import OPCode
from disco.types.channel import Channel
from disco.util.emitter import Emitter
from telecom import TelecomConnection, AvConvPlayable
try:
import youtube_dl
ytdl = youtube_dl.YoutubeDL()
except ImportError:
ytdl = None
class YoutubeDLPlayable(AvConvPlayable):
def __init__(self, url):
url = next(self.from_url(url), None)
if not url:
raise Exception('No result found for URL {}'.format(url))
super(YoutubeDLPlayable, self).__init__(url)
@classmethod
def from_url(cls, url):
assert ytdl is not None, 'YoutubeDL isn\'t installed'
results = ytdl.extract_info(url, download=False)
if 'entries' not in results:
results = [results]
else:
results = results['entries']
for result in results:
audio_formats = [fmt for fmt in result['formats'] if fmt['vcodec'] == 'none' and fmt['acodec'] == 'opus']
if not audio_formats:
raise Exception("Couldn't find valid audio format for {}".format(url))
best_audio_format = sorted(audio_formats, key=lambda i: i['abr'], reverse=True)[0]
yield AvConvPlayable(best_audio_format['url'])
class VoiceConnection(object):
def __init__(self, client, guild_id, enable_events=False):
self.client = client
self.guild_id = guild_id
self.channel_id = None
self.enable_events = enable_events
self._conn = None
self._voice_server_update_listener = self.client.events.on(
'VoiceServerUpdate',
self._on_voice_server_update,
)
self._event_reader_greenlet = None
self.events = None
if self.enable_events:
self.events = Emitter()
self._mute = False
self._deaf = False
def __del__(self):
if self._event_reader_greenlet:
self._event_reader_greenlet.kill()
@property
def mute(self):
return self._mute
@property
def deaf(self):
return self._deaf
@mute.setter
def mute(self, value):
if value is self._mute:
return
self._mute = value
self._send_voice_state_update()
@deaf.setter
def deaf(self, value):
if value is self._deaf:
return
self._deaf = value
self._send_voice_state_update()
@classmethod
def from_channel(self, channel, **kwargs):
assert channel.is_voice, 'Cannot connect to a non voice channel'
conn = VoiceConnection(channel.client, channel.guild_id, **kwargs)
conn.connect(channel.id)
return conn
def set_channel(self, channel_or_id):
if channel_or_id and isinstance(channel_or_id, Channel):
channel_or_id = channel_or_id.id
self.channel_id = channel_or_id
self._send_voice_state_update()
def connect(self, channel_id):
assert self._conn is None, 'Already connected'
self.set_channel(channel_id)
self._conn = TelecomConnection(
self.client.state.me.id,
self.guild_id,
self.client.gw.session_id,
)
if self.enable_events:
r, w = os.pipe()
self._event_reader_greenlet = gevent.spawn(self._event_reader, r)
self._conn.set_event_pipe(w)
def disconnect(self):
assert self._conn is not None, 'Not connected'
# Send disconnection
self.set_channel(None)
# If we have an event reader, kill it
if self._event_reader_greenlet:
self._event_reader_greenlet.kill()
self._event_reader_greenlet = None
# Delete our connection so it will get GC'd
del self._conn
self._conn = None
def play(self, playable):
self._conn.play(playable)
def play_file(self, url):
self._conn.play(AvConvPlayable(url))
def _on_voice_server_update(self, event):
if not self._conn or event.guild_id != self.guild_id:
return
self._conn.update_server_info(event.endpoint, event.token)
def _send_voice_state_update(self):
self.client.gw.send(OPCode.VOICE_STATE_UPDATE, {
'self_mute': self._mute,
'self_deaf': self._deaf,
'self_video': False,
'guild_id': self.guild_id,
'channel_id': self.channel_id,
})
def _event_reader(self, fd):
if not make_nonblocking(fd):
raise Exception('failed to make event pipe nonblocking')
buff = ""
while True:
buff += nb_read(fd, 2048).decode('utf-8')
parts = buff.split('\n')
for message in parts[:-1]:
event = json.loads(message)
self.events.emit(event['e'], event['d'])
if len(parts) > 1:
buff = parts[-1]
else:
buff = ""

5
disco/voice/__init__.py

@ -1,5 +0,0 @@
from disco.voice.client import * # noqa: F401,F403
from disco.voice.player import * # noqa: F401,F403
from disco.voice.playable import * # noqa: F401,F403
# TODO: deprecate this file

458
disco/voice/client.py

@ -1,458 +0,0 @@
from __future__ import print_function
import gevent
import time
from collections import namedtuple
from disco.gateway.encoding.json import JSONEncoder
from disco.util.websocket import Websocket
from disco.util.logging import LoggingClass
from disco.util.emitter import Emitter
from disco.gateway.packets import OPCode
from disco.types.base import cached_property
from disco.voice.packets import VoiceOPCode
from disco.voice.udp import AudioCodecs, RTPPayloadTypes, UDPVoiceClient
class SpeakingFlags(object):
NONE = 0
VOICE = 1 << 0
SOUNDSHARE = 1 << 1
PRIORITY = 1 << 2
class VoiceState(object):
DISCONNECTED = 0
RECONNECTING = 1
AWAITING_ENDPOINT = 2
AUTHENTICATING = 3
AUTHENTICATED = 4
CONNECTING = 5
CONNECTED = 6
VOICE_CONNECTING = 7
VOICE_CONNECTED = 8
VoiceSpeaking = namedtuple('VoiceSpeaking', [
'client',
'user_id',
'speaking',
'soundshare',
'priority',
])
class VoiceException(Exception):
def __init__(self, msg, client):
self.voice_client = client
super(VoiceException, self).__init__(msg)
class VoiceClient(LoggingClass):
VOICE_GATEWAY_VERSION = 4
SUPPORTED_MODES = {
'xsalsa20_poly1305_lite',
'xsalsa20_poly1305_suffix',
'xsalsa20_poly1305',
}
def __init__(self, client, server_id, is_dm=False, encoder=None, max_reconnects=5):
super(VoiceClient, self).__init__()
self.client = client
self.server_id = server_id
self.channel_id = None
self.is_dm = is_dm
self.encoder = encoder or JSONEncoder
self.max_reconnects = max_reconnects
self.video_enabled = False
# Set the VoiceClient in the state's voice clients
self.client.state.voice_clients[self.server_id] = self
# Bind to some WS packets
self.packets = Emitter()
self.packets.on(VoiceOPCode.HELLO, self.on_voice_hello)
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed)
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp)
self.packets.on(VoiceOPCode.SPEAKING, self.on_voice_speaking)
self.packets.on(VoiceOPCode.CLIENT_CONNECT, self.on_voice_client_connect)
self.packets.on(VoiceOPCode.CLIENT_DISCONNECT, self.on_voice_client_disconnect)
self.packets.on(VoiceOPCode.CODECS, self.on_voice_codecs)
# State + state change emitter
self.state = VoiceState.DISCONNECTED
self.state_emitter = Emitter()
# Connection metadata
self.token = None
self.endpoint = None
self.ssrc = None
self.ip = None
self.port = None
self.mode = None
self.udp = None
self.audio_codec = None
self.video_codec = None
self.transport_id = None
# Websocket connection
self.ws = None
self._session_id = self.client.gw.session_id
self._reconnects = 0
self._heartbeat_task = None
self._identified = False
# SSRCs
self.audio_ssrcs = {}
def __repr__(self):
return u'<VoiceClient {}>'.format(self.server_id)
@cached_property
def guild(self):
return self.client.state.guilds.get(self.server_id) if not self.is_dm else None
@cached_property
def channel(self):
return self.client.state.channels.get(self.channel_id)
@property
def user_id(self):
return self.client.state.me.id
@property
def ssrc_audio(self):
return self.ssrc
@property
def ssrc_video(self):
return self.ssrc + 1
@property
def ssrc_rtx(self):
return self.ssrc + 2
@property
def ssrc_rtcp(self):
return self.ssrc + 3
def set_state(self, state):
self.log.debug('[%s] state %s -> %s', self, self.state, state)
prev_state = self.state
self.state = state
self.state_emitter.emit(state, prev_state)
def set_endpoint(self, endpoint):
endpoint = endpoint.split(':', 1)[0]
if self.endpoint == endpoint:
return
self.log.info(
'[%s] Set endpoint from VOICE_SERVER_UPDATE (state = %s / endpoint = %s)', self, self.state, endpoint)
self.endpoint = endpoint
if self.ws and self.ws.sock and self.ws.sock.connected:
self.ws.close()
self.ws = None
self._identified = False
def set_token(self, token):
if self.token == token:
return
self.token = token
if not self._identified:
self._connect_and_run()
def _connect_and_run(self):
self.ws = Websocket('wss://' + self.endpoint + '/?v={}'.format(self.VOICE_GATEWAY_VERSION))
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
self.ws.emitter.on('on_message', self.on_message)
self.ws.run_forever()
def _heartbeat(self, interval):
while True:
self.send(VoiceOPCode.HEARTBEAT, time.time())
gevent.sleep(interval / 1000)
def set_speaking(self, voice=False, soundshare=False, priority=False, delay=0):
value = SpeakingFlags.NONE.value
if voice:
value |= SpeakingFlags.VOICE.value
if soundshare:
value |= SpeakingFlags.SOUNDSHARE.value
if priority:
value |= SpeakingFlags.PRIORITY.value
self.send(VoiceOPCode.SPEAKING, {
'speaking': value,
'delay': delay,
'ssrc': self.ssrc,
})
def set_voice_state(self, channel_id, mute=False, deaf=False, video=False):
self.client.gw.send(OPCode.VOICE_STATE_UPDATE, {
'self_mute': bool(mute),
'self_deaf': bool(deaf),
'self_video': bool(video),
'guild_id': None if self.is_dm else self.server_id,
'channel_id': channel_id,
})
def send(self, op, data):
if self.ws and self.ws.sock and self.ws.sock.connected:
self.log.debug('[%s] sending OP %s (data = %s)', self, op, data)
self.ws.send(self.encoder.encode({
'op': op.value,
'd': data,
}), self.encoder.OPCODE)
else:
self.log.debug('[%s] dropping because ws is closed OP %s (data = %s)', self, op, data)
def on_voice_client_connect(self, data):
user_id = int(data['user_id'])
self.audio_ssrcs[data['audio_ssrc']] = user_id
# ignore data['voice_ssrc'] for now
def on_voice_client_disconnect(self, data):
user_id = int(data['user_id'])
for ssrc in self.audio_ssrcs.keys():
if self.audio_ssrcs[ssrc] == user_id:
del self.audio_ssrcs[ssrc]
break
def on_voice_codecs(self, data):
self.audio_codec = data['audio_codec']
self.video_codec = data['video_codec']
self.transport_id = data['media_session_id']
# Set the UDP's RTP Audio Header's Payload Type
self.udp.set_audio_codec(data['audio_codec'])
def on_voice_hello(self, data):
self.log.info('[%s] Received Voice HELLO payload, starting heartbeater', self)
self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval'])
self.set_state(VoiceState.AUTHENTICATED)
def on_voice_ready(self, data):
self.log.info('[%s] Received Voice READY payload, attempting to negotiate voice connection w/ remote', self)
self.set_state(VoiceState.CONNECTING)
self.ssrc = data['ssrc']
self.ip = data['ip']
self.port = data['port']
self._identified = True
for mode in self.SUPPORTED_MODES:
if mode in data['modes']:
self.mode = mode
self.log.debug('[%s] Selected mode %s', self, mode)
break
else:
raise Exception('Failed to find a supported voice mode')
self.log.debug('[%s] Attempting IP discovery over UDP to %s:%s', self, self.ip, self.port)
self.udp = UDPVoiceClient(self)
ip, port = self.udp.connect(self.ip, self.port)
if not ip:
self.log.error('Failed to discover our IP, perhaps a NAT or firewall is fucking us')
self.disconnect()
return
codecs = []
# Sending discord our available codecs and rtp payload type for it
for idx, codec in enumerate(AudioCodecs):
codecs.append({
'name': codec,
'type': 'audio',
'priority': (idx + 1) * 1000,
'payload_type': RTPPayloadTypes.get(codec).value,
})
self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port)
self.send(VoiceOPCode.SELECT_PROTOCOL, {
'protocol': 'udp',
'data': {
'port': port,
'address': ip,
'mode': self.mode,
},
'codecs': codecs,
})
self.send(VoiceOPCode.CLIENT_CONNECT, {
'audio_ssrc': self.ssrc,
'video_ssrc': 0,
'rtx_ssrc': 0,
})
def on_voice_resumed(self, data):
self.log.info('[%s] Received resumed', self)
self.set_state(VoiceState.CONNECTED)
def on_voice_sdp(self, sdp):
self.log.info('[%s] Received session description, connection completed', self)
self.mode = sdp['mode']
self.audio_codec = sdp['audio_codec']
self.video_codec = sdp['video_codec']
self.transport_id = sdp['media_session_id']
# Set the UDP's RTP Audio Header's Payload Type
self.udp.set_audio_codec(sdp['audio_codec'])
# Create a secret box for encryption/decryption
self.udp.setup_encryption(bytes(bytearray(sdp['secret_key'])))
self.set_state(VoiceState.CONNECTED)
def on_voice_speaking(self, data):
user_id = int(data['user_id'])
self.audio_ssrcs[data['ssrc']] = user_id
# Maybe rename speaking to voice in future
payload = VoiceSpeaking(
client=self,
user_id=user_id,
speaking=bool(data['speaking'] & SpeakingFlags.VOICE.value),
soundshare=bool(data['speaking'] & SpeakingFlags.SOUNDSHARE.value),
priority=bool(data['speaking'] & SpeakingFlags.PRIORITY.value),
)
self.client.gw.events.emit('VoiceSpeaking', payload)
def on_message(self, msg):
try:
data = self.encoder.decode(msg)
self.packets.emit(VoiceOPCode[data['op']], data['d'])
except Exception:
self.log.exception('Failed to parse voice gateway message: ')
def on_error(self, err):
self.log.error('[%s] Voice websocket error: %s', self, err)
def on_open(self):
if self._identified:
self.send(VoiceOPCode.RESUME, {
'server_id': self.server_id,
'session_id': self._session_id,
'token': self.token,
})
else:
self.send(VoiceOPCode.IDENTIFY, {
'server_id': self.server_id,
'user_id': self.user_id,
'session_id': self._session_id,
'token': self.token,
'video': self.video_enabled,
})
def on_close(self, code, reason):
self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects)
if self._heartbeat_task:
self._heartbeat_task.kill()
self._heartbeat_task = None
self.ws = None
# If we killed the connection, don't try resuming
if self.state == VoiceState.DISCONNECTED:
return
self.log.info('[%s] Attempting Websocket Resumption', self)
self.set_state(VoiceState.RECONNECTING)
# Check if code is not None, was not from us
if code is not None:
self._reconnects += 1
if self.max_reconnects and self._reconnects > self.max_reconnects:
raise VoiceException(
'Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects), self)
# Don't resume for these error codes:
if 4000 <= code <= 4016:
self._identified = False
if self.udp and self.udp.connected:
self.udp.disconnect()
wait_time = 5
else:
wait_time = 1
self.log.info(
'[%s] Will attempt to %s after %s seconds', self, 'resume' if self._identified else 'reconnect', wait_time)
gevent.sleep(wait_time)
self._connect_and_run()
def connect(self, channel_id, timeout=10, **kwargs):
if self.is_dm:
channel_id = self.server_id
if not channel_id:
raise VoiceException('[{}] cannot connect to an empty channel id'.format(self))
if self.channel_id == channel_id:
if self.state == VoiceState.CONNECTED:
self.log.debug('[%s] Already connected to %s, returning', self, self.channel)
return self
else:
if self.state == VoiceState.CONNECTED:
self.log.debug('[%s] Moving to channel %s', self, channel_id)
else:
self.log.debug('[%s] Attempting connection to channel id %s', self, channel_id)
self.set_state(VoiceState.AWAITING_ENDPOINT)
self.set_voice_state(channel_id, **kwargs)
if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout):
self.disconnect()
raise VoiceException('Failed to connect to voice', self)
else:
return self
def disconnect(self):
if self.state == VoiceState.DISCONNECTED:
return
self.log.debug('[%s] disconnect called', self)
self.set_state(VoiceState.DISCONNECTED)
del self.client.state.voice_clients[self.server_id]
if self._heartbeat_task:
self._heartbeat_task.kill()
self._heartbeat_task = None
if self.ws and self.ws.sock and self.ws.sock.connected:
self.ws.close()
self.ws = None
if self.udp and self.udp.connected:
self.udp.disconnect()
if self.channel_id:
self.set_voice_state(None)
self.client.gw.events.emit('VoiceDisconnect', self)
def send_frame(self, *args, **kwargs):
self.udp.send_frame(*args, **kwargs)
def increment_timestamp(self, *args, **kwargs):
self.udp.increment_timestamp(*args, **kwargs)

148
disco/voice/opus.py

@ -1,148 +0,0 @@
import six
import sys
import array
import ctypes
import ctypes.util
from disco.util.logging import LoggingClass
c_int_ptr = ctypes.POINTER(ctypes.c_int)
c_int16_ptr = ctypes.POINTER(ctypes.c_int16)
c_float_ptr = ctypes.POINTER(ctypes.c_float)
class EncoderStruct(ctypes.Structure):
pass
class DecoderStruct(ctypes.Structure):
pass
EncoderStructPtr = ctypes.POINTER(EncoderStruct)
DecoderStructPtr = ctypes.POINTER(DecoderStruct)
class BaseOpus(LoggingClass):
BASE_EXPORTED = {
'opus_strerror': ([ctypes.c_int], ctypes.c_char_p),
}
EXPORTED = {}
def __init__(self, library_path=None):
self.path = library_path or self.find_library()
self.lib = ctypes.cdll.LoadLibrary(self.path)
methods = {}
methods.update(self.BASE_EXPORTED)
methods.update(self.EXPORTED)
for name, item in methods.items():
func = getattr(self.lib, name)
if item[0]:
func.argtypes = item[0]
func.restype = item[1]
setattr(self, name, func)
@staticmethod
def find_library():
if sys.platform == 'win32':
raise Exception('Cannot auto-load opus on Windows, please specify full library path')
return ctypes.util.find_library('opus')
class Application(object):
AUDIO = 2049
VOIP = 2048
LOWDELAY = 2051
class Control(object):
SET_BITRATE = 4002
SET_BANDWIDTH = 4008
SET_FEC = 4012
SET_PLP = 4014
class OpusEncoder(BaseOpus):
EXPORTED = {
'opus_encoder_get_size': ([ctypes.c_int], ctypes.c_int),
'opus_encoder_create': ([ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr),
'opus_encode': ([EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32),
'opus_encoder_ctl': (None, ctypes.c_int32),
'opus_encoder_destroy': ([EncoderStructPtr], None),
}
def __init__(self, sampling_rate, channels, application=Application.AUDIO, library_path=None):
super(OpusEncoder, self).__init__(library_path)
self.sampling_rate = sampling_rate
self.channels = channels
self.application = application
self._inst = None
@property
def inst(self):
if not self._inst:
self._inst = self.create()
self.set_bitrate(128)
self.set_fec(True)
self.set_expected_packet_loss_percent(0.15)
return self._inst
def set_bitrate(self, kbps):
kbps = min(128, max(16, int(kbps)))
ret = self.opus_encoder_ctl(self.inst, Control.SET_BITRATE, kbps * 1024)
if ret < 0:
raise Exception('Failed to set bitrate to {}: {}'.format(kbps, ret))
def set_fec(self, value):
ret = self.opus_encoder_ctl(self.inst, Control.SET_FEC, int(value))
if ret < 0:
raise Exception('Failed to set FEC to {}: {}'.format(value, ret))
def set_expected_packet_loss_percent(self, perc):
ret = self.opus_encoder_ctl(self.inst, Control.SET_PLP, min(100, max(0, int(perc * 100))))
if ret < 0:
raise Exception('Failed to set PLP to {}: {}'.format(perc, ret))
def create(self):
ret = ctypes.c_int()
result = self.opus_encoder_create(self.sampling_rate, self.channels, self.application, ctypes.byref(ret))
if ret.value != 0:
raise Exception('Failed to create opus encoder: {}'.format(ret.value))
return result
def __del__(self):
if hasattr(self, '_inst') and self._inst:
self.opus_encoder_destroy(self._inst)
self._inst = None
def encode(self, pcm, frame_size):
max_data_bytes = len(pcm)
pcm = ctypes.cast(pcm, c_int16_ptr)
data = (ctypes.c_char * max_data_bytes)()
ret = self.opus_encode(self.inst, pcm, frame_size, data, max_data_bytes)
if ret < 0:
raise Exception('Failed to encode: {}'.format(ret))
if six.PY3:
return array.array('b', data[:ret]).tobytes()
else:
return array.array('b', data[:ret]).tostring()
class OpusDecoder(BaseOpus):
pass

14
disco/voice/packets.py

@ -1,14 +0,0 @@
class VoiceOPCode(object):
IDENTIFY = 0
SELECT_PROTOCOL = 1
READY = 2
HEARTBEAT = 3
SESSION_DESCRIPTION = 4
SPEAKING = 5
HEARTBEAT_ACK = 6
RESUME = 7
HELLO = 8
RESUMED = 9
CLIENT_CONNECT = 12
CLIENT_DISCONNECT = 13
CODECS = 14

357
disco/voice/playable.py

@ -1,357 +0,0 @@
import abc
import six
import types
import gevent
import struct
import subprocess
from gevent.lock import Semaphore
from gevent.queue import Queue
from disco.voice.opus import OpusEncoder
try:
from io import StringIO as BufferedIO
except ImportError:
if six.PY2:
from StringIO import StringIO as BufferedIO
else:
from io import BytesIO as BufferedIO
OPUS_HEADER_SIZE = struct.calcsize('<h')
class AbstractOpus(object):
def __init__(self, sampling_rate=48000, frame_length=20, channels=2):
self.sampling_rate = sampling_rate
self.frame_length = frame_length
self.channels = 2
self.sample_size = 2 * self.channels
self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length)
self.frame_size = self.samples_per_frame * self.sample_size
class BaseUtil(object):
def pipe(self, other, *args, **kwargs):
child = other(self, *args, **kwargs)
setattr(child, 'metadata', self.metadata)
setattr(child, '_parent', self)
return child
@property
def metadata(self):
return getattr(self, '_metadata', None)
@metadata.setter
def metadata(self, value):
self._metadata = value
@six.add_metaclass(abc.ABCMeta)
class BasePlayable(BaseUtil):
@abc.abstractmethod
def next_frame(self):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class BaseInput(BaseUtil):
@abc.abstractmethod
def read(self, size):
raise NotImplementedError
@abc.abstractmethod
def fileobj(self):
raise NotImplementedError
class OpusFilePlayable(BasePlayable, AbstractOpus):
"""
An input which reads opus data from a file or file-like object.
"""
def __init__(self, fobj, *args, **kwargs):
super(OpusFilePlayable, self).__init__(*args, **kwargs)
self.fobj = fobj
self.done = False
def next_frame(self):
if self.done:
return None
header = self.fobj.read(OPUS_HEADER_SIZE)
if len(header) < OPUS_HEADER_SIZE:
self.done = True
return None
data_size = struct.unpack('<h', header)[0]
data = self.fobj.read(data_size)
if len(data) < data_size:
self.done = True
return None
return data
class FFmpegInput(BaseInput, AbstractOpus):
def __init__(self, source='-', command='avconv', streaming=False, **kwargs):
super(FFmpegInput, self).__init__(**kwargs)
if source:
self.source = source
self.streaming = streaming
self.command = command
self._buffer = None
self._proc = None
def read(self, sz):
if self.streaming:
raise TypeError('Cannot read from a streaming FFmpegInput')
# First read blocks until the subprocess finishes
if not self._buffer:
data, _ = self.proc.communicate()
self._buffer = BufferedIO(data)
# Subsequent reads can just do dis thang
return self._buffer.read(sz)
def fileobj(self):
if self.streaming:
return self.proc.stdout
else:
return self
@property
def proc(self):
if not self._proc:
if callable(self.source):
self.source = self.source(self)
if isinstance(self.source, (tuple, list)):
self.source, self.metadata = self.source
args = [
self.command,
'-i', str(self.source),
'-f', 's16le',
'-ar', str(self.sampling_rate),
'-ac', str(self.channels),
'-loglevel', 'warning',
'pipe:1',
]
self._proc = subprocess.Popen(args, stdin=None, stdout=subprocess.PIPE)
return self._proc
class YoutubeDLInput(FFmpegInput):
def __init__(self, url=None, ie_info=None, *args, **kwargs):
super(YoutubeDLInput, self).__init__(None, *args, **kwargs)
self._url = url
self._ie_info = ie_info
self._info = None
self._info_lock = Semaphore()
@property
def info(self):
with self._info_lock:
if not self._info:
import youtube_dl
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'})
if self._url:
obj = ydl.extract_info(self._url, download=False, process=False)
if 'entries' in obj:
self._ie_info = list(obj['entries'])[0]
else:
self._ie_info = obj
self._info = ydl.process_ie_result(self._ie_info, download=False)
return self._info
@property
def _metadata(self):
return self.info
@classmethod
def many(cls, url, *args, **kwargs):
import youtube_dl
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'})
info = ydl.extract_info(url, download=False, process=False)
if 'entries' not in info:
yield cls(ie_info=info, *args, **kwargs)
return
for item in info['entries']:
yield cls(ie_info=item, *args, **kwargs)
@property
def source(self):
return self.info['url']
class BufferedOpusEncoderPlayable(BasePlayable, OpusEncoder, AbstractOpus):
def __init__(self, source, *args, **kwargs):
self.source = source
self.frames = Queue(kwargs.pop('queue_size', 4096))
# Call the AbstractOpus constructor, as we need properties it sets
AbstractOpus.__init__(self, *args, **kwargs)
# Then call the OpusEncoder constructor, which requires some properties
# that AbstractOpus sets up
OpusEncoder.__init__(self, self.sampling_rate, self.channels)
# Spawn the encoder loop
gevent.spawn(self._encoder_loop)
def _encoder_loop(self):
while self.source:
raw = self.source.read(self.frame_size)
if len(raw) < self.frame_size:
break
self.frames.put(self.encode(raw, self.samples_per_frame))
gevent.idle()
self.source = None
self.frames.put(None)
def next_frame(self):
return self.frames.get()
class DCADOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder):
def __init__(self, source, *args, **kwargs):
self.source = source
self.command = kwargs.pop('command', 'dcad')
self.on_complete = kwargs.pop('on_complete', None)
super(DCADOpusEncoderPlayable, self).__init__(*args, **kwargs)
self._done = False
self._proc = None
@property
def proc(self):
if not self._proc:
source = obj = self.source.fileobj()
if not hasattr(obj, 'fileno'):
source = subprocess.PIPE
self._proc = subprocess.Popen([
self.command,
'--channels', str(self.channels),
'--rate', str(self.sampling_rate),
'--size', str(self.samples_per_frame),
'--bitrate', '128',
'--fec',
'--packet-loss-percent', '30',
'--input', 'pipe:0',
'--output', 'pipe:1',
], stdin=source, stdout=subprocess.PIPE)
def writer():
while True:
data = obj.read(2048)
if len(data) > 0:
self._proc.stdin.write(data)
if len(data) < 2048:
break
if source == subprocess.PIPE:
gevent.spawn(writer)
return self._proc
def next_frame(self):
if self._done:
return None
header = self.proc.stdout.read(OPUS_HEADER_SIZE)
if len(header) < OPUS_HEADER_SIZE:
self._done = True
self.on_complete()
return
size = struct.unpack('<h', header)[0]
data = self.proc.stdout.read(size)
if len(data) < size:
self._done = True
self.on_complete()
return
return data
class FileProxyPlayable(BasePlayable, AbstractOpus):
def __init__(self, other, output, *args, **kwargs):
self.flush = kwargs.pop('flush', False)
self.on_complete = kwargs.pop('on_complete', None)
super(FileProxyPlayable, self).__init__(*args, **kwargs)
self.other = other
self.output = output
def next_frame(self):
frame = self.other.next_frame()
if frame:
self.output.write(struct.pack('<h', len(frame)))
self.output.write(frame)
if self.flush:
self.output.flush()
else:
self.output.flush()
self.on_complete()
self.output.close()
return frame
class PlaylistPlayable(BasePlayable, AbstractOpus):
def __init__(self, items, *args, **kwargs):
super(PlaylistPlayable, self).__init__(*args, **kwargs)
self.items = items
self.now_playing = None
def _get_next(self):
if isinstance(self.items, types.GeneratorType):
return next(self.items, None)
return self.items.pop()
def next_frame(self):
if not self.items:
return
if not self.now_playing:
self.now_playing = self._get_next()
if not self.now_playing:
return
frame = self.now_playing.next_frame()
if not frame:
return self.next_frame()
return frame
class MemoryBufferedPlayable(BasePlayable, AbstractOpus):
def __init__(self, other, *args, **kwargs):
from gevent.queue import Queue
super(MemoryBufferedPlayable, self).__init__(*args, **kwargs)
self.frames = Queue()
self.other = other
gevent.spawn(self._buffer)
def _buffer(self):
while True:
frame = self.other.next_frame()
if not frame:
break
self.frames.put(frame)
self.frames.put(None)
def next_frame(self):
return self.frames.get()

123
disco/voice/player.py

@ -1,123 +0,0 @@
import time
import gevent
from disco.voice.client import VoiceState
from disco.voice.queue import PlayableQueue
from disco.util.emitter import Emitter
from disco.util.logging import LoggingClass
class Player(LoggingClass):
class Events(object):
START_PLAY = 1
STOP_PLAY = 2
PAUSE_PLAY = 3
RESUME_PLAY = 4
DISCONNECT = 5
def __init__(self, client, queue=None):
super(Player, self).__init__()
self.client = client
# Queue contains playable items
self.queue = queue or PlayableQueue()
# Whether we're playing music (true for lifetime)
self.playing = True
# Set to an event when playback is paused
self.paused = None
# Current playing item
self.now_playing = None
# Current play task
self.play_task = None
# Core task
self.run_task = gevent.spawn(self.run)
# Event triggered when playback is complete
self.complete = gevent.event.Event()
# Event emitter for metadata
self.events = Emitter()
def disconnect(self):
self.client.disconnect()
self.events.emit(self.Events.DISCONNECT)
def skip(self):
self.play_task.kill()
def pause(self):
if self.paused:
return
self.paused = gevent.event.Event()
self.events.emit(self.Events.PAUSE_PLAY)
def resume(self):
if self.paused:
self.paused.set()
self.paused = None
self.events.emit(self.Events.RESUME_PLAY)
def play(self, item):
# Grab the first frame before we start anything else, sometimes playables
# can do some lengthy async tasks here to setup the playable and we
# don't want that lerp the first N frames of the playable into playing
# faster
frame = item.next_frame()
if frame is None:
return
start = time.time()
loops = 0
while True:
loops += 1
if self.paused:
self.client.set_speaking(False)
self.paused.wait()
gevent.sleep(2)
self.client.set_speaking(True)
start = time.time()
loops = 0
if self.client.state == VoiceState.DISCONNECTED:
return
if self.client.state != VoiceState.CONNECTED:
self.client.state_emitter.once(VoiceState.CONNECTED, timeout=30)
# Send the voice frame and increment our timestamp
self.client.send_frame(frame)
self.client.increment_timestamp(item.samples_per_frame)
frame = item.next_frame()
if frame is None:
return
next_time = start + 0.02 * loops
delay = max(0, 0.02 + (next_time - time.time()))
gevent.sleep(delay)
def run(self):
self.client.set_speaking(True)
while self.playing:
self.now_playing = self.queue.get()
self.events.emit(self.Events.START_PLAY, self.now_playing)
self.play_task = gevent.spawn(self.play, self.now_playing)
self.play_task.join()
self.events.emit(self.Events.STOP_PLAY, self.now_playing)
if self.client.state == VoiceState.DISCONNECTED:
self.playing = False
self.complete.set()
return
self.client.set_speaking(False)
self.disconnect()

52
disco/voice/queue.py

@ -1,52 +0,0 @@
import abc
import six
import gevent
import random
@six.add_metaclass(abc.ABCMeta)
class BaseQueue(object):
@abc.abstractmethod
def get(self):
raise NotImplementedError
class PlayableQueue(BaseQueue):
def __init__(self):
self._data = []
self._event = gevent.event.Event()
def append(self, item):
self._data.append(item)
if self._event:
self._event.set()
self._event = None
def _get(self):
if not len(self._data):
if not self._event:
self._event = gevent.event.Event()
self._event.wait()
return self._get()
return self._data.pop(0)
def get(self):
return self._get()
def shuffle(self):
random.shuffle(self._data)
def clear(self):
self._data = []
def __len__(self):
return len(self._data)
def __iter__(self):
return self._data.__iter__()
def __nonzero__(self):
return True
__bool__ = __nonzero__

345
disco/voice/udp.py

@ -1,345 +0,0 @@
import struct
import socket
import gevent
from collections import namedtuple
try:
import nacl.secret
except ImportError:
print('WARNING: nacl is not installed, voice support is disabled')
from disco.util.logging import LoggingClass
AudioCodecs = ('opus',)
class RTPPayloadTypes(object):
OPUS = 0x78
ALL = {OPUS}
class RTCPPayloadTypes(object):
SENDER_REPORT = 200
RECEIVER_REPORT = 201
SOURCE_DESCRIPTION = 202
BYE = 203
APP = 204
RTPFB = 205
PSFB = 206
ALL = {
SENDER_REPORT, RECEIVER_REPORT, SOURCE_DESCRIPTION, BYE, APP, RTPFB, PSFB,
}
MAX_UINT32 = 4294967295
MAX_SEQUENCE = 65535
RTP_HEADER_VERSION = 0x80 # Only RTP Version is set here (value of 2 << 6)
RTP_EXTENSION_ONE_BYTE = (0xBE, 0xDE)
RTPHeader = namedtuple('RTPHeader', [
'version',
'padding',
'extension',
'csrc_count',
'marker',
'payload_type',
'sequence',
'timestamp',
'ssrc',
])
RTCPHeader = namedtuple('RTCPHeader', [
'version',
'padding',
'reception_count',
'packet_type',
'length',
'ssrc',
])
RTCPData = namedtuple('RTCPData', [
'client',
'user_id',
'payload_type',
'header',
'data',
])
VoiceData = namedtuple('VoiceData', [
'client',
'user_id',
'payload_type',
'rtp',
'nonce',
'data',
])
class UDPVoiceClient(LoggingClass):
def __init__(self, vc):
super(UDPVoiceClient, self).__init__()
self.vc = vc
# The underlying UDP socket
self.conn = None
# Connection information
self.ip = None
self.port = None
self.connected = False
# Voice information
self.sequence = 0
self.timestamp = 0
self._nonce = 0
self._run_task = None
self._secret_box = None
# RTP Header
self._rtp_audio_header = bytearray(12)
self._rtp_audio_header[0] = RTP_HEADER_VERSION
def set_audio_codec(self, codec):
if codec not in AudioCodecs:
raise Exception('Unsupported audio codec received, {}'.format(codec))
ptype = getattr(RTPPayloadTypes, codec.upper())
self._rtp_audio_header[1] = ptype
self.log.debug('[%s] Set UDP\'s Audio Codec to %s, RTP payload type %s', self.vc, codec, ptype)
def increment_timestamp(self, by):
self.timestamp += by
if self.timestamp > MAX_UINT32:
self.timestamp = 0
def setup_encryption(self, encryption_key):
self._secret_box = nacl.secret.SecretBox(encryption_key)
def send_frame(self, frame, sequence=None, timestamp=None, incr_timestamp=None):
# Convert the frame to a bytearray
frame = bytearray(frame)
# Pack the rtc header into our buffer
struct.pack_into('>H', self._rtp_audio_header, 2, sequence or self.sequence)
struct.pack_into('>I', self._rtp_audio_header, 4, timestamp or self.timestamp)
struct.pack_into('>i', self._rtp_audio_header, 8, self.vc.ssrc_audio)
if self.vc.mode == 'xsalsa20_poly1305_lite':
# Use an incrementing number as a nonce, only first 4 bytes of the nonce is padded on
self._nonce += 1
if self._nonce > MAX_UINT32:
self._nonce = 0
nonce = bytearray(24)
struct.pack_into('>I', nonce, 0, self._nonce)
nonce_padding = nonce[:4]
elif self.vc.mode == 'xsalsa20_poly1305_suffix':
# Generate a nonce
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
nonce_padding = nonce
elif self.vc.mode == 'xsalsa20_poly1305':
# Nonce is the header
nonce = bytearray(24)
nonce[:12] = self._rtp_audio_header
nonce_padding = None
else:
raise Exception('The voice mode, {}, isn\'t supported.'.format(self.vc.mode))
# Encrypt the payload with the nonce
payload = self._secret_box.encrypt(bytes(frame), bytes(nonce)).ciphertext
# Pad the payload with the nonce, if applicable
if nonce_padding:
payload += nonce_padding
# Send the header (sans nonce padding) plus the payload
self.send(self._rtp_audio_header + payload)
# Increment our sequence counter
self.sequence += 1
if self.sequence >= MAX_SEQUENCE:
self.sequence = 0
# Increment our timestamp (if applicable)
if incr_timestamp:
self.timestamp += incr_timestamp
def run(self):
while True:
data, addr = self.conn.recvfrom(4096)
# Data cannot be less than the bare minimum, just ignore
if len(data) <= 12:
self.log.debug('[%s] [VoiceData] Received voice data under 13 bytes', self.vc)
continue
first, second = struct.unpack_from('>BB', data)
if second in RTCPPayloadTypes.ALL:
length, ssrc = struct.unpack_from('>HI', data, 2)
rtcp = RTCPHeader(
version=first >> 6,
padding=(first >> 5) & 1,
reception_count=first & 0x1F,
packet_type=second,
length=length,
ssrc=ssrc,
)
if rtcp.ssrc == self.vc.ssrc_rtcp:
user_id = self.vc.user_id
else:
rtcp_ssrc = rtcp.ssrc
if rtcp_ssrc:
rtcp_ssrc -= 3
user_id = self.vc.audio_ssrcs.get(rtcp_ssrc, None)
payload = RTCPData(
client=self.vc,
user_id=user_id,
payload_type=second,
header=rtcp,
data=data[8:],
)
self.vc.client.gw.events.emit('RTCPData', payload)
else:
sequence, timestamp, ssrc = struct.unpack_from('>HII', data, 2)
rtp = RTPHeader(
version=first >> 6,
padding=(first >> 5) & 1,
extension=(first >> 4) & 1,
csrc_count=first & 0x0F,
marker=second >> 7,
payload_type=second & 0x7F,
sequence=sequence,
timestamp=timestamp,
ssrc=ssrc,
)
# Check if rtp version is 2
if rtp.version != 2:
self.log.debug('[%s] [VoiceData] Received an invalid RTP packet version, %s', self.vc, rtp.version)
continue
# Unsupported payload type received
if rtp.payload_type not in RTPPayloadTypes.ALL:
self.log.debug('[%s] [VoiceData] Received unsupported payload type, %s', self.vc, rtp.payload_type)
continue
nonce = bytearray(24)
if self.vc.mode == 'xsalsa20_poly1305_lite':
nonce[:4] = data[-4:]
data = data[:-4]
elif self.vc.mode == 'xsalsa20_poly1305_suffx':
nonce[:24] = data[-24:]
data = data[:-24]
elif self.vc.mode == 'xsalsa20_poly1305':
nonce[:12] = data[:12]
else:
self.log.debug('[%s] [VoiceData] Unsupported Encryption Mode, %s', self.vc, self.vc.mode)
continue
try:
data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce))
except Exception:
self.log.debug('[%s] [VoiceData] Failed to decode data from ssrc %s', self.vc, rtp.ssrc)
continue
# RFC3550 Section 5.1 (Padding)
if rtp.padding:
padding_amount, = struct.unpack_from('>B', data[:-1])
data = data[-padding_amount:]
if rtp.extension:
# RFC5285 Section 4.2: One-Byte Header
rtp_extension_header = struct.unpack_from('>BB', data)
if rtp_extension_header == RTP_EXTENSION_ONE_BYTE:
data = data[2:]
fields_amount, = struct.unpack_from('>H', data)
fields = []
offset = 4
for i in range(fields_amount):
first_byte, = struct.unpack_from('>B', data[:offset])
offset += 1
rtp_extension_identifer = first_byte & 0xF
rtp_extension_len = ((first_byte >> 4) & 0xF) + 1
# Ignore data if identifer == 15, so skip if this is set as 0
if rtp_extension_identifer:
fields.append(data[offset:offset + rtp_extension_len])
offset += rtp_extension_len
# skip padding
while data[offset] == 0:
offset += 1
if len(fields):
fields.append(data[offset:])
data = b''.join(fields)
else:
data = data[offset:]
# RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header
# clients send it sometimes, definitely on fresh connects to a server, dunno what to do here
if rtp.marker:
self.log.debug('[%s] [VoiceData] Received RTP data with the marker set, skipping', self.vc)
continue
payload = VoiceData(
client=self.vc,
user_id=self.vc.audio_ssrcs.get(rtp.ssrc, None),
payload_type=second,
rtp=rtp,
nonce=nonce,
data=data,
)
self.vc.client.gw.events.emit('VoiceData', payload)
def send(self, data):
self.conn.sendto(data, (self.ip, self.port))
def disconnect(self):
self._run_task.kill()
def connect(self, host, port, timeout=10, addrinfo=None):
self.ip = socket.gethostbyname(host)
self.port = port
self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if addrinfo:
ip, port = addrinfo
else:
# Send discovery packet
packet = bytearray(70)
struct.pack_into('>I', packet, 0, self.vc.ssrc)
self.send(packet)
# Wait for a response
try:
data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout)
except gevent.Timeout:
return (None, None)
# Read IP and port
ip = str(data[4:]).split('\x00', 1)[0]
port = struct.unpack('<H', data[-2:])[0]
# Spawn read thread so we don't max buffers
self.connected = True
self._run_task = gevent.spawn(self.run)
return (ip, port)

74
examples/music.py

@ -1,56 +1,32 @@
from disco.bot import Plugin
from disco.bot.command import CommandError
from disco.voice.player import Player
from disco.voice.playable import YoutubeDLInput, BufferedOpusEncoderPlayable
from disco.voice.client import VoiceException
from disco.voice import VoiceConnection, YoutubeDLPlayable
class MusicPlugin(Plugin):
def load(self, ctx):
super(MusicPlugin, self).load(ctx)
self.guilds = {}
def load(self, data):
super(MusicPlugin, self).load(data)
self._connections = {}
@Plugin.command('join')
def on_join(self, event):
if event.guild.id in self.guilds:
return event.msg.reply("I'm already playing music here.")
state = event.guild.get_member(event.author).get_voice_state()
if not state:
return event.msg.reply('You must be connected to voice to use that command.')
try:
client = state.channel.connect()
except VoiceException as e:
return event.msg.reply('Failed to connect to voice: `{}`'.format(e))
self.guilds[event.guild.id] = Player(client)
self.guilds[event.guild.id].complete.wait()
del self.guilds[event.guild.id]
def get_player(self, guild_id):
if guild_id not in self.guilds:
raise CommandError("I'm not currently playing music here.")
return self.guilds.get(guild_id)
@Plugin.command('leave')
def on_leave(self, event):
player = self.get_player(event.guild.id)
player.disconnect()
@Plugin.command('play', '<url:str>')
def on_play(self, event, url):
item = YoutubeDLInput(url).pipe(BufferedOpusEncoderPlayable)
self.get_player(event.guild.id).queue.append(item)
@Plugin.command('pause')
def on_pause(self, event):
self.get_player(event.guild.id).pause()
@Plugin.command('resume')
def on_resume(self, event):
self.get_player(event.guild.id).resume()
@Plugin.command('kill')
def on_kill(self, event):
self.get_player(event.guild.id).client.ws.sock.shutdown()
vs = event.guild.get_member(event.author).get_voice_state()
if not vs:
return event.msg.reply('you are not in a voice channel')
if event.guild.id in self._connections:
if self._connections[event.guild.id].channel_id == vs.channel_id:
return event.msg.reply('already in that channel')
else:
self._connections[event.guild.id].set_channel(vs.channel)
return
self._connections[event.guild.id] = VoiceConnection.from_channel(vs.channel, enable_events=True)
@Plugin.command('play', '<song:str>')
def on_play(self, event, song):
if event.guild.id not in self._connections:
return event.msg.reply('not in voice here')
playables = list(YoutubeDLPlayable.from_url(song))
for playable in playables:
self._connections[event.guild.id].play(playable)

2
setup.py

@ -10,7 +10,7 @@ with open('README.md') as f:
readme = f.read()
extras_require = {
'voice': ['pynacl==1.2.1'],
'voice': ['telecom-py==0.0.4'],
'http': ['flask==0.12.2'],
'yaml': ['pyyaml==3.12'],
'music': ['youtube_dl>=2018.1.21'],

5
tests/imports.py

@ -34,8 +34,3 @@ from disco.util.logging import *
from disco.util.serializer import *
from disco.util.snowflake import *
from disco.util.websocket import *
from disco.voice.client import *
from disco.voice.opus import *
from disco.voice.packets import *
from disco.voice.playable import *
from disco.voice.player import *

0
tests/voice/__init__.py

66
tests/voice/queue.py

@ -1,66 +0,0 @@
import gevent
from unittest import TestCase
from disco.voice.queue import PlayableQueue
class TestPlayableQueue(TestCase):
def test_append(self):
q = PlayableQueue()
q.append(1)
q.append(2)
q.append(3)
self.assertEqual(q._data, [1, 2, 3])
self.assertEqual(q.get(), 1)
self.assertEqual(q.get(), 2)
self.assertEqual(q.get(), 3)
def test_len(self):
q = PlayableQueue()
for idx in range(1234):
q.append(idx)
self.assertEqual(len(q), 1234)
def test_iter(self):
q = PlayableQueue()
for idx in range(5):
q.append(idx)
self.assertEqual(sum(q), 10)
def test_blocking_get(self):
q = PlayableQueue()
result = gevent.event.AsyncResult()
def get():
result.set(q.get())
gevent.spawn(get)
q.append(5)
self.assertEqual(result.get(), 5)
def test_shuffle(self):
q = PlayableQueue()
for idx in range(10000):
q.append(idx)
self.assertEqual(q._data[0], 0)
q.shuffle()
self.assertNotEqual(q._data[0], 0)
def test_clear(self):
q = PlayableQueue()
for idx in range(100):
q.append(idx)
self.assertEqual(q._data[0], 0)
self.assertEqual(q._data[-1], 99)
self.assertEqual(len(q), 100)
q.clear()
self.assertEqual(len(q), 0)
Loading…
Cancel
Save