Browse Source

Add errcheck functions to opus foreign functions

pull/1052/head
Daniel 7 years ago
parent
commit
881e598a6f
  1. 78
      discord/opus.py

78
discord/opus.py

@ -42,17 +42,36 @@ class EncoderStruct(ctypes.Structure):
EncoderStructPtr = ctypes.POINTER(EncoderStruct)
def _err_lt(result, func, args):
if result < 0:
log.info('error has happened in {0.__name__}'.format(func))
raise OpusError(result)
return result
def _err_ne(result, func, args):
if result.value != 0:
log.info('error has happened in {0.__name__}'.format(func))
raise OpusError(result.value)
return result
# A list of exported functions.
# The first argument is obviously the name.
# The second one are the types of arguments it takes.
# The third is the result type.
# The fourth is the error handler.
exported_functions = [
('opus_strerror', [ctypes.c_int], ctypes.c_char_p),
('opus_encoder_get_size', [ctypes.c_int], ctypes.c_int),
('opus_encoder_create', [ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr),
('opus_encode', [EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32),
('opus_encoder_ctl', None, ctypes.c_int32),
('opus_encoder_destroy', [EncoderStructPtr], None)
('opus_strerror',
[ctypes.c_int], ctypes.c_char_p, None),
('opus_encoder_get_size',
[ctypes.c_int], ctypes.c_int, None),
('opus_encoder_create',
[ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_ptr], EncoderStructPtr, _err_ne),
('opus_encode',
[EncoderStructPtr, c_int16_ptr, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32], ctypes.c_int32, _err_lt),
('opus_encoder_ctl',
None, ctypes.c_int32, _err_lt),
('opus_encoder_destroy',
[EncoderStructPtr], None, None),
]
def libopus_loader(name):
@ -74,6 +93,12 @@ def libopus_loader(name):
except KeyError:
pass
try:
if item[3]:
func.errcheck = item[3]
except KeyError:
log.exception("Error assigning check function to %s", func)
return lib
try:
@ -211,22 +236,12 @@ class Encoder:
def _create_state(self):
ret = ctypes.c_int()
result = _lib.opus_encoder_create(self.SAMPLING_RATE, self.CHANNELS, self.application, ctypes.byref(ret))
if ret.value != 0:
log.info('error has happened in state creation')
raise OpusError(ret.value)
return result
return _lib.opus_encoder_create(self.SAMPLING_RATE, self.CHANNELS, self.application, ctypes.byref(ret))
def set_bitrate(self, kbps):
kbps = min(128, max(16, int(kbps)))
ret = _lib.opus_encoder_ctl(self._state, CTL_SET_BITRATE, kbps * 1024)
if ret < 0:
log.info('error has happened in set_bitrate')
raise OpusError(ret)
_lib.opus_encoder_ctl(self._state, CTL_SET_BITRATE, kbps * 1024)
return kbps
def set_bandwidth(self, req):
@ -234,36 +249,20 @@ class Encoder:
raise KeyError('%r is not a valid bandwidth setting. Try one of: %s' % (req, ','.join(band_ctl)))
k = band_ctl[req]
ret = _lib.opus_encoder_ctl(self._state, CTL_SET_BANDWIDTH, k)
if ret < 0:
log.info('error has happened in set_bandwidth')
raise OpusError(ret)
_lib.opus_encoder_ctl(self._state, CTL_SET_BANDWIDTH, k)
def set_signal_type(self, req):
if req not in signal_ctl:
raise KeyError('%r is not a valid signal setting. Try one of: %s' % (req, ','.join(signal_ctl)))
k = signal_ctl[req]
ret = _lib.opus_encoder_ctl(self._state, CTL_SET_SIGNAL, k)
if ret < 0:
log.info('error has happened in set_signal_type')
raise OpusError(ret)
_lib.opus_encoder_ctl(self._state, CTL_SET_SIGNAL, k)
def set_fec(self, enabled=True):
ret = _lib.opus_encoder_ctl(self._state, CTL_SET_FEC, 1 if enabled else 0)
if ret < 0:
log.info('error has happened in set_fec')
raise OpusError(ret)
_lib.opus_encoder_ctl(self._state, CTL_SET_FEC, 1 if enabled else 0)
def set_expected_packet_loss_percent(self, percentage):
ret = _lib.opus_encoder_ctl(self._state, CTL_SET_PLP, min(100, max(0, int(percentage * 100))))
if ret < 0:
log.info('error has happened in set_expected_packet_loss_percent')
raise OpusError(ret)
_lib.opus_encoder_ctl(self._state, CTL_SET_PLP, min(100, max(0, int(percentage * 100))))
def encode(self, pcm, frame_size):
max_data_bytes = len(pcm)
@ -271,8 +270,5 @@ class Encoder:
data = (ctypes.c_char * max_data_bytes)()
ret = _lib.opus_encode(self._state, pcm, frame_size, data, max_data_bytes)
if ret < 0:
log.info('error has happened in encode')
raise OpusError(ret)
return array.array('b', data[:ret]).tobytes()

Loading…
Cancel
Save