From 3561ce9d5a5655d1549e3e1c9f89174ca0582de0 Mon Sep 17 00:00:00 2001 From: Imayhaveborkedit Date: Mon, 23 Aug 2021 21:05:31 -0400 Subject: [PATCH] Fix FFmpeg based audiosource input piping Due to an oversight that has existed since the very beginning, the pipe argument has been broken since there was nothing to actually write the data to the process's stdin. Now there is. Also josh made me add typings blegh --- discord/player.py | 42 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/discord/player.py b/discord/player.py index b752e6536..97a420022 100644 --- a/discord/player.py +++ b/discord/player.py @@ -140,13 +140,25 @@ class FFmpegAudio(AudioSource): .. versionadded:: 1.3 """ - def __init__(self, source: str, *, executable: str = 'ffmpeg', args: Any, **subprocess_kwargs: Any): + def __init__(self, source: Union[str, io.BufferedIOBase], *, executable: str = 'ffmpeg', args: Any, **subprocess_kwargs: Any): + piping = subprocess_kwargs.get('stdin') == subprocess.PIPE + if piping and isinstance(source, str): + raise TypeError("parameter conflict: 'source' parameter cannot be a string when piping to stdin") + args = [executable, *args] kwargs = {'stdout': subprocess.PIPE} kwargs.update(subprocess_kwargs) self._process: subprocess.Popen = self._spawn_process(args, **kwargs) self._stdout: IO[bytes] = self._process.stdout # type: ignore + self._stdin: Optional[IO[Bytes]] = None + self._pipe_thread: Optional[threading.Thread] = None + + if piping: + n = f'PopenStdinWriter:{id(self):#x}' + self._stdin = self._process.stdin + self._pipe_thread = threading.Thread(target=self._pipe_writer, args=(source,), daemon=True, name=n) + self._pipe_thread.start() def _spawn_process(self, args: Any, **subprocess_kwargs: Any) -> subprocess.Popen: process = None @@ -160,6 +172,21 @@ class FFmpegAudio(AudioSource): else: return process + def _pipe_writer(self, source: io.BufferedIOBase) -> None: + while self._process: + # arbitrarily large read size + data = source.read(8192) + if not data: + self._stdin.close() # EOF + break + try: + self._stdin.write(data) + except Exception: + _log.debug('Write error for %s, this is probably not a problem', self, exc_info=True) + # at this point the source data is either exhausted or the process is fubar + self._stdin.close() + break + def cleanup(self) -> None: proc = self._process if proc is MISSING: @@ -170,7 +197,7 @@ class FFmpegAudio(AudioSource): try: proc.kill() except Exception: - _log.exception("Ignoring error attempting to kill ffmpeg process %s", proc.pid) + _log.exception('Ignoring error attempting to kill ffmpeg process %s', proc.pid) if proc.poll() is None: _log.info('ffmpeg process %s has not terminated. Waiting to terminate...', proc.pid) @@ -218,16 +245,16 @@ class FFmpegPCMAudio(FFmpegAudio): def __init__( self, - source: str, + source: Union[str, io.BufferedIOBase], *, executable: str = 'ffmpeg', pipe: bool = False, stderr: Optional[IO[str]] = None, - before_options: Optional[str] = None, + before_options: Optional[str] = None, options: Optional[str] = None ) -> None: args = [] - subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr} + subprocess_kwargs = {'stdin': subprocess.PIPE if pipe else subprocess.DEVNULL, 'stderr': stderr} if isinstance(before_options, str): args.extend(shlex.split(before_options)) @@ -315,7 +342,7 @@ class FFmpegOpusAudio(FFmpegAudio): def __init__( self, - source: str, + source: Union[str, io.BufferedIOBase], *, bitrate: int = 128, codec: Optional[str] = None, @@ -327,7 +354,7 @@ class FFmpegOpusAudio(FFmpegAudio): ) -> None: args = [] - subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr} + subprocess_kwargs = {'stdin': subprocess.PIPE if pipe else subprocess.DEVNULL, 'stderr': stderr} if isinstance(before_options, str): args.extend(shlex.split(before_options)) @@ -384,7 +411,6 @@ class FFmpegOpusAudio(FFmpegAudio): def custom_probe(source, executable): # some analysis code here - return codec, bitrate source = await discord.FFmpegOpusAudio.from_probe("song.webm", method=custom_probe)