"""Low-level protocol stuff.""" from __future__ import annotations from asyncio import StreamReader from contextlib import suppress from enum import Enum from functools import partial from logging import getLogger from random import randint from typing import IO, NamedTuple __all__ = ['LittleEndianSignedInt32', 'Type', 'Packet', 'random_request_id'] LOGGER = getLogger(__file__) TERMINATOR = b'\x00\x00' class LittleEndianSignedInt32(int): """A little-endian, signed int32.""" MIN = -2_147_483_648 MAX = 2_147_483_647 def __init__(self, *_): """Check the boundaries.""" super().__init__() if not self.MIN <= self <= self.MAX: raise ValueError('Signed int32 out of bounds:', int(self)) def __bytes__(self): """Return the integer as signed little endian.""" return self.to_bytes(4, 'little', signed=True) @classmethod async def aread(cls, reader: StreamReader) -> LittleEndianSignedInt32: """Read the integer from an asynchronous file-like object.""" return cls.from_bytes(await reader.read(4), 'little', signed=True) @classmethod def read(cls, file: IO) -> LittleEndianSignedInt32: """Read the integer from a file-like object.""" return cls.from_bytes(file.read(4), 'little', signed=True) class Type(Enum): """RCON packet types.""" SERVERDATA_AUTH = LittleEndianSignedInt32(3) SERVERDATA_AUTH_RESPONSE = LittleEndianSignedInt32(2) SERVERDATA_EXECCOMMAND = LittleEndianSignedInt32(2) SERVERDATA_RESPONSE_VALUE = LittleEndianSignedInt32(0) def __int__(self): """Return the actual integer value.""" return int(self.value) def __bytes__(self): """Return the integer value as little endian.""" return bytes(self.value) @classmethod async def aread(cls, reader: StreamReader) -> Type: """Read the type from an asynchronous file-like object.""" return cls(await LittleEndianSignedInt32.aread(reader)) @classmethod def read(cls, file: IO) -> Type: """Read the type from a file-like object.""" return cls(LittleEndianSignedInt32.read(file)) class Packet(NamedTuple): """An RCON packet.""" id: LittleEndianSignedInt32 type: Type payload: bytes terminator: bytes = TERMINATOR def __bytes__(self): """Return the packet as bytes with prepended length.""" payload = bytes(self.id) payload += bytes(self.type) payload += self.payload payload += self.terminator size = bytes(LittleEndianSignedInt32(len(payload))) return size + payload @classmethod async def aread(cls, reader: StreamReader) -> Packet: """Read a packet from an asynchronous file-like object.""" size = await LittleEndianSignedInt32.aread(reader) id_ = await LittleEndianSignedInt32.aread(reader) type_ = await Type.aread(reader) payload = await reader.read(size - 10) terminator = await reader.read(2) if terminator != TERMINATOR: LOGGER.warning('Unexpected terminator: %s', terminator) return cls(id_, type_, payload, terminator) @classmethod def read(cls, file: IO, *, max_pkg_size: int | None = None) -> Packet: """Read a packet from a file-like object.""" size = LittleEndianSignedInt32.read(file) id_ = LittleEndianSignedInt32.read(file) type_ = Type.read(file) payload = file.read(size := size - 10) terminator = file.read(2) if terminator != TERMINATOR: LOGGER.warning('Unexpected terminator: %s', terminator) # Attempt to read following packets on large responses. if size >= max_pkg_size: with suppress(TimeoutError): payload += cls.read(file, max_pkg_size=max_pkg_size).payload return cls(id_, type_, payload, terminator) @classmethod def make_command(cls, *args: str, encoding: str = 'utf-8') -> Packet: """Create a command packet.""" return cls( random_request_id(), Type.SERVERDATA_EXECCOMMAND, b' '.join(map(partial(str.encode, encoding=encoding), args)) ) @classmethod def make_login(cls, passwd: str, *, encoding: str = 'utf-8') -> Packet: """Create a login packet.""" return cls( random_request_id(), Type.SERVERDATA_AUTH, passwd.encode(encoding) ) def random_request_id() -> LittleEndianSignedInt32: """Generate a random request ID.""" return LittleEndianSignedInt32(randint(0, LittleEndianSignedInt32.MAX))