Browse Source

Update docstrings

pull/14/head
Richard Neumann 3 years ago
parent
commit
376759b8f7
  1. 4
      rcon/__init__.py
  2. 10
      rcon/battleye/client.py
  3. 26
      rcon/battleye/proto.py
  4. 18
      rcon/client.py
  5. 6
      rcon/config.py
  6. 14
      rcon/console.py
  7. 4
      rcon/errorhandler.py
  8. 30
      rcon/gui.py
  9. 6
      rcon/rconclt.py
  10. 6
      rcon/rconshell.py
  11. 6
      rcon/readline.py
  12. 4
      rcon/source/async_rcon.py
  13. 8
      rcon/source/client.py
  14. 28
      rcon/source/proto.py

4
rcon/__init__.py

@ -8,7 +8,7 @@ from rcon.source import Client as _Client
class Client(_Client):
"""Backwards compatibility."""
"""Wrapper for the rcon.source.Client for backwards compatibility."""
def __init__(self, *args, **kwargs):
warn(
@ -20,7 +20,7 @@ class Client(_Client):
def rcon(*args, **kwargs) -> Coroutine[Any, Any, str]:
"""Backwards compatibility."""
"""Wrapper for rcon.source.rcon() for backwards compatibility."""
warn(
'rcon.rcon() is deprecated. Use rcon.source.rcon() instead.',

10
rcon/battleye/client.py

@ -39,7 +39,7 @@ class Client(BaseClient, socket_type=SOCK_DGRAM):
self._handle_server_message = message_handler
def _receive(self, max_length: int) -> Response:
"""Receives a packet."""
"""Receive a packet."""
return RESPONSE_TYPES[
(header := Header.from_bytes(
(data := self._socket.recv(max_length))[:8]
@ -47,28 +47,28 @@ class Client(BaseClient, socket_type=SOCK_DGRAM):
].from_bytes(header, data[8:])
def receive(self, max_length: int = 4096) -> Response:
"""Receives a message."""
"""Receive a message."""
while isinstance(response := self._receive(max_length), ServerMessage):
self._handle_server_message(response)
return response
def communicate(self, request: Request) -> Response:
"""Logs the user in."""
"""Send a request and receive a response."""
with self._socket.makefile('wb') as file:
file.write(bytes(request))
return self.receive()
def login(self, passwd: str) -> bool:
"""Logs the user in."""
"""Log-in the user."""
if not self.communicate(LoginRequest(passwd)).success:
raise WrongPassword()
return True
def run(self, command: str, *args: str) -> str:
"""Executes a command."""
"""Execute a command and return the text message."""
return self.communicate(
CommandRequest.from_command(command, *args)
).message

26
rcon/battleye/proto.py

@ -38,7 +38,7 @@ class Header(NamedTuple):
@classmethod
def create(cls, typ: int, payload: bytes) -> Header:
"""Creates a header for the given payload."""
"""Create a header for the given payload."""
return cls(
crc32(b''.join((
INFIX.to_bytes(1, 'little'),
@ -50,7 +50,7 @@ class Header(NamedTuple):
@classmethod
def from_bytes(cls, payload: bytes) -> Header:
"""Creates a header from the given bytes."""
"""Create a header from the given bytes."""
if (size := len(payload)) != 8:
raise ValueError('Invalid payload size', size)
@ -74,12 +74,12 @@ class LoginRequest(str):
@property
def payload(self) -> bytes:
"""Returns the payload."""
"""Return the payload."""
return self.encode('ascii')
@property
def header(self) -> Header:
"""Returns the appropriate header."""
"""Return the appropriate header."""
return Header.create(0x00, self.payload)
@ -91,7 +91,7 @@ class LoginResponse(NamedTuple):
@classmethod
def from_bytes(cls, header: Header, payload: bytes) -> LoginResponse:
"""Creates a login response from the given bytes."""
"""Create a login response from the given bytes."""
return cls(header, bool(int.from_bytes(payload[:1], 'little')))
@ -106,7 +106,7 @@ class CommandRequest(NamedTuple):
@property
def payload(self) -> bytes:
"""Returns the payload."""
"""Return the payload."""
return b''.join((
self.seq.to_bytes(1, 'little'),
self.command.encode('ascii')
@ -114,17 +114,17 @@ class CommandRequest(NamedTuple):
@property
def header(self) -> Header:
"""Returns the appropriate header."""
"""Return the appropriate header."""
return Header.create(0x01, self.payload)
@classmethod
def from_string(cls, command: str) -> CommandRequest:
"""Creates a command packet from the given string."""
"""Create a command packet from the given string."""
return cls(0x00, command)
@classmethod
def from_command(cls, command: str, *args: str) -> CommandRequest:
"""Creates a command packet from the command and arguments."""
"""Create a command packet from the command and arguments."""
return cls.from_string(' '.join([command, *args]))
@ -137,7 +137,7 @@ class CommandResponse(NamedTuple):
@classmethod
def from_bytes(cls, header: Header, payload: bytes) -> CommandResponse:
"""Creates a command response from the given bytes."""
"""Create a command response from the given bytes."""
return cls(
header,
int.from_bytes(payload[:1], 'little'),
@ -146,7 +146,7 @@ class CommandResponse(NamedTuple):
@property
def message(self) -> str:
"""Returns the text message."""
"""Return the text message."""
return self.payload.decode('ascii')
@ -159,7 +159,7 @@ class ServerMessage(NamedTuple):
@classmethod
def from_bytes(cls, header: Header, payload: bytes) -> ServerMessage:
"""Creates a server message from the given bytes."""
"""Create a server message from the given bytes."""
return cls(
header,
int.from_bytes(payload[:1], 'little'),
@ -168,7 +168,7 @@ class ServerMessage(NamedTuple):
@property
def message(self) -> str:
"""Returns the text message."""
"""Return the text message."""
return self.payload.decode('ascii')

18
rcon/client.py

@ -14,7 +14,7 @@ class BaseClient:
timeout: float | None = None,
passwd: str | None = None
):
"""Initializes the base client with the SOCK_STREAM socket type."""
"""Initialize the base client with the SOCK_STREAM socket type."""
self._socket = socket(type=self._socket_type)
self.host = host
self.port = port
@ -26,27 +26,27 @@ class BaseClient:
cls._socket_type = socket_type
def __enter__(self):
"""Attempts an auto-login if a password is set."""
"""Attempt an auto-login if a password is set."""
self._socket.__enter__()
self.connect(login=True)
return self
def __exit__(self, typ, value, traceback):
"""Delegates to the underlying socket's exit method."""
"""Delegate to the underlying socket's exit method."""
return self._socket.__exit__(typ, value, traceback)
@property
def timeout(self) -> float:
"""Returns the socket timeout."""
"""Return the socket timeout."""
return self._socket.gettimeout()
@timeout.setter
def timeout(self, timeout: float):
"""Sets the socket timeout."""
"""Set the socket timeout."""
self._socket.settimeout(timeout)
def connect(self, login: bool = False) -> None:
"""Connects the socket and attempts a
"""Connect the socket and attempt a
login if wanted and a password is set.
"""
self._socket.connect((self.host, self.port))
@ -55,13 +55,13 @@ class BaseClient:
self.login(self.passwd)
def close(self) -> None:
"""Closes the socket connection."""
"""Close the socket connection."""
self._socket.close()
def login(self, passwd: str) -> bool:
"""Performs a login."""
"""Perform a login."""
raise NotImplementedError()
def run(self, command: str, *args: str) -> str:
"""Runs a command."""
"""Run a command."""
raise NotImplementedError()

6
rcon/config.py

@ -45,7 +45,7 @@ class Config(NamedTuple):
@classmethod
def from_string(cls, string: str) -> Config:
"""Reads the credentials from the given string."""
"""Read the credentials from the given string."""
try:
host, port = string.rsplit(':', maxsplit=1)
except ValueError:
@ -62,7 +62,7 @@ class Config(NamedTuple):
@classmethod
def from_config_section(cls, section: SectionProxy) -> Config:
"""Creates a credentials tuple from
"""Create a credentials tuple from
the respective config section.
"""
host = section['host']
@ -72,7 +72,7 @@ class Config(NamedTuple):
def load(config_files: Path | Iterable[Path] = CONFIG_FILES) -> None:
"""Reads the configuration files and populates SERVERS."""
"""Read the configuration files and populates SERVERS."""
SERVERS.clear()
CONFIG.read(config_files)

14
rcon/console.py

@ -19,7 +19,7 @@ PROMPT = 'RCON {host}:{port}> '
def read_host() -> str:
"""Reads the host."""
"""Read the host."""
while True:
try:
@ -30,7 +30,7 @@ def read_host() -> str:
def read_port() -> int:
"""Reads the port."""
"""Read the port."""
while True:
try:
@ -52,7 +52,7 @@ def read_port() -> int:
def read_passwd() -> str:
"""Reads the password."""
"""Read the password."""
while True:
try:
@ -62,7 +62,7 @@ def read_passwd() -> str:
def get_config(host: str, port: int, passwd: str) -> Config:
"""Reads the necessary arguments."""
"""Read the necessary arguments."""
if host is None:
host = read_host()
@ -77,7 +77,7 @@ def get_config(host: str, port: int, passwd: str) -> Config:
def login(client: BaseClient, passwd: str) -> str:
"""Performs a login."""
"""Perform a login."""
while True:
try:
@ -91,7 +91,7 @@ def login(client: BaseClient, passwd: str) -> str:
def process_input(client: BaseClient, passwd: str, prompt: str) -> bool:
"""Processes the CLI input."""
"""Processe the CLI input."""
try:
command = input(prompt)
@ -133,7 +133,7 @@ def rconcmd(
client_cls: Type[BaseClient], host: str, port: int, passwd: str, *,
prompt: str = PROMPT
):
"""Initializes the console."""
"""Initialize the console."""
try:
host, port, passwd = get_config(host, port, passwd)

4
rcon/errorhandler.py

@ -28,7 +28,7 @@ class ErrorHandler:
__slots__ = ('logger', 'exit_code')
def __init__(self, logger: Logger):
"""Sets the logger."""
"""Set the logger."""
self.logger = logger
self.exit_code = 0
@ -36,7 +36,7 @@ class ErrorHandler:
return self
def __exit__(self, _, value: Exception, __):
"""Checks for connection errors and exits respectively."""
"""Check for common errors and exit respectively."""
if value is None:
return True

30
rcon/gui.py

@ -34,7 +34,7 @@ LOGGER = getLogger('rcongui')
def get_args() -> Namespace:
"""Parses the command line arguments."""
"""Parse and return the command line arguments."""
parser = ArgumentParser(description='A minimalistic, GTK-based RCON GUI.')
parser.add_argument(
@ -53,7 +53,7 @@ def get_args() -> Namespace:
class RCONParams(NamedTuple):
"""Represents the RCON parameters."""
"""Represent the RCON parameters."""
host: str
port: int
@ -65,7 +65,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
"""A GTK based GUI for RCON."""
def __init__(self, args: Namespace):
"""Initializes the GUI."""
"""Initialize the GUI."""
super().__init__(title='RCON GUI')
self.args = args
@ -107,12 +107,12 @@ class GUI(Gtk.Window): # pylint: disable=R0902
@property
def client_cls(self) -> Type[BaseClient]:
"""Returns the client class."""
"""Return the client class."""
return battleye.Client if self.args.battleye else source.Client
@property
def result_text(self) -> str:
"""Returns the result text."""
"""Return the result text."""
if (buf := self.result.get_buffer()) is not None:
start = buf.get_iter_at_line(0)
end = buf.get_iter_at_line(buf.get_line_count())
@ -122,13 +122,13 @@ class GUI(Gtk.Window): # pylint: disable=R0902
@result_text.setter
def result_text(self, text: str):
"""Sets the result text."""
"""Set the result text."""
if (buf := self.result.get_buffer()) is not None:
buf.set_text(text)
@property
def gui_settings(self) -> dict:
"""Returns the GUI settings as a dict."""
"""Return the GUI settings as a dict."""
json = {
'host': self.host.get_text(),
'port': self.port.get_value_as_int(),
@ -144,7 +144,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
@gui_settings.setter
def gui_settings(self, json: dict):
"""Sets the GUI settings."""
"""Set the GUI settings."""
self.host.set_text(json.get('host', ''))
self.port.set_value(json.get('port', 0))
self.passwd.set_text(json.get('passwd', ''))
@ -153,7 +153,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
self.savepw.set_active(json.get('savepw', False))
def load_gui_settings(self) -> None:
"""Loads the GUI settings from the cache file."""
"""Load the GUI settings from the cache file."""
try:
with CACHE_FILE.open('rb') as cache:
self.gui_settings = load(cache)
@ -165,7 +165,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
LOGGER.error('Cache file contains garbage: %s', CACHE_FILE)
def save_gui_settings(self):
"""Saves the GUI settings to the cache file."""
"""Save the GUI settings to the cache file."""
try:
with CACHE_FILE.open('w', encoding='utf-8') as cache:
dump(self.gui_settings, cache, indent=2)
@ -173,7 +173,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
LOGGER.error('Insufficient permissions to read: %s', CACHE_FILE)
def show_error(self, message: str):
"""Shows an error message."""
"""Show an error message."""
message_dialog = Gtk.MessageDialog(
transient_for=self,
message_type=Gtk.MessageType.ERROR,
@ -184,7 +184,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
message_dialog.destroy()
def run_rcon(self) -> str:
"""Returns the current RCON settings."""
"""Return the current RCON settings."""
with self.client_cls(
self.host.get_text().strip(),
self.port.get_value_as_int(),
@ -194,7 +194,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
return client.run(*self.command.get_text().strip().split())
def on_button_clicked(self, _):
"""Runs the client."""
"""Run the client."""
try:
result = self.run_rcon()
except ValueError as error:
@ -213,13 +213,13 @@ class GUI(Gtk.Window): # pylint: disable=R0902
self.result_text = result
def terminate(self, *args, **kwargs):
"""Saves the settings and terminates the application."""
"""Save the settings and terminates the application."""
self.save_gui_settings()
Gtk.main_quit(*args, **kwargs)
def main() -> None:
"""Starts the GUI."""
"""Start the GUI."""
args = get_args()
basicConfig(format=LOG_FORMAT, level=DEBUG if args.debug else INFO)

6
rcon/rconclt.py

@ -16,7 +16,7 @@ LOGGER = getLogger('rconclt')
def get_args() -> Namespace:
"""Parses and returns the CLI arguments."""
"""Parse and return the command line arguments."""
parser = ArgumentParser(description='A Minecraft RCON client.')
parser.add_argument('server', help='the server to connect to')
@ -44,7 +44,7 @@ def get_args() -> Namespace:
def run() -> None:
"""Runs the RCON client."""
"""Run the RCON client."""
args = get_args()
basicConfig(format=LOG_FORMAT, level=DEBUG if args.debug else INFO)
@ -59,7 +59,7 @@ def run() -> None:
def main() -> int:
"""Runs the main script with exceptions handled."""
"""Run the main script with exceptions handled."""
with ErrorHandler(LOGGER) as handler:
run()

6
rcon/rconshell.py

@ -18,7 +18,7 @@ LOGGER = getLogger('rconshell')
def get_args() -> Namespace:
"""Parses and returns the CLI arguments."""
"""Parse and returns the CLI arguments."""
parser = ArgumentParser(description='An interactive RCON shell.')
parser.add_argument('server', nargs='?', help='the server to connect to')
@ -38,7 +38,7 @@ def get_args() -> Namespace:
def run() -> None:
"""Runs the RCON shell."""
"""Run the RCON shell."""
args = get_args()
basicConfig(level=INFO, format=LOG_FORMAT)
@ -54,7 +54,7 @@ def run() -> None:
def main() -> int:
"""Runs the main script with exceptions handled."""
"""Run the main script with exceptions handled."""
with ErrorHandler(LOGGER) as handler:
run()

6
rcon/readline.py

@ -21,11 +21,11 @@ class CommandHistory:
__slots__ = ('logger',)
def __init__(self, logger: Logger):
"""Sets the logger to use."""
"""Set the logger to use."""
self.logger = logger
def __enter__(self):
"""Loads the history file."""
"""Load the history file."""
try:
read_history_file(HIST_FILE)
except FileNotFoundError:
@ -40,7 +40,7 @@ class CommandHistory:
return self
def __exit__(self, *_):
"""Writes to the history file."""
"""Write to the history file."""
try:
write_history_file(HIST_FILE)
except PermissionError:

4
rcon/source/async_rcon.py

@ -21,7 +21,7 @@ async def communicate(
writer: StreamWriter,
packet: Packet
) -> Packet:
"""Asynchronous requests."""
"""Make an asynchronous request."""
writer.write(bytes(packet))
await writer.drain()
@ -36,7 +36,7 @@ async def rcon(
passwd: str,
encoding: str = 'utf-8'
) -> str:
"""Runs a command asynchronously."""
"""Run a command asynchronously."""
reader, writer = await open_connection(host, port)
login = Packet.make_login(passwd, encoding=encoding)

8
rcon/source/client.py

@ -14,19 +14,19 @@ class Client(BaseClient, socket_type=SOCK_STREAM):
"""An RCON client."""
def communicate(self, packet: Packet) -> Packet:
"""Sends and receives a packet."""
"""Send and receive a packet."""
with self._socket.makefile('wb') as file:
file.write(bytes(packet))
return self.read()
def read(self) -> Packet:
"""Reads a packet."""
"""Read a packet."""
with self._socket.makefile('rb') as file:
return Packet.read(file)
def login(self, passwd: str, *, encoding: str = 'utf-8') -> bool:
"""Performs a login."""
"""Perform a login."""
request = Packet.make_login(passwd, encoding=encoding)
response = self.communicate(request)
@ -41,7 +41,7 @@ class Client(BaseClient, socket_type=SOCK_STREAM):
return True
def run(self, command: str, *args: str, encoding: str = 'utf-8') -> str:
"""Runs a command."""
"""Run a command."""
request = Packet.make_command(command, *args, encoding=encoding)
response = self.communicate(request)

28
rcon/source/proto.py

@ -23,24 +23,24 @@ class LittleEndianSignedInt32(int):
MAX = 2_147_483_647
def __init__(self, *_):
"""Checks the boundaries."""
"""Check the boundaries."""
super().__init__()
if not self.MIN <= self <= self.MAX:
raise ValueError('Signed int32 out of bounds:', int(self))
def __bytes__(self):
"""Returns the integer as signed little endian."""
"""Return the integer as signed little endian."""
return self.to_bytes(4, 'little', signed=True)
@classmethod
async def aread(cls, reader: StreamReader) -> LittleEndianSignedInt32:
"""Reads the integer from an asynchronous file-like object."""
"""Read the integer from an asynchronous file-like object."""
return cls.from_bytes(await reader.read(4), 'little', signed=True)
@classmethod
def read(cls, file: IO) -> LittleEndianSignedInt32:
"""Reads the integer from a file-like object."""
"""Read the integer from a file-like object."""
return cls.from_bytes(file.read(4), 'little', signed=True)
@ -53,21 +53,21 @@ class Type(Enum):
SERVERDATA_RESPONSE_VALUE = LittleEndianSignedInt32(0)
def __int__(self):
"""Returns the actual integer value."""
"""Return the actual integer value."""
return int(self.value)
def __bytes__(self):
"""Returns the integer value as little endian."""
"""Return the integer value as little endian."""
return bytes(self.value)
@classmethod
async def aread(cls, reader: StreamReader) -> Type:
"""Reads the type from an asynchronous file-like object."""
"""Read the type from an asynchronous file-like object."""
return cls(await LittleEndianSignedInt32.aread(reader))
@classmethod
def read(cls, file: IO) -> Type:
"""Reads the type from a file-like object."""
"""Read the type from a file-like object."""
return cls(LittleEndianSignedInt32.read(file))
@ -80,7 +80,7 @@ class Packet(NamedTuple):
terminator: bytes = TERMINATOR
def __bytes__(self):
"""Returns the packet as bytes with prepended length."""
"""Return the packet as bytes with prepended length."""
payload = bytes(self.id)
payload += bytes(self.type)
payload += self.payload
@ -90,7 +90,7 @@ class Packet(NamedTuple):
@classmethod
async def aread(cls, reader: StreamReader) -> Packet:
"""Reads a packet from an asynchronous file-like object."""
"""Read a packet from an asynchronous file-like object."""
size = await LittleEndianSignedInt32.aread(reader)
id_ = await LittleEndianSignedInt32.aread(reader)
type_ = await Type.aread(reader)
@ -104,7 +104,7 @@ class Packet(NamedTuple):
@classmethod
def read(cls, file: IO) -> Packet:
"""Reads a packet from a file-like object."""
"""Read a packet from a file-like object."""
size = LittleEndianSignedInt32.read(file)
id_ = LittleEndianSignedInt32.read(file)
type_ = Type.read(file)
@ -118,7 +118,7 @@ class Packet(NamedTuple):
@classmethod
def make_command(cls, *args: str, encoding: str = 'utf-8') -> Packet:
"""Creates a command packet."""
"""Create a command packet."""
return cls(
random_request_id(), Type.SERVERDATA_EXECCOMMAND,
b' '.join(map(partial(str.encode, encoding=encoding), args))
@ -126,13 +126,13 @@ class Packet(NamedTuple):
@classmethod
def make_login(cls, passwd: str, *, encoding: str = 'utf-8') -> Packet:
"""Creates a login packet."""
"""Create a login packet."""
return cls(
random_request_id(), Type.SERVERDATA_AUTH, passwd.encode(encoding)
)
def random_request_id() -> LittleEndianSignedInt32:
"""Generates a random request ID."""
"""Generate a random request ID."""
return LittleEndianSignedInt32(randint(0, LittleEndianSignedInt32.MAX))

Loading…
Cancel
Save