diff --git a/a2s/a2sasync.py b/a2s/a2s_async.py similarity index 65% rename from a2s/a2sasync.py rename to a2s/a2s_async.py index 8eb3180..9ac77e7 100644 --- a/a2s/a2sasync.py +++ b/a2s/a2s_async.py @@ -1,16 +1,55 @@ import asyncio import logging +import time +import io from a2s.exceptions import BrokenMessageError -from a2s.a2sfragment import decode_fragment +from a2s.a2s_fragment import decode_fragment +from a2s.defaults import DEFAULT_RETRIES +from a2s.byteio import ByteReader HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF" HEADER_MULTI = b"\xFE\xFF\xFF\xFF" +A2S_CHALLENGE_RESPONSE = 0x41 logger = logging.getLogger("a2s") + +async def request_async(address, timeout, encoding, a2s_proto): + conn = await A2SStreamAsync.create(address, timeout) + response = await request_async_impl(conn, encoding, a2s_proto) + conn.close() + return response + +async def request_async_impl(conn, encoding, a2s_proto, challenge=0, retries=0, ping=None): + send_time = time.monotonic() + resp_data = await conn.request(a2s_proto.serialize_request(challenge)) + recv_time = time.monotonic() + # Only set ping on first packet received + if retries == 0: + ping = recv_time - send_time + + reader = ByteReader( + io.BytesIO(resp_data), endian="<", encoding=encoding) + + response_type = reader.read_uint8() + if response_type == A2S_CHALLENGE_RESPONSE: + if retries >= DEFAULT_RETRIES: + raise BrokenMessageError( + "Server keeps sending challenge responses") + challenge = reader.read_uint32() + return await request_async_impl( + conn, encoding, a2s_proto, challenge, retries + 1, ping) + + if not a2s_proto.validate_response_type(response_type): + raise BrokenMessageError( + "Invalid response type: " + hex(response_type)) + + return a2s_proto.deserialize_response(reader, response_type, ping) + + class A2SProtocol(asyncio.DatagramProtocol): def __init__(self): self.recv_queue = asyncio.Queue() @@ -34,6 +73,9 @@ class A2SProtocol(asyncio.DatagramProtocol): self.fragment_buf.sort(key=lambda f: f.fragment_id) reassembled = b"".join( fragment.payload for fragment in self.fragment_buf) + # Sometimes there's an additional header present + if reassembled.startswith(b"\xFF\xFF\xFF\xFF"): + reassembled = reassembled[4:] logger.debug("Received %s part packet with content: %r", len(self.fragment_buf), reassembled) self.recv_queue.put_nowait(reassembled) diff --git a/a2s/a2sfragment.py b/a2s/a2s_fragment.py similarity index 100% rename from a2s/a2sfragment.py rename to a2s/a2s_fragment.py diff --git a/a2s/a2sstream.py b/a2s/a2s_sync.py similarity index 50% rename from a2s/a2sstream.py rename to a2s/a2s_sync.py index 5d8c593..937853d 100644 --- a/a2s/a2sstream.py +++ b/a2s/a2s_sync.py @@ -1,16 +1,55 @@ import socket import logging +import time +import io from a2s.exceptions import BrokenMessageError -from a2s.a2sfragment import decode_fragment +from a2s.a2s_fragment import decode_fragment +from a2s.defaults import DEFAULT_RETRIES +from a2s.byteio import ByteReader HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF" HEADER_MULTI = b"\xFE\xFF\xFF\xFF" +A2S_CHALLENGE_RESPONSE = 0x41 logger = logging.getLogger("a2s") + +def request_sync(address, timeout, encoding, a2s_proto): + conn = A2SStream(address, timeout) + response = request_sync_impl(conn, encoding, a2s_proto) + conn.close() + return response + +def request_sync_impl(conn, encoding, a2s_proto, challenge=0, retries=0, ping=None): + send_time = time.monotonic() + resp_data = conn.request(a2s_proto.serialize_request(challenge)) + recv_time = time.monotonic() + # Only set ping on first packet received + if retries == 0: + ping = recv_time - send_time + + reader = ByteReader( + io.BytesIO(resp_data), endian="<", encoding=encoding) + + response_type = reader.read_uint8() + if response_type == A2S_CHALLENGE_RESPONSE: + if retries >= DEFAULT_RETRIES: + raise BrokenMessageError( + "Server keeps sending challenge responses") + challenge = reader.read_uint32() + return request_sync_impl( + conn, encoding, a2s_proto, challenge, retries + 1, ping) + + if not a2s_proto.validate_response_type(response_type): + raise BrokenMessageError( + "Invalid response type: " + hex(response_type)) + + return a2s_proto.deserialize_response(reader, response_type, ping) + + class A2SStream: def __init__(self, address, timeout): self.address = address @@ -38,6 +77,9 @@ class A2SStream: fragments.append(decode_fragment(packet[4:])) fragments.sort(key=lambda f: f.fragment_id) reassembled = b"".join(fragment.payload for fragment in fragments) + # Sometimes there's an additional header present + if reassembled.startswith(b"\xFF\xFF\xFF\xFF"): + reassembled = reassembled[4:] logger.debug("Received %s part packet with content: %r", len(fragments), reassembled) return reassembled diff --git a/a2s/info.py b/a2s/info.py index bc94ef1..dd1dd02 100644 --- a/a2s/info.py +++ b/a2s/info.py @@ -1,10 +1,9 @@ -import time import io from a2s.exceptions import BrokenMessageError, BufferExhaustedError from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING -from a2s.a2sstream import A2SStream -from a2s.a2sasync import A2SStreamAsync +from a2s.a2s_sync import request_sync +from a2s.a2s_async import request_async from a2s.byteio import ByteReader from a2s.datacls import DataclsMeta @@ -13,6 +12,7 @@ from a2s.datacls import DataclsMeta A2S_INFO_RESPONSE = 0x49 A2S_INFO_RESPONSE_LEGACY = 0x6D + class SourceInfo(metaclass=DataclsMeta): """Protocol version used by the server""" protocol: int @@ -177,6 +177,38 @@ class GoldSrcInfo(metaclass=DataclsMeta): """Round-trip delay time for the request in seconds""" ping: float + +def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): + return request_sync(address, timeout, encoding, InfoProtocol) + +async def ainfo(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): + return await request_async(address, timeout, encoding, InfoProtocol) + + +class InfoProtocol: + @staticmethod + def validate_response_type(response_type): + return response_type in (A2S_INFO_RESPONSE, A2S_INFO_RESPONSE_LEGACY) + + @staticmethod + def serialize_request(challenge): + if challenge: + return b"\x54Source Engine Query\0" + challenge.to_bytes(4, "little") + else: + return b"\x54Source Engine Query\0" + + @staticmethod + def deserialize_response(reader, response_type, ping): + if response_type == A2S_INFO_RESPONSE: + resp = parse_source(reader) + elif response_type == A2S_INFO_RESPONSE_LEGACY: + resp = parse_goldsrc(reader) + else: + raise Exception(str(response_type)) + + resp.ping = ping + return resp + def parse_source(reader): resp = SourceInfo() resp.protocol = reader.read_uint8() @@ -244,39 +276,3 @@ def parse_goldsrc(reader): resp.bot_count = reader.read_uint8() return resp - -def info_response(resp_data, ping, encoding): - reader = ByteReader( - io.BytesIO(resp_data), endian="<", encoding=encoding) - - response_type = reader.read_uint8() - if response_type == A2S_INFO_RESPONSE: - resp = parse_source(reader) - elif response_type == A2S_INFO_RESPONSE_LEGACY: - resp = parse_goldsrc(reader) - else: - raise BrokenMessageError( - "Invalid response type: " + str(response_type)) - - resp.ping = ping - return resp - -def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - conn = A2SStream(address, timeout) - send_time = time.monotonic() - resp_data = conn.request(b"\x54Source Engine Query\0") - recv_time = time.monotonic() - conn.close() - ping = recv_time - send_time - - return info_response(resp_data, ping, encoding) - -async def ainfo(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - conn = await A2SStreamAsync.create(address, timeout) - send_time = time.monotonic() - resp_data = await conn.request(b"\x54Source Engine Query\0") - recv_time = time.monotonic() - conn.close() - ping = recv_time - send_time - - return info_response(resp_data, ping, encoding) diff --git a/a2s/players.py b/a2s/players.py index 06ba0d6..1100cf1 100644 --- a/a2s/players.py +++ b/a2s/players.py @@ -1,18 +1,15 @@ import io -from typing import List -from a2s.exceptions import BrokenMessageError -from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING, \ - DEFAULT_RETRIES -from a2s.a2sstream import A2SStream -from a2s.a2sasync import A2SStreamAsync +from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING +from a2s.a2s_sync import request_sync +from a2s.a2s_async import request_async from a2s.byteio import ByteReader from a2s.datacls import DataclsMeta A2S_PLAYER_RESPONSE = 0x44 -A2S_CHALLENGE_RESPONSE = 0x41 + class Player(metaclass=DataclsMeta): """Apparently an entry index, but seems to be always 0""" @@ -27,68 +24,33 @@ class Player(metaclass=DataclsMeta): """Time the player has been connected to the server""" duration: float -def players_response(reader): - player_count = reader.read_uint8() - resp = [ - Player( - index=reader.read_uint8(), - name=reader.read_cstring(), - score=reader.read_int32(), - duration=reader.read_float() - ) - for player_num in range(player_count) - ] - - return resp def players(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - conn = A2SStream(address, timeout) - reader = players_request(conn, encoding) - conn.close() - return players_response(reader) - -def players_request(conn, encoding, challenge=0, retries=0): - resp_data = conn.request(b"\x55" + challenge.to_bytes(4, "little")) - reader = ByteReader( - io.BytesIO(resp_data), endian="<", encoding=encoding) - - response_type = reader.read_uint8() - if response_type == A2S_CHALLENGE_RESPONSE: - if retries >= DEFAULT_RETRIES: - raise BrokenMessageError( - "Server keeps sending challenge responses") - challenge = reader.read_uint32() - return players_request( - conn, encoding, challenge, retries + 1) - - if response_type != A2S_PLAYER_RESPONSE: - raise BrokenMessageError( - "Invalid response type: " + str(response_type)) - - return reader + return request_sync(address, timeout, encoding, PlayersProtocol) async def aplayers(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - conn = await A2SStreamAsync.create(address, timeout) - reader = await players_request_async(conn, encoding) - conn.close() - return players_response(reader) - -async def players_request_async(conn, encoding, challenge=0, retries=0): - resp_data = await conn.request(b"\x55" + challenge.to_bytes(4, "little")) - reader = ByteReader( - io.BytesIO(resp_data), endian="<", encoding=encoding) - - response_type = reader.read_uint8() - if response_type == A2S_CHALLENGE_RESPONSE: - if retries >= DEFAULT_RETRIES: - raise BrokenMessageError( - "Server keeps sending challenge responses") - challenge = reader.read_uint32() - return await players_request_async( - conn, encoding, challenge, retries + 1) - - if response_type != A2S_PLAYER_RESPONSE: - raise BrokenMessageError( - "Invalid response type: " + str(response_type)) - - return reader + return await request_async(address, timeout, encoding, PlayersProtocol) + + +class PlayersProtocol: + @staticmethod + def validate_response_type(response_type): + return response_type == A2S_PLAYER_RESPONSE + + @staticmethod + def serialize_request(challenge): + return b"\x55" + challenge.to_bytes(4, "little") + + @staticmethod + def deserialize_response(reader, response_type, ping): + player_count = reader.read_uint8() + resp = [ + Player( + index=reader.read_uint8(), + name=reader.read_cstring(), + score=reader.read_int32(), + duration=reader.read_float() + ) + for player_num in range(player_count) + ] + return resp diff --git a/a2s/rules.py b/a2s/rules.py index bda2011..1224f72 100644 --- a/a2s/rules.py +++ b/a2s/rules.py @@ -1,88 +1,38 @@ import io -from a2s.exceptions import BrokenMessageError -from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING, \ - DEFAULT_RETRIES -from a2s.a2sstream import A2SStream -from a2s.a2sasync import A2SStreamAsync +from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING +from a2s.a2s_sync import request_sync +from a2s.a2s_async import request_async from a2s.byteio import ByteReader +from a2s.datacls import DataclsMeta A2S_RULES_RESPONSE = 0x45 -A2S_CHALLENGE_RESPONSE = 0x41 -def rules_response(reader): - rule_count = reader.read_int16() - # Have to use tuples to preserve evaluation order - resp = dict( - (reader.read_cstring(), reader.read_cstring()) - for rule_num in range(rule_count) - ) - - return resp def rules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - conn = A2SStream(address, timeout) - reader = rules_request(conn, encoding) - conn.close() - return rules_response(reader) - -def rules_request(conn, encoding, challenge=0, retries=0): - resp_data = conn.request(b"\x56" + challenge.to_bytes(4, "little")) - reader = ByteReader( - io.BytesIO(resp_data), endian="<", encoding=encoding) - - # A2S_RESPONSE misteriously seems to add a FF FF FF FF - # long to the beginning of the response which isn't - # mentioned on the wiki. - # - # Behaviour witnessed with TF2 server 94.23.226.200:2045 - # As of 2015-11-22, Quake Live servers on steam do not - # Source: valve-python messages.py - if reader.peek(4) == b"\xFF\xFF\xFF\xFF": - reader.read(4) - - response_type = reader.read_uint8() - if response_type == A2S_CHALLENGE_RESPONSE: - if retries >= DEFAULT_RETRIES: - raise BrokenMessageError( - "Server keeps sending challenge responses") - challenge = reader.read_uint32() - return rules_request( - conn, encoding, challenge, retries + 1) - - if response_type != A2S_RULES_RESPONSE: - raise BrokenMessageError( - "Invalid response type: " + str(response_type)) - - return reader + return request_sync(address, timeout, encoding, RulesProtocol) async def arules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING): - conn = await A2SStreamAsync.create(address, timeout) - reader = await rules_request_async(conn, encoding) - conn.close() - return rules_response(reader) - -async def rules_request_async(conn, encoding, challenge=0, retries=0): - resp_data = await conn.request(b"\x56" + challenge.to_bytes(4, "little")) - reader = ByteReader( - io.BytesIO(resp_data), endian="<", encoding=encoding) - - if reader.peek(4) == b"\xFF\xFF\xFF\xFF": - reader.read(4) - - response_type = reader.read_uint8() - if response_type == A2S_CHALLENGE_RESPONSE: - if retries >= DEFAULT_RETRIES: - raise BrokenMessageError( - "Server keeps sending challenge responses") - challenge = reader.read_uint32() - return await rules_request_async( - conn, encoding, challenge, retries + 1) - - if response_type != A2S_RULES_RESPONSE: - raise BrokenMessageError( - "Invalid response type: " + str(response_type)) - - return reader + return await request_async(address, timeout, encoding, RulesProtocol) + + +class RulesProtocol: + @staticmethod + def validate_response_type(response_type): + return response_type == A2S_RULES_RESPONSE + + @staticmethod + def serialize_request(challenge): + return b"\x56" + challenge.to_bytes(4, "little") + + @staticmethod + def deserialize_response(reader, response_type, ping): + rule_count = reader.read_int16() + # Have to use tuples to preserve evaluation order + resp = dict( + (reader.read_cstring(), reader.read_cstring()) + for rule_num in range(rule_count) + ) + return resp diff --git a/setup.py b/setup.py index d5380fc..7fb2fb0 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with open("README.md", "r") as readme: setuptools.setup( name="python-a2s", - version="1.2.1", + version="1.3.0", author="Gabriel Huber", author_email="mail@gabrielhuber.at", description="Query Source and GoldSource servers for name, map, players and more.",