|
|
|
@ -40,7 +40,7 @@ import io |
|
|
|
from typing import Any, Callable, Generic, IO, Optional, TYPE_CHECKING, Tuple, TypeVar, Union |
|
|
|
|
|
|
|
from .enums import SpeakingState |
|
|
|
from .errors import ClientException |
|
|
|
from .errors import ClientException, FFmpegProcessError |
|
|
|
from .opus import Encoder as OpusEncoder, OPUS_SILENCE |
|
|
|
from .oggparse import OggStream |
|
|
|
from .utils import MISSING |
|
|
|
@ -186,6 +186,8 @@ class FFmpegAudio(AudioSource): |
|
|
|
self._stderr: Optional[IO[bytes]] = None |
|
|
|
self._pipe_writer_thread: Optional[threading.Thread] = None |
|
|
|
self._pipe_reader_thread: Optional[threading.Thread] = None |
|
|
|
self._current_error: Optional[Exception] = None |
|
|
|
self._stopped: bool = False |
|
|
|
|
|
|
|
if piping_stdin: |
|
|
|
n = f'popen-stdin-writer:pid-{self._process.pid}' |
|
|
|
@ -212,25 +214,72 @@ class FFmpegAudio(AudioSource): |
|
|
|
else: |
|
|
|
return process |
|
|
|
|
|
|
|
def _check_process_returncode(self) -> None: |
|
|
|
"""Set _current_error if FFmpeg exited with a non-zero code.""" |
|
|
|
if self._process is MISSING: |
|
|
|
return |
|
|
|
|
|
|
|
ret = self._process.poll() |
|
|
|
if ret is None: |
|
|
|
return # still running |
|
|
|
|
|
|
|
if self._stopped: |
|
|
|
return # intentionally stopped |
|
|
|
|
|
|
|
if ret != 0 and self._current_error is None: |
|
|
|
# Only set error once, on first detection |
|
|
|
# read stderr if available |
|
|
|
stderr_text = None |
|
|
|
if self._stderr: |
|
|
|
try: |
|
|
|
stderr_text = self._stderr.read(8192).decode(errors='ignore') |
|
|
|
except Exception: |
|
|
|
stderr_text = '<failed to read stderr>' |
|
|
|
|
|
|
|
stderr_info = stderr_text if stderr_text else '<no stderr>' |
|
|
|
self._current_error = FFmpegProcessError(f'FFmpeg exited with code {ret}. Stderr: {stderr_info}') |
|
|
|
|
|
|
|
def _kill_process(self) -> None: |
|
|
|
# check if FFmpeg process failed |
|
|
|
self._check_process_returncode() |
|
|
|
|
|
|
|
# this function gets called in __del__ so instance attributes might not even exist |
|
|
|
proc = getattr(self, '_process', MISSING) |
|
|
|
# Only proceed if proc is a subprocess.Popen instance |
|
|
|
if proc is MISSING: |
|
|
|
return |
|
|
|
|
|
|
|
_log.debug('Preparing to terminate ffmpeg process %s.', proc.pid) |
|
|
|
pid = getattr(proc, 'pid', 'unknown') |
|
|
|
_log.debug('Preparing to terminate ffmpeg process %s.', pid) |
|
|
|
|
|
|
|
try: |
|
|
|
proc.kill() |
|
|
|
except Exception: |
|
|
|
_log.exception('Ignoring error attempting to kill ffmpeg process %s', proc.pid) |
|
|
|
_log.exception('Ignoring error attempting to kill ffmpeg process %s', pid) |
|
|
|
|
|
|
|
try: |
|
|
|
still_running = proc.poll() is None |
|
|
|
except Exception: |
|
|
|
_log.exception('Error checking poll() on ffmpeg process %s', pid) |
|
|
|
still_running = False |
|
|
|
|
|
|
|
if proc.poll() is None: |
|
|
|
_log.info('ffmpeg process %s has not terminated. Waiting to terminate...', proc.pid) |
|
|
|
proc.communicate() |
|
|
|
_log.info('ffmpeg process %s should have terminated with a return code of %s.', proc.pid, proc.returncode) |
|
|
|
if still_running: |
|
|
|
_log.info('ffmpeg process %s has not terminated. Waiting to terminate...', pid) |
|
|
|
try: |
|
|
|
proc.communicate() |
|
|
|
except Exception: |
|
|
|
pass |
|
|
|
_log.info( |
|
|
|
'ffmpeg process %s should have terminated with a return code of %s.', |
|
|
|
pid, |
|
|
|
getattr(proc, 'returncode', 'unknown'), |
|
|
|
) |
|
|
|
else: |
|
|
|
_log.info('ffmpeg process %s successfully terminated with return code of %s.', proc.pid, proc.returncode) |
|
|
|
_log.info( |
|
|
|
'ffmpeg process %s successfully terminated with return code of %s.', |
|
|
|
pid, |
|
|
|
getattr(proc, 'returncode', 'unknown'), |
|
|
|
) |
|
|
|
|
|
|
|
def _pipe_writer(self, source: io.BufferedIOBase) -> None: |
|
|
|
while self._process: |
|
|
|
@ -267,6 +316,7 @@ class FFmpegAudio(AudioSource): |
|
|
|
return |
|
|
|
|
|
|
|
def cleanup(self) -> None: |
|
|
|
self._stopped = True |
|
|
|
self._kill_process() |
|
|
|
self._process = self._stdout = self._stdin = self._stderr = MISSING |
|
|
|
|
|
|
|
@ -348,6 +398,8 @@ class FFmpegPCMAudio(FFmpegAudio): |
|
|
|
def read(self) -> bytes: |
|
|
|
ret = self._stdout.read(OpusEncoder.FRAME_SIZE) |
|
|
|
if len(ret) != OpusEncoder.FRAME_SIZE: |
|
|
|
# Check for FFmpeg process failure when read returns incomplete data |
|
|
|
self._check_process_returncode() |
|
|
|
return b'' |
|
|
|
return ret |
|
|
|
|
|
|
|
@ -646,7 +698,11 @@ class FFmpegOpusAudio(FFmpegAudio): |
|
|
|
return codec, bitrate |
|
|
|
|
|
|
|
def read(self) -> bytes: |
|
|
|
return next(self._packet_iter, b'') |
|
|
|
data = next(self._packet_iter, b'') |
|
|
|
if not data: |
|
|
|
# Check for FFmpeg process failure when read returns empty |
|
|
|
self._check_process_returncode() |
|
|
|
return data |
|
|
|
|
|
|
|
def is_opus(self) -> bool: |
|
|
|
return True |
|
|
|
@ -745,6 +801,11 @@ class AudioPlayer(threading.Thread): |
|
|
|
data = self.source.read() |
|
|
|
|
|
|
|
if not data: |
|
|
|
# Check if the source has an error (e.g., from FFmpegAudio process failure) |
|
|
|
if self._current_error is None: |
|
|
|
source_error = getattr(self.source, '_current_error', None) |
|
|
|
if source_error: |
|
|
|
self._current_error = source_error |
|
|
|
self.stop() |
|
|
|
break |
|
|
|
|
|
|
|
|