Browse Source

Black

pull/20/head
Richard Neumann 2 years ago
parent
commit
b7181c3dd4
  1. 16
      rcon/__init__.py
  2. 2
      rcon/battleye/__init__.py
  3. 32
      rcon/battleye/client.py
  4. 101
      rcon/battleye/proto.py
  5. 11
      rcon/client.py
  6. 40
      rcon/config.py
  7. 40
      rcon/console.py
  8. 12
      rcon/errorhandler.py
  9. 10
      rcon/exceptions.py
  10. 115
      rcon/gui.py
  11. 36
      rcon/rconclt.py
  12. 39
      rcon/rconshell.py
  13. 16
      rcon/readline.py
  14. 2
      rcon/source/__init__.py
  15. 32
      rcon/source/async_rcon.py
  16. 16
      rcon/source/client.py
  17. 82
      rcon/source/proto.py
  18. 36
      setup.py

16
rcon/__init__.py

@ -8,13 +8,7 @@ from rcon.source import rcon as _rcon
from rcon.source import Client as _Client
__all__ = [
'EmptyResponse',
'SessionTimeout',
'WrongPassword',
'Client',
'rcon'
]
__all__ = ["EmptyResponse", "SessionTimeout", "WrongPassword", "Client", "rcon"]
class Client(_Client):
@ -22,9 +16,9 @@ class Client(_Client):
def __init__(self, *args, **kwargs):
warn(
'rcon.Client() is deprecated. Use rcon.source.Client() instead.',
"rcon.Client() is deprecated. Use rcon.source.Client() instead.",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
super().__init__(*args, **kwargs)
@ -33,8 +27,8 @@ def rcon(*args, **kwargs) -> Coroutine[Any, Any, str]:
"""Wrapper for rcon.source.rcon() for backwards compatibility."""
warn(
'rcon.rcon() is deprecated. Use rcon.source.rcon() instead.',
"rcon.rcon() is deprecated. Use rcon.source.rcon() instead.",
DeprecationWarning,
stacklevel=2
stacklevel=2,
)
return _rcon(*args, **kwargs)

2
rcon/battleye/__init__.py

@ -4,4 +4,4 @@ from rcon.battleye.client import Client
from rcon.battleye.proto import ServerMessage
__all__ = ['Client', 'ServerMessage']
__all__ = ["Client", "ServerMessage"]

32
rcon/battleye/client.py

@ -18,7 +18,7 @@ from rcon.client import BaseClient
from rcon.exceptions import WrongPassword
__all__ = ['Client']
__all__ = ["Client"]
MessageHandler = Callable[[ServerMessage], None]
@ -27,18 +27,18 @@ MessageHandler = Callable[[ServerMessage], None]
def log_message(server_message: ServerMessage) -> None:
"""Default handler, logging the server message."""
getLogger('Server message').info(server_message.message)
getLogger("Server message").info(server_message.message)
class Client(BaseClient, socket_type=SOCK_DGRAM):
"""BattlEye RCon client."""
def __init__(
self,
*args,
max_length: int = 4096,
message_handler: MessageHandler = log_message,
**kwargs
self,
*args,
max_length: int = 4096,
message_handler: MessageHandler = log_message,
**kwargs,
):
super().__init__(*args, **kwargs)
self.max_length = max_length
@ -46,7 +46,7 @@ class Client(BaseClient, socket_type=SOCK_DGRAM):
def handle_server_message(self, message: ServerMessage) -> None:
"""Handle the respective server message."""
with self._socket.makefile('wb') as file:
with self._socket.makefile("wb") as file:
file.write(bytes(ServerMessageAck(message.seq)))
self.message_handler(message)
@ -54,9 +54,11 @@ class Client(BaseClient, socket_type=SOCK_DGRAM):
def receive(self) -> Response:
"""Receive a packet."""
return RESPONSE_TYPES[
(header := Header.from_bytes(
(data := self._socket.recv(self.max_length))[:8]
)).type
(
header := Header.from_bytes(
(data := self._socket.recv(self.max_length))[:8]
)
).type
].from_bytes(header, data[8:])
def communicate(self, request: Request) -> Response | str:
@ -64,7 +66,7 @@ class Client(BaseClient, socket_type=SOCK_DGRAM):
acknowledged = defaultdict(set)
command_responses = []
with self._socket.makefile('wb') as file:
with self._socket.makefile("wb") as file:
file.write(bytes(request))
while True:
@ -87,9 +89,9 @@ class Client(BaseClient, socket_type=SOCK_DGRAM):
if isinstance(response, ServerMessage):
self.handle_server_message(response)
return ''.join(
command_response.message for command_response in
sorted(command_responses, key=lambda cr: cr.seq)
return "".join(
command_response.message
for command_response in sorted(command_responses, key=lambda cr: cr.seq)
)
def login(self, passwd: str) -> bool:

101
rcon/battleye/proto.py

@ -6,21 +6,21 @@ from zlib import crc32
__all__ = [
'RESPONSE_TYPES',
'Header',
'LoginRequest',
'LoginResponse',
'CommandRequest',
'CommandResponse',
'ServerMessage',
'ServerMessageAck',
'Request',
'Response'
"RESPONSE_TYPES",
"Header",
"LoginRequest",
"LoginResponse",
"CommandRequest",
"CommandResponse",
"ServerMessage",
"ServerMessageAck",
"Request",
"Response",
]
PREFIX = 'BE'
INFIX = 0xff
PREFIX = "BE"
INFIX = 0xFF
class Header(NamedTuple):
@ -30,40 +30,42 @@ class Header(NamedTuple):
type: int
def __bytes__(self):
return b''.join((
PREFIX.encode('ascii'),
self.crc32.to_bytes(4, 'little'),
INFIX.to_bytes(1, 'little'),
self.type.to_bytes(1, 'little')
))
return b"".join(
(
PREFIX.encode("ascii"),
self.crc32.to_bytes(4, "little"),
INFIX.to_bytes(1, "little"),
self.type.to_bytes(1, "little"),
)
)
@classmethod
def create(cls, typ: int, payload: bytes) -> Header:
"""Create a header for the given payload."""
return cls(
crc32(b''.join((
INFIX.to_bytes(1, 'little'),
typ.to_bytes(1, 'little'),
payload
))),
typ
crc32(
b"".join(
(INFIX.to_bytes(1, "little"), typ.to_bytes(1, "little"), payload)
)
),
typ,
)
@classmethod
def from_bytes(cls, payload: bytes) -> Header:
"""Create a header from the given bytes."""
if (size := len(payload)) != 8:
raise ValueError('Invalid payload size', size)
raise ValueError("Invalid payload size", size)
if (prefix := payload[:2].decode('ascii')) != PREFIX:
raise ValueError('Invalid prefix', prefix)
if (prefix := payload[:2].decode("ascii")) != PREFIX:
raise ValueError("Invalid prefix", prefix)
if (infix := int.from_bytes(payload[6:7], 'little')) != INFIX:
raise ValueError('Invalid infix', infix)
if (infix := int.from_bytes(payload[6:7], "little")) != INFIX:
raise ValueError("Invalid infix", infix)
return cls(
int.from_bytes(payload[2:6], 'little'),
int.from_bytes(payload[7:8], 'little')
int.from_bytes(payload[2:6], "little"),
int.from_bytes(payload[7:8], "little"),
)
@ -76,7 +78,7 @@ class LoginRequest(str):
@property
def payload(self) -> bytes:
"""Return the payload."""
return self.encode('ascii')
return self.encode("ascii")
@property
def header(self) -> Header:
@ -93,7 +95,7 @@ class LoginResponse(NamedTuple):
@classmethod
def from_bytes(cls, header: Header, payload: bytes) -> LoginResponse:
"""Create a login response from the given bytes."""
return cls(header, bool(int.from_bytes(payload[:1], 'little')))
return cls(header, bool(int.from_bytes(payload[:1], "little")))
class CommandRequest(NamedTuple):
@ -108,10 +110,7 @@ class CommandRequest(NamedTuple):
@property
def payload(self) -> bytes:
"""Return the payload."""
return b''.join((
self.seq.to_bytes(1, 'little'),
self.command.encode('ascii')
))
return b"".join((self.seq.to_bytes(1, "little"), self.command.encode("ascii")))
@property
def header(self) -> Header:
@ -126,7 +125,7 @@ class CommandRequest(NamedTuple):
@classmethod
def from_command(cls, command: str, *args: str) -> CommandRequest:
"""Create a command packet from the command and arguments."""
return cls.from_string(' '.join([command, *args]))
return cls.from_string(" ".join([command, *args]))
class CommandResponse(NamedTuple):
@ -139,16 +138,12 @@ class CommandResponse(NamedTuple):
@classmethod
def from_bytes(cls, header: Header, payload: bytes) -> CommandResponse:
"""Create a command response from the given bytes."""
return cls(
header,
int.from_bytes(payload[:1], 'little'),
payload[1:]
)
return cls(header, int.from_bytes(payload[:1], "little"), payload[1:])
@property
def message(self) -> str:
"""Return the text message."""
return self.payload.decode('ascii')
return self.payload.decode("ascii")
class ServerMessage(NamedTuple):
@ -161,16 +156,12 @@ class ServerMessage(NamedTuple):
@classmethod
def from_bytes(cls, header: Header, payload: bytes) -> ServerMessage:
"""Create a server message from the given bytes."""
return cls(
header,
int.from_bytes(payload[:1], 'little'),
payload[1:]
)
return cls(header, int.from_bytes(payload[:1], "little"), payload[1:])
@property
def message(self) -> str:
"""Return the text message."""
return self.payload.decode('ascii')
return self.payload.decode("ascii")
class ServerMessageAck(NamedTuple):
@ -179,19 +170,15 @@ class ServerMessageAck(NamedTuple):
seq: int
def __bytes__(self):
return (0x02).to_bytes(1, 'little') + self.payload
return (0x02).to_bytes(1, "little") + self.payload
@property
def payload(self) -> bytes:
"""Return the payload."""
return self.seq.to_bytes(1, 'little')
return self.seq.to_bytes(1, "little")
Request = LoginRequest | CommandRequest | ServerMessageAck
Response = LoginResponse | CommandResponse | ServerMessage
RESPONSE_TYPES = {
0x00: LoginResponse,
0x01: CommandResponse,
0x02: ServerMessage
}
RESPONSE_TYPES = {0x00: LoginResponse, 0x01: CommandResponse, 0x02: ServerMessage}

11
rcon/client.py

@ -3,16 +3,19 @@
from socket import SocketKind, socket
__all__ = ['BaseClient']
__all__ = ["BaseClient"]
class BaseClient:
"""A common RCON client."""
def __init__(
self, host: str, port: int, *,
timeout: float | None = None,
passwd: str | None = None
self,
host: str,
port: int,
*,
timeout: float | None = None,
passwd: str | None = None
):
"""Initialize the base client."""
self._socket = socket(type=self._socket_type)

40
rcon/config.py

@ -12,27 +12,27 @@ from typing import Iterable, NamedTuple
from rcon.exceptions import ConfigReadError, UserAbort
__all__ = ['CONFIG_FILES', 'LOG_FORMAT', 'SERVERS', 'Config', 'from_args']
__all__ = ["CONFIG_FILES", "LOG_FORMAT", "SERVERS", "Config", "from_args"]
CONFIG = ConfigParser()
if name == 'posix':
if name == "posix":
CONFIG_FILES = (
Path('/etc/rcon.conf'),
Path('/usr/local/etc/rcon.conf'),
Path.home().joinpath('.rcon.conf')
Path("/etc/rcon.conf"),
Path("/usr/local/etc/rcon.conf"),
Path.home().joinpath(".rcon.conf"),
)
elif name == 'nt':
elif name == "nt":
CONFIG_FILES = (
Path(getenv('LOCALAPPDATA')).joinpath('rcon.conf'),
Path.home().joinpath('.rcon.conf')
Path(getenv("LOCALAPPDATA")).joinpath("rcon.conf"),
Path.home().joinpath(".rcon.conf"),
)
else:
raise NotImplementedError(f'Unsupported operating system: {name}')
raise NotImplementedError(f"Unsupported operating system: {name}")
LOG_FORMAT = '[%(levelname)s] %(name)s: %(message)s'
LOGGER = getLogger('RCON Config')
LOG_FORMAT = "[%(levelname)s] %(name)s: %(message)s"
LOGGER = getLogger("RCON Config")
SERVERS = {}
@ -47,14 +47,14 @@ class Config(NamedTuple):
def from_string(cls, string: str) -> Config:
"""Read the credentials from the given string."""
try:
host, port = string.rsplit(':', maxsplit=1)
host, port = string.rsplit(":", maxsplit=1)
except ValueError:
raise ValueError(f'Invalid socket: {string}.') from None
raise ValueError(f"Invalid socket: {string}.") from None
port = int(port)
try:
passwd, host = host.rsplit('@', maxsplit=1)
passwd, host = host.rsplit("@", maxsplit=1)
except ValueError:
passwd = None
@ -65,9 +65,9 @@ class Config(NamedTuple):
"""Create a credentials tuple from
the respective config section.
"""
host = section['host']
port = section.getint('port')
passwd = section.get('passwd')
host = section["host"]
port = section.getint("port")
passwd = section.get("passwd")
return cls(host, port, passwd)
@ -92,15 +92,15 @@ def from_args(args: Namespace) -> Config:
try:
host, port, passwd = SERVERS[args.server]
except KeyError:
LOGGER.error('No such server: %s.', args.server)
LOGGER.error("No such server: %s.", args.server)
raise ConfigReadError() from None
if passwd is None:
try:
passwd = getpass('Password: ')
passwd = getpass("Password: ")
except (KeyboardInterrupt, EOFError):
print()
LOGGER.error('Aborted by user.')
LOGGER.error("Aborted by user.")
raise UserAbort() from None
return Config(host, port, passwd)

40
rcon/console.py

@ -8,15 +8,15 @@ from rcon.config import Config
from rcon.exceptions import EmptyResponse, SessionTimeout, WrongPassword
__all__ = ['PROMPT', 'rconcmd']
__all__ = ["PROMPT", "rconcmd"]
EXIT_COMMANDS = {'exit', 'quit'}
MSG_LOGIN_ABORTED = '\nLogin aborted. Bye.'
MSG_EXIT = '\nBye.'
MSG_SERVER_GONE = 'Server has gone away.'
MSG_SESSION_TIMEOUT = 'Session timed out. Please login again.'
PROMPT = 'RCON {host}:{port}> '
EXIT_COMMANDS = {"exit", "quit"}
MSG_LOGIN_ABORTED = "\nLogin aborted. Bye."
MSG_EXIT = "\nBye."
MSG_SERVER_GONE = "Server has gone away."
MSG_SESSION_TIMEOUT = "Session timed out. Please login again."
PROMPT = "RCON {host}:{port}> "
VALID_PORTS = range(0, 65536)
@ -25,7 +25,7 @@ def read_host() -> str:
while True:
try:
return input('Host: ')
return input("Host: ")
except KeyboardInterrupt:
print()
continue
@ -36,7 +36,7 @@ def read_port() -> int:
while True:
try:
port = input('Port: ')
port = input("Port: ")
except KeyboardInterrupt:
print()
continue
@ -44,13 +44,13 @@ def read_port() -> int:
try:
port = int(port)
except ValueError:
print(f'Invalid integer: {port}')
print(f"Invalid integer: {port}")
continue
if port in VALID_PORTS:
return port
print(f'Invalid port: {port}')
print(f"Invalid port: {port}")
def read_passwd() -> str:
@ -58,7 +58,7 @@ def read_passwd() -> str:
while True:
try:
return getpass('Password: ')
return getpass("Password: ")
except KeyboardInterrupt:
print()
@ -85,7 +85,7 @@ def login(client: BaseClient, passwd: str) -> str:
try:
client.login(passwd)
except WrongPassword:
print('Wrong password.')
print("Wrong password.")
passwd = read_passwd()
continue
@ -135,13 +135,13 @@ def process_input(client: BaseClient, passwd: str, prompt: str) -> bool:
def rconcmd(
client_cls: Type[BaseClient],
host: str,
port: int,
passwd: str,
*,
timeout: float | None = None,
prompt: str = PROMPT
client_cls: Type[BaseClient],
host: str,
port: int,
passwd: str,
*,
timeout: float | None = None,
prompt: str = PROMPT,
):
"""Initialize the console."""

12
rcon/errorhandler.py

@ -9,23 +9,23 @@ from rcon.exceptions import UserAbort
from rcon.exceptions import WrongPassword
__all__ = ['ErrorHandler']
__all__ = ["ErrorHandler"]
ERRORS = {
UserAbort: (1, None),
ConfigReadError: (2, None),
ConnectionRefusedError: (3, 'Connection refused.'),
(TimeoutError, timeout): (4, 'Connection timed out.'),
WrongPassword: (5, 'Wrong password.'),
SessionTimeout: (6, 'Session timed out.')
ConnectionRefusedError: (3, "Connection refused."),
(TimeoutError, timeout): (4, "Connection timed out."),
WrongPassword: (5, "Wrong password."),
SessionTimeout: (6, "Session timed out."),
}
class ErrorHandler:
"""Handles common errors and exits."""
__slots__ = ('logger', 'exit_code')
__slots__ = ("logger", "exit_code")
def __init__(self, logger: Logger):
"""Set the logger."""

10
rcon/exceptions.py

@ -1,11 +1,11 @@
"""Common exceptions."""
__all__ = [
'ConfigReadError',
'EmptyResponse',
'SessionTimeout',
'UserAbort',
'WrongPassword'
"ConfigReadError",
"EmptyResponse",
"SessionTimeout",
"UserAbort",
"WrongPassword",
]

115
rcon/gui.py

@ -9,7 +9,8 @@ from socket import gaierror, timeout
from typing import Iterable, NamedTuple, Type
from gi import require_version
require_version('Gtk', '3.0')
require_version("Gtk", "3.0")
from gi.repository import Gtk
from rcon import battleye, source
@ -18,36 +19,40 @@ from rcon.config import LOG_FORMAT
from rcon.exceptions import SessionTimeout, WrongPassword
__all__ = ['main']
__all__ = ["main"]
if name == 'posix':
CACHE_DIR = Path.home().joinpath('.cache')
elif name == 'nt':
CACHE_DIR = Path(getenv('TEMP') or getenv('TMP'))
if name == "posix":
CACHE_DIR = Path.home().joinpath(".cache")
elif name == "nt":
CACHE_DIR = Path(getenv("TEMP") or getenv("TMP"))
else:
raise NotImplementedError('Unsupported operating system.')
raise NotImplementedError("Unsupported operating system.")
CACHE_FILE = CACHE_DIR.joinpath('rcongui.json')
LOGGER = getLogger('rcongui')
CACHE_FILE = CACHE_DIR.joinpath("rcongui.json")
LOGGER = getLogger("rcongui")
def get_args() -> Namespace:
"""Parse and return the command line arguments."""
parser = ArgumentParser(description='A minimalistic, GTK-based RCON GUI.')
parser = ArgumentParser(description="A minimalistic, GTK-based RCON GUI.")
parser.add_argument(
'-B', '--battleye', action='store_true',
help='use BattlEye RCon instead of Source RCON'
"-B",
"--battleye",
action="store_true",
help="use BattlEye RCon instead of Source RCON",
)
parser.add_argument(
'-d', '--debug', action='store_true',
help='print additional debug information'
"-d", "--debug", action="store_true", help="print additional debug information"
)
parser.add_argument(
'-t', '--timeout', type=float, metavar='seconds',
help='connection timeout in seconds'
"-t",
"--timeout",
type=float,
metavar="seconds",
help="connection timeout in seconds",
)
return parser.parse_args()
@ -66,7 +71,7 @@ class GUI(Gtk.Window):
def __init__(self, args: Namespace):
"""Initialize the GUI."""
super().__init__(title='RCON GUI')
super().__init__(title="RCON GUI")
self.args = args
self.set_position(Gtk.WindowPosition.CENTER)
@ -75,32 +80,32 @@ class GUI(Gtk.Window):
self.add(self.grid)
self.host = Gtk.Entry()
self.host.set_placeholder_text('Host')
self.host.set_placeholder_text("Host")
self.grid.attach(self.host, 0, 0, 1, 1)
self.port = Gtk.SpinButton.new_with_range(0, 65535, 1)
self.port.set_placeholder_text('Port')
self.port.set_placeholder_text("Port")
self.grid.attach(self.port, 1, 0, 1, 1)
self.passwd = Gtk.Entry()
self.passwd.set_placeholder_text('Password')
self.passwd.set_placeholder_text("Password")
self.passwd.set_visibility(False)
self.grid.attach(self.passwd, 2, 0, 1, 1)
self.command = Gtk.Entry()
self.command.set_placeholder_text('Command')
self.command.set_placeholder_text("Command")
self.grid.attach(self.command, 0, 1, 2, 1)
self.button = Gtk.Button(label='Run')
self.button.connect('clicked', self.on_button_clicked)
self.button = Gtk.Button(label="Run")
self.button.connect("clicked", self.on_button_clicked)
self.grid.attach(self.button, 2, 1, 1, 1)
self.result = Gtk.TextView()
self.result.set_wrap_mode(Gtk.WrapMode.WORD)
self.result.set_property('editable', False)
self.result.set_property("editable", False)
self.grid.attach(self.result, 0, 2, 2, 1)
self.savepw = Gtk.CheckButton(label='Save password')
self.savepw = Gtk.CheckButton(label="Save password")
self.grid.attach(self.savepw, 2, 2, 1, 1)
self.load_gui_settings()
@ -117,10 +122,10 @@ class GUI(Gtk.Window):
return buf.get_text(
buf.get_iter_at_line(0),
buf.get_iter_at_line(buf.get_line_count()),
True
True,
)
return ''
return ""
@result_text.setter
def result_text(self, text: str):
@ -132,47 +137,47 @@ class GUI(Gtk.Window):
def gui_settings(self) -> dict:
"""Return the GUI settings as a dict."""
json = {
'host': self.host.get_text(),
'port': self.port.get_value_as_int(),
'command': self.command.get_text(),
'result': self.result_text,
'savepw': (savepw := self.savepw.get_active())
"host": self.host.get_text(),
"port": self.port.get_value_as_int(),
"command": self.command.get_text(),
"result": self.result_text,
"savepw": (savepw := self.savepw.get_active()),
}
if savepw:
json['passwd'] = self.passwd.get_text()
json["passwd"] = self.passwd.get_text()
return json
@gui_settings.setter
def gui_settings(self, json: dict):
"""Set the GUI settings."""
self.host.set_text(json.get('host', ''))
self.port.set_value(json.get('port', 0))
self.passwd.set_text(json.get('passwd', ''))
self.command.set_text(json.get('command', ''))
self.result_text = json.get('result', '')
self.savepw.set_active(json.get('savepw', False))
self.host.set_text(json.get("host", ""))
self.port.set_value(json.get("port", 0))
self.passwd.set_text(json.get("passwd", ""))
self.command.set_text(json.get("command", ""))
self.result_text = json.get("result", "")
self.savepw.set_active(json.get("savepw", False))
def load_gui_settings(self) -> None:
"""Load the GUI settings from the cache file."""
try:
with CACHE_FILE.open('rb') as cache:
with CACHE_FILE.open("rb") as cache:
self.gui_settings = load(cache)
except FileNotFoundError:
LOGGER.warning('Cache file not found: %s', CACHE_FILE)
LOGGER.warning("Cache file not found: %s", CACHE_FILE)
except PermissionError:
LOGGER.error('Insufficient permissions to read: %s', CACHE_FILE)
LOGGER.error("Insufficient permissions to read: %s", CACHE_FILE)
except ValueError:
LOGGER.error('Cache file contains garbage: %s', CACHE_FILE)
LOGGER.error("Cache file contains garbage: %s", CACHE_FILE)
def save_gui_settings(self):
"""Save the GUI settings to the cache file."""
try:
with CACHE_FILE.open('w', encoding='utf-8') as cache:
with CACHE_FILE.open("w", encoding="utf-8") as cache:
dump(self.gui_settings, cache, indent=2)
except PermissionError:
LOGGER.error('Insufficient permissions to read: %s', CACHE_FILE)
LOGGER.error("Insufficient permissions to read: %s", CACHE_FILE)
def show_error(self, message: str):
"""Show an error message."""
@ -180,7 +185,7 @@ class GUI(Gtk.Window):
transient_for=self,
message_type=Gtk.MessageType.ERROR,
buttons=Gtk.ButtonsType.OK,
text=message
text=message,
)
message_dialog.run()
message_dialog.destroy()
@ -188,10 +193,10 @@ class GUI(Gtk.Window):
def run_rcon(self) -> str:
"""Return the current RCON settings."""
with self.client_cls(
self.host.get_text().strip(),
self.port.get_value_as_int(),
timeout=self.args.timeout,
passwd=self.passwd.get_text()
self.host.get_text().strip(),
self.port.get_value_as_int(),
timeout=self.args.timeout,
passwd=self.passwd.get_text(),
) as client:
return client.run(*self.command.get_text().strip().split())
@ -204,13 +209,13 @@ class GUI(Gtk.Window):
except gaierror as error:
self.show_error(error.strerror)
except ConnectionRefusedError:
self.show_error('Connection refused.')
self.show_error("Connection refused.")
except (TimeoutError, timeout):
self.show_error('Connection timed out.')
self.show_error("Connection timed out.")
except WrongPassword:
self.show_error('Wrong password.')
self.show_error("Wrong password.")
except SessionTimeout:
self.show_error('Session timed out.')
self.show_error("Session timed out.")
else:
self.result_text = result
@ -226,6 +231,6 @@ def main() -> None:
args = get_args()
basicConfig(format=LOG_FORMAT, level=DEBUG if args.debug else INFO)
win = GUI(args)
win.connect('destroy', win.terminate)
win.connect("destroy", win.terminate)
win.show_all()
Gtk.main()

36
rcon/rconclt.py

@ -9,36 +9,44 @@ from rcon.config import CONFIG_FILES, LOG_FORMAT, from_args
from rcon.errorhandler import ErrorHandler
__all__ = ['main']
__all__ = ["main"]
LOGGER = getLogger('rconclt')
LOGGER = getLogger("rconclt")
def get_args() -> Namespace:
"""Parse and return the command line arguments."""
parser = ArgumentParser(description='A Minecraft RCON client.')
parser.add_argument('server', help='the server to connect to')
parser = ArgumentParser(description="A Minecraft RCON client.")
parser.add_argument("server", help="the server to connect to")
parser.add_argument(
'-B', '--battleye', action='store_true',
help='use BattlEye RCon instead of Source RCON'
"-B",
"--battleye",
action="store_true",
help="use BattlEye RCon instead of Source RCON",
)
parser.add_argument(
'-c', '--config', type=Path, metavar='file', default=CONFIG_FILES,
help='the configuration file'
"-c",
"--config",
type=Path,
metavar="file",
default=CONFIG_FILES,
help="the configuration file",
)
parser.add_argument(
'-d', '--debug', action='store_true',
help='print additional debug information'
"-d", "--debug", action="store_true", help="print additional debug information"
)
parser.add_argument(
'-t', '--timeout', type=float, metavar='seconds',
help='connection timeout in seconds'
"-t",
"--timeout",
type=float,
metavar="seconds",
help="connection timeout in seconds",
)
parser.add_argument('command', help='command to execute on the server')
parser.add_argument("command", help="command to execute on the server")
parser.add_argument(
'argument', nargs='*', default=(), help='arguments for the command'
"argument", nargs="*", default=(), help="arguments for the command"
)
return parser.parse_args()

39
rcon/rconshell.py

@ -11,32 +11,40 @@ from rcon.console import PROMPT, rconcmd
from rcon.errorhandler import ErrorHandler
__all__ = ['get_args', 'main']
__all__ = ["get_args", "main"]
LOGGER = getLogger('rconshell')
LOGGER = getLogger("rconshell")
def get_args() -> Namespace:
"""Parse and returns the CLI arguments."""
parser = ArgumentParser(description='An interactive RCON shell.')
parser.add_argument('server', nargs='?', help='the server to connect to')
parser = ArgumentParser(description="An interactive RCON shell.")
parser.add_argument("server", nargs="?", help="the server to connect to")
parser.add_argument(
'-B', '--battleye', action='store_true',
help='use BattlEye RCon instead of Source RCON'
"-B",
"--battleye",
action="store_true",
help="use BattlEye RCon instead of Source RCON",
)
parser.add_argument(
'-c', '--config', type=Path, metavar='file', default=CONFIG_FILES,
help='the configuration file'
"-c",
"--config",
type=Path,
metavar="file",
default=CONFIG_FILES,
help="the configuration file",
)
parser.add_argument(
'-p', '--prompt', default=PROMPT, metavar='PS1',
help='the shell prompt'
"-p", "--prompt", default=PROMPT, metavar="PS1", help="the shell prompt"
)
parser.add_argument(
'-t', '--timeout', type=float, metavar='seconds',
help='connection timeout in seconds'
"-t",
"--timeout",
type=float,
metavar="seconds",
help="connection timeout in seconds",
)
return parser.parse_args()
@ -55,12 +63,7 @@ def run() -> None:
with CommandHistory(LOGGER):
rconcmd(
client_cls,
host,
port,
passwd,
timeout=args.timeout,
prompt=args.prompt
client_cls, host, port, passwd, timeout=args.timeout, prompt=args.prompt
)

16
rcon/readline.py

@ -9,10 +9,10 @@ except ModuleNotFoundError:
read_history_file = write_history_file = lambda _: None
__all__ = ['CommandHistory']
__all__ = ["CommandHistory"]
HIST_FILE = Path.home() / '.rconshell_history'
HIST_FILE = Path.home() / ".rconshell_history"
class CommandHistory:
@ -28,13 +28,9 @@ class CommandHistory:
try:
read_history_file(self.file)
except FileNotFoundError:
self.logger.warning(
'Could not find history file: %s', self.file
)
self.logger.warning("Could not find history file: %s", self.file)
except PermissionError:
self.logger.error(
'Insufficient permissions to read: %s', self.file
)
self.logger.error("Insufficient permissions to read: %s", self.file)
return self
@ -43,6 +39,4 @@ class CommandHistory:
try:
write_history_file(self.file)
except PermissionError:
self.logger.error(
'Insufficient permissions to write: %s', self.file
)
self.logger.error("Insufficient permissions to write: %s", self.file)

2
rcon/source/__init__.py

@ -4,4 +4,4 @@ from rcon.source.async_rcon import rcon
from rcon.source.client import Client
__all__ = ['Client', 'rcon']
__all__ = ["Client", "rcon"]

32
rcon/source/async_rcon.py

@ -6,7 +6,7 @@ from rcon.exceptions import SessionTimeout, WrongPassword
from rcon.source.proto import Packet, Type
__all__ = ['rcon']
__all__ = ["rcon"]
async def close(writer: StreamWriter) -> None:
@ -17,12 +17,12 @@ async def close(writer: StreamWriter) -> None:
async def communicate(
reader: StreamReader,
writer: StreamWriter,
packet: Packet,
*,
frag_threshold: int = 4096,
frag_detect_cmd: str = ''
reader: StreamReader,
writer: StreamWriter,
packet: Packet,
*,
frag_threshold: int = 4096,
frag_detect_cmd: str = "",
) -> Packet:
"""Make an asynchronous request."""
@ -43,14 +43,14 @@ async def communicate(
async def rcon(
command: str,
*arguments: str,
host: str,
port: int,
passwd: str,
encoding: str = 'utf-8',
frag_threshold: int = 4096,
frag_detect_cmd: str = ''
command: str,
*arguments: str,
host: str,
port: int,
passwd: str,
encoding: str = "utf-8",
frag_threshold: int = 4096,
frag_detect_cmd: str = "",
) -> str:
"""Run a command asynchronously."""
@ -60,7 +60,7 @@ async def rcon(
writer,
Packet.make_login(passwd, encoding=encoding),
frag_threshold=frag_threshold,
frag_detect_cmd=frag_detect_cmd
frag_detect_cmd=frag_detect_cmd,
)
# Wait for SERVERDATA_AUTH_RESPONSE according to:

16
rcon/source/client.py

@ -7,18 +7,14 @@ from rcon.exceptions import SessionTimeout, WrongPassword
from rcon.source.proto import Packet, Type
__all__ = ['Client']
__all__ = ["Client"]
class Client(BaseClient, socket_type=SOCK_STREAM):
"""An RCON client."""
def __init__(
self,
*args,
frag_threshold: int = 4096,
frag_detect_cmd: str = '',
**kwargs
self, *args, frag_threshold: int = 4096, frag_detect_cmd: str = "", **kwargs
):
"""Set an optional fragmentation threshold and
command in order to detect fragmented packets.
@ -36,12 +32,12 @@ class Client(BaseClient, socket_type=SOCK_STREAM):
def send(self, packet: Packet) -> None:
"""Send a packet to the server."""
with self._socket.makefile('wb') as file:
with self._socket.makefile("wb") as file:
file.write(bytes(packet))
def read(self) -> Packet:
"""Read a packet from the server."""
with self._socket.makefile('rb') as file:
with self._socket.makefile("rb") as file:
response = Packet.read(file)
if len(response.payload) < self.frag_threshold:
@ -54,7 +50,7 @@ class Client(BaseClient, socket_type=SOCK_STREAM):
return response
def login(self, passwd: str, *, encoding: str = 'utf-8') -> bool:
def login(self, passwd: str, *, encoding: str = "utf-8") -> bool:
"""Perform a login."""
self.send(Packet.make_login(passwd, encoding=encoding))
@ -68,7 +64,7 @@ class Client(BaseClient, socket_type=SOCK_STREAM):
return True
def run(self, command: str, *args: str, encoding: str = 'utf-8') -> str:
def run(self, command: str, *args: str, encoding: str = "utf-8") -> str:
"""Run a command."""
request = Packet.make_command(command, *args, encoding=encoding)
response = self.communicate(request)

82
rcon/source/proto.py

@ -11,11 +11,11 @@ from typing import IO, NamedTuple
from rcon.exceptions import EmptyResponse
__all__ = ['LittleEndianSignedInt32', 'Type', 'Packet', 'random_request_id']
__all__ = ["LittleEndianSignedInt32", "Type", "Packet", "random_request_id"]
LOGGER = getLogger(__file__)
TERMINATOR = b'\x00\x00'
TERMINATOR = b"\x00\x00"
class LittleEndianSignedInt32(int):
@ -29,21 +29,21 @@ class LittleEndianSignedInt32(int):
super().__init__()
if not self.MIN <= self <= self.MAX:
raise ValueError('Signed int32 out of bounds:', int(self))
raise ValueError("Signed int32 out of bounds:", int(self))
def __bytes__(self):
"""Return the integer as signed little endian."""
return self.to_bytes(4, 'little', signed=True)
return self.to_bytes(4, "little", signed=True)
@classmethod
async def aread(cls, reader: StreamReader) -> LittleEndianSignedInt32:
"""Read the integer from an asynchronous file-like object."""
return cls.from_bytes(await reader.read(4), 'little', signed=True)
return cls.from_bytes(await reader.read(4), "little", signed=True)
@classmethod
def read(cls, file: IO) -> LittleEndianSignedInt32:
"""Read the integer from a file-like object."""
return cls.from_bytes(file.read(4), 'little', signed=True)
return cls.from_bytes(file.read(4), "little", signed=True)
class Type(LittleEndianSignedInt32, Enum):
@ -63,19 +63,19 @@ class Type(LittleEndianSignedInt32, Enum):
return bytes(self.value)
@classmethod
async def aread(cls, reader: StreamReader, *, prefix: str = '') -> Type:
async def aread(cls, reader: StreamReader, *, prefix: str = "") -> Type:
"""Read the type from an asynchronous file-like object."""
LOGGER.debug('%sReading type asynchronously.', prefix)
LOGGER.debug("%sReading type asynchronously.", prefix)
value = await LittleEndianSignedInt32.aread(reader)
LOGGER.debug('%s => value: %i', prefix, value)
LOGGER.debug("%s => value: %i", prefix, value)
return cls(value)
@classmethod
def read(cls, file: IO, *, prefix: str = '') -> Type:
def read(cls, file: IO, *, prefix: str = "") -> Type:
"""Read the type from a file-like object."""
LOGGER.debug('%sReading type.', prefix)
LOGGER.debug("%sReading type.", prefix)
value = LittleEndianSignedInt32.read(file)
LOGGER.debug('%s => value: %i', prefix, value)
LOGGER.debug("%s => value: %i", prefix, value)
return cls(value)
@ -92,20 +92,15 @@ class Packet(NamedTuple):
return self
if other.id != self.id:
raise ValueError('Can only add packages with same id.')
raise ValueError("Can only add packages with same id.")
if other.type != self.type:
raise ValueError('Can only add packages of same type.')
raise ValueError("Can only add packages of same type.")
if other.terminator != self.terminator:
raise ValueError('Can only add packages with same terminator.')
raise ValueError("Can only add packages with same terminator.")
return Packet(
self.id,
self.type,
self.payload + other.payload,
self.terminator
)
return Packet(self.id, self.type, self.payload + other.payload, self.terminator)
def __radd__(self, other: Packet | None) -> Packet:
if other is None:
@ -125,65 +120,64 @@ class Packet(NamedTuple):
@classmethod
async def aread(cls, reader: StreamReader) -> Packet:
"""Read a packet from an asynchronous file-like object."""
LOGGER.debug('Reading packet asynchronously.')
LOGGER.debug("Reading packet asynchronously.")
size = await LittleEndianSignedInt32.aread(reader)
LOGGER.debug(' => size: %i', size)
LOGGER.debug(" => size: %i", size)
if not size:
raise EmptyResponse()
id_ = await LittleEndianSignedInt32.aread(reader)
LOGGER.debug(' => id: %i', id_)
type_ = await Type.aread(reader, prefix=' ')
LOGGER.debug(' => type: %i', type_)
LOGGER.debug(" => id: %i", id_)
type_ = await Type.aread(reader, prefix=" ")
LOGGER.debug(" => type: %i", type_)
payload = await reader.read(size - 10)
LOGGER.debug(' => payload: %s', payload)
LOGGER.debug(" => payload: %s", payload)
terminator = await reader.read(2)
LOGGER.debug(' => terminator: %s', terminator)
LOGGER.debug(" => terminator: %s", terminator)
if terminator != TERMINATOR:
LOGGER.warning('Unexpected terminator: %s', terminator)
LOGGER.warning("Unexpected terminator: %s", terminator)
return cls(id_, type_, payload, terminator)
@classmethod
def read(cls, file: IO) -> Packet:
"""Read a packet from a file-like object."""
LOGGER.debug('Reading packet.')
LOGGER.debug("Reading packet.")
size = LittleEndianSignedInt32.read(file)
LOGGER.debug(' => size: %i', size)
LOGGER.debug(" => size: %i", size)
if not size:
raise EmptyResponse()
id_ = LittleEndianSignedInt32.read(file)
LOGGER.debug(' => id: %i', id_)
type_ = Type.read(file, prefix=' ')
LOGGER.debug(' => type: %i', type_)
LOGGER.debug(" => id: %i", id_)
type_ = Type.read(file, prefix=" ")
LOGGER.debug(" => type: %i", type_)
payload = file.read(size - 10)
LOGGER.debug(' => payload: %s', payload)
LOGGER.debug(" => payload: %s", payload)
terminator = file.read(2)
LOGGER.debug(' => terminator: %s', terminator)
LOGGER.debug(" => terminator: %s", terminator)
if terminator != TERMINATOR:
LOGGER.warning('Unexpected terminator: %s', terminator)
LOGGER.warning("Unexpected terminator: %s", terminator)
return cls(id_, type_, payload, terminator)
@classmethod
def make_command(cls, *args: str, encoding: str = 'utf-8') -> Packet:
def make_command(cls, *args: str, encoding: str = "utf-8") -> Packet:
"""Create a command packet."""
return cls(
random_request_id(), Type.SERVERDATA_EXECCOMMAND,
b' '.join(map(partial(str.encode, encoding=encoding), args))
random_request_id(),
Type.SERVERDATA_EXECCOMMAND,
b" ".join(map(partial(str.encode, encoding=encoding), args)),
)
@classmethod
def make_login(cls, passwd: str, *, encoding: str = 'utf-8') -> Packet:
def make_login(cls, passwd: str, *, encoding: str = "utf-8") -> Packet:
"""Create a login packet."""
return cls(
random_request_id(), Type.SERVERDATA_AUTH, passwd.encode(encoding)
)
return cls(random_request_id(), Type.SERVERDATA_AUTH, passwd.encode(encoding))
def random_request_id() -> LittleEndianSignedInt32:

36
setup.py

@ -4,29 +4,25 @@
from setuptools import setup
setup(
name='rcon',
name="rcon",
use_scm_version=True,
setup_requires=['setuptools_scm'],
author='Richard Neumann',
author_email='[email protected]',
python_requires='>=3.10',
packages=[
'rcon',
'rcon.battleye',
'rcon.source'
],
extras_require={'GUI': ['pygobject', 'pygtk']},
setup_requires=["setuptools_scm"],
author="Richard Neumann",
author_email="[email protected]",
python_requires=">=3.10",
packages=["rcon", "rcon.battleye", "rcon.source"],
extras_require={"GUI": ["pygobject", "pygtk"]},
entry_points={
'console_scripts': [
'rcongui = rcon.gui:main',
'rconclt = rcon.rconclt:main',
'rconshell = rcon.rconshell:main',
"console_scripts": [
"rcongui = rcon.gui:main",
"rconclt = rcon.rconclt:main",
"rconshell = rcon.rconshell:main",
],
},
url='https://github.com/conqp/rcon',
license='GPLv3',
description='An RCON client library.',
long_description=open('README.md').read(),
url="https://github.com/conqp/rcon",
license="GPLv3",
description="An RCON client library.",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
keywords='python rcon client'
keywords="python rcon client",
)

Loading…
Cancel
Save