Browse Source

Voice encryption, dep version bump, etc fixes

feature/voice
Andrei 8 years ago
parent
commit
641724a63d
  1. 6
      disco/bot/bot.py
  2. 56
      disco/voice/client.py
  3. 68
      disco/voice/opus.py
  4. 51
      disco/voice/player.py
  5. 7
      examples/music.py
  6. 5
      requirements.txt

6
disco/bot/bot.py

@ -355,10 +355,10 @@ class Bot(LoggingClass):
if event.message.author.id == self.client.state.me.id:
return
if self.config.commands_allow_edit:
self.last_message_cache[event.message.channel_id] = (event.message, False)
result = self.handle_message(event.message)
self.handle_message(event.message)
if self.config.commands_allow_edit:
self.last_message_cache[event.message.channel_id] = (event.message, result)
def on_message_update(self, event):
if self.config.commands_allow_edit:

56
disco/voice/client.py

@ -3,6 +3,8 @@ import socket
import struct
import time
import nacl.secret
from holster.enum import Enum
from holster.emitter import Emitter
@ -40,21 +42,32 @@ class UDPVoiceClient(LoggingClass):
# Connection information
self.ip = None
self.port = None
self.sequence = 0
self.timestamp = 0
self.run_task = None
self.connected = False
def send_frame(self, frame, sequence=None, timestamp=None):
data = bytearray(12)
data[0] = 0x80
data[1] = 0x78
struct.pack_into('>H', data, 2, sequence or self.sequence)
struct.pack_into('>I', data, 4, timestamp or self.timestamp)
struct.pack_into('>i', data, 8, self.vc.ssrc)
self.send(data + ''.join(frame))
self.sequence += 1
# Convert the frame to a bytearray
frame = bytearray(frame)
# First, pack the header (TODO: reuse bytearray?)
header = bytearray(24)
header[0] = 0x80
header[1] = 0x78
struct.pack_into('>H', header, 2, sequence or self.vc.sequence)
struct.pack_into('>I', header, 4, timestamp or self.vc.timestamp)
struct.pack_into('>i', header, 8, self.vc.ssrc)
# Now encrypt the payload with the nonce as a header
raw = self.vc.secret_box.encrypt(bytes(frame), bytes(header)).ciphertext
# Send the header (sans nonce padding) plus the payload
self.send(header[:12] + raw)
# Increment our sequence counter
self.vc.sequence += 1
if self.vc.sequence >= 65535:
self.vc.sequence = 0
def run(self):
while True:
@ -101,24 +114,34 @@ class VoiceClient(LoggingClass):
def __init__(self, channel, encoder=None):
super(VoiceClient, self).__init__()
assert channel.is_voice, 'Cannot spawn a VoiceClient for a non-voice channel'
if not channel.is_voice:
raise ValueError('Cannot spawn a VoiceClient for a non-voice channel')
self.channel = channel
self.client = self.channel.client
self.encoder = encoder or JSONEncoder
# Bind to some WS packets
self.packets = Emitter(gevent.spawn)
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp)
# State
# State + state change emitter
self.state = VoiceState.DISCONNECTED
self.state_emitter = Emitter(gevent.spawn)
# Connection metadata
self.token = None
self.endpoint = None
self.ssrc = None
self.port = None
self.secret_box = None
self.udp = None
# Voice data state
self.sequence = 0
self.timestamp = 0
self.update_listener = None
# Websocket connection
@ -166,11 +189,14 @@ class VoiceClient(LoggingClass):
'data': {
'port': port,
'address': ip,
'mode': 'plain'
'mode': 'xsalsa20_poly1305'
}
})
def on_voice_sdp(self, _):
def on_voice_sdp(self, sdp):
# Create a secret box for encryption/decryption
self.secret_box = nacl.secret.SecretBox(bytes(bytearray(sdp['secret_key'])))
# Toggle speaking state so clients learn of our SSRC
self.set_speaking(True)
self.set_speaking(False)
@ -204,7 +230,7 @@ class VoiceClient(LoggingClass):
self.log.exception('Failed to parse voice gateway message: ')
def on_error(self, err):
# TODO
# TODO: raise an exception here
self.log.warning('Voice websocket error: {}'.format(err))
def on_open(self):

68
disco/voice/opus.py

@ -137,9 +137,9 @@ class OpusEncoder(BaseOpus):
return result
def __del__(self):
if self.inst:
self.opus_encoder_destroy(self.inst)
self.inst = None
if self._inst:
self.opus_encoder_destroy(self._inst)
self._inst = None
def encode(self, pcm, frame_size):
max_data_bytes = len(pcm)
@ -159,24 +159,24 @@ class OpusDecoder(BaseOpus):
class BufferedOpusEncoder(OpusEncoder):
def __init__(self, f, *args, **kwargs):
self.data = f
def __init__(self, source, *args, **kwargs):
self.source = source
self.frames = Queue(kwargs.pop('queue_size', 4096))
super(BufferedOpusEncoder, self).__init__(*args, **kwargs)
gevent.spawn(self._encoder_loop)
def _encoder_loop(self):
while self.data:
raw = self.data.read(self.frame_size)
while self.source:
raw = self.source.read(self.frame_size)
if len(raw) < self.frame_size:
break
self.frames.put(self.encode(raw, self.samples_per_frame))
gevent.idle()
self.data = None
self.source = None
def have_frame(self):
return self.data or not self.frames.empty()
return self.source or not self.frames.empty()
def next_frame(self):
return self.frames.get()
@ -185,10 +185,10 @@ class BufferedOpusEncoder(OpusEncoder):
class GIPCBufferedOpusEncoder(OpusEncoder):
FIN = 1
def __init__(self, f, *args, **kwargs):
def __init__(self, source, *args, **kwargs):
import gipc
self.data = f
self.source = source
self.parent_pipe, self.child_pipe = gipc.pipe(duplex=True)
self.frames = Queue(kwargs.pop('queue_size', 4096))
super(GIPCBufferedOpusEncoder, self).__init__(*args, **kwargs)
@ -209,7 +209,7 @@ class GIPCBufferedOpusEncoder(OpusEncoder):
def _writer(self):
while self.data:
raw = self.data.read(self.frame_size)
raw = self.source.read(self.frame_size)
if len(raw) < self.frame_size:
break
@ -238,36 +238,48 @@ class GIPCBufferedOpusEncoder(OpusEncoder):
class DCADOpusEncoder(OpusEncoder):
def __init__(self, pipe, *args, **kwargs):
command = kwargs.pop('command', 'dcad')
def __init__(self, source, *args, **kwargs):
self.source = source
self.command = kwargs.pop('command', 'dcad')
super(DCADOpusEncoder, self).__init__(*args, **kwargs)
self.proc = subprocess.Popen([
command,
# '--channels', str(self.channels),
# '--rate', str(self.sampling_rate),
# '--size', str(self.frame_length),
'--bitrate', '128',
'--fec',
'--packet-loss-percent', '30',
'--input', 'pipe:0',
'--output', 'pipe:1',
], stdin=pipe, stdout=subprocess.PIPE)
self._proc = None
self.header_size = struct.calcsize('<h')
@property
def proc(self):
if not self._proc:
self._proc = subprocess.Popen([
self.command,
'--channels', str(self.channels),
'--rate', str(self.sampling_rate),
'--size', str(self.samples_per_frame),
'--bitrate', '128',
'--fec',
'--packet-loss-percent', '30',
'--input', 'pipe:0',
'--output', 'pipe:1',
], stdin=self.source.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
return self._proc
def have_frame(self):
return bool(self.proc)
def next_frame(self):
header = self.proc.stdout.read(self.header_size)
if len(header) < self.header_size:
self.proc = None
print 'read less than required header size'
print self.proc.poll()
self._proc = None
return
if self.proc.poll() is not None:
print 'read that data when she dead n gone: %s' % self.proc.poll()
size = struct.unpack('<h', header)[0]
data = self.proc.stdout.read(size)
if len(data) < size:
self.proc = None
if len(data) == 0:
self._proc = None
return
return data

51
disco/voice/player.py

@ -24,16 +24,28 @@ class FFmpegPlayable(object):
self.channels = channels
self.kwargs = kwargs
self._buffer = None
self._proc = None
self._child = None
@property
def stdout(self):
return self.proc.stdout
def read(self, sz):
if self.streaming:
return self.proc.stdout.read(sz)
else:
if not self._buffer:
data, _ = self.proc.communicate()
self._buffer = StringIO(data)
return self._buffer.read(sz)
def pipe(self, other, streaming=True):
if issubclass(other, OpusEncoder):
if not streaming:
stdout, _ = self._proc.communicate()
self._child = other(StringIO(stdout), self.sampling_rate, self.channels, **self.kwargs)
else:
self._child = other(self.out_pipe, self.sampling_rate, self.channels, **self.kwargs)
self._child = other(self, self.sampling_rate, self.channels, **self.kwargs)
else:
raise TypeError('Invalid pipe target')
@property
def samples_per_frame(self):
@ -51,17 +63,9 @@ class FFmpegPlayable(object):
'-loglevel', 'warning',
'pipe:1'
]
self._proc = subprocess.Popen(args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._proc = subprocess.Popen(args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
return self._proc
@property
def out_pipe(self):
return self.proc.stdout
@property
def in_pipe(self):
return self.proc.stdin
def have_frame(self):
return self._child and self._child.have_frame()
@ -76,18 +80,17 @@ def create_ffmpeg_playable(*args, **kwargs):
return playable
def create_youtube_dl_playable(url, *args, **kwargs):
def create_youtube_dl_playables(url, *args, **kwargs):
import youtube_dl
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'})
info = ydl.extract_info(url, download=False)
entries = [info] if 'entries' not in info else info['entries']
if 'entries' in info:
info = info['entries'][0]
playable = create_ffmpeg_playable(info['url'], *args, **kwargs)
playable.info = info
return playable
for entry in entries:
playable = create_ffmpeg_playable(entry['url'], *args, **kwargs)
playable.info = entry
yield playable
class OpusPlayable(object):
@ -199,7 +202,11 @@ class Player(object):
if not item.have_frame():
return
self.client.send_frame(item.next_frame())
frame = item.next_frame()
if frame is None:
return
self.client.send_frame(frame)
self.client.timestamp += item.samples_per_frame
next_time = start + 0.02 * loops

7
examples/music.py

@ -1,11 +1,11 @@
from disco.bot import Plugin
from disco.bot.command import CommandError
from disco.voice.player import Player, create_youtube_dl_playable
from disco.voice.player import Player, create_youtube_dl_playables
from disco.voice.client import VoiceException
def download(url):
return create_youtube_dl_playable(url)
return create_youtube_dl_playables(url)
class MusicPlugin(Plugin):
@ -43,7 +43,8 @@ class MusicPlugin(Plugin):
@Plugin.command('play', '<url:str>')
def on_play(self, event, url):
self.get_player(event.guild.id).queue.put(download(url))
item = list(create_youtube_dl_playables(url))[0]
self.get_player(event.guild.id).queue.put(item)
@Plugin.command('pause')
def on_pause(self, event):

5
requirements.txt

@ -1,6 +1,7 @@
gevent==1.1.2
gevent==1.2.1
holster==1.0.11
inflection==0.3.1
requests==2.11.1
requests==2.13.0
six==1.10.0
websocket-client==0.40.0
pynacl==1.1.2

Loading…
Cancel
Save