|
|
@ -49,6 +49,7 @@ exported_functions = [ |
|
|
|
('opus_encoder_get_size', [ctypes.c_int], ctypes.c_int), |
|
|
|
('opus_encoder_create', [ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr), |
|
|
|
('opus_encode', [EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32), |
|
|
|
('opus_encoder_ctl', None, ctypes.c_int32), |
|
|
|
('opus_encoder_destroy', [EncoderStructPtr], None) |
|
|
|
] |
|
|
|
|
|
|
@ -64,7 +65,9 @@ def libopus_loader(name): |
|
|
|
raise e |
|
|
|
|
|
|
|
try: |
|
|
|
func.argtypes = item[1] |
|
|
|
if item[1]: |
|
|
|
func.argtypes = item[1] |
|
|
|
|
|
|
|
func.restype = item[2] |
|
|
|
except KeyError: |
|
|
|
pass |
|
|
@ -134,6 +137,7 @@ class OpusError(DiscordException): |
|
|
|
code : int |
|
|
|
The error code returned. |
|
|
|
""" |
|
|
|
|
|
|
|
def __init__(self, code): |
|
|
|
self.code = code |
|
|
|
msg = _lib.opus_strerror(self.code).decode('utf-8') |
|
|
@ -150,6 +154,16 @@ OK = 0 |
|
|
|
APPLICATION_AUDIO = 2049 |
|
|
|
APPLICATION_VOIP = 2048 |
|
|
|
APPLICATION_LOWDELAY = 2051 |
|
|
|
CTL_SET_BITRATE = 4002 |
|
|
|
CTL_SET_BANDWIDTH = 4008 |
|
|
|
|
|
|
|
band_ctl = { |
|
|
|
'narrow': 1101, |
|
|
|
'medium': 1102, |
|
|
|
'wide': 1103, |
|
|
|
'superwide': 1104, |
|
|
|
'full': 1105, |
|
|
|
} |
|
|
|
|
|
|
|
class Encoder: |
|
|
|
def __init__(self, sampling, channels, application=APPLICATION_AUDIO): |
|
|
@ -158,7 +172,7 @@ class Encoder: |
|
|
|
self.application = application |
|
|
|
|
|
|
|
self.frame_length = 20 |
|
|
|
self.sample_size = 2 * self.channels # (bit_rate / 8) but bit_rate == 16 |
|
|
|
self.sample_size = 2 * self.channels # (bit_rate / 8) but bit_rate == 16 |
|
|
|
self.samples_per_frame = int(self.sampling_rate / 1000 * self.frame_length) |
|
|
|
self.frame_size = self.samples_per_frame * self.sample_size |
|
|
|
|
|
|
@ -166,6 +180,8 @@ class Encoder: |
|
|
|
raise OpusNotLoaded() |
|
|
|
|
|
|
|
self._state = self._create_state() |
|
|
|
self.set_bitrate(128) |
|
|
|
self.set_bandwidth('full') |
|
|
|
|
|
|
|
def __del__(self): |
|
|
|
if hasattr(self, '_state'): |
|
|
@ -182,6 +198,27 @@ class Encoder: |
|
|
|
|
|
|
|
return result |
|
|
|
|
|
|
|
def set_bitrate(self, kbps): |
|
|
|
kbps = min(128, max(16, int(kbps))) |
|
|
|
|
|
|
|
ret = _lib.opus_encoder_ctl(self._state, CTL_SET_BITRATE, kbps * 1024) |
|
|
|
if ret < 0: |
|
|
|
log.info('error has happened in set_bitrate') |
|
|
|
raise OpusError(ret) |
|
|
|
|
|
|
|
return kbps |
|
|
|
|
|
|
|
def set_bandwidth(self, req): |
|
|
|
if req not in band_ctl: |
|
|
|
raise KeyError('%r is not a valid bandwidth setting. Try one of: %s' % (req, ','.join(band_ctl))) |
|
|
|
|
|
|
|
k = band_ctl[req] |
|
|
|
ret = _lib.opus_encoder_ctl(self._state, CTL_SET_BANDWIDTH, k) |
|
|
|
|
|
|
|
if ret < 0: |
|
|
|
log.info('error has happened in set_bandwidth') |
|
|
|
raise OpusError(ret) |
|
|
|
|
|
|
|
def encode(self, pcm, frame_size): |
|
|
|
max_data_bytes = len(pcm) |
|
|
|
pcm = ctypes.cast(pcm, c_int16_ptr) |
|
|
|