You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

347 lines
9.6 KiB

import abc
import six
import types
import gevent
import struct
import subprocess
from gevent.queue import Queue
from disco.voice.opus import OpusEncoder
try:
from cStringIO import cStringIO as StringIO
except:
from StringIO import StringIO
OPUS_HEADER_SIZE = struct.calcsize('<h')
# Play from file:
# OpusFilePlayable(open('myfile.opus', 'r'))
# PCMFileInput(open('myfile.pcm', 'r')).pipe(DCADOpusEncoder) => OpusPlayable
# FFMpegInput.youtube_dl('youtube.com/yolo').pipe(DCADOpusEncoder) => OpusPlayable
# FFMpegInput.youtube_dl('youtube.com/yolo').pipe(OpusEncoder).pipe(DuplexStream, open('cache_file.opus', 'w')) => OpusPlayable
class AbstractOpus(object):
def __init__(self, sampling_rate=48000, frame_length=20, channels=2):
self.sampling_rate = sampling_rate
self.frame_length = frame_length
self.channels = 2
self.sample_size = 2 * self.channels
self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length)
self.frame_size = self.samples_per_frame * self.sample_size
class BaseUtil(object):
def pipe(self, other, *args, **kwargs):
child = other(self, *args, **kwargs)
setattr(child, 'metadata', self.metadata)
setattr(child, '_parent', self)
return child
@property
def metadata(self):
return self._metadata
@metadata.setter
def metadata(self, value):
self._metadata = value
@six.add_metaclass(abc.ABCMeta)
class BasePlayable(BaseUtil):
@abc.abstractmethod
def next_frame(self):
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class BaseInput(BaseUtil):
@abc.abstractmethod
def read(self, size):
raise NotImplementedError
@abc.abstractmethod
def fileobj(self):
raise NotImplementedError
class OpusFilePlayable(BasePlayable, AbstractOpus):
"""
An input which reads opus data from a file or file-like object.
"""
def __init__(self, fobj, *args, **kwargs):
super(OpusFilePlayable, self).__init__(*args, **kwargs)
self.fobj = fobj
self.done = False
def next_frame(self):
if self.done:
return None
header = self.fobj.read(OPUS_HEADER_SIZE)
if len(header) < OPUS_HEADER_SIZE:
self.done = True
return None
data_size = struct.unpack('<h', header)[0]
data = self.fobj.read(data_size)
if len(data) < data_size:
self.done = True
return None
return data
class FFmpegInput(BaseInput, AbstractOpus):
def __init__(self, source='-', command='avconv', streaming=False, **kwargs):
super(FFmpegInput, self).__init__(**kwargs)
if source:
self.source = source
self.streaming = streaming
self.command = command
self._buffer = None
self._proc = None
def read(self, sz):
if self.streaming:
raise TypeError('Cannot read from a streaming FFmpegInput')
# First read blocks until the subprocess finishes
if not self._buffer:
data, _ = self.proc.communicate()
self._buffer = StringIO(data)
# Subsequent reads can just do dis thang
return self._buffer.read(sz)
def fileobj(self):
if self.streaming:
return self.proc.stdout
else:
return self
@property
def proc(self):
if not self._proc:
if callable(self.source):
self.source = self.source(self)
if isinstance(self.source, (tuple, list)):
self.source, self.metadata = self.source
args = [
self.command,
'-i', str(self.source),
'-f', 's16le',
'-ar', str(self.sampling_rate),
'-ac', str(self.channels),
'-loglevel', 'warning',
'pipe:1'
]
self._proc = subprocess.Popen(args, stdin=None, stdout=subprocess.PIPE)
return self._proc
class YoutubeDLInput(FFmpegInput):
def __init__(self, url=None, ie_info=None, *args, **kwargs):
super(YoutubeDLInput, self).__init__(None, *args, **kwargs)
self._url = url
self._ie_info = ie_info
self._info = None
@property
def info(self):
if not self._info:
import youtube_dl
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'})
if self._url:
obj = ydl.extract_info(self._url, download=False, process=False)
if 'entries' in obj:
self._ie_info = obj['entries']
else:
self._ie_info = [obj]
self._info = ydl.process_ie_result(self._ie_info, download=False)
return self._info
@property
def _metadata(self):
return self.info
@classmethod
def many(cls, url, *args, **kwargs):
import youtube_dl
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'})
info = ydl.extract_info(url, download=False, process=False)
if 'entries' not in info:
yield cls(ie_info=info, *args, **kwargs)
raise StopIteration
for item in info['entries']:
yield cls(ie_info=item, *args, **kwargs)
@property
def source(self):
return self.info['url']
class BufferedOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder):
def __init__(self, source, *args, **kwargs):
self.source = source
self.frames = Queue(kwargs.pop('queue_size', 4096))
super(BufferedOpusEncoderPlayable, self).__init__(*args, **kwargs)
gevent.spawn(self._encoder_loop)
def _encoder_loop(self):
while self.source:
raw = self.source.read(self.frame_size)
if len(raw) < self.frame_size:
break
self.frames.put(self.encode(raw, self.samples_per_frame))
gevent.idle()
self.source = None
def next_frame(self):
if not self.source:
return None
return self.frames.get()
class DCADOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder):
def __init__(self, source, *args, **kwargs):
self.source = source
self.command = kwargs.pop('command', 'dcad')
super(DCADOpusEncoderPlayable, self).__init__(*args, **kwargs)
self._done = False
self._proc = None
@property
def proc(self):
if not self._proc:
source = obj = self.source.fileobj()
if not hasattr(obj, 'fileno'):
source = subprocess.PIPE
self._proc = subprocess.Popen([
self.command,
'--channels', str(self.channels),
'--rate', str(self.sampling_rate),
'--size', str(self.samples_per_frame),
'--bitrate', '128',
'--fec',
'--packet-loss-percent', '30',
'--input', 'pipe:0',
'--output', 'pipe:1',
], stdin=source, stdout=subprocess.PIPE)
def writer():
while True:
data = obj.read(2048)
if len(data) > 0:
self._proc.stdin.write(data)
if len(data) < 2048:
break
if source == subprocess.PIPE:
gevent.spawn(writer)
return self._proc
def next_frame(self):
if self._done:
return None
header = self.proc.stdout.read(OPUS_HEADER_SIZE)
if len(header) < OPUS_HEADER_SIZE:
self._done = True
return
size = struct.unpack('<h', header)[0]
data = self.proc.stdout.read(size)
if len(data) < size:
self._done = True
return
return data
class FileProxyPlayable(BasePlayable, AbstractOpus):
def __init__(self, other, output, *args, **kwargs):
self.flush = kwargs.pop('flush', False)
super(FileProxyPlayable, self).__init__(*args, **kwargs)
self.other = other
self.output = output
def next_frame(self):
frame = self.other.next_frame()
if frame:
self.output.write(struct.pack('<h', len(frame)))
self.output.write(frame)
if self.flush:
self.output.flush()
else:
self.output.flush()
self.output.close()
return frame
class PlaylistPlayable(BasePlayable, AbstractOpus):
def __init__(self, items, *args, **kwargs):
super(PlaylistPlayable, self).__init__(*args, **kwargs)
self.items = items
self.now_playing = None
def _get_next(self):
if isinstance(self.items, types.GeneratorType):
return next(self.items, None)
return self.items.pop()
def next_frame(self):
if not self.items:
return
if not self.now_playing:
self.now_playing = self._get_next()
if not self.now_playing:
return
frame = self.now_playing.next_frame()
if not frame:
return self.next_frame()
return frame
class MemoryBufferedPlayable(BasePlayable, AbstractOpus):
def __init__(self, other, *args, **kwargs):
from gevent.queue import Queue
super(MemoryBufferedPlayable, self).__init__(*args, **kwargs)
self.frames = Queue()
self.other = other
gevent.spawn(self._buffer)
def _buffer(self):
while True:
frame = self.other.next_frame()
if not frame:
break
self.frames.put(frame)
self.frames.put(None)
def next_frame(self):
return self.frames.get()