diff --git a/README.md b/README.md index da17e30..c84ccd5 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Official demo application: [Sourcequery](https://sourcequery.yepoleb.at) ## Requirements -Python >=3.6, no external dependencies +Python >=3.7, no external dependencies ## Install @@ -21,6 +21,9 @@ Python >=3.6, no external dependencies * `a2s.players(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING)` * `a2s.rules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING)` +All functions also have an async version as of package 1.2.0 that adds an `a` prefix, e.g. +`ainfo`, `aplayers`, `arules`. + ### Parameters * address: `Tuple[str, int]` - Address of the server. diff --git a/a2s/__init__.py b/a2s/__init__.py index 332331e..0251452 100644 --- a/a2s/__init__.py +++ b/a2s/__init__.py @@ -1,5 +1,5 @@ from a2s.exceptions import BrokenMessageError, BufferExhaustedError -from a2s.info import info, SourceInfo, GoldSrcInfo -from a2s.players import players, Player -from a2s.rules import rules +from a2s.info import info, ainfo, SourceInfo, GoldSrcInfo +from a2s.players import players, aplayers, Player +from a2s.rules import rules, arules diff --git a/a2s/a2sasync.py b/a2s/a2sasync.py new file mode 100644 index 0000000..7c6c8bc --- /dev/null +++ b/a2s/a2sasync.py @@ -0,0 +1,95 @@ +import asyncio +import logging + +from a2s.exceptions import BrokenMessageError +from a2s.a2sfragment import decode_fragment + + + +HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF" +HEADER_MULTI = b"\xFE\xFF\xFF\xFF" + +logger = logging.getLogger("a2s") + +class A2SProtocol(asyncio.DatagramProtocol): + def __init__(self): + self.recv_queue = asyncio.Queue() + self.error_event = asyncio.Event() + self.error = None + self.fragment_buf = [] + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, packet, addr): + header = packet[:4] + payload = packet[4:] + if header == HEADER_SIMPLE: + logger.debug("Received single packet: %r", payload) + self.recv_queue.put_nowait(payload) + elif header == HEADER_MULTI: + self.fragment_buf.append(decode_fragment(payload)) + if len(self.fragment_buf) < self.fragment_buf[0].fragment_count: + return # Wait for more packets to arrive + self.fragment_buf.sort(key=lambda f: f.fragment_id) + reassembled = b"".join( + fragment.payload for fragment in self.fragment_buf) + logger.debug("Received %s part packet with content: %r", + len(self.fragment_buf), reassembled) + self.recv_queue.put_nowait(reassembled) + self.fragment_buf = [] + else: + self.error = BrokenMessageError( + "Invalid packet header: " + repr(header)) + self.error_event.set() + + def error_received(self, exc): + self.error = exc + self.error_event.set() + + def raise_on_error(): + error = self.error + self.error = None + self.error_event.clear() + raise error + +class A2SStreamAsync: + def __init__(self, transport, protocol, timeout): + self.transport = transport + self.protocol = protocol + self.timeout = timeout + + def __del__(self): + self.close() + + @classmethod + async def create(cls, address, timeout): + loop = asyncio.get_running_loop() + transport, protocol = await loop.create_datagram_endpoint( + lambda: A2SProtocol(), remote_addr=address) + return cls(transport, protocol, timeout) + + def send(self, payload): + packet = HEADER_SIMPLE + payload + self.transport.sendto(packet) + + async def recv(self): + queue_task = asyncio.create_task(self.protocol.recv_queue.get()) + error_task = asyncio.create_task(self.protocol.error_event.wait()) + done, pending = await asyncio.wait({queue_task, error_task}, + timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED) + + for task in pending: task.cancel() + if error_task in done: + self.protocol.raise_on_error() + if not done: + raise asyncio.TimeoutError() + + return queue_task.result() + + async def request(self, payload): + self.send(payload) + return await self.recv() + + def close(self): + self.transport.close() diff --git a/a2s/a2sfragment.py b/a2s/a2sfragment.py new file mode 100644 index 0000000..38a40ba --- /dev/null +++ b/a2s/a2sfragment.py @@ -0,0 +1,39 @@ +import bz2 +import io + +from a2s.byteio import ByteReader + + + +class A2SFragment: + def __init__(self, message_id, fragment_count, fragment_id, mtu, + decompressed_size=0, crc=0, payload=b""): + self.message_id = message_id + self.fragment_count = fragment_count + self.fragment_id = fragment_id + self.mtu = mtu + self.decompressed_size = decompressed_size + self.crc = crc + self.payload = payload + + @property + def is_compressed(self): + return bool(self.message_id & (1 << 15)) + +def decode_fragment(data): + reader = ByteReader( + io.BytesIO(data), endian="<", encoding="utf-8") + frag = A2SFragment( + message_id=reader.read_uint32(), + fragment_count=reader.read_uint8(), + fragment_id=reader.read_uint8(), + mtu=reader.read_uint16() + ) + if frag.is_compressed: + frag.decompressed_size = reader.read_uint32() + frag.crc = reader.read_uint32() + frag.payload = bz2.decompress(reader.read()) + else: + frag.payload = reader.read() + + return frag diff --git a/a2s/a2sstream.py b/a2s/a2sstream.py index 83ef472..5d8c593 100644 --- a/a2s/a2sstream.py +++ b/a2s/a2sstream.py @@ -1,10 +1,8 @@ import socket -import bz2 -import io import logging from a2s.exceptions import BrokenMessageError -from a2s.byteio import ByteReader +from a2s.a2sfragment import decode_fragment @@ -13,39 +11,6 @@ HEADER_MULTI = b"\xFE\xFF\xFF\xFF" logger = logging.getLogger("a2s") -class A2SFragment: - def __init__(self, message_id, fragment_count, fragment_id, mtu, - decompressed_size=0, crc=0, payload=b""): - self.message_id = message_id - self.fragment_count = fragment_count - self.fragment_id = fragment_id - self.mtu = mtu - self.decompressed_size = decompressed_size - self.crc = crc - self.payload = payload - - @property - def is_compressed(self): - return bool(self.message_id & (1 << 15)) - -def decode_fragment(data): - reader = ByteReader( - io.BytesIO(data), endian="<", encoding="utf-8") - frag = A2SFragment( - message_id=reader.read_uint32(), - fragment_count=reader.read_uint8(), - fragment_id=reader.read_uint8(), - mtu=reader.read_uint16() - ) - if frag.is_compressed: - frag.decompressed_size = reader.read_uint32() - frag.crc = reader.read_uint32() - frag.payload = bz2.decompress(reader.read()) - else: - frag.payload = reader.read() - - return frag - class A2SStream: def __init__(self, address, timeout): self.address = address @@ -80,12 +45,9 @@ class A2SStream: raise BrokenMessageError( "Invalid packet header: " + repr(header)) + def request(self, payload): + self.send(payload) + return self.recv() + def close(self): self._socket.close() - -def request(address, data, timeout): - stream = A2SStream(address, timeout) - stream.send(data) - resp = stream.recv() - stream.close() - return resp diff --git a/a2s/info.py b/a2s/info.py index 50c2e88..bc94ef1 100644 --- a/a2s/info.py +++ b/a2s/info.py @@ -3,7 +3,8 @@ import io from a2s.exceptions import BrokenMessageError, BufferExhaustedError from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING -from a2s.a2sstream import request +from a2s.a2sstream import A2SStream +from a2s.a2sasync import A2SStreamAsync from a2s.byteio import ByteReader from a2s.datacls import DataclsMeta @@ -244,10 +245,7 @@ def parse_goldsrc(reader): return resp -def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - send_time = time.monotonic() - resp_data = request(address, b"\x54Source Engine Query\0", timeout) - recv_time = time.monotonic() +def info_response(resp_data, ping, encoding): reader = ByteReader( io.BytesIO(resp_data), endian="<", encoding=encoding) @@ -260,5 +258,25 @@ def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): raise BrokenMessageError( "Invalid response type: " + str(response_type)) - resp.ping = recv_time - send_time + resp.ping = ping return resp + +def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): + conn = A2SStream(address, timeout) + send_time = time.monotonic() + resp_data = conn.request(b"\x54Source Engine Query\0") + recv_time = time.monotonic() + conn.close() + ping = recv_time - send_time + + return info_response(resp_data, ping, encoding) + +async def ainfo(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): + conn = await A2SStreamAsync.create(address, timeout) + send_time = time.monotonic() + resp_data = await conn.request(b"\x54Source Engine Query\0") + recv_time = time.monotonic() + conn.close() + ping = recv_time - send_time + + return info_response(resp_data, ping, encoding) diff --git a/a2s/players.py b/a2s/players.py index 71e3460..06ba0d6 100644 --- a/a2s/players.py +++ b/a2s/players.py @@ -4,7 +4,8 @@ from typing import List from a2s.exceptions import BrokenMessageError from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING, \ DEFAULT_RETRIES -from a2s.a2sstream import request +from a2s.a2sstream import A2SStream +from a2s.a2sasync import A2SStreamAsync from a2s.byteio import ByteReader from a2s.datacls import DataclsMeta @@ -26,13 +27,28 @@ class Player(metaclass=DataclsMeta): """Time the player has been connected to the server""" duration: float -def players(address, timeout=DEFAULT_TIMEOUT, - encoding=DEFAULT_ENCODING): - return players_impl(address, timeout, encoding) +def players_response(reader): + player_count = reader.read_uint8() + resp = [ + Player( + index=reader.read_uint8(), + name=reader.read_cstring(), + score=reader.read_int32(), + duration=reader.read_float() + ) + for player_num in range(player_count) + ] -def players_impl(address, timeout, encoding, challenge=0, retries=0): - resp_data = request( - address, b"\x55" + challenge.to_bytes(4, "little"), timeout) + return resp + +def players(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): + conn = A2SStream(address, timeout) + reader = players_request(conn, encoding) + conn.close() + return players_response(reader) + +def players_request(conn, encoding, challenge=0, retries=0): + resp_data = conn.request(b"\x55" + challenge.to_bytes(4, "little")) reader = ByteReader( io.BytesIO(resp_data), endian="<", encoding=encoding) @@ -42,22 +58,37 @@ def players_impl(address, timeout, encoding, challenge=0, retries=0): raise BrokenMessageError( "Server keeps sending challenge responses") challenge = reader.read_uint32() - return players_impl( - address, timeout, encoding, challenge, retries + 1) + return players_request( + conn, encoding, challenge, retries + 1) if response_type != A2S_PLAYER_RESPONSE: raise BrokenMessageError( "Invalid response type: " + str(response_type)) - player_count = reader.read_uint8() - resp = [ - Player( - index=reader.read_uint8(), - name=reader.read_cstring(), - score=reader.read_int32(), - duration=reader.read_float() - ) - for player_num in range(player_count) - ] + return reader - return resp +async def aplayers(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): + conn = await A2SStreamAsync.create(address, timeout) + reader = await players_request_async(conn, encoding) + conn.close() + return players_response(reader) + +async def players_request_async(conn, encoding, challenge=0, retries=0): + resp_data = await conn.request(b"\x55" + challenge.to_bytes(4, "little")) + reader = ByteReader( + io.BytesIO(resp_data), endian="<", encoding=encoding) + + response_type = reader.read_uint8() + if response_type == A2S_CHALLENGE_RESPONSE: + if retries >= DEFAULT_RETRIES: + raise BrokenMessageError( + "Server keeps sending challenge responses") + challenge = reader.read_uint32() + return await players_request_async( + conn, encoding, challenge, retries + 1) + + if response_type != A2S_PLAYER_RESPONSE: + raise BrokenMessageError( + "Invalid response type: " + str(response_type)) + + return reader diff --git a/a2s/rules.py b/a2s/rules.py index 8812f1f..bda2011 100644 --- a/a2s/rules.py +++ b/a2s/rules.py @@ -3,7 +3,8 @@ import io from a2s.exceptions import BrokenMessageError from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING, \ DEFAULT_RETRIES -from a2s.a2sstream import request +from a2s.a2sstream import A2SStream +from a2s.a2sasync import A2SStreamAsync from a2s.byteio import ByteReader @@ -11,12 +12,24 @@ from a2s.byteio import ByteReader A2S_RULES_RESPONSE = 0x45 A2S_CHALLENGE_RESPONSE = 0x41 +def rules_response(reader): + rule_count = reader.read_int16() + # Have to use tuples to preserve evaluation order + resp = dict( + (reader.read_cstring(), reader.read_cstring()) + for rule_num in range(rule_count) + ) + + return resp + def rules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - return rules_impl(address, timeout, encoding) + conn = A2SStream(address, timeout) + reader = rules_request(conn, encoding) + conn.close() + return rules_response(reader) -def rules_impl(address, timeout, encoding, challenge=0, retries=0): - resp_data = request( - address, b"\x56" + challenge.to_bytes(4, "little"), timeout) +def rules_request(conn, encoding, challenge=0, retries=0): + resp_data = conn.request(b"\x56" + challenge.to_bytes(4, "little")) reader = ByteReader( io.BytesIO(resp_data), endian="<", encoding=encoding) @@ -36,18 +49,40 @@ def rules_impl(address, timeout, encoding, challenge=0, retries=0): raise BrokenMessageError( "Server keeps sending challenge responses") challenge = reader.read_uint32() - return rules_impl( - address, timeout, encoding, challenge, retries + 1) + return rules_request( + conn, encoding, challenge, retries + 1) if response_type != A2S_RULES_RESPONSE: raise BrokenMessageError( "Invalid response type: " + str(response_type)) - rule_count = reader.read_int16() - # Have to use tuples to preserve evaluation order - resp = dict( - (reader.read_cstring(), reader.read_cstring()) - for rule_num in range(rule_count) - ) + return reader - return resp +async def arules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): + conn = await A2SStreamAsync.create(address, timeout) + reader = await rules_request_async(conn, encoding) + conn.close() + return rules_response(reader) + +async def rules_request_async(conn, encoding, challenge=0, retries=0): + resp_data = await conn.request(b"\x56" + challenge.to_bytes(4, "little")) + reader = ByteReader( + io.BytesIO(resp_data), endian="<", encoding=encoding) + + if reader.peek(4) == b"\xFF\xFF\xFF\xFF": + reader.read(4) + + response_type = reader.read_uint8() + if response_type == A2S_CHALLENGE_RESPONSE: + if retries >= DEFAULT_RETRIES: + raise BrokenMessageError( + "Server keeps sending challenge responses") + challenge = reader.read_uint32() + return await rules_request_async( + conn, encoding, challenge, retries + 1) + + if response_type != A2S_RULES_RESPONSE: + raise BrokenMessageError( + "Invalid response type: " + str(response_type)) + + return reader diff --git a/setup.py b/setup.py index 273dc00..5f576f7 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with open("README.md", "r") as readme: setuptools.setup( name="python-a2s", - version="1.1.5", + version="1.2.0", author="Gabriel Huber", author_email="mail@gabrielhuber.at", description="Query Source and GoldSource servers for name, map, players and more.", @@ -23,5 +23,5 @@ setuptools.setup( "Operating System :: OS Independent", "Topic :: Games/Entertainment" ], - python_requires=">=3.6" + python_requires=">=3.7" )