mirror of https://github.com/conqp/rcon
11 changed files with 93 additions and 76 deletions
@ -0,0 +1,63 @@ |
|||||
|
"""Common base client.""" |
||||
|
|
||||
|
from socket import SOCK_STREAM, SocketKind, socket |
||||
|
from typing import Optional |
||||
|
|
||||
|
|
||||
|
__all__ = ['BaseClient'] |
||||
|
|
||||
|
|
||||
|
class BaseClient: |
||||
|
"""A common RCON client.""" |
||||
|
|
||||
|
__slots__ = ('_socket', 'host', 'port', 'passwd') |
||||
|
|
||||
|
def __init__( |
||||
|
self, host: str, port: int, *, |
||||
|
type: SocketKind = SOCK_STREAM, |
||||
|
timeout: Optional[float] = None, |
||||
|
passwd: Optional[str] = None |
||||
|
): |
||||
|
"""Initializes the base client with the SOCK_STREAM socket type.""" |
||||
|
self._socket = socket(type=type) |
||||
|
self.host = host |
||||
|
self.port = port |
||||
|
self.timeout = timeout |
||||
|
self.passwd = passwd |
||||
|
|
||||
|
def __enter__(self): |
||||
|
"""Attempts an auto-login if a password is set.""" |
||||
|
self._socket.__enter__() |
||||
|
self.connect(login=True) |
||||
|
return self |
||||
|
|
||||
|
def __exit__(self, typ, value, traceback): |
||||
|
"""Delegates to the underlying socket's exit method.""" |
||||
|
return self._socket.__exit__(typ, value, traceback) |
||||
|
|
||||
|
@property |
||||
|
def timeout(self) -> float: |
||||
|
"""Returns the socket timeout.""" |
||||
|
return self._socket.gettimeout() |
||||
|
|
||||
|
@timeout.setter |
||||
|
def timeout(self, timeout: float): |
||||
|
"""Sets the socket timeout.""" |
||||
|
self._socket.settimeout(timeout) |
||||
|
|
||||
|
def connect(self, login: bool = False) -> None: |
||||
|
"""Connects the socket and attempts a |
||||
|
login if wanted and a password is set. |
||||
|
""" |
||||
|
self._socket.connect((self.host, self.port)) |
||||
|
|
||||
|
if login and self.passwd is not None: |
||||
|
self.login(self.passwd) |
||||
|
|
||||
|
def close(self) -> None: |
||||
|
"""Closes the socket connection.""" |
||||
|
self._socket.close() |
||||
|
|
||||
|
def login(self, passwd: str) -> bool: |
||||
|
"""Performs a login.""" |
||||
|
raise NotImplementedError() |
Loading…
Reference in new issue