|
|
@ -27,6 +27,10 @@ from typing import Any, Dict, Optional, Tuple, Union |
|
|
|
|
|
|
|
import os |
|
|
|
import io |
|
|
|
import base64 |
|
|
|
from .oggparse import OggStream |
|
|
|
from .opus import Decoder |
|
|
|
import struct |
|
|
|
|
|
|
|
from .utils import MISSING |
|
|
|
|
|
|
@ -75,9 +79,37 @@ class File: |
|
|
|
The file description to display, currently only supported for images. |
|
|
|
|
|
|
|
.. versionadded:: 2.0 |
|
|
|
|
|
|
|
voice: :class:`bool` |
|
|
|
Whether the file is a voice message. If left unspecified, the :attr:`~File.duration` is used |
|
|
|
to determine if the file is a voice message. |
|
|
|
|
|
|
|
.. note:: |
|
|
|
|
|
|
|
Voice files must be an audio only format. |
|
|
|
|
|
|
|
A *non-exhaustive* list of supported formats are: `ogg`, `mp3`, `wav`, `aac`, and `flac`. |
|
|
|
|
|
|
|
.. versionadded:: 2.7 |
|
|
|
|
|
|
|
duration: Optional[:class:`float`] |
|
|
|
The duration of the voice message in seconds |
|
|
|
|
|
|
|
.. versionadded:: 2.7 |
|
|
|
""" |
|
|
|
|
|
|
|
__slots__ = ('fp', '_filename', 'spoiler', 'description', '_original_pos', '_owner', '_closer') |
|
|
|
__slots__ = ( |
|
|
|
'fp', |
|
|
|
'_filename', |
|
|
|
'spoiler', |
|
|
|
'description', |
|
|
|
'_original_pos', |
|
|
|
'_owner', |
|
|
|
'_closer', |
|
|
|
'duration', |
|
|
|
'_waveform', |
|
|
|
'voice', |
|
|
|
) |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, |
|
|
@ -86,6 +118,9 @@ class File: |
|
|
|
*, |
|
|
|
spoiler: bool = MISSING, |
|
|
|
description: Optional[str] = None, |
|
|
|
voice: bool = MISSING, |
|
|
|
duration: Optional[float] = None, |
|
|
|
waveform: Optional[list[int]] = None, |
|
|
|
): |
|
|
|
if isinstance(fp, io.IOBase): |
|
|
|
if not (fp.seekable() and fp.readable()): |
|
|
@ -117,6 +152,22 @@ class File: |
|
|
|
|
|
|
|
self.spoiler: bool = spoiler |
|
|
|
self.description: Optional[str] = description |
|
|
|
self.duration = duration |
|
|
|
if waveform is not None: |
|
|
|
if len(waveform) > 256: |
|
|
|
raise ValueError('Waveforms have a maximum of 256 values') |
|
|
|
elif max(waveform) > 255: |
|
|
|
raise ValueError('Maximum value of ints is 255 for waveforms') |
|
|
|
elif min(waveform) < 0: |
|
|
|
raise ValueError('Minimum value of ints is 0 for waveforms') |
|
|
|
self._waveform = waveform |
|
|
|
|
|
|
|
if voice is MISSING: |
|
|
|
voice = duration is not None |
|
|
|
self.voice = voice |
|
|
|
|
|
|
|
if duration is None and voice: |
|
|
|
raise TypeError('Voice messages must have a duration') |
|
|
|
|
|
|
|
@property |
|
|
|
def filename(self) -> str: |
|
|
@ -126,6 +177,24 @@ class File: |
|
|
|
""" |
|
|
|
return 'SPOILER_' + self._filename if self.spoiler else self._filename |
|
|
|
|
|
|
|
@property |
|
|
|
def waveform(self) -> list[int]: |
|
|
|
"""List[:class:`int`]: The waveform data for the voice message. |
|
|
|
|
|
|
|
.. note:: |
|
|
|
If a waveform was not given, it will be generated |
|
|
|
|
|
|
|
Only supports generating the waveform for Opus format files, other files will be given a random waveform |
|
|
|
|
|
|
|
.. versionadded:: 2.7""" |
|
|
|
if self._waveform is None: |
|
|
|
try: |
|
|
|
self._waveform = self.generate_waveform() |
|
|
|
except Exception: |
|
|
|
self._waveform = list(os.urandom(256)) |
|
|
|
self.reset() |
|
|
|
return self._waveform |
|
|
|
|
|
|
|
@filename.setter |
|
|
|
def filename(self, value: str) -> None: |
|
|
|
self._filename, self.spoiler = _strip_spoiler(value) |
|
|
@ -170,4 +239,62 @@ class File: |
|
|
|
if self.description is not None: |
|
|
|
payload['description'] = self.description |
|
|
|
|
|
|
|
if self.voice: |
|
|
|
payload['duration_secs'] = self.duration |
|
|
|
payload['waveform'] = base64.b64encode(bytes(self.waveform)).decode('utf-8') |
|
|
|
|
|
|
|
return payload |
|
|
|
|
|
|
|
def generate_waveform(self) -> list[int]: |
|
|
|
if not self.voice: |
|
|
|
raise ValueError('Cannot produce waveform for non voice file') |
|
|
|
self.reset() |
|
|
|
ogg = OggStream(self.fp) # type: ignore |
|
|
|
decoder = Decoder() |
|
|
|
waveform: list[int] = [] |
|
|
|
prefixes = [b'OpusHead', b'OpusTags'] |
|
|
|
for packet in ogg.iter_packets(): |
|
|
|
if packet[:8] in prefixes: |
|
|
|
continue |
|
|
|
|
|
|
|
if b'vorbis' in packet: |
|
|
|
raise ValueError("File format is 'vorbis'. Format of 'opus' is required for waveform generation") |
|
|
|
|
|
|
|
# these are PCM bytes in 16-bit signed little-endian form |
|
|
|
decoded = decoder.decode(packet, fec=False) |
|
|
|
|
|
|
|
# 16 bits -> 2 bytes per sample |
|
|
|
num_samples = len(decoded) // 2 |
|
|
|
|
|
|
|
# https://docs.python.org/3/library/struct.html#byte-order-size-and-alignment |
|
|
|
format = '<' + 'h' * num_samples |
|
|
|
samples: tuple[int] = struct.unpack(format, decoded) |
|
|
|
|
|
|
|
waveform.extend(samples) |
|
|
|
|
|
|
|
# Make sure all values are positive |
|
|
|
for i in range(len(waveform)): |
|
|
|
if waveform[i] < 0: |
|
|
|
waveform[i] = -waveform[i] |
|
|
|
|
|
|
|
point_count: int = self.duration * 10 # type: ignore |
|
|
|
point_count = min(point_count, 255) |
|
|
|
points_per_sample: int = len(waveform) // point_count |
|
|
|
sample_waveform: list[int] = [] |
|
|
|
|
|
|
|
total, count = 0, 0 |
|
|
|
# Average out the amplitudes for each point within a sample |
|
|
|
for i in range(len(waveform)): |
|
|
|
total += waveform[i] |
|
|
|
count += 1 |
|
|
|
if i % points_per_sample == 0: |
|
|
|
sample_waveform.append(total // count) |
|
|
|
total, count = 0, 0 |
|
|
|
|
|
|
|
# Maximum value of a waveform is 0xff (255) |
|
|
|
highest = max(sample_waveform) |
|
|
|
mult = 255 / highest |
|
|
|
for i in range(len(sample_waveform)): |
|
|
|
sample_waveform[i] = int(sample_waveform[i] * mult) |
|
|
|
|
|
|
|
return sample_waveform |
|
|
|