|
|
@ -28,6 +28,9 @@ from typing import Any, Dict, Optional, Tuple, Union |
|
|
|
import os |
|
|
|
import io |
|
|
|
import base64 |
|
|
|
from .oggparse import OggStream |
|
|
|
from .opus import Decoder |
|
|
|
import struct |
|
|
|
|
|
|
|
from .utils import MISSING |
|
|
|
|
|
|
@ -85,8 +88,8 @@ class File: |
|
|
|
|
|
|
|
Voice files must be an audio only format. |
|
|
|
|
|
|
|
A *non-exhaustive* list of supported formats are: `mp3`, `ogg`, `wav`, `aac`, and `flac`. |
|
|
|
|
|
|
|
A *non-exhaustive* list of supported formats are: `ogg`, `mp3`, `wav`, `aac`, and `flac`. |
|
|
|
|
|
|
|
.. versionadded:: 2.6 |
|
|
|
|
|
|
|
duration: Optional[:class:`float`] |
|
|
@ -171,9 +174,18 @@ class File: |
|
|
|
def waveform(self) -> str: |
|
|
|
""":class:`str`: The waveform data for the voice message. |
|
|
|
|
|
|
|
.. note:: |
|
|
|
If a waveform was not given, it will be generated |
|
|
|
|
|
|
|
Only supports generating the waveform for Opus format files, other files will be given a random waveform |
|
|
|
|
|
|
|
.. versionadded:: 2.6""" |
|
|
|
if self._waveform is None: |
|
|
|
return base64.b64encode(os.urandom(256)).decode('utf-8') |
|
|
|
try: |
|
|
|
self._waveform = self.generate_waveform() |
|
|
|
except Exception: |
|
|
|
self._waveform = base64.b64encode(os.urandom(256)).decode('utf-8') |
|
|
|
self.reset() |
|
|
|
return self._waveform |
|
|
|
|
|
|
|
@filename.setter |
|
|
@ -206,8 +218,60 @@ class File: |
|
|
|
if self.description is not None: |
|
|
|
payload['description'] = self.description |
|
|
|
|
|
|
|
if self.duration is not None: |
|
|
|
if self.voice: |
|
|
|
payload['duration_secs'] = self.duration |
|
|
|
payload['waveform'] = self.waveform |
|
|
|
|
|
|
|
return payload |
|
|
|
|
|
|
|
def generate_waveform(self) -> str: |
|
|
|
self.reset() |
|
|
|
ogg = OggStream(self.fp) # type: ignore |
|
|
|
decoder = Decoder() |
|
|
|
waveform: list[int] = [] |
|
|
|
prefixes = [b'OpusHead', b'OpusTags'] |
|
|
|
for packet in ogg.iter_packets(): |
|
|
|
if packet[:8] in prefixes: |
|
|
|
continue |
|
|
|
|
|
|
|
if b'vorbis' in packet: |
|
|
|
raise TypeError("File format is 'vorbis'. Format of 'opus' is required for waveform generation") |
|
|
|
|
|
|
|
# these are PCM bytes in 16-bit signed little-endian form |
|
|
|
decoded = decoder.decode(packet, fec=False) |
|
|
|
|
|
|
|
# 16 bits -> 2 bytes per sample |
|
|
|
num_samples = len(decoded) // 2 |
|
|
|
|
|
|
|
# https://docs.python.org/3/library/struct.html#byte-order-size-and-alignment |
|
|
|
format = '<' + 'h' * num_samples |
|
|
|
samples: tuple[int] = struct.unpack(format, decoded) |
|
|
|
|
|
|
|
waveform.extend(samples) |
|
|
|
|
|
|
|
# Make sure all values are positive |
|
|
|
for i in range(len(waveform)): |
|
|
|
if waveform[i] < 0: |
|
|
|
waveform[i] = -waveform[i] |
|
|
|
|
|
|
|
# TODO: Figure out how discord sets the sample count |
|
|
|
# Voice message I've been using has 40 samples, so using that for now |
|
|
|
points_per_sample = len(waveform) // 40 |
|
|
|
sample_waveform: list[int] = [] |
|
|
|
|
|
|
|
total, count = 0, 0 |
|
|
|
# Average out the amplitudes for each point within a sample |
|
|
|
for i in range(len(waveform)): |
|
|
|
total += waveform[i] |
|
|
|
count += 1 |
|
|
|
if i % points_per_sample == 0: |
|
|
|
sample_waveform.append(total // count) |
|
|
|
total, count = 0, 0 |
|
|
|
|
|
|
|
# Maximum value of a waveform is 0xff (255) |
|
|
|
highest = max(sample_waveform) |
|
|
|
mult = 255 / highest |
|
|
|
for i in range(len(sample_waveform)): |
|
|
|
sample_waveform[i] = int(sample_waveform[i] * mult) |
|
|
|
|
|
|
|
return base64.b64encode(bytes(sample_waveform)).decode('utf-8') |
|
|
|