|
|
@ -22,8 +22,12 @@ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
|
|
|
DEALINGS IN THE SOFTWARE. |
|
|
|
""" |
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
|
|
import struct |
|
|
|
|
|
|
|
from typing import TYPE_CHECKING, ClassVar, IO, Generator, Tuple, Optional |
|
|
|
|
|
|
|
from .errors import DiscordException |
|
|
|
|
|
|
|
__all__ = ( |
|
|
@ -40,22 +44,29 @@ class OggError(DiscordException): |
|
|
|
# https://tools.ietf.org/html/rfc7845 |
|
|
|
|
|
|
|
class OggPage: |
|
|
|
_header = struct.Struct('<xBQIIIB') |
|
|
|
|
|
|
|
def __init__(self, stream): |
|
|
|
_header: ClassVar[struct.Struct] = struct.Struct('<xBQIIIB') |
|
|
|
if TYPE_CHECKING: |
|
|
|
flag: int |
|
|
|
gran_pos: int |
|
|
|
serial: int |
|
|
|
pagenum: int |
|
|
|
crc: int |
|
|
|
segnum: int |
|
|
|
|
|
|
|
def __init__(self, stream: IO[bytes]) -> None: |
|
|
|
try: |
|
|
|
header = stream.read(struct.calcsize(self._header.format)) |
|
|
|
|
|
|
|
self.flag, self.gran_pos, self.serial, \ |
|
|
|
self.pagenum, self.crc, self.segnum = self._header.unpack(header) |
|
|
|
|
|
|
|
self.segtable = stream.read(self.segnum) |
|
|
|
self.segtable: bytes = stream.read(self.segnum) |
|
|
|
bodylen = sum(struct.unpack('B'*self.segnum, self.segtable)) |
|
|
|
self.data = stream.read(bodylen) |
|
|
|
self.data: bytes = stream.read(bodylen) |
|
|
|
except Exception: |
|
|
|
raise OggError('bad data stream') from None |
|
|
|
|
|
|
|
def iter_packets(self): |
|
|
|
def iter_packets(self) -> Generator[Tuple[bytes, bool], None, None]: |
|
|
|
packetlen = offset = 0 |
|
|
|
partial = True |
|
|
|
|
|
|
@ -74,10 +85,10 @@ class OggPage: |
|
|
|
yield self.data[offset:], False |
|
|
|
|
|
|
|
class OggStream: |
|
|
|
def __init__(self, stream): |
|
|
|
self.stream = stream |
|
|
|
def __init__(self, stream: IO[bytes]) -> None: |
|
|
|
self.stream: IO[bytes] = stream |
|
|
|
|
|
|
|
def _next_page(self): |
|
|
|
def _next_page(self) -> Optional[OggPage]: |
|
|
|
head = self.stream.read(4) |
|
|
|
if head == b'OggS': |
|
|
|
return OggPage(self.stream) |
|
|
@ -86,13 +97,13 @@ class OggStream: |
|
|
|
else: |
|
|
|
raise OggError('invalid header magic') |
|
|
|
|
|
|
|
def _iter_pages(self): |
|
|
|
def _iter_pages(self) -> Generator[OggPage, None, None]: |
|
|
|
page = self._next_page() |
|
|
|
while page: |
|
|
|
yield page |
|
|
|
page = self._next_page() |
|
|
|
|
|
|
|
def iter_packets(self): |
|
|
|
def iter_packets(self) -> Generator[bytes, None, None]: |
|
|
|
partial = b'' |
|
|
|
for page in self._iter_pages(): |
|
|
|
for data, complete in page.iter_packets(): |
|
|
|