Browse Source

General code cleanup

pull/8/head
Richard Neumann 3 years ago
parent
commit
be4bd223c9
  1. 23
      rcon/async_rcon.py
  2. 8
      rcon/client.py
  3. 7
      rcon/console.py
  4. 2
      rcon/gui.py
  5. 36
      rcon/proto.py
  6. 23
      rcon/rconclt.py
  7. 12
      rcon/rconshell.py
  8. 15
      rcon/readline.py

23
rcon/async_rcon.py

@ -1,7 +1,6 @@
"""Asynchronous RCON.""" """Asynchronous RCON."""
from asyncio import open_connection from asyncio import StreamReader, StreamWriter, open_connection
from typing import IO
from rcon.exceptions import RequestIdMismatch, WrongPassword from rcon.exceptions import RequestIdMismatch, WrongPassword
from rcon.proto import Packet, Type from rcon.proto import Packet, Type
@ -10,14 +9,16 @@ from rcon.proto import Packet, Type
__all__ = ['rcon'] __all__ = ['rcon']
async def close(socket: IO) -> None: async def close(writer: StreamWriter) -> None:
"""Close socket asynchronously.""" """Close socket asynchronously."""
socket.close() writer.close()
await socket.wait_closed() await writer.wait_closed()
async def communicate(reader: IO, writer: IO, packet: Packet) -> Packet: async def communicate(
reader: StreamReader, writer: StreamWriter, packet: Packet
) -> Packet:
"""Asynchronous requests.""" """Asynchronous requests."""
writer.write(bytes(packet)) writer.write(bytes(packet))
@ -25,8 +26,14 @@ async def communicate(reader: IO, writer: IO, packet: Packet) -> Packet:
return await Packet.aread(reader) return await Packet.aread(reader)
async def rcon(command: str, *arguments: str, host: str, port: int, async def rcon(
passwd: str, encoding: str = 'utf-8') -> str: command: str,
*arguments: str,
host: str,
port: int,
passwd: str,
encoding: str = 'utf-8'
) -> str:
"""Runs a command asynchronously.""" """Runs a command asynchronously."""
reader, writer = await open_connection(host, port) reader, writer = await open_connection(host, port)

8
rcon/client.py

@ -15,9 +15,11 @@ class Client:
__slots__ = ('_socket', 'host', 'port', 'passwd') __slots__ = ('_socket', 'host', 'port', 'passwd')
def __init__(self, host: str, port: int, *, def __init__(
timeout: Optional[float] = None, self, host: str, port: int, *,
passwd: Optional[str] = None): timeout: Optional[float] = None,
passwd: Optional[str] = None
):
"""Initializes the base client with the SOCK_STREAM socket type.""" """Initializes the base client with the SOCK_STREAM socket type."""
self._socket = socket() self._socket = socket()
self.host = host self.host = host

7
rcon/console.py

@ -7,7 +7,7 @@ from rcon.config import Config
from rcon.exceptions import RequestIdMismatch, WrongPassword from rcon.exceptions import RequestIdMismatch, WrongPassword
__all__ = ['rconcmd'] __all__ = ['PROMPT', 'rconcmd']
EXIT_COMMANDS = {'exit', 'quit'} EXIT_COMMANDS = {'exit', 'quit'}
@ -115,12 +115,13 @@ def process_input(client: Client, passwd: str, prompt: str) -> bool:
print(MSG_SESSION_TIMEOUT) print(MSG_SESSION_TIMEOUT)
try: try:
passwd = login(client, passwd) login(client, passwd)
except EOFError: except EOFError:
print(MSG_LOGIN_ABORTED) print(MSG_LOGIN_ABORTED)
return False return False
else:
print(result)
print(result)
return True return True

2
rcon/gui.py

@ -138,7 +138,7 @@ class GUI(Gtk.Window): # pylint: disable=R0902
self.result_text = json.get('result', '') self.result_text = json.get('result', '')
self.savepw.set_active(json.get('savepw', False)) self.savepw.set_active(json.get('savepw', False))
def load_gui_settings(self) -> dict: def load_gui_settings(self) -> None:
"""Loads the GUI settings from the cache file.""" """Loads the GUI settings from the cache file."""
try: try:
with CACHE_FILE.open('r') as cache: with CACHE_FILE.open('r') as cache:

36
rcon/proto.py

@ -1,6 +1,7 @@
"""Low-level protocol stuff.""" """Low-level protocol stuff."""
from __future__ import annotations from __future__ import annotations
from asyncio import StreamReader
from enum import Enum from enum import Enum
from functools import partial from functools import partial
from logging import getLogger from logging import getLogger
@ -39,9 +40,9 @@ class LittleEndianSignedInt32(int):
return self.to_bytes(4, 'little', signed=True) return self.to_bytes(4, 'little', signed=True)
@classmethod @classmethod
async def aread(cls, file: IO) -> LittleEndianSignedInt32: async def aread(cls, reader: StreamReader) -> LittleEndianSignedInt32:
"""Reads the integer from an ansynchronous file-like object.""" """Reads the integer from an asynchronous file-like object."""
return cls.from_bytes(await file.read(4), 'little', signed=True) return cls.from_bytes(await reader.read(4), 'little', signed=True)
@classmethod @classmethod
def read(cls, file: IO) -> LittleEndianSignedInt32: def read(cls, file: IO) -> LittleEndianSignedInt32:
@ -66,9 +67,9 @@ class Type(Enum):
return bytes(self.value) return bytes(self.value)
@classmethod @classmethod
async def aread(cls, file: IO) -> Type: async def aread(cls, reader: StreamReader) -> Type:
"""Reads the type from an asynchronous file-like object.""" """Reads the type from an asynchronous file-like object."""
return cls(await LittleEndianSignedInt32.aread(file)) return cls(await LittleEndianSignedInt32.aread(reader))
@classmethod @classmethod
def read(cls, file: IO) -> Type: def read(cls, file: IO) -> Type:
@ -94,13 +95,13 @@ class Packet(NamedTuple):
return size + payload return size + payload
@classmethod @classmethod
async def aread(cls, file: IO) -> Packet: async def aread(cls, reader: StreamReader) -> Packet:
"""Reads a packet from an asynchronous file-like object.""" """Reads a packet from an asynchronous file-like object."""
size = await LittleEndianSignedInt32.aread(file) size = await LittleEndianSignedInt32.aread(reader)
id_ = await LittleEndianSignedInt32.aread(file) id_ = await LittleEndianSignedInt32.aread(reader)
type_ = await Type.aread(file) type_ = await Type.aread(reader)
payload = await file.read(size - 10) payload = await reader.read(size - 10)
terminator = await file.read(2) terminator = await reader.read(2)
if terminator != TERMINATOR: if terminator != TERMINATOR:
LOGGER.warning('Unexpected terminator: %s', terminator) LOGGER.warning('Unexpected terminator: %s', terminator)
@ -124,11 +125,14 @@ class Packet(NamedTuple):
@classmethod @classmethod
def make_command(cls, *args: str, encoding: str = 'utf-8') -> Packet: def make_command(cls, *args: str, encoding: str = 'utf-8') -> Packet:
"""Creates a command packet.""" """Creates a command packet."""
return cls(random_request_id(), Type.SERVERDATA_EXECCOMMAND, return cls(
b' '.join(map(partial(str.encode, encoding=encoding), args))) random_request_id(), Type.SERVERDATA_EXECCOMMAND,
b' '.join(map(partial(str.encode, encoding=encoding), args))
)
@classmethod @classmethod
def make_login(cls, passwd: bytes, *, encoding: str = 'utf-8') -> Packet: def make_login(cls, passwd: str, *, encoding: str = 'utf-8') -> Packet:
"""Creates a login packet.""" """Creates a login packet."""
return cls(random_request_id(), Type.SERVERDATA_AUTH, return cls(
passwd.encode(encoding)) random_request_id(), Type.SERVERDATA_AUTH, passwd.encode(encoding)
)

23
rcon/rconclt.py

@ -20,15 +20,22 @@ def get_args() -> Namespace:
parser = ArgumentParser(description='A Minecraft RCON client.') parser = ArgumentParser(description='A Minecraft RCON client.')
parser.add_argument('server', help='the server to connect to') parser.add_argument('server', help='the server to connect to')
parser.add_argument('-c', '--config', type=Path, metavar='file', parser.add_argument(
default=CONFIG_FILES, help='the configuration file') '-c', '--config', type=Path, metavar='file', default=CONFIG_FILES,
parser.add_argument('-d', '--debug', action='store_true', help='the configuration file'
help='print additional debug information') )
parser.add_argument('-t', '--timeout', type=float, metavar='seconds', parser.add_argument(
help='connection timeout in seconds') '-d', '--debug', action='store_true',
help='print additional debug information'
)
parser.add_argument(
'-t', '--timeout', type=float, metavar='seconds',
help='connection timeout in seconds'
)
parser.add_argument('command', help='command to execute on the server') parser.add_argument('command', help='command to execute on the server')
parser.add_argument('argument', nargs='*', default=(), parser.add_argument(
help='arguments for the command') 'argument', nargs='*', default=(), help='arguments for the command'
)
return parser.parse_args() return parser.parse_args()

12
rcon/rconshell.py

@ -21,10 +21,14 @@ def get_args() -> Namespace:
parser = ArgumentParser(description='An interactive RCON shell.') parser = ArgumentParser(description='An interactive RCON shell.')
parser.add_argument('server', nargs='?', help='the server to connect to') parser.add_argument('server', nargs='?', help='the server to connect to')
parser.add_argument('-c', '--config', type=Path, metavar='file', parser.add_argument(
default=CONFIG_FILES, help='the configuration file') '-c', '--config', type=Path, metavar='file', default=CONFIG_FILES,
parser.add_argument('-p', '--prompt', default=PROMPT, metavar='PS1', help='the configuration file'
help='the shell prompt') )
parser.add_argument(
'-p', '--prompt', default=PROMPT, metavar='PS1',
help='the shell prompt'
)
return parser.parse_args() return parser.parse_args()

15
rcon/readline.py

@ -30,11 +30,13 @@ class CommandHistory:
try: try:
read_history_file(HIST_FILE) read_history_file(HIST_FILE)
except FileNotFoundError: except FileNotFoundError:
self.logger.warning('Could not find history file: %s', self.logger.warning(
HIST_FILE) 'Could not find history file: %s', HIST_FILE
)
except PermissionError: except PermissionError:
self.logger.error('Insufficient permissions to read: %s', self.logger.error(
HIST_FILE) 'Insufficient permissions to read: %s', HIST_FILE
)
return self return self
@ -43,5 +45,6 @@ class CommandHistory:
try: try:
write_history_file(HIST_FILE) write_history_file(HIST_FILE)
except PermissionError: except PermissionError:
self.logger.error('Insufficient permissions to write: %s', self.logger.error(
HIST_FILE) 'Insufficient permissions to write: %s', HIST_FILE
)

Loading…
Cancel
Save