|
|
@ -1,3 +1,5 @@ |
|
|
|
import abc |
|
|
|
import six |
|
|
|
import gevent |
|
|
|
import struct |
|
|
|
import subprocess |
|
|
@ -17,65 +19,111 @@ except: |
|
|
|
OPUS_HEADER_SIZE = struct.calcsize('<h') |
|
|
|
|
|
|
|
|
|
|
|
# Play from file: |
|
|
|
# OpusFilePlayable(open('myfile.opus', 'r')) |
|
|
|
# PCMFileInput(open('myfile.pcm', 'r')).pipe(DCADOpusEncoder) => OpusPlayable |
|
|
|
# FFMpegInput.youtube_dl('youtube.com/yolo').pipe(DCADOpusEncoder) => OpusPlayable |
|
|
|
# FFMpegInput.youtube_dl('youtube.com/yolo').pipe(OpusEncoder).pipe(DuplexStream, open('cache_file.opus', 'w')) => OpusPlayable |
|
|
|
|
|
|
|
|
|
|
|
class AbstractOpus(object): |
|
|
|
def __init__(self, sampling_rate=48000, frame_length=20, channels=2): |
|
|
|
self.sampling_rate = sampling_rate |
|
|
|
self.frame_length = frame_length |
|
|
|
self.channels = 2 |
|
|
|
self.sample_size = 2 * self.channels |
|
|
|
self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length) |
|
|
|
self.frame_size = self.samples_per_frame * self.sample_size |
|
|
|
|
|
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta) |
|
|
|
class BasePlayable(object): |
|
|
|
pass |
|
|
|
@abc.abstractmethod |
|
|
|
def next_frame(self): |
|
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta) |
|
|
|
class BaseInput(object): |
|
|
|
@abc.abstractmethod |
|
|
|
def read(self, size): |
|
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
@abc.abstractmethod |
|
|
|
def fileobj(self): |
|
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
def pipe(self, other, *args, **kwargs): |
|
|
|
return other(self, *args, **kwargs) |
|
|
|
|
|
|
|
class FFmpegPlayable(BasePlayable): |
|
|
|
def __init__(self, source='-', command='avconv', sampling_rate=48000, channels=2, **kwargs): |
|
|
|
|
|
|
|
class OpusFilePlayable(BasePlayable): |
|
|
|
""" |
|
|
|
An input which reads opus data from a file or file-like object. |
|
|
|
""" |
|
|
|
def __init__(self, fobj): |
|
|
|
self.fobj = fobj |
|
|
|
self.done = False |
|
|
|
|
|
|
|
def next_frame(self): |
|
|
|
if self.done: |
|
|
|
return None |
|
|
|
|
|
|
|
header = self.fobj.read(OPUS_HEADER_SIZE) |
|
|
|
if len(header) < OPUS_HEADER_SIZE: |
|
|
|
self.done = True |
|
|
|
return None |
|
|
|
|
|
|
|
data_size = struct.unpack('<h', header)[0] |
|
|
|
data = self.fobj.read(data_size) |
|
|
|
if len(data) < data_size: |
|
|
|
self.done = True |
|
|
|
return None |
|
|
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
|
|
|
class FFmpegInput(BaseInput, AbstractOpus): |
|
|
|
def __init__(self, source='-', command='avconv', streaming=False, **kwargs): |
|
|
|
super(FFmpegInput, self).__init__(**kwargs) |
|
|
|
self.streaming = streaming |
|
|
|
self.source = source |
|
|
|
self.command = command |
|
|
|
self.sampling_rate = sampling_rate |
|
|
|
self.channels = channels |
|
|
|
self.kwargs = kwargs |
|
|
|
|
|
|
|
self._buffer = None |
|
|
|
self._proc = None |
|
|
|
self._child = None |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def create(_cls, *args, **kwargs): |
|
|
|
cls = kwargs.pop('cls', BufferedOpusEncoder) |
|
|
|
playable = _cls(*args, **kwargs) |
|
|
|
playable.pipe(cls) |
|
|
|
return playable |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def create_youtube_dl(_cls, url, *args, **kwargs): |
|
|
|
def youtube_dl(cls, url, *args, **kwargs): |
|
|
|
import youtube_dl |
|
|
|
|
|
|
|
ydl = youtube_dl.YoutubeDL({'format': 'webm[abr>0]/bestaudio/best'}) |
|
|
|
info = ydl.extract_info(url, download=False) |
|
|
|
entries = [info] if 'entries' not in info else info['entries'] |
|
|
|
|
|
|
|
for entry in entries: |
|
|
|
playable = _cls.create(entry['url'], *args, **kwargs) |
|
|
|
playable.info = entry |
|
|
|
yield playable |
|
|
|
if 'entries' in info: |
|
|
|
info = info['entries'][0] |
|
|
|
|
|
|
|
@property |
|
|
|
def stdout(self): |
|
|
|
return self.proc.stdout |
|
|
|
result = cls(source=info['url'], *args, **kwargs) |
|
|
|
result.info = info |
|
|
|
return result |
|
|
|
|
|
|
|
def read(self, sz): |
|
|
|
if self.streaming: |
|
|
|
return self.proc.stdout.read(sz) |
|
|
|
else: |
|
|
|
if not self._buffer: |
|
|
|
data, _ = self.proc.communicate() |
|
|
|
self._buffer = StringIO(data) |
|
|
|
return self._buffer.read(sz) |
|
|
|
|
|
|
|
def pipe(self, other, streaming=True): |
|
|
|
if issubclass(other, OpusEncoder): |
|
|
|
self._child = other(self, self.sampling_rate, self.channels, **self.kwargs) |
|
|
|
else: |
|
|
|
raise TypeError('Invalid pipe target') |
|
|
|
raise TypeError('Cannot read from a streaming FFmpegInput') |
|
|
|
|
|
|
|
@property |
|
|
|
def samples_per_frame(self): |
|
|
|
return self._child.samples_per_frame |
|
|
|
# First read blocks until the subprocess finishes |
|
|
|
if not self._buffer: |
|
|
|
data, _ = self.proc.communicate() |
|
|
|
self._buffer = StringIO(data) |
|
|
|
|
|
|
|
# Subsequent reads can just do dis thang |
|
|
|
return self._buffer.read(sz) |
|
|
|
|
|
|
|
def fileobj(self): |
|
|
|
if self.streaming: |
|
|
|
return self.proc.stdout |
|
|
|
else: |
|
|
|
return self |
|
|
|
|
|
|
|
@property |
|
|
|
def proc(self): |
|
|
@ -92,50 +140,12 @@ class FFmpegPlayable(BasePlayable): |
|
|
|
self._proc = subprocess.Popen(args, stdin=None, stdout=subprocess.PIPE) |
|
|
|
return self._proc |
|
|
|
|
|
|
|
def have_frame(self): |
|
|
|
return self._child and self._child.have_frame() |
|
|
|
|
|
|
|
def next_frame(self): |
|
|
|
return self._child.next_frame() |
|
|
|
|
|
|
|
|
|
|
|
class OpusFilePlayable(BasePlayable): |
|
|
|
""" |
|
|
|
A Playable which supports reading from an on-disk opus-format file. This is |
|
|
|
useful in combination with other playables and the OpusFileOutputDuplex. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, obj, sampling_rate=48000, frame_length=20, channels=2): |
|
|
|
super(OpusFilePlayable, self).__init__() |
|
|
|
self.obj = obj |
|
|
|
self.sampling_rate = sampling_rate |
|
|
|
self.frame_length = frame_length |
|
|
|
self.channels = channels |
|
|
|
self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length) |
|
|
|
|
|
|
|
def have_frame(self): |
|
|
|
return self.obj |
|
|
|
|
|
|
|
def next_frame(self): |
|
|
|
header = self.obj.read(OPUS_HEADER_SIZE) |
|
|
|
if len(header) < OPUS_HEADER_SIZE: |
|
|
|
self.obj = None |
|
|
|
return |
|
|
|
|
|
|
|
size = struct.unpack('<h', header)[0] |
|
|
|
raw = self.obj.read(size) |
|
|
|
if len(raw) < size: |
|
|
|
self.obj = None |
|
|
|
return |
|
|
|
|
|
|
|
return raw |
|
|
|
|
|
|
|
|
|
|
|
class BufferedOpusEncoder(BasePlayable, OpusEncoder): |
|
|
|
class BufferedOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder): |
|
|
|
def __init__(self, source, *args, **kwargs): |
|
|
|
self.source = source |
|
|
|
self.frames = Queue(kwargs.pop('queue_size', 4096)) |
|
|
|
super(BufferedOpusEncoder, self).__init__(*args, **kwargs) |
|
|
|
super(BufferedOpusEncoderPlayable, self).__init__(*args, **kwargs) |
|
|
|
gevent.spawn(self._encoder_loop) |
|
|
|
|
|
|
|
def _encoder_loop(self): |
|
|
@ -148,78 +158,28 @@ class BufferedOpusEncoder(BasePlayable, OpusEncoder): |
|
|
|
gevent.idle() |
|
|
|
self.source = None |
|
|
|
|
|
|
|
def have_frame(self): |
|
|
|
return self.source or not self.frames.empty() |
|
|
|
|
|
|
|
def next_frame(self): |
|
|
|
if not self.source: |
|
|
|
return None |
|
|
|
return self.frames.get() |
|
|
|
|
|
|
|
|
|
|
|
class GIPCBufferedOpusEncoder(BasePlayable, OpusEncoder): |
|
|
|
FIN = 1 |
|
|
|
|
|
|
|
def __init__(self, source, *args, **kwargs): |
|
|
|
import gipc |
|
|
|
|
|
|
|
self.source = source |
|
|
|
self.parent_pipe, self.child_pipe = gipc.pipe(duplex=True) |
|
|
|
self.frames = Queue(kwargs.pop('queue_size', 4096)) |
|
|
|
super(GIPCBufferedOpusEncoder, self).__init__(*args, **kwargs) |
|
|
|
|
|
|
|
gipc.start_process(target=self._encoder_loop, args=(self.child_pipe, (args, kwargs))) |
|
|
|
|
|
|
|
gevent.spawn(self._writer) |
|
|
|
gevent.spawn(self._reader) |
|
|
|
|
|
|
|
def _reader(self): |
|
|
|
while True: |
|
|
|
data = self.parent_pipe.get() |
|
|
|
if data == self.FIN: |
|
|
|
return |
|
|
|
|
|
|
|
self.frames.put(data) |
|
|
|
self.parent_pipe = None |
|
|
|
|
|
|
|
def _writer(self): |
|
|
|
while self.data: |
|
|
|
raw = self.source.read(self.frame_size) |
|
|
|
if len(raw) < self.frame_size: |
|
|
|
break |
|
|
|
|
|
|
|
self.parent_pipe.put(raw) |
|
|
|
gevent.idle() |
|
|
|
|
|
|
|
self.parent_pipe.put(self.FIN) |
|
|
|
|
|
|
|
def have_frame(self): |
|
|
|
return self.parent_pipe |
|
|
|
|
|
|
|
def next_frame(self): |
|
|
|
return self.frames.get() |
|
|
|
|
|
|
|
@classmethod |
|
|
|
def _encoder_loop(cls, pipe, (args, kwargs)): |
|
|
|
encoder = OpusEncoder(*args, **kwargs) |
|
|
|
|
|
|
|
while True: |
|
|
|
data = pipe.get() |
|
|
|
if data == cls.FIN: |
|
|
|
pipe.put(cls.FIN) |
|
|
|
return |
|
|
|
|
|
|
|
pipe.put(encoder.encode(data, encoder.samples_per_frame)) |
|
|
|
|
|
|
|
|
|
|
|
class DCADOpusEncoder(BasePlayable, OpusEncoder): |
|
|
|
class DCADOpusEncoderPlayable(BasePlayable, AbstractOpus, OpusEncoder): |
|
|
|
def __init__(self, source, *args, **kwargs): |
|
|
|
self.source = source |
|
|
|
self.command = kwargs.pop('command', 'dcad') |
|
|
|
super(DCADOpusEncoder, self).__init__(*args, **kwargs) |
|
|
|
super(DCADOpusEncoderPlayable, self).__init__(*args, **kwargs) |
|
|
|
|
|
|
|
self._done = False |
|
|
|
self._proc = None |
|
|
|
|
|
|
|
@property |
|
|
|
def proc(self): |
|
|
|
if not self._proc: |
|
|
|
source = obj = self.source.fileobj() |
|
|
|
if not hasattr(obj, 'fileno'): |
|
|
|
source = subprocess.PIPE |
|
|
|
|
|
|
|
self._proc = subprocess.Popen([ |
|
|
|
self.command, |
|
|
|
'--channels', str(self.channels), |
|
|
@ -230,61 +190,34 @@ class DCADOpusEncoder(BasePlayable, OpusEncoder): |
|
|
|
'--packet-loss-percent', '30', |
|
|
|
'--input', 'pipe:0', |
|
|
|
'--output', 'pipe:1', |
|
|
|
], stdin=self.source.stdout, stdout=subprocess.PIPE) |
|
|
|
], stdin=source, stdout=subprocess.PIPE) |
|
|
|
|
|
|
|
def writer(): |
|
|
|
while True: |
|
|
|
data = obj.read(2048) |
|
|
|
if data > 0: |
|
|
|
self._proc.stdin.write(data) |
|
|
|
if data < 2048: |
|
|
|
break |
|
|
|
|
|
|
|
if source == subprocess.PIPE: |
|
|
|
gevent.spawn(writer) |
|
|
|
return self._proc |
|
|
|
|
|
|
|
def have_frame(self): |
|
|
|
return bool(self.proc) |
|
|
|
|
|
|
|
def next_frame(self): |
|
|
|
if self._done: |
|
|
|
return None |
|
|
|
|
|
|
|
header = self.proc.stdout.read(OPUS_HEADER_SIZE) |
|
|
|
if len(header) < OPUS_HEADER_SIZE: |
|
|
|
self._proc = None |
|
|
|
self._done = True |
|
|
|
return |
|
|
|
|
|
|
|
size = struct.unpack('<h', header)[0] |
|
|
|
|
|
|
|
data = self.proc.stdout.read(size) |
|
|
|
if len(data) == 0: |
|
|
|
self._proc = None |
|
|
|
self._done = True |
|
|
|
return |
|
|
|
|
|
|
|
return data |
|
|
|
|
|
|
|
|
|
|
|
class OpusDuplexEncoder(BasePlayable): |
|
|
|
def __init__(self, out, encoder, flush=False): |
|
|
|
self.out = out |
|
|
|
self.flush = flush |
|
|
|
self.encoder = encoder |
|
|
|
self.closed = False |
|
|
|
|
|
|
|
def __getattr__(self, attr): |
|
|
|
return getattr(self.encoder, attr) |
|
|
|
|
|
|
|
def close(self): |
|
|
|
if self.closed: |
|
|
|
return |
|
|
|
|
|
|
|
self.closed = True |
|
|
|
self.out.flush() |
|
|
|
self.out.close() |
|
|
|
|
|
|
|
def have_frame(self): |
|
|
|
have = self.encoder.have_frame() |
|
|
|
if not have: |
|
|
|
self.close() |
|
|
|
return have |
|
|
|
|
|
|
|
def next_frame(self): |
|
|
|
frame = self.encoder.next_frame() |
|
|
|
|
|
|
|
if frame: |
|
|
|
self.out.write(struct.pack('<h', len(frame))) |
|
|
|
self.out.write(frame) |
|
|
|
|
|
|
|
if self.flush: |
|
|
|
self.out.flush() |
|
|
|
else: |
|
|
|
self.close() |
|
|
|
return frame |
|
|
|