Compare commits
38 Commits
master
...
staging/v1
Author | SHA1 | Date |
---|---|---|
|
b5d26190a6 | 5 years ago |
|
4bed5b8494 | 5 years ago |
|
29ade49209 | 5 years ago |
|
76a4b8df94 | 5 years ago |
|
8dbe0421f6 | 5 years ago |
|
bbddbc4956 | 5 years ago |
|
9984edcbf3 | 5 years ago |
|
d89562204f | 5 years ago |
|
425a2f9bc4 | 5 years ago |
|
40d22921fa | 5 years ago |
|
688f2d3378 | 6 years ago |
|
af03a114d0 | 6 years ago |
|
5864160139 | 6 years ago |
|
7a1c1a193d | 6 years ago |
|
6f4d982aca | 6 years ago |
|
ced24273ce | 6 years ago |
|
d4c9f961de | 6 years ago |
|
dc752d248a | 6 years ago |
|
f19a14f6b5 | 6 years ago |
|
4fb136ee08 | 6 years ago |
|
1c23c1b9a9 | 6 years ago |
|
f3e36be931 | 6 years ago |
|
ca9378c833 | 6 years ago |
|
70ed08c7f5 | 6 years ago |
|
5223f886b2 | 6 years ago |
|
ca087ea8f6 | 6 years ago |
|
0082ad9a15 | 6 years ago |
|
405488c728 | 6 years ago |
|
4a3295e63d | 6 years ago |
|
746d0d13e5 | 6 years ago |
|
250d7d0d8f | 6 years ago |
|
7420d170b7 | 6 years ago |
|
92e940fcf9 | 6 years ago |
|
1fcb01dcea | 6 years ago |
|
2f44aebb6a | 6 years ago |
|
5bd1b92e60 | 6 years ago |
|
839682291b | 6 years ago |
|
bb9e6bb356 | 6 years ago |
59 changed files with 1618 additions and 2295 deletions
@ -1,118 +0,0 @@ |
|||
# CHANGELOG |
|||
|
|||
## v0.0.12 |
|||
|
|||
### Additions |
|||
|
|||
- **MAJOR** Added voice gateway v3 support. This will result in increased stability for voice connections |
|||
- **BREAKING** Updated holster to v2.0.0 which changes the way emitters work (and removes the previous priorities). A migration guide will be provided post-RC cycle. |
|||
- Added support for ETF on Python 3.x via `earl-etf` (@GiovanniMCMXCIX) |
|||
- Supported detecting dead/inactive/zombied Gateway websocket connections via tracking `HEARTBEAT_ACK` (@PixeLInc) |
|||
- Added support for animated emoji (@Seklfreak) |
|||
- Added support for `LISTENING` and `WATCHING` game statuses (@PixeLInc) |
|||
- Added `wsaccel` package within the `performance` pack, should improve websocket performance |
|||
- Added the concept of a `shared_config` which propgates its options to all plugin configs (@enkoder) |
|||
- Added support for streaming zlib compression to our gateway socket. This is enabled by default and provides significant performance improvements on startup and overall bandwidth usage |
|||
- Added support for `Guild.system_channel_id` and `GUILD_MEMBER_JOIN` system message |
|||
- Added `Guild.create_category`, `Guild.create_text_channel` and `Guild.create_voice_channel` |
|||
- Added `Channel.create_text_channel` and `Channel.create_voice_channel` which can be called only on category channels to add sub-channels |
|||
|
|||
### Fixes |
|||
|
|||
- Fixed 'Invalid token passed' errors from showing up (via removal of token validation) |
|||
- Fixed `IndexError` being raised when `MessageIterator` was done iterating (@Majora320) |
|||
- Fixed overwrite calculations in `Channel.get_permissions` (@cookkkie) |
|||
- A plethora of PEP8 and general syntax changes have been made to cleanup the code |
|||
- Fixed a bug with `Emoji.custom` |
|||
- Fixed a bug in the typing system that would not allow Field's to have a `default` of `None` |
|||
- Fixed the `__str__` method for Channel's displaying (useless) unset data for DMs |
|||
- Fixed a bug with `MessageIterator` related to iterating before or after an ID of 0 |
|||
- Fixed incorrect field name (`icon_proxy_url` vs `proxy_icon_url`) in MessageEmbedAuthor model |
|||
- Fixed bugs related to creating and deleting pinned messages |
|||
- Fixed `GuildBan.reason` incorrectly handling unicode reasons |
|||
- Fixed `Paginator` throwing an exception when reaching the end of pagination, instead of just ending its iteration |
|||
- Fixed `Paginator` defaulting to start at 0 for all iterations |
|||
|
|||
### Etc |
|||
|
|||
- **BREAKING** Refactor the way Role's are managed and updated. You should update your code to use `Role.update` |
|||
- **BREAKING** Renamed `Model.update` to `Model.inplace_update`. You should not have to worry about this change unless you explicitly call that method |
|||
- **DEPRECATION** Deprecated the use of `Guild.create_channel`. You should use the explicit channel type creation methods added in this release |
|||
- Cleaned up various documentation |
|||
- Removed some outdated storage/etc examples |
|||
- Expanded `APIClient.guilds_roles_create` to handle more attributes |
|||
- Bumped various requirement versions |
|||
|
|||
## v0.0.11 |
|||
|
|||
### Additions |
|||
|
|||
- Added support for Guild audit logs, exposed via `Guild.get_audit_log_entries`, `Guild.audit_log` and `Guild.audit_log_iter`. For more information see the `AuditLogEntry` model |
|||
- Added built-in Flask HTTP server which can be enabled via `http_enabled` and configured via `http_host`/`http_port` config options. The server allows plugins to define routes which can be called externally. |
|||
- Added support for capturing the raw responses returned from API requests via the `APIClient.capture` contextmanager |
|||
- Added support for NSFW channels via `Channel.nsfw` and `Channel.is_nsfw` |
|||
- Added initial support for channel categories via `Channel.parent_id` and `Channel.parent` |
|||
- Added various setters for updating Channel properties, e.g. `Channel.set_topic` |
|||
- Added support for audit log reasons, accessible through passing `reason` to various methods |
|||
- Added `disco.util.snowflake.from_timestamp_ms` |
|||
- Added support for `on_complete` callback within DCADOpusEncoderPlayable |
|||
- **BREAKING** Added new custom queue types `BaseQueue`/`PlayableQueue` for use w/ `Player`. |
|||
- `queue` can be passed when creating a `Player`, should inherit from BaseQueue |
|||
- Users who previously utilized the `put` method of the old `Player.queue` must move to using `Player.queue.append`, or providing a custom queue implementation. |
|||
- Added `Emoji.custom` property |
|||
|
|||
### Fixes |
|||
|
|||
- Fixed GuildRoleCreate missing guild\_id, resulting in incorrect state |
|||
- Fixed SimpleLimiter behaving incorrectly (causing GW socket to be ratelimited in some cases) |
|||
- Fixed the shortest possible match for a single command being an empty string |
|||
- Fixed group matching being overly greedy, which allowed for extra characters to be allowed at the end of a group match |
|||
- Fixed errors thrown when not enabling manhole via cli |
|||
- Fixed various warnings emitted due to useage of StopIteration |
|||
- Fixed warnings about missing voice libs when importing `disco.types.channel` |
|||
- Fixed `Bot.get_commands_for_message` returning None (instead of empty list) in some cases |
|||
|
|||
### Etc |
|||
|
|||
- Greatly imrpoved the performance of `HashMap` |
|||
- **BREAKING** Increased the weight of group matches over command argument matches, and limited the number of commands executed per message to one. |
|||
- Reuse a buffer in voice code to slightly improve performance |
|||
|
|||
## v0.0.11-rc.8 |
|||
|
|||
### Additions |
|||
|
|||
- Added support for capturing the raw responses returned from the API via `APIClient.capture` contextmanager |
|||
- Added various pieces of documentation |
|||
|
|||
### Fixes |
|||
|
|||
- Fixed Python 3 errors and Python 2 deprecation warnings for CommandError using `.message` attribute |
|||
|
|||
### ETC |
|||
|
|||
- Grealty improved the performance of the custom HashMap |
|||
- Moved tests around and added pytest as the testing framework of choice |
|||
|
|||
|
|||
## v0.0.11-rc.7 |
|||
|
|||
### Additions |
|||
|
|||
- Added support for new NSFW attribute of channels |
|||
- `Channel.nsfw` |
|||
- `Channel.set_nsfw` |
|||
- `Channel.is_nsfw` behaves correctly, checking both the deprecated `nsfw-` prefix and the new attribute |
|||
- Added support for `on_complete` callback within DCADOpusEncoderPlayable |
|||
- **BREAKING** Added new custom queue types `BaseQueue`/`PlayableQueue` for use w/ `Player`. |
|||
- `queue` can be passed when creating a `Player`, should inherit from BaseQueue |
|||
- Users who previously utilized the `put` method of the old `Player.queue` must move to using `Player.queue.append`, or providing a custom queue implementation. |
|||
|
|||
### Fixes |
|||
|
|||
- Fixed bug within SimpleLimiter which would cause all events after a quiescent period to be immedietly dispatched. This would cause gateway disconnects w/ RATE\_LIMITED on clients with many Guilds and member sync enabled. |
|||
|
|||
### ETC |
|||
|
|||
- Improved log messages within GatewayClient |
|||
- Log voice endpoint within VoiceClient |
@ -1,20 +1,17 @@ |
|||
from holster.enum import Enum |
|||
|
|||
SEND = 1 |
|||
RECV = 2 |
|||
|
|||
OPCode = Enum( |
|||
DISPATCH=0, |
|||
HEARTBEAT=1, |
|||
IDENTIFY=2, |
|||
STATUS_UPDATE=3, |
|||
VOICE_STATE_UPDATE=4, |
|||
VOICE_SERVER_PING=5, |
|||
RESUME=6, |
|||
RECONNECT=7, |
|||
REQUEST_GUILD_MEMBERS=8, |
|||
INVALID_SESSION=9, |
|||
HELLO=10, |
|||
HEARTBEAT_ACK=11, |
|||
GUILD_SYNC=12, |
|||
) |
|||
|
|||
class OPCode(object): |
|||
DISPATCH = 0 |
|||
HEARTBEAT = 1 |
|||
IDENTIFY = 2 |
|||
STATUS_UPDATE = 3 |
|||
VOICE_STATE_UPDATE = 4 |
|||
VOICE_SERVER_PING = 5 |
|||
RESUME = 6 |
|||
RECONNECT = 7 |
|||
REQUEST_GUILD_MEMBERS = 8 |
|||
INVALID_SESSION = 9 |
|||
HELLO = 10 |
|||
HEARTBEAT_ACK = 11 |
|||
|
@ -0,0 +1,85 @@ |
|||
from disco.types.base import SlottedModel, Field, ListField, snowflake, text, enum |
|||
from disco.types.guild import Integration |
|||
from disco.types.user import User |
|||
from disco.util.snowflake import to_snowflake |
|||
|
|||
|
|||
class TeamMembershipState(object): |
|||
INVITED = 1 |
|||
ACCEPTED = 2 |
|||
|
|||
|
|||
class TeamMember(SlottedModel): |
|||
membership_state = Field(enum(TeamMembershipState)) |
|||
permissions = Field(text) |
|||
team_id = Field(snowflake) |
|||
user = Field(User) |
|||
|
|||
|
|||
class Team(SlottedModel): |
|||
icon = Field(text) |
|||
id = Field(snowflake) |
|||
members = ListField(TeamMember) |
|||
owner_user_id = Field(snowflake) |
|||
|
|||
|
|||
class Application(SlottedModel): |
|||
id = Field(snowflake) |
|||
name = Field(text) |
|||
icon = Field(text) |
|||
description = Field(text) |
|||
rpc_origins = ListField(text) |
|||
bot_public = Field(bool) |
|||
bot_require_code_grant = Field(bool) |
|||
owner = Field(User) |
|||
summary = Field(text) |
|||
verify_key = Field(text) |
|||
team = Field(Team) |
|||
guild_id = Field(snowflake) |
|||
primary_sku_id = Field(snowflake) |
|||
slug = Field(text) |
|||
cover_image = Field(text) |
|||
|
|||
def user_is_owner(self, user): |
|||
user_id = to_snowflake(user) |
|||
if user_id == self.owner.id: |
|||
return True |
|||
|
|||
return any(user_id == member.user.id for member in self.team.members) |
|||
|
|||
def get_icon_url(self, fmt='webp', size=1024): |
|||
if not self.icon: |
|||
return '' |
|||
|
|||
return 'https://cdn.discordapp.com/app-icons/{}/{}.{}?size={}'.format(self.id, self.icon, fmt, size) |
|||
|
|||
def get_cover_image_url(self, fmt='webp', size=1024): |
|||
if not self.cover_image: |
|||
return '' |
|||
|
|||
return 'https://cdn.discordapp.com/app-icons/{}/{}.{}?size={}'.format(self.id, self.cover_image, fmt, size) |
|||
|
|||
@property |
|||
def icon_url(self): |
|||
return self.get_icon_url() |
|||
|
|||
@property |
|||
def cover_image_url(self): |
|||
return self.get_cover_image_url() |
|||
|
|||
|
|||
class ConnectionVisibility(object): |
|||
NOBODY = 0 |
|||
EVERYONE = 1 |
|||
|
|||
|
|||
class Connection(SlottedModel): |
|||
id = Field(text) |
|||
name = Field(text) |
|||
type = Field(text) |
|||
revoked = Field(bool) |
|||
integrations = ListField(Integration) |
|||
verified = Field(bool) |
|||
friend_sync = Field(bool) |
|||
show_activity = Field(bool) |
|||
visibility = Field(enum(ConnectionVisibility)) |
@ -0,0 +1,175 @@ |
|||
import gevent |
|||
|
|||
from collections import defaultdict |
|||
from gevent.event import AsyncResult |
|||
from gevent.queue import Queue, Full |
|||
|
|||
|
|||
from disco.util.logging import LoggingClass |
|||
|
|||
|
|||
class Priority(object): |
|||
# BEFORE is the most dangerous priority level. Every event that flows through |
|||
# the given emitter instance will be dispatched _sequentially_ to all BEFORE |
|||
# handlers. Until these before handlers complete execution, no other event |
|||
# will be allowed to continue. Any exceptions raised will be ignored. |
|||
BEFORE = 1 |
|||
|
|||
# AFTER has the same behavior as before with regards to dispatching events, |
|||
# with the one difference being it executes after all the BEFORE listeners. |
|||
AFTER = 2 |
|||
|
|||
# SEQUENTIAL guarantees that all events your handler receives will be ordered |
|||
# when looked at in isolation. SEQUENTIAL handlers will not block other handlers, |
|||
# but do use a queue internally and thus can fall behind. |
|||
SEQUENTIAL = 3 |
|||
|
|||
# NONE provides no guarantees around the ordering or execution of events, sans |
|||
# that BEFORE handlers will always complete before any NONE handlers are called. |
|||
NONE = 4 |
|||
|
|||
ALL = {BEFORE, AFTER, SEQUENTIAL, NONE} |
|||
|
|||
|
|||
class Event(object): |
|||
def __init__(self, parent, data): |
|||
self.parent = parent |
|||
self.data = data |
|||
|
|||
def __getattr__(self, name): |
|||
if hasattr(self.data, name): |
|||
return getattr(self.data, name) |
|||
raise AttributeError |
|||
|
|||
|
|||
class EmitterSubscription(object): |
|||
def __init__(self, events, callback, priority=Priority.NONE, conditional=None, metadata=None, max_queue_size=8096): |
|||
self.events = events |
|||
self.callback = callback |
|||
self.priority = priority |
|||
self.conditional = conditional |
|||
self.metadata = metadata or {} |
|||
self.max_queue_size = max_queue_size |
|||
|
|||
self._emitter = None |
|||
self._queue = None |
|||
self._queue_greenlet = None |
|||
|
|||
if priority == Priority.SEQUENTIAL: |
|||
self._queue_greenlet = gevent.spawn(self._queue_handler) |
|||
|
|||
def __del__(self): |
|||
if self._emitter: |
|||
self.detach() |
|||
|
|||
if self._queue_greenlet: |
|||
self._queue_greenlet.kill() |
|||
|
|||
def __call__(self, *args, **kwargs): |
|||
if self._queue is not None: |
|||
try: |
|||
self._queue.put_nowait((args, kwargs)) |
|||
except Full: |
|||
# TODO: warning |
|||
pass |
|||
return |
|||
|
|||
if callable(self.conditional): |
|||
if not self.conditional(*args, **kwargs): |
|||
return |
|||
return self.callback(*args, **kwargs) |
|||
|
|||
def _queue_handler(self): |
|||
self._queue = Queue(self.max_queue_size) |
|||
|
|||
while True: |
|||
args, kwargs = self._queue.get() |
|||
try: |
|||
self.callback(*args, **kwargs) |
|||
except Exception: |
|||
# TODO: warning |
|||
pass |
|||
|
|||
def attach(self, emitter): |
|||
self._emitter = emitter |
|||
|
|||
for event in self.events: |
|||
self._emitter.event_handlers[self.priority][event].append(self) |
|||
|
|||
return self |
|||
|
|||
def detach(self, emitter=None): |
|||
emitter = emitter or self._emitter |
|||
|
|||
for event in self.events: |
|||
if self in emitter.event_handlers[self.priority][event]: |
|||
emitter.event_handlers[self.priority][event].remove(self) |
|||
|
|||
def remove(self, emitter=None): |
|||
self.detach(emitter) |
|||
|
|||
|
|||
class Emitter(LoggingClass): |
|||
def __init__(self): |
|||
self.event_handlers = { |
|||
k: defaultdict(list) for k in Priority.ALL |
|||
} |
|||
|
|||
def emit(self, name, *args, **kwargs): |
|||
# First execute all BEFORE handlers sequentially |
|||
for listener in self.event_handlers[Priority.BEFORE].get(name, []): |
|||
try: |
|||
listener(*args, **kwargs) |
|||
except Exception as e: |
|||
self.log.warning('BEFORE {} event handler `{}` raised {}: {}'.format( |
|||
name, |
|||
listener.callback.__name__, |
|||
e.__class__.__name__, |
|||
e, |
|||
)) |
|||
|
|||
# Next execute all AFTER handlers sequentially |
|||
for listener in self.event_handlers[Priority.AFTER].get(name, []): |
|||
try: |
|||
listener(*args, **kwargs) |
|||
except Exception as e: |
|||
self.log.warning('AFTER {} event handler `{}` raised {}: {}'.format( |
|||
name, |
|||
listener.callback.__name__, |
|||
e.__class__.__name__, |
|||
e, |
|||
)) |
|||
|
|||
# Next enqueue all sequential handlers. This just puts stuff into a queue |
|||
# without blocking, so we don't have to worry too much |
|||
for listener in self.event_handlers[Priority.SEQUENTIAL].get(name, []): |
|||
listener(*args, **kwargs) |
|||
|
|||
# Finally just spawn for everything else |
|||
for listener in self.event_handlers[Priority.NONE].get(name, []): |
|||
gevent.spawn(listener, *args, **kwargs) |
|||
|
|||
def on(self, *args, **kwargs): |
|||
return EmitterSubscription(args[:-1], args[-1], **kwargs).attach(self) |
|||
|
|||
def once(self, *args, **kwargs): |
|||
result = AsyncResult() |
|||
li = None |
|||
|
|||
def _f(e): |
|||
result.set(e) |
|||
li.detach() |
|||
|
|||
li = self.on(*args + (_f, )) |
|||
|
|||
return result.wait(kwargs.pop('timeout', None)) |
|||
|
|||
def wait(self, *args, **kwargs): |
|||
result = AsyncResult() |
|||
match = args[-1] |
|||
|
|||
def _f(e): |
|||
if match(e): |
|||
result.set(e) |
|||
|
|||
return result.wait(kwargs.pop('timeout', None)) |
@ -0,0 +1,20 @@ |
|||
import six |
|||
|
|||
|
|||
def get_enum_members(enum): |
|||
for k, v in six.iteritems(enum.__dict__): |
|||
if not isinstance(k, six.string_types): |
|||
continue |
|||
|
|||
if k.startswith('_') or not k.isupper(): |
|||
continue |
|||
|
|||
yield k, v |
|||
|
|||
|
|||
def get_enum_value_by_name(enum, name): |
|||
name = name.lower() |
|||
|
|||
for k, v in get_enum_members(enum): |
|||
if k.lower() == name: |
|||
return v |
@ -0,0 +1,27 @@ |
|||
import gevent |
|||
import weakref |
|||
|
|||
|
|||
class ThreadLocal(object): |
|||
___slots__ = ['storage'] |
|||
|
|||
def __init__(self): |
|||
self.storage = weakref.WeakKeyDictionary() |
|||
|
|||
def get(self): |
|||
if gevent.getcurrent() not in self.storage: |
|||
self.storage[gevent.getcurrent()] = {} |
|||
return self.storage[gevent.getcurrent()] |
|||
|
|||
def drop(self): |
|||
if gevent.getcurrent() in self.storage: |
|||
del self.storage[gevent.getcurrent()] |
|||
|
|||
def __contains__(self, key): |
|||
return key in self.get() |
|||
|
|||
def __getitem__(self, item): |
|||
return self.get()[item] |
|||
|
|||
def __setitem__(self, item, value): |
|||
self.get()[item] = value |
@ -0,0 +1,176 @@ |
|||
import os |
|||
import json |
|||
|
|||
import gevent |
|||
from gevent.os import make_nonblocking, nb_read |
|||
|
|||
from disco.gateway.packets import OPCode |
|||
from disco.types.channel import Channel |
|||
from disco.util.emitter import Emitter |
|||
from telecom import TelecomConnection, AvConvPlayable |
|||
|
|||
try: |
|||
import youtube_dl |
|||
ytdl = youtube_dl.YoutubeDL() |
|||
except ImportError: |
|||
ytdl = None |
|||
|
|||
|
|||
class YoutubeDLPlayable(AvConvPlayable): |
|||
def __init__(self, url): |
|||
url = next(self.from_url(url), None) |
|||
if not url: |
|||
raise Exception('No result found for URL {}'.format(url)) |
|||
super(YoutubeDLPlayable, self).__init__(url) |
|||
|
|||
@classmethod |
|||
def from_url(cls, url): |
|||
assert ytdl is not None, 'YoutubeDL isn\'t installed' |
|||
|
|||
results = ytdl.extract_info(url, download=False) |
|||
if 'entries' not in results: |
|||
results = [results] |
|||
else: |
|||
results = results['entries'] |
|||
|
|||
for result in results: |
|||
audio_formats = [fmt for fmt in result['formats'] if fmt['vcodec'] == 'none' and fmt['acodec'] == 'opus'] |
|||
if not audio_formats: |
|||
raise Exception("Couldn't find valid audio format for {}".format(url)) |
|||
|
|||
best_audio_format = sorted(audio_formats, key=lambda i: i['abr'], reverse=True)[0] |
|||
yield AvConvPlayable(best_audio_format['url']) |
|||
|
|||
|
|||
class VoiceConnection(object): |
|||
def __init__(self, client, guild_id, enable_events=False): |
|||
self.client = client |
|||
self.guild_id = guild_id |
|||
self.channel_id = None |
|||
self.enable_events = enable_events |
|||
self._conn = None |
|||
self._voice_server_update_listener = self.client.events.on( |
|||
'VoiceServerUpdate', |
|||
self._on_voice_server_update, |
|||
) |
|||
self._event_reader_greenlet = None |
|||
|
|||
self.events = None |
|||
if self.enable_events: |
|||
self.events = Emitter() |
|||
|
|||
self._mute = False |
|||
self._deaf = False |
|||
|
|||
def __del__(self): |
|||
if self._event_reader_greenlet: |
|||
self._event_reader_greenlet.kill() |
|||
|
|||
@property |
|||
def mute(self): |
|||
return self._mute |
|||
|
|||
@property |
|||
def deaf(self): |
|||
return self._deaf |
|||
|
|||
@mute.setter |
|||
def mute(self, value): |
|||
if value is self._mute: |
|||
return |
|||
|
|||
self._mute = value |
|||
self._send_voice_state_update() |
|||
|
|||
@deaf.setter |
|||
def deaf(self, value): |
|||
if value is self._deaf: |
|||
return |
|||
|
|||
self._deaf = value |
|||
self._send_voice_state_update() |
|||
|
|||
@classmethod |
|||
def from_channel(self, channel, **kwargs): |
|||
assert channel.is_voice, 'Cannot connect to a non voice channel' |
|||
conn = VoiceConnection(channel.client, channel.guild_id, **kwargs) |
|||
conn.connect(channel.id) |
|||
return conn |
|||
|
|||
def set_channel(self, channel_or_id): |
|||
if channel_or_id and isinstance(channel_or_id, Channel): |
|||
channel_or_id = channel_or_id.id |
|||
|
|||
self.channel_id = channel_or_id |
|||
self._send_voice_state_update() |
|||
|
|||
def connect(self, channel_id): |
|||
assert self._conn is None, 'Already connected' |
|||
|
|||
self.set_channel(channel_id) |
|||
|
|||
self._conn = TelecomConnection( |
|||
self.client.state.me.id, |
|||
self.guild_id, |
|||
self.client.gw.session_id, |
|||
) |
|||
|
|||
if self.enable_events: |
|||
r, w = os.pipe() |
|||
|
|||
self._event_reader_greenlet = gevent.spawn(self._event_reader, r) |
|||
self._conn.set_event_pipe(w) |
|||
|
|||
def disconnect(self): |
|||
assert self._conn is not None, 'Not connected' |
|||
|
|||
# Send disconnection |
|||
self.set_channel(None) |
|||
|
|||
# If we have an event reader, kill it |
|||
if self._event_reader_greenlet: |
|||
self._event_reader_greenlet.kill() |
|||
self._event_reader_greenlet = None |
|||
|
|||
# Delete our connection so it will get GC'd |
|||
del self._conn |
|||
self._conn = None |
|||
|
|||
def play(self, playable): |
|||
self._conn.play(playable) |
|||
|
|||
def play_file(self, url): |
|||
self._conn.play(AvConvPlayable(url)) |
|||
|
|||
def _on_voice_server_update(self, event): |
|||
if not self._conn or event.guild_id != self.guild_id: |
|||
return |
|||
|
|||
self._conn.update_server_info(event.endpoint, event.token) |
|||
|
|||
def _send_voice_state_update(self): |
|||
self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { |
|||
'self_mute': self._mute, |
|||
'self_deaf': self._deaf, |
|||
'self_video': False, |
|||
'guild_id': self.guild_id, |
|||
'channel_id': self.channel_id, |
|||
}) |
|||
|
|||
def _event_reader(self, fd): |
|||
if not make_nonblocking(fd): |
|||
raise Exception('failed to make event pipe non-blocking') |
|||
|
|||
buff = "" |
|||
while True: |
|||
buff += nb_read(fd, 2048).decode('utf-8') |
|||
|
|||
parts = buff.split('\n') |
|||
for message in parts[:-1]: |
|||
event = json.loads(message) |
|||
self.events.emit(event['e'], event['d']) |
|||
|
|||
if len(parts) > 1: |
|||
buff = parts[-1] |
|||
else: |
|||
buff = "" |
@ -1,5 +0,0 @@ |
|||
from disco.voice.client import * # noqa: F401,F403 |
|||
from disco.voice.player import * # noqa: F401,F403 |
|||
from disco.voice.playable import * # noqa: F401,F403 |
|||
|
|||
# TODO: deprecate this file |
@ -1,459 +0,0 @@ |
|||
from __future__ import print_function |
|||
|
|||
import gevent |
|||
import time |
|||
|
|||
from collections import namedtuple |
|||
|
|||
from holster.enum import Enum |
|||
from holster.emitter import Emitter |
|||
|
|||
from disco.gateway.encoding.json import JSONEncoder |
|||
from disco.util.websocket import Websocket |
|||
from disco.util.logging import LoggingClass |
|||
from disco.gateway.packets import OPCode |
|||
from disco.types.base import cached_property |
|||
from disco.voice.packets import VoiceOPCode |
|||
from disco.voice.udp import AudioCodecs, RTPPayloadTypes, UDPVoiceClient |
|||
|
|||
SpeakingFlags = Enum( |
|||
NONE=0, |
|||
VOICE=1 << 0, |
|||
SOUNDSHARE=1 << 1, |
|||
PRIORITY=1 << 2, |
|||
) |
|||
|
|||
VoiceState = Enum( |
|||
DISCONNECTED=0, |
|||
RECONNECTING=1, |
|||
AWAITING_ENDPOINT=2, |
|||
AUTHENTICATING=3, |
|||
AUTHENTICATED=4, |
|||
CONNECTING=5, |
|||
CONNECTED=6, |
|||
VOICE_CONNECTING=7, |
|||
VOICE_CONNECTED=8, |
|||
) |
|||
|
|||
VoiceSpeaking = namedtuple('VoiceSpeaking', [ |
|||
'client', |
|||
'user_id', |
|||
'speaking', |
|||
'soundshare', |
|||
'priority', |
|||
]) |
|||
|
|||
|
|||
class VoiceException(Exception): |
|||
def __init__(self, msg, client): |
|||
self.voice_client = client |
|||
super(VoiceException, self).__init__(msg) |
|||
|
|||
|
|||
class VoiceClient(LoggingClass): |
|||
VOICE_GATEWAY_VERSION = 4 |
|||
|
|||
SUPPORTED_MODES = { |
|||
'xsalsa20_poly1305_lite', |
|||
'xsalsa20_poly1305_suffix', |
|||
'xsalsa20_poly1305', |
|||
} |
|||
|
|||
def __init__(self, client, server_id, is_dm=False, encoder=None, max_reconnects=5): |
|||
super(VoiceClient, self).__init__() |
|||
|
|||
self.client = client |
|||
self.server_id = server_id |
|||
self.channel_id = None |
|||
self.is_dm = is_dm |
|||
self.encoder = encoder or JSONEncoder |
|||
self.max_reconnects = max_reconnects |
|||
self.video_enabled = False |
|||
|
|||
# Set the VoiceClient in the state's voice clients |
|||
self.client.state.voice_clients[self.server_id] = self |
|||
|
|||
# Bind to some WS packets |
|||
self.packets = Emitter() |
|||
self.packets.on(VoiceOPCode.HELLO, self.on_voice_hello) |
|||
self.packets.on(VoiceOPCode.READY, self.on_voice_ready) |
|||
self.packets.on(VoiceOPCode.RESUMED, self.on_voice_resumed) |
|||
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp) |
|||
self.packets.on(VoiceOPCode.SPEAKING, self.on_voice_speaking) |
|||
self.packets.on(VoiceOPCode.CLIENT_CONNECT, self.on_voice_client_connect) |
|||
self.packets.on(VoiceOPCode.CLIENT_DISCONNECT, self.on_voice_client_disconnect) |
|||
self.packets.on(VoiceOPCode.CODECS, self.on_voice_codecs) |
|||
|
|||
# State + state change emitter |
|||
self.state = VoiceState.DISCONNECTED |
|||
self.state_emitter = Emitter() |
|||
|
|||
# Connection metadata |
|||
self.token = None |
|||
self.endpoint = None |
|||
self.ssrc = None |
|||
self.ip = None |
|||
self.port = None |
|||
self.mode = None |
|||
self.udp = None |
|||
self.audio_codec = None |
|||
self.video_codec = None |
|||
self.transport_id = None |
|||
|
|||
# Websocket connection |
|||
self.ws = None |
|||
|
|||
self._session_id = self.client.gw.session_id |
|||
self._reconnects = 0 |
|||
self._heartbeat_task = None |
|||
self._identified = False |
|||
|
|||
# SSRCs |
|||
self.audio_ssrcs = {} |
|||
|
|||
def __repr__(self): |
|||
return u'<VoiceClient {}>'.format(self.server_id) |
|||
|
|||
@cached_property |
|||
def guild(self): |
|||
return self.client.state.guilds.get(self.server_id) if not self.is_dm else None |
|||
|
|||
@cached_property |
|||
def channel(self): |
|||
return self.client.state.channels.get(self.channel_id) |
|||
|
|||
@property |
|||
def user_id(self): |
|||
return self.client.state.me.id |
|||
|
|||
@property |
|||
def ssrc_audio(self): |
|||
return self.ssrc |
|||
|
|||
@property |
|||
def ssrc_video(self): |
|||
return self.ssrc + 1 |
|||
|
|||
@property |
|||
def ssrc_rtx(self): |
|||
return self.ssrc + 2 |
|||
|
|||
@property |
|||
def ssrc_rtcp(self): |
|||
return self.ssrc + 3 |
|||
|
|||
def set_state(self, state): |
|||
self.log.debug('[%s] state %s -> %s', self, self.state, state) |
|||
prev_state = self.state |
|||
self.state = state |
|||
self.state_emitter.emit(state, prev_state) |
|||
|
|||
def set_endpoint(self, endpoint): |
|||
endpoint = endpoint.split(':', 1)[0] |
|||
if self.endpoint == endpoint: |
|||
return |
|||
|
|||
self.log.info( |
|||
'[%s] Set endpoint from VOICE_SERVER_UPDATE (state = %s / endpoint = %s)', self, self.state, endpoint) |
|||
|
|||
self.endpoint = endpoint |
|||
|
|||
if self.ws and self.ws.sock and self.ws.sock.connected: |
|||
self.ws.close() |
|||
self.ws = None |
|||
|
|||
self._identified = False |
|||
|
|||
def set_token(self, token): |
|||
if self.token == token: |
|||
return |
|||
self.token = token |
|||
if not self._identified: |
|||
self._connect_and_run() |
|||
|
|||
def _connect_and_run(self): |
|||
self.ws = Websocket('wss://' + self.endpoint + '/?v={}'.format(self.VOICE_GATEWAY_VERSION)) |
|||
self.ws.emitter.on('on_open', self.on_open) |
|||
self.ws.emitter.on('on_error', self.on_error) |
|||
self.ws.emitter.on('on_close', self.on_close) |
|||
self.ws.emitter.on('on_message', self.on_message) |
|||
self.ws.run_forever() |
|||
|
|||
def _heartbeat(self, interval): |
|||
while True: |
|||
self.send(VoiceOPCode.HEARTBEAT, time.time()) |
|||
gevent.sleep(interval / 1000) |
|||
|
|||
def set_speaking(self, voice=False, soundshare=False, priority=False, delay=0): |
|||
value = SpeakingFlags.NONE.value |
|||
if voice: |
|||
value |= SpeakingFlags.VOICE.value |
|||
if soundshare: |
|||
value |= SpeakingFlags.SOUNDSHARE.value |
|||
if priority: |
|||
value |= SpeakingFlags.PRIORITY.value |
|||
|
|||
self.send(VoiceOPCode.SPEAKING, { |
|||
'speaking': value, |
|||
'delay': delay, |
|||
'ssrc': self.ssrc, |
|||
}) |
|||
|
|||
def set_voice_state(self, channel_id, mute=False, deaf=False, video=False): |
|||
self.client.gw.send(OPCode.VOICE_STATE_UPDATE, { |
|||
'self_mute': bool(mute), |
|||
'self_deaf': bool(deaf), |
|||
'self_video': bool(video), |
|||
'guild_id': None if self.is_dm else self.server_id, |
|||
'channel_id': channel_id, |
|||
}) |
|||
|
|||
def send(self, op, data): |
|||
if self.ws and self.ws.sock and self.ws.sock.connected: |
|||
self.log.debug('[%s] sending OP %s (data = %s)', self, op, data) |
|||
self.ws.send(self.encoder.encode({ |
|||
'op': op.value, |
|||
'd': data, |
|||
}), self.encoder.OPCODE) |
|||
else: |
|||
self.log.debug('[%s] dropping because ws is closed OP %s (data = %s)', self, op, data) |
|||
|
|||
def on_voice_client_connect(self, data): |
|||
user_id = int(data['user_id']) |
|||
|
|||
self.audio_ssrcs[data['audio_ssrc']] = user_id |
|||
# ignore data['voice_ssrc'] for now |
|||
|
|||
def on_voice_client_disconnect(self, data): |
|||
user_id = int(data['user_id']) |
|||
|
|||
for ssrc in self.audio_ssrcs.keys(): |
|||
if self.audio_ssrcs[ssrc] == user_id: |
|||
del self.audio_ssrcs[ssrc] |
|||
break |
|||
|
|||
def on_voice_codecs(self, data): |
|||
self.audio_codec = data['audio_codec'] |
|||
self.video_codec = data['video_codec'] |
|||
self.transport_id = data['media_session_id'] |
|||
|
|||
# Set the UDP's RTP Audio Header's Payload Type |
|||
self.udp.set_audio_codec(data['audio_codec']) |
|||
|
|||
def on_voice_hello(self, data): |
|||
self.log.info('[%s] Received Voice HELLO payload, starting heartbeater', self) |
|||
self._heartbeat_task = gevent.spawn(self._heartbeat, data['heartbeat_interval']) |
|||
self.set_state(VoiceState.AUTHENTICATED) |
|||
|
|||
def on_voice_ready(self, data): |
|||
self.log.info('[%s] Received Voice READY payload, attempting to negotiate voice connection w/ remote', self) |
|||
self.set_state(VoiceState.CONNECTING) |
|||
self.ssrc = data['ssrc'] |
|||
self.ip = data['ip'] |
|||
self.port = data['port'] |
|||
self._identified = True |
|||
|
|||
for mode in self.SUPPORTED_MODES: |
|||
if mode in data['modes']: |
|||
self.mode = mode |
|||
self.log.debug('[%s] Selected mode %s', self, mode) |
|||
break |
|||
else: |
|||
raise Exception('Failed to find a supported voice mode') |
|||
|
|||
self.log.debug('[%s] Attempting IP discovery over UDP to %s:%s', self, self.ip, self.port) |
|||
self.udp = UDPVoiceClient(self) |
|||
ip, port = self.udp.connect(self.ip, self.port) |
|||
|
|||
if not ip: |
|||
self.log.error('Failed to discover our IP, perhaps a NAT or firewall is fucking us') |
|||
self.disconnect() |
|||
return |
|||
|
|||
codecs = [] |
|||
|
|||
# Sending discord our available codecs and rtp payload type for it |
|||
for idx, codec in enumerate(AudioCodecs): |
|||
codecs.append({ |
|||
'name': codec, |
|||
'type': 'audio', |
|||
'priority': (idx + 1) * 1000, |
|||
'payload_type': RTPPayloadTypes.get(codec).value, |
|||
}) |
|||
|
|||
self.log.debug('[%s] IP discovery completed (ip = %s, port = %s), sending SELECT_PROTOCOL', self, ip, port) |
|||
self.send(VoiceOPCode.SELECT_PROTOCOL, { |
|||
'protocol': 'udp', |
|||
'data': { |
|||
'port': port, |
|||
'address': ip, |
|||
'mode': self.mode, |
|||
}, |
|||
'codecs': codecs, |
|||
}) |
|||
self.send(VoiceOPCode.CLIENT_CONNECT, { |
|||
'audio_ssrc': self.ssrc, |
|||
'video_ssrc': 0, |
|||
'rtx_ssrc': 0, |
|||
}) |
|||
|
|||
def on_voice_resumed(self, data): |
|||
self.log.info('[%s] Received resumed', self) |
|||
self.set_state(VoiceState.CONNECTED) |
|||
|
|||
def on_voice_sdp(self, sdp): |
|||
self.log.info('[%s] Received session description, connection completed', self) |
|||
|
|||
self.mode = sdp['mode'] |
|||
self.audio_codec = sdp['audio_codec'] |
|||
self.video_codec = sdp['video_codec'] |
|||
self.transport_id = sdp['media_session_id'] |
|||
|
|||
# Set the UDP's RTP Audio Header's Payload Type |
|||
self.udp.set_audio_codec(sdp['audio_codec']) |
|||
|
|||
# Create a secret box for encryption/decryption |
|||
self.udp.setup_encryption(bytes(bytearray(sdp['secret_key']))) |
|||
|
|||
self.set_state(VoiceState.CONNECTED) |
|||
|
|||
def on_voice_speaking(self, data): |
|||
user_id = int(data['user_id']) |
|||
|
|||
self.audio_ssrcs[data['ssrc']] = user_id |
|||
|
|||
# Maybe rename speaking to voice in future |
|||
payload = VoiceSpeaking( |
|||
client=self, |
|||
user_id=user_id, |
|||
speaking=bool(data['speaking'] & SpeakingFlags.VOICE.value), |
|||
soundshare=bool(data['speaking'] & SpeakingFlags.SOUNDSHARE.value), |
|||
priority=bool(data['speaking'] & SpeakingFlags.PRIORITY.value), |
|||
) |
|||
|
|||
self.client.gw.events.emit('VoiceSpeaking', payload) |
|||
|
|||
def on_message(self, msg): |
|||
try: |
|||
data = self.encoder.decode(msg) |
|||
self.packets.emit(VoiceOPCode[data['op']], data['d']) |
|||
except Exception: |
|||
self.log.exception('Failed to parse voice gateway message: ') |
|||
|
|||
def on_error(self, err): |
|||
self.log.error('[%s] Voice websocket error: %s', self, err) |
|||
|
|||
def on_open(self): |
|||
if self._identified: |
|||
self.send(VoiceOPCode.RESUME, { |
|||
'server_id': self.server_id, |
|||
'session_id': self._session_id, |
|||
'token': self.token, |
|||
}) |
|||
else: |
|||
self.send(VoiceOPCode.IDENTIFY, { |
|||
'server_id': self.server_id, |
|||
'user_id': self.user_id, |
|||
'session_id': self._session_id, |
|||
'token': self.token, |
|||
'video': self.video_enabled, |
|||
}) |
|||
|
|||
def on_close(self, code, reason): |
|||
self.log.warning('[%s] Voice websocket closed: [%s] %s (%s)', self, code, reason, self._reconnects) |
|||
|
|||
if self._heartbeat_task: |
|||
self._heartbeat_task.kill() |
|||
self._heartbeat_task = None |
|||
|
|||
self.ws = None |
|||
|
|||
# If we killed the connection, don't try resuming |
|||
if self.state == VoiceState.DISCONNECTED: |
|||
return |
|||
|
|||
self.log.info('[%s] Attempting Websocket Resumption', self) |
|||
|
|||
self.set_state(VoiceState.RECONNECTING) |
|||
|
|||
# Check if code is not None, was not from us |
|||
if code is not None: |
|||
self._reconnects += 1 |
|||
|
|||
if self.max_reconnects and self._reconnects > self.max_reconnects: |
|||
raise VoiceException( |
|||
'Failed to reconnect after {} attempts, giving up'.format(self.max_reconnects), self) |
|||
|
|||
# Don't resume for these error codes: |
|||
if 4000 <= code <= 4016: |
|||
self._identified = False |
|||
|
|||
if self.udp and self.udp.connected: |
|||
self.udp.disconnect() |
|||
|
|||
wait_time = 5 |
|||
else: |
|||
wait_time = 1 |
|||
|
|||
self.log.info( |
|||
'[%s] Will attempt to %s after %s seconds', self, 'resume' if self._identified else 'reconnect', wait_time) |
|||
gevent.sleep(wait_time) |
|||
|
|||
self._connect_and_run() |
|||
|
|||
def connect(self, channel_id, timeout=10, **kwargs): |
|||
if self.is_dm: |
|||
channel_id = self.server_id |
|||
|
|||
if not channel_id: |
|||
raise VoiceException('[{}] cannot connect to an empty channel id'.format(self)) |
|||
|
|||
if self.channel_id == channel_id: |
|||
if self.state == VoiceState.CONNECTED: |
|||
self.log.debug('[%s] Already connected to %s, returning', self, self.channel) |
|||
return self |
|||
else: |
|||
if self.state == VoiceState.CONNECTED: |
|||
self.log.debug('[%s] Moving to channel %s', self, channel_id) |
|||
else: |
|||
self.log.debug('[%s] Attempting connection to channel id %s', self, channel_id) |
|||
self.set_state(VoiceState.AWAITING_ENDPOINT) |
|||
|
|||
self.set_voice_state(channel_id, **kwargs) |
|||
|
|||
if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout): |
|||
self.disconnect() |
|||
raise VoiceException('Failed to connect to voice', self) |
|||
else: |
|||
return self |
|||
|
|||
def disconnect(self): |
|||
if self.state == VoiceState.DISCONNECTED: |
|||
return |
|||
|
|||
self.log.debug('[%s] disconnect called', self) |
|||
self.set_state(VoiceState.DISCONNECTED) |
|||
|
|||
del self.client.state.voice_clients[self.server_id] |
|||
|
|||
if self._heartbeat_task: |
|||
self._heartbeat_task.kill() |
|||
self._heartbeat_task = None |
|||
|
|||
if self.ws and self.ws.sock and self.ws.sock.connected: |
|||
self.ws.close() |
|||
self.ws = None |
|||
|
|||
if self.udp and self.udp.connected: |
|||
self.udp.disconnect() |
|||
|
|||
if self.channel_id: |
|||
self.set_voice_state(None) |
|||
|
|||
self.client.gw.events.emit('VoiceDisconnect', self) |
|||
|
|||
def send_frame(self, *args, **kwargs): |
|||
self.udp.send_frame(*args, **kwargs) |
|||
|
|||
def increment_timestamp(self, *args, **kwargs): |
|||
self.udp.increment_timestamp(*args, **kwargs) |
@ -1,152 +0,0 @@ |
|||
import six |
|||
import sys |
|||
import array |
|||
import ctypes |
|||
import ctypes.util |
|||
|
|||
from holster.enum import Enum |
|||
|
|||
from disco.util.logging import LoggingClass |
|||
|
|||
|
|||
c_int_ptr = ctypes.POINTER(ctypes.c_int) |
|||
c_int16_ptr = ctypes.POINTER(ctypes.c_int16) |
|||
c_float_ptr = ctypes.POINTER(ctypes.c_float) |
|||
|
|||
|
|||
class EncoderStruct(ctypes.Structure): |
|||
pass |
|||
|
|||
|
|||
class DecoderStruct(ctypes.Structure): |
|||
pass |
|||
|
|||
|
|||
EncoderStructPtr = ctypes.POINTER(EncoderStruct) |
|||
DecoderStructPtr = ctypes.POINTER(DecoderStruct) |
|||
|
|||
|
|||
class BaseOpus(LoggingClass): |
|||
BASE_EXPORTED = { |
|||
'opus_strerror': ([ctypes.c_int], ctypes.c_char_p), |
|||
} |
|||
|
|||
EXPORTED = {} |
|||
|
|||
def __init__(self, library_path=None): |
|||
self.path = library_path or self.find_library() |
|||
self.lib = ctypes.cdll.LoadLibrary(self.path) |
|||
|
|||
methods = {} |
|||
methods.update(self.BASE_EXPORTED) |
|||
methods.update(self.EXPORTED) |
|||
|
|||
for name, item in methods.items(): |
|||
func = getattr(self.lib, name) |
|||
|
|||
if item[0]: |
|||
func.argtypes = item[0] |
|||
|
|||
func.restype = item[1] |
|||
|
|||
setattr(self, name, func) |
|||
|
|||
@staticmethod |
|||
def find_library(): |
|||
if sys.platform == 'win32': |
|||
raise Exception('Cannot auto-load opus on Windows, please specify full library path') |
|||
|
|||
return ctypes.util.find_library('opus') |
|||
|
|||
|
|||
Application = Enum( |
|||
AUDIO=2049, |
|||
VOIP=2048, |
|||
LOWDELAY=2051, |
|||
) |
|||
|
|||
|
|||
Control = Enum( |
|||
SET_BITRATE=4002, |
|||
SET_BANDWIDTH=4008, |
|||
SET_FEC=4012, |
|||
SET_PLP=4014, |
|||
) |
|||
|
|||
|
|||
class OpusEncoder(BaseOpus): |
|||
EXPORTED = { |
|||
'opus_encoder_get_size': ([ctypes.c_int], ctypes.c_int), |
|||
'opus_encoder_create': ([ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr), |
|||
'opus_encode': ([EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32), |
|||
'opus_encoder_ctl': (None, ctypes.c_int32), |
|||
'opus_encoder_destroy': ([EncoderStructPtr], None), |
|||
} |
|||
|
|||
def __init__(self, sampling_rate, channels, application=Application.AUDIO, library_path=None): |
|||
super(OpusEncoder, self).__init__(library_path) |
|||
self.sampling_rate = sampling_rate |
|||
self.channels = channels |
|||
self.application = application |
|||
|
|||
self._inst = None |
|||
|
|||
@property |
|||
def inst(self): |
|||
if not self._inst: |
|||
self._inst = self.create() |
|||
self.set_bitrate(128) |
|||
self.set_fec(True) |
|||
self.set_expected_packet_loss_percent(0.15) |
|||
return self._inst |
|||
|
|||
def set_bitrate(self, kbps): |
|||
kbps = min(128, max(16, int(kbps))) |
|||
ret = self.opus_encoder_ctl(self.inst, int(Control.SET_BITRATE), kbps * 1024) |
|||
|
|||
if ret < 0: |
|||
raise Exception('Failed to set bitrate to {}: {}'.format(kbps, ret)) |
|||
|
|||
def set_fec(self, value): |
|||
ret = self.opus_encoder_ctl(self.inst, int(Control.SET_FEC), int(value)) |
|||
|
|||
if ret < 0: |
|||
raise Exception('Failed to set FEC to {}: {}'.format(value, ret)) |
|||
|
|||
def set_expected_packet_loss_percent(self, perc): |
|||
ret = self.opus_encoder_ctl(self.inst, int(Control.SET_PLP), min(100, max(0, int(perc * 100)))) |
|||
|
|||
if ret < 0: |
|||
raise Exception('Failed to set PLP to {}: {}'.format(perc, ret)) |
|||
|
|||
def create(self): |
|||
ret = ctypes.c_int() |
|||
result = self.opus_encoder_create(self.sampling_rate, self.channels, self.application.value, ctypes.byref(ret)) |
|||
|
|||
if ret.value != 0: |
|||
raise Exception('Failed to create opus encoder: {}'.format(ret.value)) |
|||
|
|||
return result |
|||
|
|||
def __del__(self): |
|||
if hasattr(self, '_inst') and self._inst: |
|||
self.opus_encoder_destroy(self._inst) |
|||
self._inst = None |
|||
|
|||
def encode(self, pcm, frame_size): |
|||
max_data_bytes = len(pcm) |
|||
pcm = ctypes.cast(pcm, c_int16_ptr) |
|||
data = (ctypes.c_char * max_data_bytes)() |
|||
|
|||
ret = self.opus_encode(self.inst, pcm, frame_size, data, max_data_bytes) |
|||
if ret < 0: |
|||
raise Exception('Failed to encode: {}'.format(ret)) |
|||
|
|||
if six.PY3: |
|||
return array.array('b', data[:ret]).tobytes() |
|||
else: |
|||
return array.array('b', data[:ret]).tostring() |
|||
|
|||
|
|||
class OpusDecoder(BaseOpus): |
|||
pass |
@ -1,17 +0,0 @@ |
|||
from holster.enum import Enum |
|||
|
|||
VoiceOPCode = Enum( |
|||
IDENTIFY=0, |
|||
SELECT_PROTOCOL=1, |
|||
READY=2, |
|||
HEARTBEAT=3, |
|||
SESSION_DESCRIPTION=4, |
|||
SPEAKING=5, |
|||
HEARTBEAT_ACK=6, |
|||
RESUME=7, |
|||
HELLO=8, |
|||
RESUMED=9, |
|||
CLIENT_CONNECT=12, |
|||
CLIENT_DISCONNECT=13, |
|||
CODECS=14, |
|||
) |
@ -1,357 +0,0 @@ |
|||
import abc |
|||
import six |
|||
import types |
|||
import gevent |
|||
import struct |
|||
import subprocess |
|||
|
|||
from gevent.lock import Semaphore |
|||
from gevent.queue import Queue |
|||
|
|||
from disco.voice.opus import OpusEncoder |
|||
|
|||
|
|||
try: |
|||
from io import StringIO as BufferedIO |
|||
except ImportError: |
|||
if six.PY2: |
|||
from StringIO import StringIO as BufferedIO |
|||
else: |
|||
from io import BytesIO as BufferedIO |
|||
|
|||
|
|||
OPUS_HEADER_SIZE = struct.calcsize('<h') |
|||
|
|||
|
|||
class AbstractOpus(object): |
|||
def __init__(self, sampling_rate=48000, frame_length=20, channels=2): |
|||
self.sampling_rate = sampling_rate |
|||
self.frame_length = frame_length |
|||
self.channels = 2 |
|||
self.sample_size = 2 * self.channels |
|||
self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length) |
|||
self.frame_size = self.samples_per_frame * self.sample_size |
|||
|
|||
|
|||
class BaseUtil(object): |
|||
def pipe(self, other, *args, **kwargs): |
|||
child = other(self, *args, **kwargs) |
|||
setattr(child, 'metadata', self.metadata) |
|||
setattr(child, '_parent', self) |
|||
return child |
|||
|
|||
@property |
|||
def metadata(self): |
|||
return getattr(self, '_metadata', None) |
|||
|
|||
@metadata.setter |
|||
def metadata(self, value): |
|||
self._metadata = value |
|||
|
|||
|
|||
@six.add_metaclass(abc.ABCMeta) |
|||
class BasePlayable(BaseUtil): |
|||
@abc.abstractmethod |
|||
def next_frame(self): |
|||
raise NotImplementedError |
|||
|
|||
|
|||
@six.add_metaclass(abc.ABCMeta) |
|||
class BaseInput(BaseUtil): |
|||
@abc.abstractmethod |
|||
def read(self, size): |
|||
raise NotImplementedError |
|||
|
|||
@abc.abstractmethod |
|||
def fileobj(self): |
|||
raise NotImplementedError |
|||
|
|||
|
|||
class OpusFilePlayable(BasePlayable, AbstractOpus): |
|||
""" |
|||
An input which reads opus data from a file or file-like object. |
|||
""" |
|||
def __init__(self, fobj, *args, **kwargs): |
|||
super(OpusFilePlayable, self).__init__(*args, **kwargs) |
|||
self.fobj = fobj |
|||
self.done = False |
|||
|
|||
def next_frame(self): |
|||
if self.done: |
|||
return None |
|||
|
|||
header = self.fobj.read(OPUS_HEADER_SIZE) |
|||
if len(header) < OPUS_HEADER_SIZE: |
|||
self.done = True |
|||
return None |
|||
|
|||
data_size = struct.unpack('<h', header)[0] |
|||
data = self.fobj.read(data_size) |
|||
if len(data) < data_size: |
|||
self.done = True |
|||
return None |
|||
|
|||
return data |
|||
|
|||
|
|||
class FFmpegInput(BaseInput, AbstractOpus): |
|||
def __init__(self, source='-', command='avconv', streaming=False, **kwargs): |
|||
super(FFmpegInput, self).__init__(**kwargs) |
|||
if source: |
|||
self.source = source |
|||
self.streaming = streaming |
|||
self.command = command |
|||
|
|||
self._buffer = None |
|||
self._proc = None |
|||
|
|||
def read(self, sz): |
|||
if self.streaming: |
|||
raise TypeError('Cannot read from a streaming FFmpegInput') |
|||
|
|||
# First read blocks until the subprocess finishes |
|||
if not self._buffer: |
|||
data, _ = self.proc.communicate() |
|||
self._buffer = BufferedIO(data) |
|||
|
|||
# Subsequent reads can just do dis thang |
|||
return self._buffer.read(sz) |
|||
|
|||
def fileobj(self): |
|||
if self.streaming: |
|||
return self.proc.stdout |
|||
else: |
|||
return self |
|||
|
|||
@property |
|||
def proc(self): |
|||
if not self._proc: |
|||
if callable(self.source): |
|||
self.source = self.source(self) |
|||
|
|||
if isinstance(self.source, (tuple, list)): |
|||
self.source, self.metadata = self.source |
|||
|
|||
args = [ |
|||
self.command, |
|||
'-i', str(self.source), |
|||
'-f', 's16le', |
|||
'-ar', str(self.sampling_rate), |
|||
'-ac', str(self.channels), |
|||
'-loglevel', 'warning', |
|||
'pipe:1', |
|||
] |
|||
self._proc = subprocess.Popen(args, stdin=None, stdout=subprocess.PIPE) |
|||
return self._proc |
|||
|
|||
|
|||
class YoutubeDLInput(FFmpegInput): |
|||
def __init__(self, url=None, ie_info=None, *args, **kwargs): |
|||
super(YoutubeDLInput, self).__init__(None, *args, **kwargs) |
|||
self._url = url |
|||
self._ie_info = ie_info |
|||
self._info = None |
|||
self._info_lock = Semaphore() |
|||
|
|||
@property |
|||
def info(self): |
|||
with self._info_lock: |
|||
if not self._info: |
|||
import youtube_dl |
|||
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'}) |
|||
|
|||
if self._url: |
|||
obj = ydl.extract_info(self._url, download=False, process=False) |
|||
if 'entries' in obj: |
|||
self._ie_info = list(obj['entries'])[0] |
|||
else: |
|||
self._ie_info = obj |
|||
|
|||
self._info = ydl.process_ie_result(self._ie_info, download=False) |
|||
return self._info |
|||
|
|||
@property |
|||
def _metadata(self): |
|||
return self.info |
|||
|
|||
@classmethod |
|||
def many(cls, url, *args, **kwargs): |
|||
import youtube_dl |
|||
|
|||
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'}) |
|||
info = ydl.extract_info(url, download=False, process=False) |
|||
|
|||
if 'entries' not in info: |
|||
yield cls(ie_info=info, *args, **kwargs) |
|||
return |
|||
|
|||
for item in info['entries']: |
|||
yield cls(ie_info=item, *args, **kwargs) |
|||
|
|||
@property |
|||
def source(self): |
|||
return self.info['url'] |
|||
|
|||
|
|||
class BufferedOpusEncoderPlayable(BasePlayable, OpusEncoder, AbstractOpus): |
|||
def __init__(self, source, *args, **kwargs): |
|||
self.source = source |
|||
self.frames = Queue(kwargs.pop('queue_size', 4096)) |
|||
|
|||
# Call the AbstractOpus constructor, as we need properties it sets |
|||
AbstractOpus.__init__(self, *args, **kwargs) |
|||
|
|||
# Then call the OpusEncoder constructor, which requires some properties |
|||
# that AbstractOpus sets up |
|||
OpusEncoder.__init__(self, self.sampling_rate, self.channels) |
|||
|
|||
# Spawn the encoder loop |
|||
gevent.spawn(self._encoder_loop) |
|||
|
|||
def _encoder_loop(self): |
|||
while self.source: |
|||
raw = self.source.read(self.frame_size) |
|||
if len(raw) < self.frame_size: |
|||
break |
|||
|
|||
self.frames.put(self.encode(raw, self.samples_per_frame)) |
|||
gevent.idle() |
|||
self.source = None |
|||
self.frames.put(None) |
|||
|
|||
def next_frame(self): |
|||
return self.frames.get() |
|||
|
|||
|
|||
class DCADOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder): |
|||
def __init__(self, source, *args, **kwargs): |
|||
self.source = source |
|||
self.command = kwargs.pop('command', 'dcad') |
|||
self.on_complete = kwargs.pop('on_complete', None) |
|||
super(DCADOpusEncoderPlayable, self).__init__(*args, **kwargs) |
|||
|
|||
self._done = False |
|||
self._proc = None |
|||
|
|||
@property |
|||
def proc(self): |
|||
if not self._proc: |
|||
source = obj = self.source.fileobj() |
|||
if not hasattr(obj, 'fileno'): |
|||
source = subprocess.PIPE |
|||
|
|||
self._proc = subprocess.Popen([ |
|||
self.command, |
|||
'--channels', str(self.channels), |
|||
'--rate', str(self.sampling_rate), |
|||
'--size', str(self.samples_per_frame), |
|||
'--bitrate', '128', |
|||
'--fec', |
|||
'--packet-loss-percent', '30', |
|||
'--input', 'pipe:0', |
|||
'--output', 'pipe:1', |
|||
], stdin=source, stdout=subprocess.PIPE) |
|||
|
|||
def writer(): |
|||
while True: |
|||
data = obj.read(2048) |
|||
if len(data) > 0: |
|||
self._proc.stdin.write(data) |
|||
if len(data) < 2048: |
|||
break |
|||
|
|||
if source == subprocess.PIPE: |
|||
gevent.spawn(writer) |
|||
return self._proc |
|||
|
|||
def next_frame(self): |
|||
if self._done: |
|||
return None |
|||
|
|||
header = self.proc.stdout.read(OPUS_HEADER_SIZE) |
|||
if len(header) < OPUS_HEADER_SIZE: |
|||
self._done = True |
|||
self.on_complete() |
|||
return |
|||
|
|||
size = struct.unpack('<h', header)[0] |
|||
|
|||
data = self.proc.stdout.read(size) |
|||
if len(data) < size: |
|||
self._done = True |
|||
self.on_complete() |
|||
return |
|||
|
|||
return data |
|||
|
|||
|
|||
class FileProxyPlayable(BasePlayable, AbstractOpus): |
|||
def __init__(self, other, output, *args, **kwargs): |
|||
self.flush = kwargs.pop('flush', False) |
|||
self.on_complete = kwargs.pop('on_complete', None) |
|||
super(FileProxyPlayable, self).__init__(*args, **kwargs) |
|||
self.other = other |
|||
self.output = output |
|||
|
|||
def next_frame(self): |
|||
frame = self.other.next_frame() |
|||
|
|||
if frame: |
|||
self.output.write(struct.pack('<h', len(frame))) |
|||
self.output.write(frame) |
|||
|
|||
if self.flush: |
|||
self.output.flush() |
|||
else: |
|||
self.output.flush() |
|||
self.on_complete() |
|||
self.output.close() |
|||
return frame |
|||
|
|||
|
|||
class PlaylistPlayable(BasePlayable, AbstractOpus): |
|||
def __init__(self, items, *args, **kwargs): |
|||
super(PlaylistPlayable, self).__init__(*args, **kwargs) |
|||
self.items = items |
|||
self.now_playing = None |
|||
|
|||
def _get_next(self): |
|||
if isinstance(self.items, types.GeneratorType): |
|||
return next(self.items, None) |
|||
return self.items.pop() |
|||
|
|||
def next_frame(self): |
|||
if not self.items: |
|||
return |
|||
|
|||
if not self.now_playing: |
|||
self.now_playing = self._get_next() |
|||
if not self.now_playing: |
|||
return |
|||
|
|||
frame = self.now_playing.next_frame() |
|||
if not frame: |
|||
return self.next_frame() |
|||
|
|||
return frame |
|||
|
|||
|
|||
class MemoryBufferedPlayable(BasePlayable, AbstractOpus): |
|||
def __init__(self, other, *args, **kwargs): |
|||
from gevent.queue import Queue |
|||
|
|||
super(MemoryBufferedPlayable, self).__init__(*args, **kwargs) |
|||
self.frames = Queue() |
|||
self.other = other |
|||
gevent.spawn(self._buffer) |
|||
|
|||
def _buffer(self): |
|||
while True: |
|||
frame = self.other.next_frame() |
|||
if not frame: |
|||
break |
|||
self.frames.put(frame) |
|||
self.frames.put(None) |
|||
|
|||
def next_frame(self): |
|||
return self.frames.get() |
@ -1,126 +0,0 @@ |
|||
import time |
|||
import gevent |
|||
|
|||
from holster.enum import Enum |
|||
from holster.emitter import Emitter |
|||
|
|||
from disco.voice.client import VoiceState |
|||
from disco.voice.queue import PlayableQueue |
|||
from disco.util.logging import LoggingClass |
|||
|
|||
|
|||
class Player(LoggingClass): |
|||
Events = Enum( |
|||
'START_PLAY', |
|||
'STOP_PLAY', |
|||
'PAUSE_PLAY', |
|||
'RESUME_PLAY', |
|||
'DISCONNECT', |
|||
) |
|||
|
|||
def __init__(self, client, queue=None): |
|||
super(Player, self).__init__() |
|||
self.client = client |
|||
|
|||
# Queue contains playable items |
|||
self.queue = queue or PlayableQueue() |
|||
|
|||
# Whether we're playing music (true for lifetime) |
|||
self.playing = True |
|||
|
|||
# Set to an event when playback is paused |
|||
self.paused = None |
|||
|
|||
# Current playing item |
|||
self.now_playing = None |
|||
|
|||
# Current play task |
|||
self.play_task = None |
|||
|
|||
# Core task |
|||
self.run_task = gevent.spawn(self.run) |
|||
|
|||
# Event triggered when playback is complete |
|||
self.complete = gevent.event.Event() |
|||
|
|||
# Event emitter for metadata |
|||
self.events = Emitter() |
|||
|
|||
def disconnect(self): |
|||
self.client.disconnect() |
|||
self.events.emit(self.Events.DISCONNECT) |
|||
|
|||
def skip(self): |
|||
self.play_task.kill() |
|||
|
|||
def pause(self): |
|||
if self.paused: |
|||
return |
|||
self.paused = gevent.event.Event() |
|||
self.events.emit(self.Events.PAUSE_PLAY) |
|||
|
|||
def resume(self): |
|||
if self.paused: |
|||
self.paused.set() |
|||
self.paused = None |
|||
self.events.emit(self.Events.RESUME_PLAY) |
|||
|
|||
def play(self, item): |
|||
# Grab the first frame before we start anything else, sometimes playables |
|||
# can do some lengthy async tasks here to setup the playable and we |
|||
# don't want that lerp the first N frames of the playable into playing |
|||
# faster |
|||
frame = item.next_frame() |
|||
if frame is None: |
|||
return |
|||
|
|||
start = time.time() |
|||
loops = 0 |
|||
|
|||
while True: |
|||
loops += 1 |
|||
|
|||
if self.paused: |
|||
self.client.set_speaking(False) |
|||
self.paused.wait() |
|||
gevent.sleep(2) |
|||
self.client.set_speaking(True) |
|||
start = time.time() |
|||
loops = 0 |
|||
|
|||
if self.client.state == VoiceState.DISCONNECTED: |
|||
return |
|||
|
|||
if self.client.state != VoiceState.CONNECTED: |
|||
self.client.state_emitter.once(VoiceState.CONNECTED, timeout=30) |
|||
|
|||
# Send the voice frame and increment our timestamp |
|||
self.client.send_frame(frame) |
|||
self.client.increment_timestamp(item.samples_per_frame) |
|||
|
|||
frame = item.next_frame() |
|||
if frame is None: |
|||
return |
|||
|
|||
next_time = start + 0.02 * loops |
|||
delay = max(0, 0.02 + (next_time - time.time())) |
|||
gevent.sleep(delay) |
|||
|
|||
def run(self): |
|||
self.client.set_speaking(True) |
|||
|
|||
while self.playing: |
|||
self.now_playing = self.queue.get() |
|||
|
|||
self.events.emit(self.Events.START_PLAY, self.now_playing) |
|||
self.play_task = gevent.spawn(self.play, self.now_playing) |
|||
self.play_task.join() |
|||
self.events.emit(self.Events.STOP_PLAY, self.now_playing) |
|||
|
|||
if self.client.state == VoiceState.DISCONNECTED: |
|||
self.playing = False |
|||
self.complete.set() |
|||
return |
|||
|
|||
self.client.set_speaking(False) |
|||
self.disconnect() |
@ -1,52 +0,0 @@ |
|||
import abc |
|||
import six |
|||
import gevent |
|||
import random |
|||
|
|||
|
|||
@six.add_metaclass(abc.ABCMeta) |
|||
class BaseQueue(object): |
|||
@abc.abstractmethod |
|||
def get(self): |
|||
raise NotImplementedError |
|||
|
|||
|
|||
class PlayableQueue(BaseQueue): |
|||
def __init__(self): |
|||
self._data = [] |
|||
self._event = gevent.event.Event() |
|||
|
|||
def append(self, item): |
|||
self._data.append(item) |
|||
|
|||
if self._event: |
|||
self._event.set() |
|||
self._event = None |
|||
|
|||
def _get(self): |
|||
if not len(self._data): |
|||
if not self._event: |
|||
self._event = gevent.event.Event() |
|||
self._event.wait() |
|||
return self._get() |
|||
return self._data.pop(0) |
|||
|
|||
def get(self): |
|||
return self._get() |
|||
|
|||
def shuffle(self): |
|||
random.shuffle(self._data) |
|||
|
|||
def clear(self): |
|||
self._data = [] |
|||
|
|||
def __len__(self): |
|||
return len(self._data) |
|||
|
|||
def __iter__(self): |
|||
return self._data.__iter__() |
|||
|
|||
def __nonzero__(self): |
|||
return True |
|||
|
|||
__bool__ = __nonzero__ |
@ -1,341 +0,0 @@ |
|||
import struct |
|||
import socket |
|||
import gevent |
|||
|
|||
from collections import namedtuple |
|||
|
|||
try: |
|||
import nacl.secret |
|||
except ImportError: |
|||
print('WARNING: nacl is not installed, voice support is disabled') |
|||
|
|||
from holster.enum import Enum |
|||
|
|||
from disco.util.logging import LoggingClass |
|||
|
|||
AudioCodecs = ('opus',) |
|||
|
|||
RTPPayloadTypes = Enum(OPUS=0x78) |
|||
|
|||
RTCPPayloadTypes = Enum( |
|||
SENDER_REPORT=200, |
|||
RECEIVER_REPORT=201, |
|||
SOURCE_DESCRIPTION=202, |
|||
BYE=203, |
|||
APP=204, |
|||
RTPFB=205, |
|||
PSFB=206, |
|||
) |
|||
|
|||
MAX_UINT32 = 4294967295 |
|||
MAX_SEQUENCE = 65535 |
|||
|
|||
RTP_HEADER_VERSION = 0x80 # Only RTP Version is set here (value of 2 << 6) |
|||
RTP_EXTENSION_ONE_BYTE = (0xBE, 0xDE) |
|||
|
|||
RTPHeader = namedtuple('RTPHeader', [ |
|||
'version', |
|||
'padding', |
|||
'extension', |
|||
'csrc_count', |
|||
'marker', |
|||
'payload_type', |
|||
'sequence', |
|||
'timestamp', |
|||
'ssrc', |
|||
]) |
|||
|
|||
RTCPHeader = namedtuple('RTCPHeader', [ |
|||
'version', |
|||
'padding', |
|||
'reception_count', |
|||
'packet_type', |
|||
'length', |
|||
'ssrc', |
|||
]) |
|||
|
|||
RTCPData = namedtuple('RTCPData', [ |
|||
'client', |
|||
'user_id', |
|||
'payload_type', |
|||
'header', |
|||
'data', |
|||
]) |
|||
|
|||
VoiceData = namedtuple('VoiceData', [ |
|||
'client', |
|||
'user_id', |
|||
'payload_type', |
|||
'rtp', |
|||
'nonce', |
|||
'data', |
|||
]) |
|||
|
|||
|
|||
class UDPVoiceClient(LoggingClass): |
|||
def __init__(self, vc): |
|||
super(UDPVoiceClient, self).__init__() |
|||
self.vc = vc |
|||
|
|||
# The underlying UDP socket |
|||
self.conn = None |
|||
|
|||
# Connection information |
|||
self.ip = None |
|||
self.port = None |
|||
self.connected = False |
|||
|
|||
# Voice information |
|||
self.sequence = 0 |
|||
self.timestamp = 0 |
|||
|
|||
self._nonce = 0 |
|||
self._run_task = None |
|||
self._secret_box = None |
|||
|
|||
# RTP Header |
|||
self._rtp_audio_header = bytearray(12) |
|||
self._rtp_audio_header[0] = RTP_HEADER_VERSION |
|||
|
|||
def set_audio_codec(self, codec): |
|||
if codec not in AudioCodecs: |
|||
raise Exception('Unsupported audio codec received, {}'.format(codec)) |
|||
|
|||
ptype = RTPPayloadTypes.get(codec) |
|||
self._rtp_audio_header[1] = ptype.value |
|||
self.log.debug('[%s] Set UDP\'s Audio Codec to %s, RTP payload type %s', self.vc, ptype.name, ptype.value) |
|||
|
|||
def increment_timestamp(self, by): |
|||
self.timestamp += by |
|||
if self.timestamp > MAX_UINT32: |
|||
self.timestamp = 0 |
|||
|
|||
def setup_encryption(self, encryption_key): |
|||
self._secret_box = nacl.secret.SecretBox(encryption_key) |
|||
|
|||
def send_frame(self, frame, sequence=None, timestamp=None, incr_timestamp=None): |
|||
# Convert the frame to a bytearray |
|||
frame = bytearray(frame) |
|||
|
|||
# Pack the rtc header into our buffer |
|||
struct.pack_into('>H', self._rtp_audio_header, 2, sequence or self.sequence) |
|||
struct.pack_into('>I', self._rtp_audio_header, 4, timestamp or self.timestamp) |
|||
struct.pack_into('>i', self._rtp_audio_header, 8, self.vc.ssrc_audio) |
|||
|
|||
if self.vc.mode == 'xsalsa20_poly1305_lite': |
|||
# Use an incrementing number as a nonce, only first 4 bytes of the nonce is padded on |
|||
self._nonce += 1 |
|||
if self._nonce > MAX_UINT32: |
|||
self._nonce = 0 |
|||
|
|||
nonce = bytearray(24) |
|||
struct.pack_into('>I', nonce, 0, self._nonce) |
|||
nonce_padding = nonce[:4] |
|||
elif self.vc.mode == 'xsalsa20_poly1305_suffix': |
|||
# Generate a nonce |
|||
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) |
|||
nonce_padding = nonce |
|||
elif self.vc.mode == 'xsalsa20_poly1305': |
|||
# Nonce is the header |
|||
nonce = bytearray(24) |
|||
nonce[:12] = self._rtp_audio_header |
|||
nonce_padding = None |
|||
else: |
|||
raise Exception('The voice mode, {}, isn\'t supported.'.format(self.vc.mode)) |
|||
|
|||
# Encrypt the payload with the nonce |
|||
payload = self._secret_box.encrypt(bytes(frame), bytes(nonce)).ciphertext |
|||
|
|||
# Pad the payload with the nonce, if applicable |
|||
if nonce_padding: |
|||
payload += nonce_padding |
|||
|
|||
# Send the header (sans nonce padding) plus the payload |
|||
self.send(self._rtp_audio_header + payload) |
|||
|
|||
# Increment our sequence counter |
|||
self.sequence += 1 |
|||
if self.sequence >= MAX_SEQUENCE: |
|||
self.sequence = 0 |
|||
|
|||
# Increment our timestamp (if applicable) |
|||
if incr_timestamp: |
|||
self.timestamp += incr_timestamp |
|||
|
|||
def run(self): |
|||
while True: |
|||
data, addr = self.conn.recvfrom(4096) |
|||
|
|||
# Data cannot be less than the bare minimum, just ignore |
|||
if len(data) <= 12: |
|||
self.log.debug('[%s] [VoiceData] Received voice data under 13 bytes', self.vc) |
|||
continue |
|||
|
|||
first, second = struct.unpack_from('>BB', data) |
|||
|
|||
payload_type = RTCPPayloadTypes.get(second) |
|||
if payload_type: |
|||
length, ssrc = struct.unpack_from('>HI', data, 2) |
|||
|
|||
rtcp = RTCPHeader( |
|||
version=first >> 6, |
|||
padding=(first >> 5) & 1, |
|||
reception_count=first & 0x1F, |
|||
packet_type=second, |
|||
length=length, |
|||
ssrc=ssrc, |
|||
) |
|||
|
|||
if rtcp.ssrc == self.vc.ssrc_rtcp: |
|||
user_id = self.vc.user_id |
|||
else: |
|||
rtcp_ssrc = rtcp.ssrc |
|||
if rtcp_ssrc: |
|||
rtcp_ssrc -= 3 |
|||
user_id = self.vc.audio_ssrcs.get(rtcp_ssrc, None) |
|||
|
|||
payload = RTCPData( |
|||
client=self.vc, |
|||
user_id=user_id, |
|||
payload_type=payload_type.name, |
|||
header=rtcp, |
|||
data=data[8:], |
|||
) |
|||
|
|||
self.vc.client.gw.events.emit('RTCPData', payload) |
|||
else: |
|||
sequence, timestamp, ssrc = struct.unpack_from('>HII', data, 2) |
|||
|
|||
rtp = RTPHeader( |
|||
version=first >> 6, |
|||
padding=(first >> 5) & 1, |
|||
extension=(first >> 4) & 1, |
|||
csrc_count=first & 0x0F, |
|||
marker=second >> 7, |
|||
payload_type=second & 0x7F, |
|||
sequence=sequence, |
|||
timestamp=timestamp, |
|||
ssrc=ssrc, |
|||
) |
|||
|
|||
# Check if rtp version is 2 |
|||
if rtp.version != 2: |
|||
self.log.debug('[%s] [VoiceData] Received an invalid RTP packet version, %s', self.vc, rtp.version) |
|||
continue |
|||
|
|||
payload_type = RTPPayloadTypes.get(rtp.payload_type) |
|||
|
|||
# Unsupported payload type received |
|||
if not payload_type: |
|||
self.log.debug('[%s] [VoiceData] Received unsupported payload type, %s', self.vc, rtp.payload_type) |
|||
continue |
|||
|
|||
nonce = bytearray(24) |
|||
if self.vc.mode == 'xsalsa20_poly1305_lite': |
|||
nonce[:4] = data[-4:] |
|||
data = data[:-4] |
|||
elif self.vc.mode == 'xsalsa20_poly1305_suffix': |
|||
nonce[:24] = data[-24:] |
|||
data = data[:-24] |
|||
elif self.vc.mode == 'xsalsa20_poly1305': |
|||
nonce[:12] = data[:12] |
|||
else: |
|||
self.log.debug('[%s] [VoiceData] Unsupported Encryption Mode, %s', self.vc, self.vc.mode) |
|||
continue |
|||
|
|||
try: |
|||
data = self._secret_box.decrypt(bytes(data[12:]), bytes(nonce)) |
|||
except Exception: |
|||
self.log.debug('[%s] [VoiceData] Failed to decode data from ssrc %s', self.vc, rtp.ssrc) |
|||
continue |
|||
|
|||
# RFC3550 Section 5.1 (Padding) |
|||
if rtp.padding: |
|||
padding_amount, = struct.unpack_from('>B', data[:-1]) |
|||
data = data[-padding_amount:] |
|||
|
|||
if rtp.extension: |
|||
# RFC5285 Section 4.2: One-Byte Header |
|||
rtp_extension_header = struct.unpack_from('>BB', data) |
|||
if rtp_extension_header == RTP_EXTENSION_ONE_BYTE: |
|||
data = data[2:] |
|||
|
|||
fields_amount, = struct.unpack_from('>H', data) |
|||
fields = [] |
|||
|
|||
offset = 4 |
|||
for i in range(fields_amount): |
|||
first_byte, = struct.unpack_from('>B', data[:offset]) |
|||
offset += 1 |
|||
|
|||
rtp_extension_identifer = first_byte & 0xF |
|||
rtp_extension_len = ((first_byte >> 4) & 0xF) + 1 |
|||
|
|||
# Ignore data if identifer == 15, so skip if this is set as 0 |
|||
if rtp_extension_identifer: |
|||
fields.append(data[offset:offset + rtp_extension_len]) |
|||
|
|||
offset += rtp_extension_len |
|||
|
|||
# skip padding |
|||
while data[offset] == 0: |
|||
offset += 1 |
|||
|
|||
if len(fields): |
|||
fields.append(data[offset:]) |
|||
data = b''.join(fields) |
|||
else: |
|||
data = data[offset:] |
|||
|
|||
# RFC3550 Section 5.3: Profile-Specific Modifications to the RTP Header |
|||
# clients send it sometimes, definitely on fresh connects to a server, dunno what to do here |
|||
if rtp.marker: |
|||
self.log.debug('[%s] [VoiceData] Received RTP data with the marker set, skipping', self.vc) |
|||
continue |
|||
|
|||
payload = VoiceData( |
|||
client=self.vc, |
|||
user_id=self.vc.audio_ssrcs.get(rtp.ssrc, None), |
|||
payload_type=payload_type.name, |
|||
rtp=rtp, |
|||
nonce=nonce, |
|||
data=data, |
|||
) |
|||
|
|||
self.vc.client.gw.events.emit('VoiceData', payload) |
|||
|
|||
def send(self, data): |
|||
self.conn.sendto(data, (self.ip, self.port)) |
|||
|
|||
def disconnect(self): |
|||
self._run_task.kill() |
|||
|
|||
def connect(self, host, port, timeout=10, addrinfo=None): |
|||
self.ip = socket.gethostbyname(host) |
|||
self.port = port |
|||
|
|||
self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|||
|
|||
if addrinfo: |
|||
ip, port = addrinfo |
|||
else: |
|||
# Send discovery packet |
|||
packet = bytearray(70) |
|||
struct.pack_into('>I', packet, 0, self.vc.ssrc) |
|||
self.send(packet) |
|||
|
|||
# Wait for a response |
|||
try: |
|||
data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout) |
|||
except gevent.Timeout: |
|||
return (None, None) |
|||
|
|||
# Read IP and port |
|||
ip = str(data[4:]).split('\x00', 1)[0] |
|||
port = struct.unpack('<H', data[-2:])[0] |
|||
|
|||
# Spawn read thread so we don't max buffers |
|||
self.connected = True |
|||
self._run_task = gevent.spawn(self.run) |
|||
|
|||
return (ip, port) |
@ -1,56 +1,32 @@ |
|||
from disco.bot import Plugin |
|||
from disco.bot.command import CommandError |
|||
from disco.voice.player import Player |
|||
from disco.voice.playable import YoutubeDLInput, BufferedOpusEncoderPlayable |
|||
from disco.voice.client import VoiceException |
|||
from disco.voice import VoiceConnection, YoutubeDLPlayable |
|||
|
|||
|
|||
class MusicPlugin(Plugin): |
|||
def load(self, ctx): |
|||
super(MusicPlugin, self).load(ctx) |
|||
self.guilds = {} |
|||
def load(self, data): |
|||
super(MusicPlugin, self).load(data) |
|||
self._connections = {} |
|||
|
|||
@Plugin.command('join') |
|||
def on_join(self, event): |
|||
if event.guild.id in self.guilds: |
|||
return event.msg.reply("I'm already playing music here.") |
|||
|
|||
state = event.guild.get_member(event.author).get_voice_state() |
|||
if not state: |
|||
return event.msg.reply('You must be connected to voice to use that command.') |
|||
|
|||
try: |
|||
client = state.channel.connect() |
|||
except VoiceException as e: |
|||
return event.msg.reply('Failed to connect to voice: `{}`'.format(e)) |
|||
|
|||
self.guilds[event.guild.id] = Player(client) |
|||
self.guilds[event.guild.id].complete.wait() |
|||
del self.guilds[event.guild.id] |
|||
|
|||
def get_player(self, guild_id): |
|||
if guild_id not in self.guilds: |
|||
raise CommandError("I'm not currently playing music here.") |
|||
return self.guilds.get(guild_id) |
|||
|
|||
@Plugin.command('leave') |
|||
def on_leave(self, event): |
|||
player = self.get_player(event.guild.id) |
|||
player.disconnect() |
|||
|
|||
@Plugin.command('play', '<url:str>') |
|||
def on_play(self, event, url): |
|||
item = YoutubeDLInput(url).pipe(BufferedOpusEncoderPlayable) |
|||
self.get_player(event.guild.id).queue.append(item) |
|||
|
|||
@Plugin.command('pause') |
|||
def on_pause(self, event): |
|||
self.get_player(event.guild.id).pause() |
|||
|
|||
@Plugin.command('resume') |
|||
def on_resume(self, event): |
|||
self.get_player(event.guild.id).resume() |
|||
|
|||
@Plugin.command('kill') |
|||
def on_kill(self, event): |
|||
self.get_player(event.guild.id).client.ws.sock.shutdown() |
|||
vs = event.guild.get_member(event.author).get_voice_state() |
|||
if not vs: |
|||
return event.msg.reply('you are not in a voice channel') |
|||
|
|||
if event.guild.id in self._connections: |
|||
if self._connections[event.guild.id].channel_id == vs.channel_id: |
|||
return event.msg.reply('already in that channel') |
|||
else: |
|||
self._connections[event.guild.id].set_channel(vs.channel) |
|||
return |
|||
|
|||
self._connections[event.guild.id] = VoiceConnection.from_channel(vs.channel, enable_events=True) |
|||
|
|||
@Plugin.command('play', '<song:str>') |
|||
def on_play(self, event, song): |
|||
if event.guild.id not in self._connections: |
|||
return event.msg.reply('not in voice here') |
|||
|
|||
playables = list(YoutubeDLPlayable.from_url(song)) |
|||
for playable in playables: |
|||
self._connections[event.guild.id].play(playable) |
|||
|
@ -1,5 +1,4 @@ |
|||
gevent==1.3.7 |
|||
holster==2.0.0 |
|||
requests==2.20.1 |
|||
six==1.11.0 |
|||
websocket-client==0.44.0 |
|||
|
@ -0,0 +1,42 @@ |
|||
from disco.types.permissions import Permissions, PermissionValue |
|||
|
|||
|
|||
def test_permission_value_can(): |
|||
admin_perms = PermissionValue( |
|||
Permissions.ADMINISTRATOR |
|||
) |
|||
|
|||
assert admin_perms.administrator |
|||
|
|||
# Admin can do everything |
|||
for key in Permissions.keys(): |
|||
assert admin_perms.can(getattr(Permissions, key)) |
|||
|
|||
manage_channels_perms = PermissionValue( |
|||
Permissions.MANAGE_CHANNELS, |
|||
) |
|||
|
|||
assert not manage_channels_perms.administrator |
|||
assert manage_channels_perms.manage_channels |
|||
|
|||
|
|||
def test_permission_value_mutation(): |
|||
no_perms = PermissionValue() |
|||
assert not no_perms.can(Permissions.SEND_MESSAGES) |
|||
|
|||
no_perms.send_messages = True |
|||
assert no_perms.can(Permissions.SEND_MESSAGES) |
|||
|
|||
|
|||
def test_permission_value_accepts_permission_value(): |
|||
perms = PermissionValue(Permissions.ADMINISTRATOR) |
|||
|
|||
new_perms = PermissionValue(perms) |
|||
assert new_perms.administrator |
|||
|
|||
assert not new_perms.manage_channels |
|||
new_perms.add(PermissionValue(Permissions.MANAGE_CHANNELS)) |
|||
assert new_perms.manage_channels |
|||
|
|||
new_perms.sub(PermissionValue(Permissions.MANAGE_CHANNELS)) |
|||
assert not new_perms.manage_channels |
@ -1,22 +1,21 @@ |
|||
from unittest import TestCase |
|||
|
|||
from disco.types.user import User, DefaultAvatars |
|||
|
|||
|
|||
class TestChannel(TestCase): |
|||
def test_user_avatar(self): |
|||
u = User( |
|||
id=12345, |
|||
username='test123', |
|||
avatar='1234567890abcdefghijkl', |
|||
discriminator='1234', |
|||
bot=False) |
|||
def test_user_avatar_url(): |
|||
u = User(id=12345, avatar='1234567890abcdefghijkl') |
|||
assert u.avatar_url == 'https://cdn.discordapp.com/avatars/12345/1234567890abcdefghijkl.webp?size=1024' |
|||
avatar_url = u.get_avatar_url(still_format='png') |
|||
assert avatar_url == 'https://cdn.discordapp.com/avatars/12345/1234567890abcdefghijkl.png?size=1024' |
|||
|
|||
|
|||
def test_user_animated_avatar_url(): |
|||
u = User(id=12345, avatar='a_1234567890abcdefghijkl') |
|||
assert u.avatar_url == 'https://cdn.discordapp.com/avatars/12345/a_1234567890abcdefghijkl.gif?size=1024' |
|||
avatar_url = u.get_avatar_url(animated_format='webp') |
|||
assert avatar_url == 'https://cdn.discordapp.com/avatars/12345/a_1234567890abcdefghijkl.webp?size=1024' |
|||
|
|||
self.assertEqual( |
|||
u.avatar_url, 'https://cdn.discordapp.com/avatars/12345/1234567890abcdefghijkl.webp?size=1024' |
|||
) |
|||
|
|||
def test_user_default_avatar(self): |
|||
u = User(id=123456, discriminator='1234') |
|||
self.assertEqual(u.default_avatar, DefaultAvatars.RED) |
|||
self.assertEqual(u.avatar_url, 'https://cdn.discordapp.com/embed/avatars/4.png') |
|||
def test_user_default_avatar_url(): |
|||
u = User(id=12345, discriminator='1234') |
|||
assert u.default_avatar == DefaultAvatars.RED |
|||
assert u.avatar_url == 'https://cdn.discordapp.com/embed/avatars/4.png' |
|||
|
@ -1,66 +0,0 @@ |
|||
import gevent |
|||
from unittest import TestCase |
|||
|
|||
from disco.voice.queue import PlayableQueue |
|||
|
|||
|
|||
class TestPlayableQueue(TestCase): |
|||
def test_append(self): |
|||
q = PlayableQueue() |
|||
q.append(1) |
|||
q.append(2) |
|||
q.append(3) |
|||
|
|||
self.assertEqual(q._data, [1, 2, 3]) |
|||
self.assertEqual(q.get(), 1) |
|||
self.assertEqual(q.get(), 2) |
|||
self.assertEqual(q.get(), 3) |
|||
|
|||
def test_len(self): |
|||
q = PlayableQueue() |
|||
|
|||
for idx in range(1234): |
|||
q.append(idx) |
|||
|
|||
self.assertEqual(len(q), 1234) |
|||
|
|||
def test_iter(self): |
|||
q = PlayableQueue() |
|||
|
|||
for idx in range(5): |
|||
q.append(idx) |
|||
|
|||
self.assertEqual(sum(q), 10) |
|||
|
|||
def test_blocking_get(self): |
|||
q = PlayableQueue() |
|||
result = gevent.event.AsyncResult() |
|||
|
|||
def get(): |
|||
result.set(q.get()) |
|||
|
|||
gevent.spawn(get) |
|||
q.append(5) |
|||
self.assertEqual(result.get(), 5) |
|||
|
|||
def test_shuffle(self): |
|||
q = PlayableQueue() |
|||
|
|||
for idx in range(10000): |
|||
q.append(idx) |
|||
|
|||
self.assertEqual(q._data[0], 0) |
|||
q.shuffle() |
|||
self.assertNotEqual(q._data[0], 0) |
|||
|
|||
def test_clear(self): |
|||
q = PlayableQueue() |
|||
|
|||
for idx in range(100): |
|||
q.append(idx) |
|||
|
|||
self.assertEqual(q._data[0], 0) |
|||
self.assertEqual(q._data[-1], 99) |
|||
self.assertEqual(len(q), 100) |
|||
q.clear() |
|||
self.assertEqual(len(q), 0) |
Loading…
Reference in new issue