diff --git a/rcon/__init__.py b/rcon/__init__.py index b49b5ff..c2bd8b9 100644 --- a/rcon/__init__.py +++ b/rcon/__init__.py @@ -8,13 +8,7 @@ from rcon.source import rcon as _rcon from rcon.source import Client as _Client -__all__ = [ - 'EmptyResponse', - 'SessionTimeout', - 'WrongPassword', - 'Client', - 'rcon' -] +__all__ = ["EmptyResponse", "SessionTimeout", "WrongPassword", "Client", "rcon"] class Client(_Client): @@ -22,9 +16,9 @@ class Client(_Client): def __init__(self, *args, **kwargs): warn( - 'rcon.Client() is deprecated. Use rcon.source.Client() instead.', + "rcon.Client() is deprecated. Use rcon.source.Client() instead.", DeprecationWarning, - stacklevel=2 + stacklevel=2, ) super().__init__(*args, **kwargs) @@ -33,8 +27,8 @@ def rcon(*args, **kwargs) -> Coroutine[Any, Any, str]: """Wrapper for rcon.source.rcon() for backwards compatibility.""" warn( - 'rcon.rcon() is deprecated. Use rcon.source.rcon() instead.', + "rcon.rcon() is deprecated. Use rcon.source.rcon() instead.", DeprecationWarning, - stacklevel=2 + stacklevel=2, ) return _rcon(*args, **kwargs) diff --git a/rcon/battleye/__init__.py b/rcon/battleye/__init__.py index 5aea643..d912392 100644 --- a/rcon/battleye/__init__.py +++ b/rcon/battleye/__init__.py @@ -4,4 +4,4 @@ from rcon.battleye.client import Client from rcon.battleye.proto import ServerMessage -__all__ = ['Client', 'ServerMessage'] +__all__ = ["Client", "ServerMessage"] diff --git a/rcon/battleye/client.py b/rcon/battleye/client.py index 7570428..d2eb77a 100644 --- a/rcon/battleye/client.py +++ b/rcon/battleye/client.py @@ -18,7 +18,7 @@ from rcon.client import BaseClient from rcon.exceptions import WrongPassword -__all__ = ['Client'] +__all__ = ["Client"] MessageHandler = Callable[[ServerMessage], None] @@ -27,18 +27,18 @@ MessageHandler = Callable[[ServerMessage], None] def log_message(server_message: ServerMessage) -> None: """Default handler, logging the server message.""" - getLogger('Server message').info(server_message.message) + getLogger("Server message").info(server_message.message) class Client(BaseClient, socket_type=SOCK_DGRAM): """BattlEye RCon client.""" def __init__( - self, - *args, - max_length: int = 4096, - message_handler: MessageHandler = log_message, - **kwargs + self, + *args, + max_length: int = 4096, + message_handler: MessageHandler = log_message, + **kwargs, ): super().__init__(*args, **kwargs) self.max_length = max_length @@ -46,7 +46,7 @@ class Client(BaseClient, socket_type=SOCK_DGRAM): def handle_server_message(self, message: ServerMessage) -> None: """Handle the respective server message.""" - with self._socket.makefile('wb') as file: + with self._socket.makefile("wb") as file: file.write(bytes(ServerMessageAck(message.seq))) self.message_handler(message) @@ -54,9 +54,11 @@ class Client(BaseClient, socket_type=SOCK_DGRAM): def receive(self) -> Response: """Receive a packet.""" return RESPONSE_TYPES[ - (header := Header.from_bytes( - (data := self._socket.recv(self.max_length))[:8] - )).type + ( + header := Header.from_bytes( + (data := self._socket.recv(self.max_length))[:8] + ) + ).type ].from_bytes(header, data[8:]) def communicate(self, request: Request) -> Response | str: @@ -64,7 +66,7 @@ class Client(BaseClient, socket_type=SOCK_DGRAM): acknowledged = defaultdict(set) command_responses = [] - with self._socket.makefile('wb') as file: + with self._socket.makefile("wb") as file: file.write(bytes(request)) while True: @@ -87,9 +89,9 @@ class Client(BaseClient, socket_type=SOCK_DGRAM): if isinstance(response, ServerMessage): self.handle_server_message(response) - return ''.join( - command_response.message for command_response in - sorted(command_responses, key=lambda cr: cr.seq) + return "".join( + command_response.message + for command_response in sorted(command_responses, key=lambda cr: cr.seq) ) def login(self, passwd: str) -> bool: diff --git a/rcon/battleye/proto.py b/rcon/battleye/proto.py index 66d7eac..2f7c078 100644 --- a/rcon/battleye/proto.py +++ b/rcon/battleye/proto.py @@ -6,21 +6,21 @@ from zlib import crc32 __all__ = [ - 'RESPONSE_TYPES', - 'Header', - 'LoginRequest', - 'LoginResponse', - 'CommandRequest', - 'CommandResponse', - 'ServerMessage', - 'ServerMessageAck', - 'Request', - 'Response' + "RESPONSE_TYPES", + "Header", + "LoginRequest", + "LoginResponse", + "CommandRequest", + "CommandResponse", + "ServerMessage", + "ServerMessageAck", + "Request", + "Response", ] -PREFIX = 'BE' -INFIX = 0xff +PREFIX = "BE" +INFIX = 0xFF class Header(NamedTuple): @@ -30,40 +30,42 @@ class Header(NamedTuple): type: int def __bytes__(self): - return b''.join(( - PREFIX.encode('ascii'), - self.crc32.to_bytes(4, 'little'), - INFIX.to_bytes(1, 'little'), - self.type.to_bytes(1, 'little') - )) + return b"".join( + ( + PREFIX.encode("ascii"), + self.crc32.to_bytes(4, "little"), + INFIX.to_bytes(1, "little"), + self.type.to_bytes(1, "little"), + ) + ) @classmethod def create(cls, typ: int, payload: bytes) -> Header: """Create a header for the given payload.""" return cls( - crc32(b''.join(( - INFIX.to_bytes(1, 'little'), - typ.to_bytes(1, 'little'), - payload - ))), - typ + crc32( + b"".join( + (INFIX.to_bytes(1, "little"), typ.to_bytes(1, "little"), payload) + ) + ), + typ, ) @classmethod def from_bytes(cls, payload: bytes) -> Header: """Create a header from the given bytes.""" if (size := len(payload)) != 8: - raise ValueError('Invalid payload size', size) + raise ValueError("Invalid payload size", size) - if (prefix := payload[:2].decode('ascii')) != PREFIX: - raise ValueError('Invalid prefix', prefix) + if (prefix := payload[:2].decode("ascii")) != PREFIX: + raise ValueError("Invalid prefix", prefix) - if (infix := int.from_bytes(payload[6:7], 'little')) != INFIX: - raise ValueError('Invalid infix', infix) + if (infix := int.from_bytes(payload[6:7], "little")) != INFIX: + raise ValueError("Invalid infix", infix) return cls( - int.from_bytes(payload[2:6], 'little'), - int.from_bytes(payload[7:8], 'little') + int.from_bytes(payload[2:6], "little"), + int.from_bytes(payload[7:8], "little"), ) @@ -76,7 +78,7 @@ class LoginRequest(str): @property def payload(self) -> bytes: """Return the payload.""" - return self.encode('ascii') + return self.encode("ascii") @property def header(self) -> Header: @@ -93,7 +95,7 @@ class LoginResponse(NamedTuple): @classmethod def from_bytes(cls, header: Header, payload: bytes) -> LoginResponse: """Create a login response from the given bytes.""" - return cls(header, bool(int.from_bytes(payload[:1], 'little'))) + return cls(header, bool(int.from_bytes(payload[:1], "little"))) class CommandRequest(NamedTuple): @@ -108,10 +110,7 @@ class CommandRequest(NamedTuple): @property def payload(self) -> bytes: """Return the payload.""" - return b''.join(( - self.seq.to_bytes(1, 'little'), - self.command.encode('ascii') - )) + return b"".join((self.seq.to_bytes(1, "little"), self.command.encode("ascii"))) @property def header(self) -> Header: @@ -126,7 +125,7 @@ class CommandRequest(NamedTuple): @classmethod def from_command(cls, command: str, *args: str) -> CommandRequest: """Create a command packet from the command and arguments.""" - return cls.from_string(' '.join([command, *args])) + return cls.from_string(" ".join([command, *args])) class CommandResponse(NamedTuple): @@ -139,16 +138,12 @@ class CommandResponse(NamedTuple): @classmethod def from_bytes(cls, header: Header, payload: bytes) -> CommandResponse: """Create a command response from the given bytes.""" - return cls( - header, - int.from_bytes(payload[:1], 'little'), - payload[1:] - ) + return cls(header, int.from_bytes(payload[:1], "little"), payload[1:]) @property def message(self) -> str: """Return the text message.""" - return self.payload.decode('ascii') + return self.payload.decode("ascii") class ServerMessage(NamedTuple): @@ -161,16 +156,12 @@ class ServerMessage(NamedTuple): @classmethod def from_bytes(cls, header: Header, payload: bytes) -> ServerMessage: """Create a server message from the given bytes.""" - return cls( - header, - int.from_bytes(payload[:1], 'little'), - payload[1:] - ) + return cls(header, int.from_bytes(payload[:1], "little"), payload[1:]) @property def message(self) -> str: """Return the text message.""" - return self.payload.decode('ascii') + return self.payload.decode("ascii") class ServerMessageAck(NamedTuple): @@ -179,19 +170,15 @@ class ServerMessageAck(NamedTuple): seq: int def __bytes__(self): - return (0x02).to_bytes(1, 'little') + self.payload + return (0x02).to_bytes(1, "little") + self.payload @property def payload(self) -> bytes: """Return the payload.""" - return self.seq.to_bytes(1, 'little') + return self.seq.to_bytes(1, "little") Request = LoginRequest | CommandRequest | ServerMessageAck Response = LoginResponse | CommandResponse | ServerMessage -RESPONSE_TYPES = { - 0x00: LoginResponse, - 0x01: CommandResponse, - 0x02: ServerMessage -} +RESPONSE_TYPES = {0x00: LoginResponse, 0x01: CommandResponse, 0x02: ServerMessage} diff --git a/rcon/client.py b/rcon/client.py index 6308d50..6296f5a 100644 --- a/rcon/client.py +++ b/rcon/client.py @@ -3,16 +3,19 @@ from socket import SocketKind, socket -__all__ = ['BaseClient'] +__all__ = ["BaseClient"] class BaseClient: """A common RCON client.""" def __init__( - self, host: str, port: int, *, - timeout: float | None = None, - passwd: str | None = None + self, + host: str, + port: int, + *, + timeout: float | None = None, + passwd: str | None = None ): """Initialize the base client.""" self._socket = socket(type=self._socket_type) diff --git a/rcon/config.py b/rcon/config.py index d3f7943..7bcf1b7 100644 --- a/rcon/config.py +++ b/rcon/config.py @@ -12,27 +12,27 @@ from typing import Iterable, NamedTuple from rcon.exceptions import ConfigReadError, UserAbort -__all__ = ['CONFIG_FILES', 'LOG_FORMAT', 'SERVERS', 'Config', 'from_args'] +__all__ = ["CONFIG_FILES", "LOG_FORMAT", "SERVERS", "Config", "from_args"] CONFIG = ConfigParser() -if name == 'posix': +if name == "posix": CONFIG_FILES = ( - Path('/etc/rcon.conf'), - Path('/usr/local/etc/rcon.conf'), - Path.home().joinpath('.rcon.conf') + Path("/etc/rcon.conf"), + Path("/usr/local/etc/rcon.conf"), + Path.home().joinpath(".rcon.conf"), ) -elif name == 'nt': +elif name == "nt": CONFIG_FILES = ( - Path(getenv('LOCALAPPDATA')).joinpath('rcon.conf'), - Path.home().joinpath('.rcon.conf') + Path(getenv("LOCALAPPDATA")).joinpath("rcon.conf"), + Path.home().joinpath(".rcon.conf"), ) else: - raise NotImplementedError(f'Unsupported operating system: {name}') + raise NotImplementedError(f"Unsupported operating system: {name}") -LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s' -LOGGER = getLogger('RCON Config') +LOG_FORMAT = "[%(levelname)s] %(name)s: %(message)s" +LOGGER = getLogger("RCON Config") SERVERS = {} @@ -47,14 +47,14 @@ class Config(NamedTuple): def from_string(cls, string: str) -> Config: """Read the credentials from the given string.""" try: - host, port = string.rsplit(':', maxsplit=1) + host, port = string.rsplit(":", maxsplit=1) except ValueError: - raise ValueError(f'Invalid socket: {string}.') from None + raise ValueError(f"Invalid socket: {string}.") from None port = int(port) try: - passwd, host = host.rsplit('@', maxsplit=1) + passwd, host = host.rsplit("@", maxsplit=1) except ValueError: passwd = None @@ -65,9 +65,9 @@ class Config(NamedTuple): """Create a credentials tuple from the respective config section. """ - host = section['host'] - port = section.getint('port') - passwd = section.get('passwd') + host = section["host"] + port = section.getint("port") + passwd = section.get("passwd") return cls(host, port, passwd) @@ -92,15 +92,15 @@ def from_args(args: Namespace) -> Config: try: host, port, passwd = SERVERS[args.server] except KeyError: - LOGGER.error('No such server: %s.', args.server) + LOGGER.error("No such server: %s.", args.server) raise ConfigReadError() from None if passwd is None: try: - passwd = getpass('Password: ') + passwd = getpass("Password: ") except (KeyboardInterrupt, EOFError): print() - LOGGER.error('Aborted by user.') + LOGGER.error("Aborted by user.") raise UserAbort() from None return Config(host, port, passwd) diff --git a/rcon/console.py b/rcon/console.py index faf21e8..9aec392 100644 --- a/rcon/console.py +++ b/rcon/console.py @@ -8,15 +8,15 @@ from rcon.config import Config from rcon.exceptions import EmptyResponse, SessionTimeout, WrongPassword -__all__ = ['PROMPT', 'rconcmd'] +__all__ = ["PROMPT", "rconcmd"] -EXIT_COMMANDS = {'exit', 'quit'} -MSG_LOGIN_ABORTED = '\nLogin aborted. Bye.' -MSG_EXIT = '\nBye.' -MSG_SERVER_GONE = 'Server has gone away.' -MSG_SESSION_TIMEOUT = 'Session timed out. Please login again.' -PROMPT = 'RCON {host}:{port}> ' +EXIT_COMMANDS = {"exit", "quit"} +MSG_LOGIN_ABORTED = "\nLogin aborted. Bye." +MSG_EXIT = "\nBye." +MSG_SERVER_GONE = "Server has gone away." +MSG_SESSION_TIMEOUT = "Session timed out. Please login again." +PROMPT = "RCON {host}:{port}> " VALID_PORTS = range(0, 65536) @@ -25,7 +25,7 @@ def read_host() -> str: while True: try: - return input('Host: ') + return input("Host: ") except KeyboardInterrupt: print() continue @@ -36,7 +36,7 @@ def read_port() -> int: while True: try: - port = input('Port: ') + port = input("Port: ") except KeyboardInterrupt: print() continue @@ -44,13 +44,13 @@ def read_port() -> int: try: port = int(port) except ValueError: - print(f'Invalid integer: {port}') + print(f"Invalid integer: {port}") continue if port in VALID_PORTS: return port - print(f'Invalid port: {port}') + print(f"Invalid port: {port}") def read_passwd() -> str: @@ -58,7 +58,7 @@ def read_passwd() -> str: while True: try: - return getpass('Password: ') + return getpass("Password: ") except KeyboardInterrupt: print() @@ -85,7 +85,7 @@ def login(client: BaseClient, passwd: str) -> str: try: client.login(passwd) except WrongPassword: - print('Wrong password.') + print("Wrong password.") passwd = read_passwd() continue @@ -135,13 +135,13 @@ def process_input(client: BaseClient, passwd: str, prompt: str) -> bool: def rconcmd( - client_cls: Type[BaseClient], - host: str, - port: int, - passwd: str, - *, - timeout: float | None = None, - prompt: str = PROMPT + client_cls: Type[BaseClient], + host: str, + port: int, + passwd: str, + *, + timeout: float | None = None, + prompt: str = PROMPT, ): """Initialize the console.""" diff --git a/rcon/errorhandler.py b/rcon/errorhandler.py index aef32c9..e15d6af 100644 --- a/rcon/errorhandler.py +++ b/rcon/errorhandler.py @@ -9,23 +9,23 @@ from rcon.exceptions import UserAbort from rcon.exceptions import WrongPassword -__all__ = ['ErrorHandler'] +__all__ = ["ErrorHandler"] ERRORS = { UserAbort: (1, None), ConfigReadError: (2, None), - ConnectionRefusedError: (3, 'Connection refused.'), - (TimeoutError, timeout): (4, 'Connection timed out.'), - WrongPassword: (5, 'Wrong password.'), - SessionTimeout: (6, 'Session timed out.') + ConnectionRefusedError: (3, "Connection refused."), + (TimeoutError, timeout): (4, "Connection timed out."), + WrongPassword: (5, "Wrong password."), + SessionTimeout: (6, "Session timed out."), } class ErrorHandler: """Handles common errors and exits.""" - __slots__ = ('logger', 'exit_code') + __slots__ = ("logger", "exit_code") def __init__(self, logger: Logger): """Set the logger.""" diff --git a/rcon/exceptions.py b/rcon/exceptions.py index 6bf6d56..0ada898 100644 --- a/rcon/exceptions.py +++ b/rcon/exceptions.py @@ -1,11 +1,11 @@ """Common exceptions.""" __all__ = [ - 'ConfigReadError', - 'EmptyResponse', - 'SessionTimeout', - 'UserAbort', - 'WrongPassword' + "ConfigReadError", + "EmptyResponse", + "SessionTimeout", + "UserAbort", + "WrongPassword", ] diff --git a/rcon/gui.py b/rcon/gui.py index ff515e1..b4789b0 100644 --- a/rcon/gui.py +++ b/rcon/gui.py @@ -9,7 +9,8 @@ from socket import gaierror, timeout from typing import Iterable, NamedTuple, Type from gi import require_version -require_version('Gtk', '3.0') + +require_version("Gtk", "3.0") from gi.repository import Gtk from rcon import battleye, source @@ -18,36 +19,40 @@ from rcon.config import LOG_FORMAT from rcon.exceptions import SessionTimeout, WrongPassword -__all__ = ['main'] +__all__ = ["main"] -if name == 'posix': - CACHE_DIR = Path.home().joinpath('.cache') -elif name == 'nt': - CACHE_DIR = Path(getenv('TEMP') or getenv('TMP')) +if name == "posix": + CACHE_DIR = Path.home().joinpath(".cache") +elif name == "nt": + CACHE_DIR = Path(getenv("TEMP") or getenv("TMP")) else: - raise NotImplementedError('Unsupported operating system.') + raise NotImplementedError("Unsupported operating system.") -CACHE_FILE = CACHE_DIR.joinpath('rcongui.json') -LOGGER = getLogger('rcongui') +CACHE_FILE = CACHE_DIR.joinpath("rcongui.json") +LOGGER = getLogger("rcongui") def get_args() -> Namespace: """Parse and return the command line arguments.""" - parser = ArgumentParser(description='A minimalistic, GTK-based RCON GUI.') + parser = ArgumentParser(description="A minimalistic, GTK-based RCON GUI.") parser.add_argument( - '-B', '--battleye', action='store_true', - help='use BattlEye RCon instead of Source RCON' + "-B", + "--battleye", + action="store_true", + help="use BattlEye RCon instead of Source RCON", ) parser.add_argument( - '-d', '--debug', action='store_true', - help='print additional debug information' + "-d", "--debug", action="store_true", help="print additional debug information" ) parser.add_argument( - '-t', '--timeout', type=float, metavar='seconds', - help='connection timeout in seconds' + "-t", + "--timeout", + type=float, + metavar="seconds", + help="connection timeout in seconds", ) return parser.parse_args() @@ -66,7 +71,7 @@ class GUI(Gtk.Window): def __init__(self, args: Namespace): """Initialize the GUI.""" - super().__init__(title='RCON GUI') + super().__init__(title="RCON GUI") self.args = args self.set_position(Gtk.WindowPosition.CENTER) @@ -75,32 +80,32 @@ class GUI(Gtk.Window): self.add(self.grid) self.host = Gtk.Entry() - self.host.set_placeholder_text('Host') + self.host.set_placeholder_text("Host") self.grid.attach(self.host, 0, 0, 1, 1) self.port = Gtk.SpinButton.new_with_range(0, 65535, 1) - self.port.set_placeholder_text('Port') + self.port.set_placeholder_text("Port") self.grid.attach(self.port, 1, 0, 1, 1) self.passwd = Gtk.Entry() - self.passwd.set_placeholder_text('Password') + self.passwd.set_placeholder_text("Password") self.passwd.set_visibility(False) self.grid.attach(self.passwd, 2, 0, 1, 1) self.command = Gtk.Entry() - self.command.set_placeholder_text('Command') + self.command.set_placeholder_text("Command") self.grid.attach(self.command, 0, 1, 2, 1) - self.button = Gtk.Button(label='Run') - self.button.connect('clicked', self.on_button_clicked) + self.button = Gtk.Button(label="Run") + self.button.connect("clicked", self.on_button_clicked) self.grid.attach(self.button, 2, 1, 1, 1) self.result = Gtk.TextView() self.result.set_wrap_mode(Gtk.WrapMode.WORD) - self.result.set_property('editable', False) + self.result.set_property("editable", False) self.grid.attach(self.result, 0, 2, 2, 1) - self.savepw = Gtk.CheckButton(label='Save password') + self.savepw = Gtk.CheckButton(label="Save password") self.grid.attach(self.savepw, 2, 2, 1, 1) self.load_gui_settings() @@ -117,10 +122,10 @@ class GUI(Gtk.Window): return buf.get_text( buf.get_iter_at_line(0), buf.get_iter_at_line(buf.get_line_count()), - True + True, ) - return '' + return "" @result_text.setter def result_text(self, text: str): @@ -132,47 +137,47 @@ class GUI(Gtk.Window): def gui_settings(self) -> dict: """Return the GUI settings as a dict.""" json = { - 'host': self.host.get_text(), - 'port': self.port.get_value_as_int(), - 'command': self.command.get_text(), - 'result': self.result_text, - 'savepw': (savepw := self.savepw.get_active()) + "host": self.host.get_text(), + "port": self.port.get_value_as_int(), + "command": self.command.get_text(), + "result": self.result_text, + "savepw": (savepw := self.savepw.get_active()), } if savepw: - json['passwd'] = self.passwd.get_text() + json["passwd"] = self.passwd.get_text() return json @gui_settings.setter def gui_settings(self, json: dict): """Set the GUI settings.""" - self.host.set_text(json.get('host', '')) - self.port.set_value(json.get('port', 0)) - self.passwd.set_text(json.get('passwd', '')) - self.command.set_text(json.get('command', '')) - self.result_text = json.get('result', '') - self.savepw.set_active(json.get('savepw', False)) + self.host.set_text(json.get("host", "")) + self.port.set_value(json.get("port", 0)) + self.passwd.set_text(json.get("passwd", "")) + self.command.set_text(json.get("command", "")) + self.result_text = json.get("result", "") + self.savepw.set_active(json.get("savepw", False)) def load_gui_settings(self) -> None: """Load the GUI settings from the cache file.""" try: - with CACHE_FILE.open('rb') as cache: + with CACHE_FILE.open("rb") as cache: self.gui_settings = load(cache) except FileNotFoundError: - LOGGER.warning('Cache file not found: %s', CACHE_FILE) + LOGGER.warning("Cache file not found: %s", CACHE_FILE) except PermissionError: - LOGGER.error('Insufficient permissions to read: %s', CACHE_FILE) + LOGGER.error("Insufficient permissions to read: %s", CACHE_FILE) except ValueError: - LOGGER.error('Cache file contains garbage: %s', CACHE_FILE) + LOGGER.error("Cache file contains garbage: %s", CACHE_FILE) def save_gui_settings(self): """Save the GUI settings to the cache file.""" try: - with CACHE_FILE.open('w', encoding='utf-8') as cache: + with CACHE_FILE.open("w", encoding="utf-8") as cache: dump(self.gui_settings, cache, indent=2) except PermissionError: - LOGGER.error('Insufficient permissions to read: %s', CACHE_FILE) + LOGGER.error("Insufficient permissions to read: %s", CACHE_FILE) def show_error(self, message: str): """Show an error message.""" @@ -180,7 +185,7 @@ class GUI(Gtk.Window): transient_for=self, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.OK, - text=message + text=message, ) message_dialog.run() message_dialog.destroy() @@ -188,10 +193,10 @@ class GUI(Gtk.Window): def run_rcon(self) -> str: """Return the current RCON settings.""" with self.client_cls( - self.host.get_text().strip(), - self.port.get_value_as_int(), - timeout=self.args.timeout, - passwd=self.passwd.get_text() + self.host.get_text().strip(), + self.port.get_value_as_int(), + timeout=self.args.timeout, + passwd=self.passwd.get_text(), ) as client: return client.run(*self.command.get_text().strip().split()) @@ -204,13 +209,13 @@ class GUI(Gtk.Window): except gaierror as error: self.show_error(error.strerror) except ConnectionRefusedError: - self.show_error('Connection refused.') + self.show_error("Connection refused.") except (TimeoutError, timeout): - self.show_error('Connection timed out.') + self.show_error("Connection timed out.") except WrongPassword: - self.show_error('Wrong password.') + self.show_error("Wrong password.") except SessionTimeout: - self.show_error('Session timed out.') + self.show_error("Session timed out.") else: self.result_text = result @@ -226,6 +231,6 @@ def main() -> None: args = get_args() basicConfig(format=LOG_FORMAT, level=DEBUG if args.debug else INFO) win = GUI(args) - win.connect('destroy', win.terminate) + win.connect("destroy", win.terminate) win.show_all() Gtk.main() diff --git a/rcon/rconclt.py b/rcon/rconclt.py index c8ca215..b731f53 100644 --- a/rcon/rconclt.py +++ b/rcon/rconclt.py @@ -9,36 +9,44 @@ from rcon.config import CONFIG_FILES, LOG_FORMAT, from_args from rcon.errorhandler import ErrorHandler -__all__ = ['main'] +__all__ = ["main"] -LOGGER = getLogger('rconclt') +LOGGER = getLogger("rconclt") def get_args() -> Namespace: """Parse and return the command line arguments.""" - parser = ArgumentParser(description='A Minecraft RCON client.') - parser.add_argument('server', help='the server to connect to') + parser = ArgumentParser(description="A Minecraft RCON client.") + parser.add_argument("server", help="the server to connect to") parser.add_argument( - '-B', '--battleye', action='store_true', - help='use BattlEye RCon instead of Source RCON' + "-B", + "--battleye", + action="store_true", + help="use BattlEye RCon instead of Source RCON", ) parser.add_argument( - '-c', '--config', type=Path, metavar='file', default=CONFIG_FILES, - help='the configuration file' + "-c", + "--config", + type=Path, + metavar="file", + default=CONFIG_FILES, + help="the configuration file", ) parser.add_argument( - '-d', '--debug', action='store_true', - help='print additional debug information' + "-d", "--debug", action="store_true", help="print additional debug information" ) parser.add_argument( - '-t', '--timeout', type=float, metavar='seconds', - help='connection timeout in seconds' + "-t", + "--timeout", + type=float, + metavar="seconds", + help="connection timeout in seconds", ) - parser.add_argument('command', help='command to execute on the server') + parser.add_argument("command", help="command to execute on the server") parser.add_argument( - 'argument', nargs='*', default=(), help='arguments for the command' + "argument", nargs="*", default=(), help="arguments for the command" ) return parser.parse_args() diff --git a/rcon/rconshell.py b/rcon/rconshell.py index c5d5d6e..a40c157 100644 --- a/rcon/rconshell.py +++ b/rcon/rconshell.py @@ -11,32 +11,40 @@ from rcon.console import PROMPT, rconcmd from rcon.errorhandler import ErrorHandler -__all__ = ['get_args', 'main'] +__all__ = ["get_args", "main"] -LOGGER = getLogger('rconshell') +LOGGER = getLogger("rconshell") def get_args() -> Namespace: """Parse and returns the CLI arguments.""" - parser = ArgumentParser(description='An interactive RCON shell.') - parser.add_argument('server', nargs='?', help='the server to connect to') + parser = ArgumentParser(description="An interactive RCON shell.") + parser.add_argument("server", nargs="?", help="the server to connect to") parser.add_argument( - '-B', '--battleye', action='store_true', - help='use BattlEye RCon instead of Source RCON' + "-B", + "--battleye", + action="store_true", + help="use BattlEye RCon instead of Source RCON", ) parser.add_argument( - '-c', '--config', type=Path, metavar='file', default=CONFIG_FILES, - help='the configuration file' + "-c", + "--config", + type=Path, + metavar="file", + default=CONFIG_FILES, + help="the configuration file", ) parser.add_argument( - '-p', '--prompt', default=PROMPT, metavar='PS1', - help='the shell prompt' + "-p", "--prompt", default=PROMPT, metavar="PS1", help="the shell prompt" ) parser.add_argument( - '-t', '--timeout', type=float, metavar='seconds', - help='connection timeout in seconds' + "-t", + "--timeout", + type=float, + metavar="seconds", + help="connection timeout in seconds", ) return parser.parse_args() @@ -55,12 +63,7 @@ def run() -> None: with CommandHistory(LOGGER): rconcmd( - client_cls, - host, - port, - passwd, - timeout=args.timeout, - prompt=args.prompt + client_cls, host, port, passwd, timeout=args.timeout, prompt=args.prompt ) diff --git a/rcon/readline.py b/rcon/readline.py index c354ff8..d421dec 100644 --- a/rcon/readline.py +++ b/rcon/readline.py @@ -9,10 +9,10 @@ except ModuleNotFoundError: read_history_file = write_history_file = lambda _: None -__all__ = ['CommandHistory'] +__all__ = ["CommandHistory"] -HIST_FILE = Path.home() / '.rconshell_history' +HIST_FILE = Path.home() / ".rconshell_history" class CommandHistory: @@ -28,13 +28,9 @@ class CommandHistory: try: read_history_file(self.file) except FileNotFoundError: - self.logger.warning( - 'Could not find history file: %s', self.file - ) + self.logger.warning("Could not find history file: %s", self.file) except PermissionError: - self.logger.error( - 'Insufficient permissions to read: %s', self.file - ) + self.logger.error("Insufficient permissions to read: %s", self.file) return self @@ -43,6 +39,4 @@ class CommandHistory: try: write_history_file(self.file) except PermissionError: - self.logger.error( - 'Insufficient permissions to write: %s', self.file - ) + self.logger.error("Insufficient permissions to write: %s", self.file) diff --git a/rcon/source/__init__.py b/rcon/source/__init__.py index 4d7e143..ed9a7f8 100644 --- a/rcon/source/__init__.py +++ b/rcon/source/__init__.py @@ -4,4 +4,4 @@ from rcon.source.async_rcon import rcon from rcon.source.client import Client -__all__ = ['Client', 'rcon'] +__all__ = ["Client", "rcon"] diff --git a/rcon/source/async_rcon.py b/rcon/source/async_rcon.py index e5dcebe..a012f0f 100644 --- a/rcon/source/async_rcon.py +++ b/rcon/source/async_rcon.py @@ -6,7 +6,7 @@ from rcon.exceptions import SessionTimeout, WrongPassword from rcon.source.proto import Packet, Type -__all__ = ['rcon'] +__all__ = ["rcon"] async def close(writer: StreamWriter) -> None: @@ -17,12 +17,12 @@ async def close(writer: StreamWriter) -> None: async def communicate( - reader: StreamReader, - writer: StreamWriter, - packet: Packet, - *, - frag_threshold: int = 4096, - frag_detect_cmd: str = '' + reader: StreamReader, + writer: StreamWriter, + packet: Packet, + *, + frag_threshold: int = 4096, + frag_detect_cmd: str = "", ) -> Packet: """Make an asynchronous request.""" @@ -43,14 +43,14 @@ async def communicate( async def rcon( - command: str, - *arguments: str, - host: str, - port: int, - passwd: str, - encoding: str = 'utf-8', - frag_threshold: int = 4096, - frag_detect_cmd: str = '' + command: str, + *arguments: str, + host: str, + port: int, + passwd: str, + encoding: str = "utf-8", + frag_threshold: int = 4096, + frag_detect_cmd: str = "", ) -> str: """Run a command asynchronously.""" @@ -60,7 +60,7 @@ async def rcon( writer, Packet.make_login(passwd, encoding=encoding), frag_threshold=frag_threshold, - frag_detect_cmd=frag_detect_cmd + frag_detect_cmd=frag_detect_cmd, ) # Wait for SERVERDATA_AUTH_RESPONSE according to: diff --git a/rcon/source/client.py b/rcon/source/client.py index 47de7e0..5a06086 100644 --- a/rcon/source/client.py +++ b/rcon/source/client.py @@ -7,18 +7,14 @@ from rcon.exceptions import SessionTimeout, WrongPassword from rcon.source.proto import Packet, Type -__all__ = ['Client'] +__all__ = ["Client"] class Client(BaseClient, socket_type=SOCK_STREAM): """An RCON client.""" def __init__( - self, - *args, - frag_threshold: int = 4096, - frag_detect_cmd: str = '', - **kwargs + self, *args, frag_threshold: int = 4096, frag_detect_cmd: str = "", **kwargs ): """Set an optional fragmentation threshold and command in order to detect fragmented packets. @@ -36,12 +32,12 @@ class Client(BaseClient, socket_type=SOCK_STREAM): def send(self, packet: Packet) -> None: """Send a packet to the server.""" - with self._socket.makefile('wb') as file: + with self._socket.makefile("wb") as file: file.write(bytes(packet)) def read(self) -> Packet: """Read a packet from the server.""" - with self._socket.makefile('rb') as file: + with self._socket.makefile("rb") as file: response = Packet.read(file) if len(response.payload) < self.frag_threshold: @@ -54,7 +50,7 @@ class Client(BaseClient, socket_type=SOCK_STREAM): return response - def login(self, passwd: str, *, encoding: str = 'utf-8') -> bool: + def login(self, passwd: str, *, encoding: str = "utf-8") -> bool: """Perform a login.""" self.send(Packet.make_login(passwd, encoding=encoding)) @@ -68,7 +64,7 @@ class Client(BaseClient, socket_type=SOCK_STREAM): return True - def run(self, command: str, *args: str, encoding: str = 'utf-8') -> str: + def run(self, command: str, *args: str, encoding: str = "utf-8") -> str: """Run a command.""" request = Packet.make_command(command, *args, encoding=encoding) response = self.communicate(request) diff --git a/rcon/source/proto.py b/rcon/source/proto.py index a1cdb7b..fa2312f 100644 --- a/rcon/source/proto.py +++ b/rcon/source/proto.py @@ -11,11 +11,11 @@ from typing import IO, NamedTuple from rcon.exceptions import EmptyResponse -__all__ = ['LittleEndianSignedInt32', 'Type', 'Packet', 'random_request_id'] +__all__ = ["LittleEndianSignedInt32", "Type", "Packet", "random_request_id"] LOGGER = getLogger(__file__) -TERMINATOR = b'\x00\x00' +TERMINATOR = b"\x00\x00" class LittleEndianSignedInt32(int): @@ -29,21 +29,21 @@ class LittleEndianSignedInt32(int): super().__init__() if not self.MIN <= self <= self.MAX: - raise ValueError('Signed int32 out of bounds:', int(self)) + raise ValueError("Signed int32 out of bounds:", int(self)) def __bytes__(self): """Return the integer as signed little endian.""" - return self.to_bytes(4, 'little', signed=True) + return self.to_bytes(4, "little", signed=True) @classmethod async def aread(cls, reader: StreamReader) -> LittleEndianSignedInt32: """Read the integer from an asynchronous file-like object.""" - return cls.from_bytes(await reader.read(4), 'little', signed=True) + return cls.from_bytes(await reader.read(4), "little", signed=True) @classmethod def read(cls, file: IO) -> LittleEndianSignedInt32: """Read the integer from a file-like object.""" - return cls.from_bytes(file.read(4), 'little', signed=True) + return cls.from_bytes(file.read(4), "little", signed=True) class Type(LittleEndianSignedInt32, Enum): @@ -63,19 +63,19 @@ class Type(LittleEndianSignedInt32, Enum): return bytes(self.value) @classmethod - async def aread(cls, reader: StreamReader, *, prefix: str = '') -> Type: + async def aread(cls, reader: StreamReader, *, prefix: str = "") -> Type: """Read the type from an asynchronous file-like object.""" - LOGGER.debug('%sReading type asynchronously.', prefix) + LOGGER.debug("%sReading type asynchronously.", prefix) value = await LittleEndianSignedInt32.aread(reader) - LOGGER.debug('%s => value: %i', prefix, value) + LOGGER.debug("%s => value: %i", prefix, value) return cls(value) @classmethod - def read(cls, file: IO, *, prefix: str = '') -> Type: + def read(cls, file: IO, *, prefix: str = "") -> Type: """Read the type from a file-like object.""" - LOGGER.debug('%sReading type.', prefix) + LOGGER.debug("%sReading type.", prefix) value = LittleEndianSignedInt32.read(file) - LOGGER.debug('%s => value: %i', prefix, value) + LOGGER.debug("%s => value: %i", prefix, value) return cls(value) @@ -92,20 +92,15 @@ class Packet(NamedTuple): return self if other.id != self.id: - raise ValueError('Can only add packages with same id.') + raise ValueError("Can only add packages with same id.") if other.type != self.type: - raise ValueError('Can only add packages of same type.') + raise ValueError("Can only add packages of same type.") if other.terminator != self.terminator: - raise ValueError('Can only add packages with same terminator.') + raise ValueError("Can only add packages with same terminator.") - return Packet( - self.id, - self.type, - self.payload + other.payload, - self.terminator - ) + return Packet(self.id, self.type, self.payload + other.payload, self.terminator) def __radd__(self, other: Packet | None) -> Packet: if other is None: @@ -125,65 +120,64 @@ class Packet(NamedTuple): @classmethod async def aread(cls, reader: StreamReader) -> Packet: """Read a packet from an asynchronous file-like object.""" - LOGGER.debug('Reading packet asynchronously.') + LOGGER.debug("Reading packet asynchronously.") size = await LittleEndianSignedInt32.aread(reader) - LOGGER.debug(' => size: %i', size) + LOGGER.debug(" => size: %i", size) if not size: raise EmptyResponse() id_ = await LittleEndianSignedInt32.aread(reader) - LOGGER.debug(' => id: %i', id_) - type_ = await Type.aread(reader, prefix=' ') - LOGGER.debug(' => type: %i', type_) + LOGGER.debug(" => id: %i", id_) + type_ = await Type.aread(reader, prefix=" ") + LOGGER.debug(" => type: %i", type_) payload = await reader.read(size - 10) - LOGGER.debug(' => payload: %s', payload) + LOGGER.debug(" => payload: %s", payload) terminator = await reader.read(2) - LOGGER.debug(' => terminator: %s', terminator) + LOGGER.debug(" => terminator: %s", terminator) if terminator != TERMINATOR: - LOGGER.warning('Unexpected terminator: %s', terminator) + LOGGER.warning("Unexpected terminator: %s", terminator) return cls(id_, type_, payload, terminator) @classmethod def read(cls, file: IO) -> Packet: """Read a packet from a file-like object.""" - LOGGER.debug('Reading packet.') + LOGGER.debug("Reading packet.") size = LittleEndianSignedInt32.read(file) - LOGGER.debug(' => size: %i', size) + LOGGER.debug(" => size: %i", size) if not size: raise EmptyResponse() id_ = LittleEndianSignedInt32.read(file) - LOGGER.debug(' => id: %i', id_) - type_ = Type.read(file, prefix=' ') - LOGGER.debug(' => type: %i', type_) + LOGGER.debug(" => id: %i", id_) + type_ = Type.read(file, prefix=" ") + LOGGER.debug(" => type: %i", type_) payload = file.read(size - 10) - LOGGER.debug(' => payload: %s', payload) + LOGGER.debug(" => payload: %s", payload) terminator = file.read(2) - LOGGER.debug(' => terminator: %s', terminator) + LOGGER.debug(" => terminator: %s", terminator) if terminator != TERMINATOR: - LOGGER.warning('Unexpected terminator: %s', terminator) + LOGGER.warning("Unexpected terminator: %s", terminator) return cls(id_, type_, payload, terminator) @classmethod - def make_command(cls, *args: str, encoding: str = 'utf-8') -> Packet: + def make_command(cls, *args: str, encoding: str = "utf-8") -> Packet: """Create a command packet.""" return cls( - random_request_id(), Type.SERVERDATA_EXECCOMMAND, - b' '.join(map(partial(str.encode, encoding=encoding), args)) + random_request_id(), + Type.SERVERDATA_EXECCOMMAND, + b" ".join(map(partial(str.encode, encoding=encoding), args)), ) @classmethod - def make_login(cls, passwd: str, *, encoding: str = 'utf-8') -> Packet: + def make_login(cls, passwd: str, *, encoding: str = "utf-8") -> Packet: """Create a login packet.""" - return cls( - random_request_id(), Type.SERVERDATA_AUTH, passwd.encode(encoding) - ) + return cls(random_request_id(), Type.SERVERDATA_AUTH, passwd.encode(encoding)) def random_request_id() -> LittleEndianSignedInt32: diff --git a/setup.py b/setup.py index 40eaed8..554cb43 100755 --- a/setup.py +++ b/setup.py @@ -4,29 +4,25 @@ from setuptools import setup setup( - name='rcon', + name="rcon", use_scm_version=True, - setup_requires=['setuptools_scm'], - author='Richard Neumann', - author_email='mail@richard-neumann.de', - python_requires='>=3.10', - packages=[ - 'rcon', - 'rcon.battleye', - 'rcon.source' - ], - extras_require={'GUI': ['pygobject', 'pygtk']}, + setup_requires=["setuptools_scm"], + author="Richard Neumann", + author_email="mail@richard-neumann.de", + python_requires=">=3.10", + packages=["rcon", "rcon.battleye", "rcon.source"], + extras_require={"GUI": ["pygobject", "pygtk"]}, entry_points={ - 'console_scripts': [ - 'rcongui = rcon.gui:main', - 'rconclt = rcon.rconclt:main', - 'rconshell = rcon.rconshell:main', + "console_scripts": [ + "rcongui = rcon.gui:main", + "rconclt = rcon.rconclt:main", + "rconshell = rcon.rconshell:main", ], }, - url='https://github.com/conqp/rcon', - license='GPLv3', - description='An RCON client library.', - long_description=open('README.md').read(), + url="https://github.com/conqp/rcon", + license="GPLv3", + description="An RCON client library.", + long_description=open("README.md").read(), long_description_content_type="text/markdown", - keywords='python rcon client' + keywords="python rcon client", )