diff --git a/discord/player.py b/discord/player.py index b752e6536..97a420022 100644 --- a/discord/player.py +++ b/discord/player.py @@ -140,13 +140,25 @@ class FFmpegAudio(AudioSource): .. versionadded:: 1.3 """ - def __init__(self, source: str, *, executable: str = 'ffmpeg', args: Any, **subprocess_kwargs: Any): + def __init__(self, source: Union[str, io.BufferedIOBase], *, executable: str = 'ffmpeg', args: Any, **subprocess_kwargs: Any): + piping = subprocess_kwargs.get('stdin') == subprocess.PIPE + if piping and isinstance(source, str): + raise TypeError("parameter conflict: 'source' parameter cannot be a string when piping to stdin") + args = [executable, *args] kwargs = {'stdout': subprocess.PIPE} kwargs.update(subprocess_kwargs) self._process: subprocess.Popen = self._spawn_process(args, **kwargs) self._stdout: IO[bytes] = self._process.stdout # type: ignore + self._stdin: Optional[IO[Bytes]] = None + self._pipe_thread: Optional[threading.Thread] = None + + if piping: + n = f'PopenStdinWriter:{id(self):#x}' + self._stdin = self._process.stdin + self._pipe_thread = threading.Thread(target=self._pipe_writer, args=(source,), daemon=True, name=n) + self._pipe_thread.start() def _spawn_process(self, args: Any, **subprocess_kwargs: Any) -> subprocess.Popen: process = None @@ -160,6 +172,21 @@ class FFmpegAudio(AudioSource): else: return process + def _pipe_writer(self, source: io.BufferedIOBase) -> None: + while self._process: + # arbitrarily large read size + data = source.read(8192) + if not data: + self._stdin.close() # EOF + break + try: + self._stdin.write(data) + except Exception: + _log.debug('Write error for %s, this is probably not a problem', self, exc_info=True) + # at this point the source data is either exhausted or the process is fubar + self._stdin.close() + break + def cleanup(self) -> None: proc = self._process if proc is MISSING: @@ -170,7 +197,7 @@ class FFmpegAudio(AudioSource): try: proc.kill() except Exception: - _log.exception("Ignoring error attempting to kill ffmpeg process %s", proc.pid) + _log.exception('Ignoring error attempting to kill ffmpeg process %s', proc.pid) if proc.poll() is None: _log.info('ffmpeg process %s has not terminated. Waiting to terminate...', proc.pid) @@ -218,16 +245,16 @@ class FFmpegPCMAudio(FFmpegAudio): def __init__( self, - source: str, + source: Union[str, io.BufferedIOBase], *, executable: str = 'ffmpeg', pipe: bool = False, stderr: Optional[IO[str]] = None, - before_options: Optional[str] = None, + before_options: Optional[str] = None, options: Optional[str] = None ) -> None: args = [] - subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr} + subprocess_kwargs = {'stdin': subprocess.PIPE if pipe else subprocess.DEVNULL, 'stderr': stderr} if isinstance(before_options, str): args.extend(shlex.split(before_options)) @@ -315,7 +342,7 @@ class FFmpegOpusAudio(FFmpegAudio): def __init__( self, - source: str, + source: Union[str, io.BufferedIOBase], *, bitrate: int = 128, codec: Optional[str] = None, @@ -327,7 +354,7 @@ class FFmpegOpusAudio(FFmpegAudio): ) -> None: args = [] - subprocess_kwargs = {'stdin': source if pipe else subprocess.DEVNULL, 'stderr': stderr} + subprocess_kwargs = {'stdin': subprocess.PIPE if pipe else subprocess.DEVNULL, 'stderr': stderr} if isinstance(before_options, str): args.extend(shlex.split(before_options)) @@ -384,7 +411,6 @@ class FFmpegOpusAudio(FFmpegAudio): def custom_probe(source, executable): # some analysis code here - return codec, bitrate source = await discord.FFmpegOpusAudio.from_probe("song.webm", method=custom_probe)