Browse Source

complete typing

pull/43/head
Alex Nørgaard 2 years ago
parent
commit
43150c1336
No known key found for this signature in database GPG Key ID: 94D54F6A3604E97
  1. 61
      .github/workflows/coverage_and_lint.yml
  2. 2
      .gitignore
  3. 3
      MANIFEST.in
  4. 32
      a2s/__init__.py
  5. 132
      a2s/a2s_async.py
  6. 48
      a2s/a2s_async.pyi
  7. 37
      a2s/a2s_fragment.py
  8. 65
      a2s/a2s_sync.py
  9. 48
      a2s/a2s_sync.pyi
  10. 109
      a2s/byteio.py
  11. 24
      a2s/datacls.py
  12. 1
      a2s/exceptions.py
  13. 137
      a2s/info.py
  14. 26
      a2s/players.py
  15. 0
      a2s/py.typed
  16. 27
      a2s/rules.py
  17. 19
      pyproject.toml
  18. 4
      setup.py

61
.github/workflows/coverage_and_lint.yml

@ -0,0 +1,61 @@
name: Type Coverage and Linting
on:
push:
branches:
- master
pull_request:
branches:
- master
types:
- opened
- synchronize
jobs:
job:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: [ '3.7', '3.8', '3.9', '3.10', '3.11' ]
name: "Type Coverage and Linting @ ${{ matrix.python-version }}"
steps:
- name: "Checkout Repository"
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: "Setup Python @ ${{ matrix.python-version }}"
uses: actions/setup-python@v3
with:
python-version: "${{ matrix.python-version }}"
- name: "Install Python deps @ ${{ matrix.python-version }}"
env:
PY_VER: "${{ matrix.python-version }}"
run: |
pip install -U .
- uses: actions/setup-node@v3
with:
node-version: "17"
- run: npm install --location=global pyright@latest
- name: "Type Coverage @ ${{ matrix.python-version }}"
run: |
pyright
pyright --ignoreexternal --lib --verifytypes a2s
- name: Lint
if: ${{ github.event_name != 'pull_request' }}
uses: github/super-linter/slim@v4
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DEFAULT_BRANCH: master
VALIDATE_ALL_CODEBASE: false
VALIDATE_PYTHON_BLACK: true
VALIDATE_PYTHON_ISORT: true
LINTER_RULES_PATH: /
PYTHON_ISORT_CONFIG_FILE: pyproject.toml
PYTHON_BLACK_CONFIG_FILE: pyproject.toml

2
.gitignore

@ -2,4 +2,4 @@ __pycache__
build build
dist dist
*.egg-info *.egg-info
.venv/

3
MANIFEST.in

@ -0,0 +1,3 @@
include README.md
include LICENSE
include a2s/py.typed

32
a2s/__init__.py

@ -1,5 +1,29 @@
from a2s.exceptions import BrokenMessageError, BufferExhaustedError """
MIT License
from a2s.info import info, ainfo, SourceInfo, GoldSrcInfo Copyright (c) 2020 Gabriel Huber
from a2s.players import players, aplayers, Player
from a2s.rules import rules, arules Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from a2s.exceptions import BrokenMessageError as BrokenMessageError, BufferExhaustedError as BufferExhaustedError
from a2s.info import info as info, ainfo as ainfo, SourceInfo as SourceInfo, GoldSrcInfo as GoldSrcInfo
from a2s.players import players as players, aplayers as aplayers, Player as Player
from a2s.rules import rules as rules, arules as arules

132
a2s/a2s_async.py

@ -1,29 +1,73 @@
"""
MIT License
Copyright (c) 2020 Gabriel Huber
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from __future__ import annotations
import asyncio import asyncio
import io
import logging import logging
import time import time
import io from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Tuple, Type, TypeVar, Union
from a2s.exceptions import BrokenMessageError from a2s.a2s_fragment import A2SFragment, decode_fragment
from a2s.a2s_fragment import decode_fragment
from a2s.defaults import DEFAULT_RETRIES
from a2s.byteio import ByteReader from a2s.byteio import ByteReader
from a2s.defaults import DEFAULT_RETRIES
from a2s.exceptions import BrokenMessageError
from .info import GoldSrcInfo, InfoProtocol, SourceInfo
from .players import Player, PlayersProtocol
from .rules import RulesProtocol
if TYPE_CHECKING:
from typing_extensions import Self
HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF" HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF"
HEADER_MULTI = b"\xFE\xFF\xFF\xFF" HEADER_MULTI = b"\xFE\xFF\xFF\xFF"
A2S_CHALLENGE_RESPONSE = 0x41 A2S_CHALLENGE_RESPONSE = 0x41
PROTOCOLS = Union[InfoProtocol, PlayersProtocol, RulesProtocol]
logger: logging.Logger = logging.getLogger("a2s")
logger = logging.getLogger("a2s") T = TypeVar("T", bound=PROTOCOLS)
async def request_async(address, timeout, encoding, a2s_proto): async def request_async(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[T]
) -> Union[SourceInfo, GoldSrcInfo, List[Player], Dict[str, str]]:
conn = await A2SStreamAsync.create(address, timeout) conn = await A2SStreamAsync.create(address, timeout)
response = await request_async_impl(conn, encoding, a2s_proto) response = await request_async_impl(conn, encoding, a2s_proto)
conn.close() conn.close()
return response return response
async def request_async_impl(conn, encoding, a2s_proto, challenge=0, retries=0, ping=None):
async def request_async_impl(
conn: A2SStreamAsync,
encoding: str,
a2s_proto: Type[T],
challenge: int = 0,
retries: int = 0,
ping: Optional[float] = None,
) -> Union[SourceInfo, GoldSrcInfo, Dict[str, str], List[Player]]:
send_time = time.monotonic() send_time = time.monotonic()
resp_data = await conn.request(a2s_proto.serialize_request(challenge)) resp_data = await conn.request(a2s_proto.serialize_request(challenge))
recv_time = time.monotonic() recv_time = time.monotonic()
@ -31,108 +75,104 @@ async def request_async_impl(conn, encoding, a2s_proto, challenge=0, retries=0,
if retries == 0: if retries == 0:
ping = recv_time - send_time ping = recv_time - send_time
reader = ByteReader( reader = ByteReader(io.BytesIO(resp_data), endian="<", encoding=encoding)
io.BytesIO(resp_data), endian="<", encoding=encoding)
response_type = reader.read_uint8() response_type = reader.read_uint8()
if response_type == A2S_CHALLENGE_RESPONSE: if response_type == A2S_CHALLENGE_RESPONSE:
if retries >= DEFAULT_RETRIES: if retries >= DEFAULT_RETRIES:
raise BrokenMessageError( raise BrokenMessageError("Server keeps sending challenge responses")
"Server keeps sending challenge responses")
challenge = reader.read_uint32() challenge = reader.read_uint32()
return await request_async_impl( return await request_async_impl(conn, encoding, a2s_proto, challenge, retries + 1, ping)
conn, encoding, a2s_proto, challenge, retries + 1, ping)
if not a2s_proto.validate_response_type(response_type): if not a2s_proto.validate_response_type(response_type):
raise BrokenMessageError( raise BrokenMessageError("Invalid response type: " + hex(response_type))
"Invalid response type: " + hex(response_type))
return a2s_proto.deserialize_response(reader, response_type, ping) return a2s_proto.deserialize_response(reader, response_type, ping)
class A2SProtocol(asyncio.DatagramProtocol): class A2SProtocol(asyncio.DatagramProtocol):
def __init__(self): def __init__(self):
self.recv_queue = asyncio.Queue() self.recv_queue: asyncio.Queue[bytes] = asyncio.Queue()
self.error_event = asyncio.Event() self.error_event: asyncio.Event = asyncio.Event()
self.error = None self.error: Optional[Exception] = None
self.fragment_buf = [] self.fragment_buf: List[A2SFragment] = []
def connection_made(self, transport): def connection_made(self, transport: asyncio.DatagramTransport) -> None:
self.transport = transport self.transport = transport
def datagram_received(self, packet, addr): def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
header = packet[:4] header = data[:4]
payload = packet[4:] payload = data[4:]
if header == HEADER_SIMPLE: if header == HEADER_SIMPLE:
logger.debug("Received single packet: %r", payload) logger.debug("Received single packet: %r", payload)
self.recv_queue.put_nowait(payload) self.recv_queue.put_nowait(payload)
elif header == HEADER_MULTI: elif header == HEADER_MULTI:
self.fragment_buf.append(decode_fragment(payload)) self.fragment_buf.append(decode_fragment(payload))
if len(self.fragment_buf) < self.fragment_buf[0].fragment_count: if len(self.fragment_buf) < self.fragment_buf[0].fragment_count:
return # Wait for more packets to arrive return # Wait for more packets to arrive
self.fragment_buf.sort(key=lambda f: f.fragment_id) self.fragment_buf.sort(key=lambda f: f.fragment_id)
reassembled = b"".join( reassembled = b"".join(fragment.payload for fragment in self.fragment_buf)
fragment.payload for fragment in self.fragment_buf)
# Sometimes there's an additional header present # Sometimes there's an additional header present
if reassembled.startswith(b"\xFF\xFF\xFF\xFF"): if reassembled.startswith(b"\xFF\xFF\xFF\xFF"):
reassembled = reassembled[4:] reassembled = reassembled[4:]
logger.debug("Received %s part packet with content: %r", logger.debug("Received %s part packet with content: %r", len(self.fragment_buf), reassembled)
len(self.fragment_buf), reassembled)
self.recv_queue.put_nowait(reassembled) self.recv_queue.put_nowait(reassembled)
self.fragment_buf = [] self.fragment_buf = []
else: else:
self.error = BrokenMessageError( self.error = BrokenMessageError("Invalid packet header: " + repr(header))
"Invalid packet header: " + repr(header))
self.error_event.set() self.error_event.set()
def error_received(self, exc): def error_received(self, exc: Exception) -> None:
self.error = exc self.error = exc
self.error_event.set() self.error_event.set()
def raise_on_error(self): def raise_on_error(self) -> NoReturn:
error = self.error assert self.error
error: Exception = self.error
self.error = None self.error = None
self.error_event.clear() self.error_event.clear()
raise error raise error
class A2SStreamAsync: class A2SStreamAsync:
def __init__(self, transport, protocol, timeout): def __init__(self, transport: asyncio.DatagramTransport, protocol: A2SProtocol, timeout: float) -> None:
self.transport = transport self.transport = transport
self.protocol = protocol self.protocol = protocol
self.timeout = timeout self.timeout = timeout
def __del__(self): def __del__(self) -> None:
self.close() self.close()
@classmethod @classmethod
async def create(cls, address, timeout): async def create(cls, address: Tuple[str, int], timeout: float) -> Self:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint( transport, protocol = await loop.create_datagram_endpoint(lambda: A2SProtocol(), remote_addr=address)
lambda: A2SProtocol(), remote_addr=address)
return cls(transport, protocol, timeout) return cls(transport, protocol, timeout)
def send(self, payload): def send(self, payload: bytes) -> None:
logger.debug("Sending packet: %r", payload) logger.debug("Sending packet: %r", payload)
packet = HEADER_SIMPLE + payload packet = HEADER_SIMPLE + payload
self.transport.sendto(packet) self.transport.sendto(packet)
async def recv(self): async def recv(self) -> bytes:
queue_task = asyncio.create_task(self.protocol.recv_queue.get()) queue_task = asyncio.create_task(self.protocol.recv_queue.get())
error_task = asyncio.create_task(self.protocol.error_event.wait()) error_task = asyncio.create_task(self.protocol.error_event.wait())
done, pending = await asyncio.wait({queue_task, error_task}, done, pending = await asyncio.wait(
timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED) {queue_task, error_task}, timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED
)
for task in pending: task.cancel() for task in pending:
task.cancel()
if error_task in done: if error_task in done:
self.protocol.raise_on_error() self.protocol.raise_on_error()
if not done: if not done:
raise asyncio.TimeoutError() raise asyncio.TimeoutError()
return queue_task.result() return queue_task.result()
async def request(self, payload): async def request(self, payload: bytes) -> bytes:
self.send(payload) self.send(payload)
return await self.recv() return await self.recv()
def close(self): def close(self) -> None:
self.transport.close() self.transport.close()

48
a2s/a2s_async.pyi

@ -0,0 +1,48 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type, Union, overload
from .a2s_async import A2SStreamAsync
if TYPE_CHECKING:
from .info import GoldSrcInfo, InfoProtocol, SourceInfo
from .players import Player, PlayersProtocol
from .rules import RulesProtocol
@overload
async def request_async(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[InfoProtocol]
) -> Union[SourceInfo, GoldSrcInfo]: ...
@overload
async def request_async(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[PlayersProtocol]
) -> List[Player]: ...
@overload
async def request_async(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[RulesProtocol]
) -> Dict[str, str]: ...
@overload
async def request_async_impl(
conn: A2SStreamAsync,
encoding: str,
a2s_proto: Type[InfoProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Union[SourceInfo, GoldSrcInfo]: ...
@overload
async def request_async_impl(
conn: A2SStreamAsync,
encoding: str,
a2s_proto: Type[PlayersProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> List[Player]: ...
@overload
async def request_async_impl(
conn: A2SStreamAsync,
encoding: str,
a2s_proto: Type[RulesProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Dict[str, str]: ...

37
a2s/a2s_fragment.py

@ -4,30 +4,37 @@ import io
from a2s.byteio import ByteReader from a2s.byteio import ByteReader
class A2SFragment: class A2SFragment:
def __init__(self, message_id, fragment_count, fragment_id, mtu, def __init__(
decompressed_size=0, crc=0, payload=b""): self,
self.message_id = message_id message_id: int,
self.fragment_count = fragment_count fragment_count: int,
self.fragment_id = fragment_id fragment_id: int,
self.mtu = mtu mtu: int,
self.decompressed_size = decompressed_size decompressed_size: int = 0,
self.crc = crc crc: int = 0,
self.payload = payload payload: bytes = b"",
) -> None:
self.message_id: int = message_id
self.fragment_count: int = fragment_count
self.fragment_id: int = fragment_id
self.mtu: int = mtu
self.decompressed_size: int = decompressed_size
self.crc: int = crc
self.payload: bytes = payload
@property @property
def is_compressed(self): def is_compressed(self) -> bool:
return bool(self.message_id & (1 << 15)) return bool(self.message_id & (1 << 15))
def decode_fragment(data):
reader = ByteReader( def decode_fragment(data: bytes) -> A2SFragment:
io.BytesIO(data), endian="<", encoding="utf-8") reader = ByteReader(io.BytesIO(data), endian="<", encoding="utf-8")
frag = A2SFragment( frag = A2SFragment(
message_id=reader.read_uint32(), message_id=reader.read_uint32(),
fragment_count=reader.read_uint8(), fragment_count=reader.read_uint8(),
fragment_id=reader.read_uint8(), fragment_id=reader.read_uint8(),
mtu=reader.read_uint16() mtu=reader.read_uint16(),
) )
if frag.is_compressed: if frag.is_compressed:
frag.decompressed_size = reader.read_uint32() frag.decompressed_size = reader.read_uint32()

65
a2s/a2s_sync.py

@ -1,29 +1,42 @@
import socket from __future__ import annotations
import io
import logging import logging
import socket
import time import time
import io from typing import Dict, List, Optional, Tuple, Type, TypeVar, Union
from a2s.exceptions import BrokenMessageError
from a2s.a2s_fragment import decode_fragment from a2s.a2s_fragment import decode_fragment
from a2s.defaults import DEFAULT_RETRIES
from a2s.byteio import ByteReader from a2s.byteio import ByteReader
from a2s.defaults import DEFAULT_RETRIES
from a2s.exceptions import BrokenMessageError
from .info import GoldSrcInfo, InfoProtocol, SourceInfo
from .players import Player, PlayersProtocol
from .rules import RulesProtocol
HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF" HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF"
HEADER_MULTI = b"\xFE\xFF\xFF\xFF" HEADER_MULTI = b"\xFE\xFF\xFF\xFF"
A2S_CHALLENGE_RESPONSE = 0x41 A2S_CHALLENGE_RESPONSE = 0x41
PROTOCOLS = Union[InfoProtocol, RulesProtocol, PlayersProtocol]
logger: logging.Logger = logging.getLogger("a2s")
logger = logging.getLogger("a2s") T = TypeVar("T", InfoProtocol, RulesProtocol, PlayersProtocol)
def request_sync(address, timeout, encoding, a2s_proto): def request_sync(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[T]
) -> Union[List[Player], GoldSrcInfo, SourceInfo, Dict[str, str]]:
conn = A2SStream(address, timeout) conn = A2SStream(address, timeout)
response = request_sync_impl(conn, encoding, a2s_proto) response = request_sync_impl(conn, encoding, a2s_proto) # type: ignore
conn.close() conn.close()
return response return response
def request_sync_impl(conn, encoding, a2s_proto, challenge=0, retries=0, ping=None):
def request_sync_impl(
conn: A2SStream, encoding: str, a2s_proto: Type[T], challenge: int = 0, retries: int = 0, ping: Optional[float] = None
) -> Union[SourceInfo, GoldSrcInfo, Dict[str, str], List[Player]]:
send_time = time.monotonic() send_time = time.monotonic()
resp_data = conn.request(a2s_proto.serialize_request(challenge)) resp_data = conn.request(a2s_proto.serialize_request(challenge))
recv_time = time.monotonic() recv_time = time.monotonic()
@ -31,40 +44,36 @@ def request_sync_impl(conn, encoding, a2s_proto, challenge=0, retries=0, ping=No
if retries == 0: if retries == 0:
ping = recv_time - send_time ping = recv_time - send_time
reader = ByteReader( reader = ByteReader(io.BytesIO(resp_data), endian="<", encoding=encoding)
io.BytesIO(resp_data), endian="<", encoding=encoding)
response_type = reader.read_uint8() response_type = reader.read_uint8()
if response_type == A2S_CHALLENGE_RESPONSE: if response_type == A2S_CHALLENGE_RESPONSE:
if retries >= DEFAULT_RETRIES: if retries >= DEFAULT_RETRIES:
raise BrokenMessageError( raise BrokenMessageError("Server keeps sending challenge responses")
"Server keeps sending challenge responses")
challenge = reader.read_uint32() challenge = reader.read_uint32()
return request_sync_impl( return request_sync_impl(conn, encoding, a2s_proto, challenge, retries + 1, ping)
conn, encoding, a2s_proto, challenge, retries + 1, ping)
if not a2s_proto.validate_response_type(response_type): if not a2s_proto.validate_response_type(response_type):
raise BrokenMessageError( raise BrokenMessageError("Invalid response type: " + hex(response_type))
"Invalid response type: " + hex(response_type))
return a2s_proto.deserialize_response(reader, response_type, ping) return a2s_proto.deserialize_response(reader, response_type, ping)
class A2SStream: class A2SStream:
def __init__(self, address, timeout): def __init__(self, address: Tuple[str, int], timeout: float) -> None:
self.address = address self.address: Tuple[str, int] = address
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.settimeout(timeout) self._socket.settimeout(timeout)
def __del__(self): def __del__(self) -> None:
self.close() self.close()
def send(self, data): def send(self, data: bytes) -> None:
logger.debug("Sending packet: %r", data) logger.debug("Sending packet: %r", data)
packet = HEADER_SIMPLE + data packet = HEADER_SIMPLE + data
self._socket.sendto(packet, self.address) self._socket.sendto(packet, self.address)
def recv(self): def recv(self) -> bytes:
packet = self._socket.recv(65535) packet = self._socket.recv(65535)
header = packet[:4] header = packet[:4]
data = packet[4:] data = packet[4:]
@ -81,16 +90,14 @@ class A2SStream:
# Sometimes there's an additional header present # Sometimes there's an additional header present
if reassembled.startswith(b"\xFF\xFF\xFF\xFF"): if reassembled.startswith(b"\xFF\xFF\xFF\xFF"):
reassembled = reassembled[4:] reassembled = reassembled[4:]
logger.debug("Received %s part packet with content: %r", logger.debug("Received %s part packet with content: %r", len(fragments), reassembled)
len(fragments), reassembled)
return reassembled return reassembled
else: else:
raise BrokenMessageError( raise BrokenMessageError("Invalid packet header: " + repr(header))
"Invalid packet header: " + repr(header))
def request(self, payload): def request(self, payload: bytes) -> bytes:
self.send(payload) self.send(payload)
return self.recv() return self.recv()
def close(self): def close(self) -> None:
self._socket.close() self._socket.close()

48
a2s/a2s_sync.pyi

@ -0,0 +1,48 @@
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type, Union, overload
from .a2s_sync import A2SStream
if TYPE_CHECKING:
from .info import GoldSrcInfo, InfoProtocol, SourceInfo
from .players import Player, PlayersProtocol
from .rules import RulesProtocol
@overload
def request_sync(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[InfoProtocol]
) -> Union[SourceInfo, GoldSrcInfo]: ...
@overload
def request_sync(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[PlayersProtocol]
) -> List[Player]: ...
@overload
def request_sync(
address: Tuple[str, int], timeout: float, encoding: str, a2s_proto: Type[RulesProtocol]
) -> Dict[str, str]: ...
@overload
def request_sync_impl(
conn: A2SStream,
encoding: str,
a2s_proto: Type[InfoProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Union[SourceInfo, GoldSrcInfo]: ...
@overload
def request_sync_impl(
conn: A2SStream,
encoding: str,
a2s_proto: Type[PlayersProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> List[Player]: ...
@overload
def request_sync_impl(
conn: A2SStream,
encoding: str,
a2s_proto: Type[RulesProtocol],
challenge: int = ...,
retries: int = ...,
ping: Optional[float] = ...,
) -> Dict[str, str]: ...

109
a2s/byteio.py

@ -1,80 +1,92 @@
import struct from __future__ import annotations
import io import io
import struct
from typing import TYPE_CHECKING, Any, Optional, Tuple, Union
from a2s.exceptions import BufferExhaustedError from a2s.exceptions import BufferExhaustedError
from .defaults import DEFAULT_ENCODING
if TYPE_CHECKING:
from typing_extensions import Literal
class ByteReader(): STRUCT_OPTIONS = Literal[
def __init__(self, stream, endian="=", encoding=None): "x", "c", "b", "B", "?", "h", "H", "i", "I", "l", "L", "q", "Q", "n", "N", "e", "f", "d", "s", "p", "P"
self.stream = stream ]
self.endian = endian
self.encoding = encoding
def read(self, size=-1):
class ByteReader:
def __init__(self, stream: io.BytesIO, endian: str = "=", encoding: Optional[str] = None) -> None:
self.stream: io.BytesIO = stream
self.endian: str = endian
self.encoding: Optional[str] = encoding
def read(self, size: int = -1) -> bytes:
data = self.stream.read(size) data = self.stream.read(size)
if size > -1 and len(data) != size: if size > -1 and len(data) != size:
raise BufferExhaustedError() raise BufferExhaustedError()
return data return data
def peek(self, size=-1): def peek(self, size: int = -1) -> bytes:
cur_pos = self.stream.tell() cur_pos = self.stream.tell()
data = self.stream.read(size) data = self.stream.read(size)
self.stream.seek(cur_pos, io.SEEK_SET) self.stream.seek(cur_pos, io.SEEK_SET)
return data return data
def unpack(self, fmt): def unpack(self, fmt: STRUCT_OPTIONS) -> Tuple[Any, ...]:
fmt = self.endian + fmt new_fmt = self.endian + fmt
fmt_size = struct.calcsize(fmt) fmt_size = struct.calcsize(fmt)
return struct.unpack(fmt, self.read(fmt_size)) return struct.unpack(new_fmt, self.read(fmt_size))
def unpack_one(self, fmt): def unpack_one(self, fmt: STRUCT_OPTIONS) -> Any:
values = self.unpack(fmt) values = self.unpack(fmt)
assert len(values) == 1 assert len(values) == 1
return values[0] return values[0]
def read_int8(self): def read_int8(self) -> int:
return self.unpack_one("b") return self.unpack_one("b")
def read_uint8(self): def read_uint8(self) -> int:
return self.unpack_one("B") return self.unpack_one("B")
def read_int16(self): def read_int16(self) -> int:
return self.unpack_one("h") return self.unpack_one("h")
def read_uint16(self): def read_uint16(self) -> int:
return self.unpack_one("H") return self.unpack_one("H")
def read_int32(self): def read_int32(self) -> int:
return self.unpack_one("l") return self.unpack_one("l")
def read_uint32(self): def read_uint32(self) -> int:
return self.unpack_one("L") return self.unpack_one("L")
def read_int64(self): def read_int64(self) -> int:
return self.unpack_one("q") return self.unpack_one("q")
def read_uint64(self): def read_uint64(self) -> int:
return self.unpack_one("Q") return self.unpack_one("Q")
def read_float(self): def read_float(self) -> float:
return self.unpack_one("f") return self.unpack_one("f")
def read_double(self): def read_double(self) -> float:
return self.unpack_one("d") return self.unpack_one("d")
def read_bool(self): def read_bool(self) -> bool:
return bool(self.unpack_one("b")) return bool(self.unpack_one("b"))
def read_char(self): def read_char(self) -> str:
char = self.unpack_one("c") char = self.unpack_one("c")
if self.encoding is not None: if self.encoding is not None:
return char.decode(self.encoding, errors="replace") return char.decode(self.encoding, errors="replace")
else: else:
return char return char.decode(DEFAULT_ENCODING, errors="replace")
def read_cstring(self, charsize=1): def read_cstring(self, charsize: int = 1) -> str:
string = b"" string = b""
while True: while True:
c = self.read(charsize) c = self.read(charsize)
@ -86,64 +98,65 @@ class ByteReader():
if self.encoding is not None: if self.encoding is not None:
return string.decode(self.encoding, errors="replace") return string.decode(self.encoding, errors="replace")
else: else:
return string return string.decode(DEFAULT_ENCODING, errors="replace")
class ByteWriter(): class ByteWriter:
def __init__(self, stream, endian="=", encoding=None): def __init__(self, stream: io.BytesIO, endian: str = "=", encoding: Optional[str] = None) -> None:
self.stream = stream self.stream: io.BytesIO = stream
self.endian = endian self.endian: str = endian
self.encoding = encoding self.encoding: Optional[str] = encoding
def write(self, *args): def write(self, *args: bytes) -> int:
return self.stream.write(*args) return self.stream.write(*args)
def pack(self, fmt, *values): def pack(self, fmt: str, *values: Any) -> int:
fmt = self.endian + fmt fmt = self.endian + fmt
fmt_size = struct.calcsize(fmt)
return self.stream.write(struct.pack(fmt, *values)) return self.stream.write(struct.pack(fmt, *values))
def write_int8(self, val): def write_int8(self, val: int) -> None:
self.pack("b", val) self.pack("b", val)
def write_uint8(self, val): def write_uint8(self, val: int) -> None:
self.pack("B", val) self.pack("B", val)
def write_int16(self, val): def write_int16(self, val: int) -> None:
self.pack("h", val) self.pack("h", val)
def write_uint16(self, val): def write_uint16(self, val: int) -> None:
self.pack("H", val) self.pack("H", val)
def write_int32(self, val): def write_int32(self, val: int) -> None:
self.pack("l", val) self.pack("l", val)
def write_uint32(self, val): def write_uint32(self, val: int) -> None:
self.pack("L", val) self.pack("L", val)
def write_int64(self, val): def write_int64(self, val: int) -> None:
self.pack("q", val) self.pack("q", val)
def write_uint64(self, val): def write_uint64(self, val: int) -> None:
self.pack("Q", val) self.pack("Q", val)
def write_float(self, val): def write_float(self, val: float) -> None:
self.pack("f", val) self.pack("f", val)
def write_double(self, val): def write_double(self, val: float) -> None:
self.pack("d", val) self.pack("d", val)
def write_bool(self, val): def write_bool(self, val: bool) -> None:
self.pack("b", val) self.pack("b", val)
def write_char(self, val): def write_char(self, val: str) -> None:
if self.encoding is not None: if self.encoding is not None:
self.pack("c", val.encode(self.encoding)) self.pack("c", val.encode(self.encoding))
else: else:
self.pack("c", val) self.pack("c", val)
def write_cstring(self, val): def write_cstring(self, val: Union[str, bytes]) -> None:
if self.encoding is not None: if self.encoding is not None:
assert isinstance(val, str)
self.write(val.encode(self.encoding) + b"\x00") self.write(val.encode(self.encoding) + b"\x00")
else: else:
assert isinstance(val, bytes)
self.write(val + b"\x00") self.write(val + b"\x00")

24
a2s/datacls.py

@ -5,29 +5,37 @@ Check out the official documentation to see what this is trying to
achieve: achieve:
https://docs.python.org/3/library/dataclasses.html https://docs.python.org/3/library/dataclasses.html
""" """
from __future__ import annotations
import collections from collections import OrderedDict
import copy import copy
from typing import Any, Generator, Tuple, TYPE_CHECKING, Dict
if TYPE_CHECKING:
from typing_extensions import Self
class DataclsBase: class DataclsBase:
def __init__(self, **kwargs): _defaults: "OrderedDict[str, Any]"
def __init__(self, **kwargs: Any) -> None:
for name, value in self._defaults.items(): for name, value in self._defaults.items():
if name in kwargs: if name in kwargs:
value = kwargs[name] value = kwargs[name]
setattr(self, name, copy.copy(value)) setattr(self, name, copy.copy(value))
def __iter__(self): def __iter__(self) -> Generator[Tuple[str, Any], None, None]:
for name in self.__annotations__: for name in self.__annotations__:
yield (name, getattr(self, name)) yield (name, getattr(self, name))
def __repr__(self): def __repr__(self) -> str:
return "{}({})".format( return "{}({})".format(
self.__class__.__name__, self.__class__.__name__,
", ".join(name + "=" + repr(value) for name, value in self)) ", ".join(name + "=" + repr(value) for name, value in self))
class DataclsMeta(type): class DataclsMeta(type):
def __new__(cls, name, bases, prop): def __new__(cls, name: str, bases: Tuple[type, ...], prop: Dict[str, Any]) -> Self:
values = collections.OrderedDict() values: OrderedDict[str, Any] = OrderedDict()
for member_name in prop["__annotations__"].keys(): for member_name in prop["__annotations__"].keys():
# Check if member has a default value set as class variable # Check if member has a default value set as class variable
if member_name in prop: if member_name in prop:
@ -43,5 +51,5 @@ class DataclsMeta(type):
bases = (DataclsBase, *bases) bases = (DataclsBase, *bases)
return super().__new__(cls, name, bases, prop) return super().__new__(cls, name, bases, prop)
def __prepare__(self, *args, **kwargs): def __prepare__(self, *args: Any, **kwargs: Any) -> OrderedDict[str, Any]: # type: ignore # this is custom overriden
return collections.OrderedDict() return OrderedDict()

1
a2s/exceptions.py

@ -1,5 +1,6 @@
class BrokenMessageError(Exception): class BrokenMessageError(Exception):
pass pass
class BufferExhaustedError(BrokenMessageError): class BufferExhaustedError(BrokenMessageError):
pass pass

137
a2s/info.py

@ -1,204 +1,214 @@
import io from __future__ import annotations
from typing import Optional, Tuple, Union
from a2s.exceptions import BrokenMessageError, BufferExhaustedError
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING
from a2s.a2s_sync import request_sync
from a2s.a2s_async import request_async from a2s.a2s_async import request_async
from a2s.byteio import ByteReader from a2s.a2s_sync import request_sync
from a2s.datacls import DataclsMeta from a2s.datacls import DataclsMeta
from a2s.defaults import DEFAULT_ENCODING, DEFAULT_TIMEOUT
from a2s.exceptions import BufferExhaustedError
from .byteio import ByteReader
A2S_INFO_RESPONSE = 0x49 A2S_INFO_RESPONSE = 0x49
A2S_INFO_RESPONSE_LEGACY = 0x6D A2S_INFO_RESPONSE_LEGACY = 0x6D
class SourceInfo(metaclass=DataclsMeta): class SourceInfo(metaclass=DataclsMeta):
"""Protocol version used by the server"""
protocol: int protocol: int
"""Protocol version used by the server"""
"""Display name of the server"""
server_name: str server_name: str
"""Display name of the server"""
"""The currently loaded map"""
map_name: str map_name: str
"""The currently loaded map"""
"""Name of the game directory"""
folder: str folder: str
"""Name of the game directory"""
"""Name of the game"""
game: str game: str
"""Name of the game"""
"""App ID of the game required to connect"""
app_id: int app_id: int
"""App ID of the game required to connect"""
"""Number of players currently connected"""
player_count: int player_count: int
"""Number of players currently connected"""
"""Number of player slots available"""
max_players: int max_players: int
"""Number of player slots available"""
"""Number of bots on the server"""
bot_count: int bot_count: int
"""Number of bots on the server"""
server_type: str
"""Type of the server: """Type of the server:
'd': Dedicated server 'd': Dedicated server
'l': Non-dedicated server 'l': Non-dedicated server
'p': SourceTV relay (proxy)""" 'p': SourceTV relay (proxy)"""
server_type: str
platform: str
"""Operating system of the server """Operating system of the server
'l', 'w', 'm' for Linux, Windows, macOS""" 'l', 'w', 'm' for Linux, Windows, macOS"""
platform: str
"""Server requires a password to connect"""
password_protected: bool password_protected: bool
"""Server requires a password to connect"""
"""Server has VAC enabled"""
vac_enabled: bool vac_enabled: bool
"""Server has VAC enabled"""
"""Version of the server software"""
version: str version: str
"""Version of the server software"""
# Optional: # Optional:
edf: int = 0
"""Extra data field, used to indicate if extra values are """Extra data field, used to indicate if extra values are
included in the response""" included in the response"""
edf: int = 0
"""Port of the game server."""
port: int port: int
"""Port of the game server."""
"""Steam ID of the server"""
steam_id: int steam_id: int
"""Steam ID of the server"""
"""Port of the SourceTV server"""
stv_port: int stv_port: int
"""Port of the SourceTV server"""
"""Name of the SourceTV server"""
stv_name: str stv_name: str
"""Name of the SourceTV server"""
"""Tags that describe the gamemode being played"""
keywords: str keywords: str
"""Tags that describe the gamemode being played"""
"""Game ID for games that have an app ID too high for 16bit."""
game_id: int game_id: int
"""Game ID for games that have an app ID too high for 16bit."""
# Client determined values: # Client determined values:
"""Round-trip delay time for the request in seconds"""
ping: float ping: float
"""Round-trip delay time for the request in seconds"""
@property @property
def has_port(self): def has_port(self) -> bool:
return bool(self.edf & 0x80) return bool(self.edf & 0x80)
@property @property
def has_steam_id(self): def has_steam_id(self) -> bool:
return bool(self.edf & 0x10) return bool(self.edf & 0x10)
@property @property
def has_stv(self): def has_stv(self) -> bool:
return bool(self.edf & 0x40) return bool(self.edf & 0x40)
@property @property
def has_keywords(self): def has_keywords(self) -> bool:
return bool(self.edf & 0x20) return bool(self.edf & 0x20)
@property @property
def has_game_id(self): def has_game_id(self) -> bool:
return bool(self.edf & 0x01) return bool(self.edf & 0x01)
class GoldSrcInfo(metaclass=DataclsMeta): class GoldSrcInfo(metaclass=DataclsMeta):
"""IP Address and port of the server"""
address: str address: str
"""IP Address and port of the server"""
"""Display name of the server"""
server_name: str server_name: str
"""Display name of the server"""
"""The currently loaded map"""
map_name: str map_name: str
"""The currently loaded map"""
"""Name of the game directory"""
folder: str folder: str
"""Name of the game directory"""
"""Name of the game"""
game: str game: str
"""Name of the game"""
"""Number of players currently connected"""
player_count: int player_count: int
"""Number of players currently connected"""
"""Number of player slots available"""
max_players: int max_players: int
"""Number of player slots available"""
"""Protocol version used by the server"""
protocol: int protocol: int
"""Protocol version used by the server"""
server_type: str
"""Type of the server: """Type of the server:
'd': Dedicated server 'd': Dedicated server
'l': Non-dedicated server 'l': Non-dedicated server
'p': SourceTV relay (proxy)""" 'p': SourceTV relay (proxy)"""
server_type: str
platform: str
"""Operating system of the server """Operating system of the server
'l', 'w' for Linux and Windows""" 'l', 'w' for Linux and Windows"""
platform: str
"""Server requires a password to connect"""
password_protected: bool password_protected: bool
"""Server requires a password to connect"""
"""Server is running a Half-Life mod instead of the base game""" """Server is running a Half-Life mod instead of the base game"""
is_mod: bool is_mod: bool
"""Server has VAC enabled"""
vac_enabled: bool vac_enabled: bool
"""Server has VAC enabled"""
"""Number of bots on the server"""
bot_count: int bot_count: int
"""Number of bots on the server"""
# Optional: # Optional:
"""URL to the mod website"""
mod_website: str mod_website: str
"""URL to the mod website"""
"""URL to download the mod"""
mod_download: str mod_download: str
"""URL to download the mod"""
"""Version of the mod installed on the server"""
mod_version: int mod_version: int
"""Version of the mod installed on the server"""
"""Size in bytes of the mod"""
mod_size: int mod_size: int
"""Size in bytes of the mod"""
"""Mod supports multiplayer only"""
multiplayer_only: bool = False multiplayer_only: bool = False
"""Mod supports multiplayer only"""
uses_custom_dll: bool = True
"""Mod uses a custom DLL""" """Mod uses a custom DLL"""
uses_hl_dll: bool = True
# Client determined values: # Client determined values:
"""Round-trip delay time for the request in seconds"""
ping: float ping: float
"""Round-trip delay time for the request in seconds"""
def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): def info(
address: Tuple[str, int], timeout: float = DEFAULT_TIMEOUT, encoding: str = DEFAULT_ENCODING
) -> Union[SourceInfo, GoldSrcInfo]:
return request_sync(address, timeout, encoding, InfoProtocol) return request_sync(address, timeout, encoding, InfoProtocol)
async def ainfo(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
async def ainfo(
address: Tuple[str, int], timeout: float = DEFAULT_TIMEOUT, encoding: str = DEFAULT_ENCODING
) -> Union[SourceInfo, GoldSrcInfo]:
return await request_async(address, timeout, encoding, InfoProtocol) return await request_async(address, timeout, encoding, InfoProtocol)
class InfoProtocol: class InfoProtocol:
@staticmethod @staticmethod
def validate_response_type(response_type): def validate_response_type(response_type: int) -> bool:
return response_type in (A2S_INFO_RESPONSE, A2S_INFO_RESPONSE_LEGACY) return response_type in (A2S_INFO_RESPONSE, A2S_INFO_RESPONSE_LEGACY)
@staticmethod @staticmethod
def serialize_request(challenge): def serialize_request(challenge: int) -> bytes:
if challenge: if challenge:
return b"\x54Source Engine Query\0" + challenge.to_bytes(4, "little") return b"\x54Source Engine Query\0" + challenge.to_bytes(4, "little")
else: else:
return b"\x54Source Engine Query\0" return b"\x54Source Engine Query\0"
@staticmethod @staticmethod
def deserialize_response(reader, response_type, ping): def deserialize_response(
reader: ByteReader, response_type: int, ping: Optional[float]
) -> Union[SourceInfo, GoldSrcInfo]:
if response_type == A2S_INFO_RESPONSE: if response_type == A2S_INFO_RESPONSE:
resp = parse_source(reader) resp = parse_source(reader)
elif response_type == A2S_INFO_RESPONSE_LEGACY: elif response_type == A2S_INFO_RESPONSE_LEGACY:
@ -206,10 +216,12 @@ class InfoProtocol:
else: else:
raise Exception(str(response_type)) raise Exception(str(response_type))
assert ping
resp.ping = ping resp.ping = ping
return resp return resp
def parse_source(reader):
def parse_source(reader: ByteReader) -> SourceInfo:
resp = SourceInfo() resp = SourceInfo()
resp.protocol = reader.read_uint8() resp.protocol = reader.read_uint8()
resp.server_name = reader.read_cstring() resp.server_name = reader.read_cstring()
@ -222,7 +234,7 @@ def parse_source(reader):
resp.bot_count = reader.read_uint8() resp.bot_count = reader.read_uint8()
resp.server_type = reader.read_char().lower() resp.server_type = reader.read_char().lower()
resp.platform = reader.read_char().lower() resp.platform = reader.read_char().lower()
if resp.platform == "o": # Deprecated mac value if resp.platform == "o": # Deprecated mac value
resp.platform = "m" resp.platform = "m"
resp.password_protected = reader.read_bool() resp.password_protected = reader.read_bool()
resp.vac_enabled = reader.read_bool() resp.vac_enabled = reader.read_bool()
@ -247,7 +259,8 @@ def parse_source(reader):
return resp return resp
def parse_goldsrc(reader):
def parse_goldsrc(reader: ByteReader) -> GoldSrcInfo:
resp = GoldSrcInfo() resp = GoldSrcInfo()
resp.address = reader.read_cstring() resp.address = reader.read_cstring()
resp.server_name = reader.read_cstring() resp.server_name = reader.read_cstring()
@ -266,7 +279,7 @@ def parse_goldsrc(reader):
if resp.is_mod and len(reader.peek()) > 2: if resp.is_mod and len(reader.peek()) > 2:
resp.mod_website = reader.read_cstring() resp.mod_website = reader.read_cstring()
resp.mod_download = reader.read_cstring() resp.mod_download = reader.read_cstring()
reader.read(1) # Skip a NULL byte reader.read(1) # Skip a NULL byte
resp.mod_version = reader.read_uint32() resp.mod_version = reader.read_uint32()
resp.mod_size = reader.read_uint32() resp.mod_size = reader.read_uint32()
resp.multiplayer_only = reader.read_bool() resp.multiplayer_only = reader.read_bool()

26
a2s/players.py

@ -1,18 +1,17 @@
import io from typing import List, Optional, Tuple
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING
from a2s.a2s_sync import request_sync
from a2s.a2s_async import request_async from a2s.a2s_async import request_async
from a2s.a2s_sync import request_sync
from a2s.byteio import ByteReader from a2s.byteio import ByteReader
from a2s.datacls import DataclsMeta from a2s.datacls import DataclsMeta
from a2s.defaults import DEFAULT_ENCODING, DEFAULT_TIMEOUT
A2S_PLAYER_RESPONSE = 0x44 A2S_PLAYER_RESPONSE = 0x44
class Player(metaclass=DataclsMeta): class Player(metaclass=DataclsMeta):
"""Apparently an entry index, but seems to be always 0""" """Apparently an entry index, but seems to be always 0"""
index: int index: int
"""Name of the player""" """Name of the player"""
@ -25,32 +24,35 @@ class Player(metaclass=DataclsMeta):
duration: float duration: float
def players(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): def players(address: Tuple[str, int], timeout: float = DEFAULT_TIMEOUT, encoding: str = DEFAULT_ENCODING) -> List[Player]:
return request_sync(address, timeout, encoding, PlayersProtocol) return request_sync(address, timeout, encoding, PlayersProtocol)
async def aplayers(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
async def aplayers(
address: Tuple[str, int], timeout: float = DEFAULT_TIMEOUT, encoding: str = DEFAULT_ENCODING
) -> List[Player]:
return await request_async(address, timeout, encoding, PlayersProtocol) return await request_async(address, timeout, encoding, PlayersProtocol)
class PlayersProtocol: class PlayersProtocol:
@staticmethod @staticmethod
def validate_response_type(response_type): def validate_response_type(response_type: int) -> bool:
return response_type == A2S_PLAYER_RESPONSE return response_type == A2S_PLAYER_RESPONSE
@staticmethod @staticmethod
def serialize_request(challenge): def serialize_request(challenge: int) -> bytes:
return b"\x55" + challenge.to_bytes(4, "little") return b"\x55" + challenge.to_bytes(4, "little")
@staticmethod @staticmethod
def deserialize_response(reader, response_type, ping): def deserialize_response(reader: ByteReader, response_type: int, ping: Optional[float]) -> List[Player]:
player_count = reader.read_uint8() player_count = reader.read_uint8()
resp = [ resp = [
Player( Player(
index=reader.read_uint8(), index=reader.read_uint8(),
name=reader.read_cstring(), name=reader.read_cstring(),
score=reader.read_int32(), score=reader.read_int32(),
duration=reader.read_float() duration=reader.read_float(),
) )
for player_num in range(player_count) for _ in range(player_count)
] ]
return resp return resp

0
a2s/py.typed

27
a2s/rules.py

@ -1,38 +1,35 @@
import io from typing import Dict, Optional, Tuple
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING
from a2s.a2s_sync import request_sync
from a2s.a2s_async import request_async from a2s.a2s_async import request_async
from a2s.a2s_sync import request_sync
from a2s.byteio import ByteReader from a2s.byteio import ByteReader
from a2s.datacls import DataclsMeta from a2s.defaults import DEFAULT_ENCODING, DEFAULT_TIMEOUT
A2S_RULES_RESPONSE = 0x45 A2S_RULES_RESPONSE = 0x45
def rules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): def rules(address: Tuple[str, int], timeout: float = DEFAULT_TIMEOUT, encoding: str = DEFAULT_ENCODING) -> Dict[str, str]:
return request_sync(address, timeout, encoding, RulesProtocol) return request_sync(address, timeout, encoding, RulesProtocol)
async def arules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
async def arules(
address: Tuple[str, int], timeout: float = DEFAULT_TIMEOUT, encoding: str = DEFAULT_ENCODING
) -> Dict[str, str]:
return await request_async(address, timeout, encoding, RulesProtocol) return await request_async(address, timeout, encoding, RulesProtocol)
class RulesProtocol: class RulesProtocol:
@staticmethod @staticmethod
def validate_response_type(response_type): def validate_response_type(response_type: int) -> bool:
return response_type == A2S_RULES_RESPONSE return response_type == A2S_RULES_RESPONSE
@staticmethod @staticmethod
def serialize_request(challenge): def serialize_request(challenge: int) -> bytes:
return b"\x56" + challenge.to_bytes(4, "little") return b"\x56" + challenge.to_bytes(4, "little")
@staticmethod @staticmethod
def deserialize_response(reader, response_type, ping): def deserialize_response(reader: ByteReader, response_type: int, ping: Optional[float]) -> Dict[str, str]:
rule_count = reader.read_int16() rule_count = reader.read_int16()
# Have to use tuples to preserve evaluation order # Have to use tuples to preserve evaluation order
resp = dict( resp = dict((reader.read_cstring(), reader.read_cstring()) for _ in range(rule_count))
(reader.read_cstring(), reader.read_cstring())
for rule_num in range(rule_count)
)
return resp return resp

19
pyproject.toml

@ -0,0 +1,19 @@
[tool.black]
line-length = 125
target-version = ["py37"]
[tool.isort]
profile = "black"
combine_as_imports = true
combine_star = true
line_length = 125
[tool.pyright]
include = ["a2s/**/*.py"]
useLibraryCodeForTypes = true
typeCheckingMode = "strict"
pythonVersion = "3.7"
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

4
setup.py

@ -21,7 +21,7 @@ setuptools.setup(
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Topic :: Games/Entertainment" "Topic :: Games/Entertainment",
], ],
python_requires=">=3.7" python_requires=">=3.7",
) )

Loading…
Cancel
Save