From 8b4f53ee3973ab7ba7ee844d179c329b8a477b7b Mon Sep 17 00:00:00 2001 From: Richard Neumann Date: Sun, 13 Feb 2022 17:04:27 +0100 Subject: [PATCH] Add BattlEye client --- rcon/battleye/__init__.py | 1 + rcon/battleye/client.py | 30 +++++++++++ rcon/battleye/proto.py | 104 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+) create mode 100644 rcon/battleye/__init__.py create mode 100644 rcon/battleye/client.py create mode 100644 rcon/battleye/proto.py diff --git a/rcon/battleye/__init__.py b/rcon/battleye/__init__.py new file mode 100644 index 0000000..51c33c9 --- /dev/null +++ b/rcon/battleye/__init__.py @@ -0,0 +1 @@ +"""BattlEye RCON implementation.""" \ No newline at end of file diff --git a/rcon/battleye/client.py b/rcon/battleye/client.py new file mode 100644 index 0000000..76b2e05 --- /dev/null +++ b/rcon/battleye/client.py @@ -0,0 +1,30 @@ +"""BattlEye RCon client.""" + +from ipaddress import IPv4Address +from typing import Union + +from rcon.battleye.proto import Command, LoginRequest +from rcon.client import BaseClient + + +__all__ = ['Client'] + + +Host = Union[str, IPv4Address] + + +class Client(BaseClient): + """BattlEye RCon client.""" + + def communicate(self, data: bytes, *, recv: int = 4096) -> bytes: + """Sends and receives packets.""" + self._socket.send(data) + return self._socket.recv(recv) + + def login(self, passwd: str) -> bytes: + """Logs the user in.""" + return self.communicate(bytes(LoginRequest.from_passwd(passwd))) + + def command(self, command: str) -> bytes: + """Executes a command.""" + return self.communicate(bytes(Command.from_command(command))) diff --git a/rcon/battleye/proto.py b/rcon/battleye/proto.py new file mode 100644 index 0000000..6019e94 --- /dev/null +++ b/rcon/battleye/proto.py @@ -0,0 +1,104 @@ +"""Low-level protocol stuff.""" + +from typing import IO, NamedTuple +from zlib import crc32 + + +__all__ = ['LoginRequest', 'Command'] + + +PREFIX = 'BE' +SUFFIX = 0xff + + +class Header(NamedTuple): + """Packet header.""" + + prefix: str + crc32: int + suffix: int + + def __bytes__(self): + return b''.join(( + self.prefix.encode('ascii'), + self.crc32.to_bytes(4, 'little'), + self.suffix.to_bytes(1, 'big') + )) + + @classmethod + def from_payload(cls, payload: bytes): + """Creates a header for the given payload.""" + return cls( + PREFIX, + crc32(SUFFIX.to_bytes(1, 'little') + payload), + SUFFIX + ) + + @classmethod + def from_bytes(cls, prefix: bytes, crc32sum: bytes, suffix: bytes): + """Creates a header from the given bytes.""" + return cls( + prefix.decode('ascii'), + int.from_bytes(crc32sum, 'little'), + int.from_bytes(suffix, 'big') + ) + + @classmethod + def read(cls, file: IO): + """Reads the packet from a socket.""" + return cls.from_bytes(file.read(2), file.read(4), file.read(1)) + + +class LoginRequest(NamedTuple): + """Login request packet.""" + + type: int + passwd: str + + def __bytes__(self): + return bytes(self.header) + self.payload + + @property + def payload(self) -> bytes: + """Returns the payload.""" + return self.type.to_bytes(1, 'little') + self.passwd.encode('ascii') + + @property + def header(self) -> Header: + """Returns the appropriate header.""" + return Header.from_payload(self.payload) + + @classmethod + def from_passwd(cls, passwd: str): + """Creates a login request with the given password.""" + return cls(0x00, passwd) + + +class Command(NamedTuple): + """Command packet.""" + + type: int + seq: int + command: str + + def __bytes__(self): + return bytes(self.header) + self.payload + + @property + def payload(self) -> bytes: + """Returns the payload.""" + return b''.join(( + self.type.to_bytes(1, 'little'), + self.seq.to_bytes(1, 'little'), + self.command.encode('ascii') + )) + + @property + def header(self) -> Header: + """Returns the appropriate header.""" + return Header.from_payload(self.payload) + + @classmethod + def from_command(cls, command: str): + """Creates a command packet from the given command.""" + return cls(0x01, 0x00, command)