You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

119 lines
3.7 KiB

"""BattlEye RCon client."""
from collections import defaultdict
from logging import getLogger
from socket import SOCK_DGRAM
from typing import Callable
from rcon.battleye.proto import HEADER_SIZE
from rcon.battleye.proto import RESPONSE_TYPES
from rcon.battleye.proto import CommandRequest
from rcon.battleye.proto import CommandResponse
from rcon.battleye.proto import Header
from rcon.battleye.proto import LoginRequest
from rcon.battleye.proto import LoginResponse
from rcon.battleye.proto import Request
from rcon.battleye.proto import Response
from rcon.battleye.proto import ServerMessage
from rcon.battleye.proto import ServerMessageAck
from rcon.client import BaseClient
from rcon.exceptions import WrongPassword
__all__ = ["Client"]
MessageHandler = Callable[[ServerMessage], None]
def log_message(server_message: ServerMessage) -> None:
"""Default handler, logging the server message."""
getLogger("Server message").info(server_message.message)
class Client(BaseClient, socket_type=SOCK_DGRAM):
"""BattlEye RCon client."""
def __init__(
self,
*args,
max_length: int = 4096,
message_handler: MessageHandler = log_message,
**kwargs,
):
super().__init__(*args, **kwargs)
self.max_length = max_length
self.message_handler = message_handler
def handle_server_message(self, message: ServerMessage) -> None:
"""Handle the respective server message."""
with self._socket.makefile("wb") as file:
file.write(bytes(ServerMessageAck(message.seq)))
self.message_handler(message)
def receive(self) -> Response:
"""Receive a packet."""
return RESPONSE_TYPES[
(
header := Header.from_bytes(
(data := self._socket.recv(self.max_length))[:HEADER_SIZE]
)
).type
].from_bytes(header, data[HEADER_SIZE:])
def communicate(self, request: Request) -> Response | str:
"""Send a request and receive a response."""
acknowledged = defaultdict(set)
command_responses = []
first = False
with self._socket.makefile("wb") as file:
file.write(bytes(request))
while True:
# FIXME: Can we have a better way to detect whether a
# command packet does or doesn't have a successor?
try:
response = self.receive()
except TimeoutError:
if first:
raise
else:
break
first = False
if isinstance(response, LoginResponse):
return response
seq = response.seq
if isinstance(response, CommandResponse):
# Collect fragmented command responses with the same seq
command_responses.append(response)
else:
if seq in acknowledged[response_type := type(response)]:
break
else:
acknowledged[response_type].add(seq)
if isinstance(response, ServerMessage):
self.handle_server_message(response)
return "".join(
command_response.message
for command_response in sorted(command_responses, key=lambda cr: cr.seq)
)
def login(self, passwd: str) -> bool:
"""Log-in the user."""
if not self.communicate(LoginRequest(passwd)).success:
raise WrongPassword()
return True
def run(self, command: str, *args: str) -> str:
"""Execute a command and return the text message."""
return self.communicate(CommandRequest.from_command(command, *args))