Browse Source

Merge pull request #10 from Yepoleb/async

Async support
pull/15/head v1.2.0
Gabriel Huber 5 years ago
committed by GitHub
parent
commit
dd8212b71f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      README.md
  2. 6
      a2s/__init__.py
  3. 95
      a2s/a2sasync.py
  4. 39
      a2s/a2sfragment.py
  5. 48
      a2s/a2sstream.py
  6. 30
      a2s/info.py
  7. 71
      a2s/players.py
  8. 63
      a2s/rules.py
  9. 4
      setup.py

5
README.md

@ -7,7 +7,7 @@ Official demo application: [Sourcequery](https://sourcequery.yepoleb.at)
## Requirements
Python >=3.6, no external dependencies
Python >=3.7, no external dependencies
## Install
@ -21,6 +21,9 @@ Python >=3.6, no external dependencies
* `a2s.players(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING)`
* `a2s.rules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING)`
All functions also have an async version as of package 1.2.0 that adds an `a` prefix, e.g.
`ainfo`, `aplayers`, `arules`.
### Parameters
* address: `Tuple[str, int]` - Address of the server.

6
a2s/__init__.py

@ -1,5 +1,5 @@
from a2s.exceptions import BrokenMessageError, BufferExhaustedError
from a2s.info import info, SourceInfo, GoldSrcInfo
from a2s.players import players, Player
from a2s.rules import rules
from a2s.info import info, ainfo, SourceInfo, GoldSrcInfo
from a2s.players import players, aplayers, Player
from a2s.rules import rules, arules

95
a2s/a2sasync.py

@ -0,0 +1,95 @@
import asyncio
import logging
from a2s.exceptions import BrokenMessageError
from a2s.a2sfragment import decode_fragment
HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF"
HEADER_MULTI = b"\xFE\xFF\xFF\xFF"
logger = logging.getLogger("a2s")
class A2SProtocol(asyncio.DatagramProtocol):
def __init__(self):
self.recv_queue = asyncio.Queue()
self.error_event = asyncio.Event()
self.error = None
self.fragment_buf = []
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, packet, addr):
header = packet[:4]
payload = packet[4:]
if header == HEADER_SIMPLE:
logger.debug("Received single packet: %r", payload)
self.recv_queue.put_nowait(payload)
elif header == HEADER_MULTI:
self.fragment_buf.append(decode_fragment(payload))
if len(self.fragment_buf) < self.fragment_buf[0].fragment_count:
return # Wait for more packets to arrive
self.fragment_buf.sort(key=lambda f: f.fragment_id)
reassembled = b"".join(
fragment.payload for fragment in self.fragment_buf)
logger.debug("Received %s part packet with content: %r",
len(self.fragment_buf), reassembled)
self.recv_queue.put_nowait(reassembled)
self.fragment_buf = []
else:
self.error = BrokenMessageError(
"Invalid packet header: " + repr(header))
self.error_event.set()
def error_received(self, exc):
self.error = exc
self.error_event.set()
def raise_on_error():
error = self.error
self.error = None
self.error_event.clear()
raise error
class A2SStreamAsync:
def __init__(self, transport, protocol, timeout):
self.transport = transport
self.protocol = protocol
self.timeout = timeout
def __del__(self):
self.close()
@classmethod
async def create(cls, address, timeout):
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: A2SProtocol(), remote_addr=address)
return cls(transport, protocol, timeout)
def send(self, payload):
packet = HEADER_SIMPLE + payload
self.transport.sendto(packet)
async def recv(self):
queue_task = asyncio.create_task(self.protocol.recv_queue.get())
error_task = asyncio.create_task(self.protocol.error_event.wait())
done, pending = await asyncio.wait({queue_task, error_task},
timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED)
for task in pending: task.cancel()
if error_task in done:
self.protocol.raise_on_error()
if not done:
raise asyncio.TimeoutError()
return queue_task.result()
async def request(self, payload):
self.send(payload)
return await self.recv()
def close(self):
self.transport.close()

39
a2s/a2sfragment.py

@ -0,0 +1,39 @@
import bz2
import io
from a2s.byteio import ByteReader
class A2SFragment:
def __init__(self, message_id, fragment_count, fragment_id, mtu,
decompressed_size=0, crc=0, payload=b""):
self.message_id = message_id
self.fragment_count = fragment_count
self.fragment_id = fragment_id
self.mtu = mtu
self.decompressed_size = decompressed_size
self.crc = crc
self.payload = payload
@property
def is_compressed(self):
return bool(self.message_id & (1 << 15))
def decode_fragment(data):
reader = ByteReader(
io.BytesIO(data), endian="<", encoding="utf-8")
frag = A2SFragment(
message_id=reader.read_uint32(),
fragment_count=reader.read_uint8(),
fragment_id=reader.read_uint8(),
mtu=reader.read_uint16()
)
if frag.is_compressed:
frag.decompressed_size = reader.read_uint32()
frag.crc = reader.read_uint32()
frag.payload = bz2.decompress(reader.read())
else:
frag.payload = reader.read()
return frag

48
a2s/a2sstream.py

@ -1,10 +1,8 @@
import socket
import bz2
import io
import logging
from a2s.exceptions import BrokenMessageError
from a2s.byteio import ByteReader
from a2s.a2sfragment import decode_fragment
@ -13,39 +11,6 @@ HEADER_MULTI = b"\xFE\xFF\xFF\xFF"
logger = logging.getLogger("a2s")
class A2SFragment:
def __init__(self, message_id, fragment_count, fragment_id, mtu,
decompressed_size=0, crc=0, payload=b""):
self.message_id = message_id
self.fragment_count = fragment_count
self.fragment_id = fragment_id
self.mtu = mtu
self.decompressed_size = decompressed_size
self.crc = crc
self.payload = payload
@property
def is_compressed(self):
return bool(self.message_id & (1 << 15))
def decode_fragment(data):
reader = ByteReader(
io.BytesIO(data), endian="<", encoding="utf-8")
frag = A2SFragment(
message_id=reader.read_uint32(),
fragment_count=reader.read_uint8(),
fragment_id=reader.read_uint8(),
mtu=reader.read_uint16()
)
if frag.is_compressed:
frag.decompressed_size = reader.read_uint32()
frag.crc = reader.read_uint32()
frag.payload = bz2.decompress(reader.read())
else:
frag.payload = reader.read()
return frag
class A2SStream:
def __init__(self, address, timeout):
self.address = address
@ -80,12 +45,9 @@ class A2SStream:
raise BrokenMessageError(
"Invalid packet header: " + repr(header))
def request(self, payload):
self.send(payload)
return self.recv()
def close(self):
self._socket.close()
def request(address, data, timeout):
stream = A2SStream(address, timeout)
stream.send(data)
resp = stream.recv()
stream.close()
return resp

30
a2s/info.py

@ -3,7 +3,8 @@ import io
from a2s.exceptions import BrokenMessageError, BufferExhaustedError
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING
from a2s.a2sstream import request
from a2s.a2sstream import A2SStream
from a2s.a2sasync import A2SStreamAsync
from a2s.byteio import ByteReader
from a2s.datacls import DataclsMeta
@ -244,10 +245,7 @@ def parse_goldsrc(reader):
return resp
def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
send_time = time.monotonic()
resp_data = request(address, b"\x54Source Engine Query\0", timeout)
recv_time = time.monotonic()
def info_response(resp_data, ping, encoding):
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=encoding)
@ -260,5 +258,25 @@ def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
resp.ping = recv_time - send_time
resp.ping = ping
return resp
def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
conn = A2SStream(address, timeout)
send_time = time.monotonic()
resp_data = conn.request(b"\x54Source Engine Query\0")
recv_time = time.monotonic()
conn.close()
ping = recv_time - send_time
return info_response(resp_data, ping, encoding)
async def ainfo(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
conn = await A2SStreamAsync.create(address, timeout)
send_time = time.monotonic()
resp_data = await conn.request(b"\x54Source Engine Query\0")
recv_time = time.monotonic()
conn.close()
ping = recv_time - send_time
return info_response(resp_data, ping, encoding)

71
a2s/players.py

@ -4,7 +4,8 @@ from typing import List
from a2s.exceptions import BrokenMessageError
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING, \
DEFAULT_RETRIES
from a2s.a2sstream import request
from a2s.a2sstream import A2SStream
from a2s.a2sasync import A2SStreamAsync
from a2s.byteio import ByteReader
from a2s.datacls import DataclsMeta
@ -26,13 +27,28 @@ class Player(metaclass=DataclsMeta):
"""Time the player has been connected to the server"""
duration: float
def players(address, timeout=DEFAULT_TIMEOUT,
encoding=DEFAULT_ENCODING):
return players_impl(address, timeout, encoding)
def players_response(reader):
player_count = reader.read_uint8()
resp = [
Player(
index=reader.read_uint8(),
name=reader.read_cstring(),
score=reader.read_int32(),
duration=reader.read_float()
)
for player_num in range(player_count)
]
def players_impl(address, timeout, encoding, challenge=0, retries=0):
resp_data = request(
address, b"\x55" + challenge.to_bytes(4, "little"), timeout)
return resp
def players(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
conn = A2SStream(address, timeout)
reader = players_request(conn, encoding)
conn.close()
return players_response(reader)
def players_request(conn, encoding, challenge=0, retries=0):
resp_data = conn.request(b"\x55" + challenge.to_bytes(4, "little"))
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=encoding)
@ -42,22 +58,37 @@ def players_impl(address, timeout, encoding, challenge=0, retries=0):
raise BrokenMessageError(
"Server keeps sending challenge responses")
challenge = reader.read_uint32()
return players_impl(
address, timeout, encoding, challenge, retries + 1)
return players_request(
conn, encoding, challenge, retries + 1)
if response_type != A2S_PLAYER_RESPONSE:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
player_count = reader.read_uint8()
resp = [
Player(
index=reader.read_uint8(),
name=reader.read_cstring(),
score=reader.read_int32(),
duration=reader.read_float()
)
for player_num in range(player_count)
]
return reader
return resp
async def aplayers(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
conn = await A2SStreamAsync.create(address, timeout)
reader = await players_request_async(conn, encoding)
conn.close()
return players_response(reader)
async def players_request_async(conn, encoding, challenge=0, retries=0):
resp_data = await conn.request(b"\x55" + challenge.to_bytes(4, "little"))
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=encoding)
response_type = reader.read_uint8()
if response_type == A2S_CHALLENGE_RESPONSE:
if retries >= DEFAULT_RETRIES:
raise BrokenMessageError(
"Server keeps sending challenge responses")
challenge = reader.read_uint32()
return await players_request_async(
conn, encoding, challenge, retries + 1)
if response_type != A2S_PLAYER_RESPONSE:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
return reader

63
a2s/rules.py

@ -3,7 +3,8 @@ import io
from a2s.exceptions import BrokenMessageError
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING, \
DEFAULT_RETRIES
from a2s.a2sstream import request
from a2s.a2sstream import A2SStream
from a2s.a2sasync import A2SStreamAsync
from a2s.byteio import ByteReader
@ -11,12 +12,24 @@ from a2s.byteio import ByteReader
A2S_RULES_RESPONSE = 0x45
A2S_CHALLENGE_RESPONSE = 0x41
def rules_response(reader):
rule_count = reader.read_int16()
# Have to use tuples to preserve evaluation order
resp = dict(
(reader.read_cstring(), reader.read_cstring())
for rule_num in range(rule_count)
)
return resp
def rules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
return rules_impl(address, timeout, encoding)
conn = A2SStream(address, timeout)
reader = rules_request(conn, encoding)
conn.close()
return rules_response(reader)
def rules_impl(address, timeout, encoding, challenge=0, retries=0):
resp_data = request(
address, b"\x56" + challenge.to_bytes(4, "little"), timeout)
def rules_request(conn, encoding, challenge=0, retries=0):
resp_data = conn.request(b"\x56" + challenge.to_bytes(4, "little"))
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=encoding)
@ -36,18 +49,40 @@ def rules_impl(address, timeout, encoding, challenge=0, retries=0):
raise BrokenMessageError(
"Server keeps sending challenge responses")
challenge = reader.read_uint32()
return rules_impl(
address, timeout, encoding, challenge, retries + 1)
return rules_request(
conn, encoding, challenge, retries + 1)
if response_type != A2S_RULES_RESPONSE:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
rule_count = reader.read_int16()
# Have to use tuples to preserve evaluation order
resp = dict(
(reader.read_cstring(), reader.read_cstring())
for rule_num in range(rule_count)
)
return reader
return resp
async def arules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
conn = await A2SStreamAsync.create(address, timeout)
reader = await rules_request_async(conn, encoding)
conn.close()
return rules_response(reader)
async def rules_request_async(conn, encoding, challenge=0, retries=0):
resp_data = await conn.request(b"\x56" + challenge.to_bytes(4, "little"))
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=encoding)
if reader.peek(4) == b"\xFF\xFF\xFF\xFF":
reader.read(4)
response_type = reader.read_uint8()
if response_type == A2S_CHALLENGE_RESPONSE:
if retries >= DEFAULT_RETRIES:
raise BrokenMessageError(
"Server keeps sending challenge responses")
challenge = reader.read_uint32()
return await rules_request_async(
conn, encoding, challenge, retries + 1)
if response_type != A2S_RULES_RESPONSE:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
return reader

4
setup.py

@ -7,7 +7,7 @@ with open("README.md", "r") as readme:
setuptools.setup(
name="python-a2s",
version="1.1.5",
version="1.2.0",
author="Gabriel Huber",
author_email="[email protected]",
description="Query Source and GoldSource servers for name, map, players and more.",
@ -23,5 +23,5 @@ setuptools.setup(
"Operating System :: OS Independent",
"Topic :: Games/Entertainment"
],
python_requires=">=3.6"
python_requires=">=3.7"
)

Loading…
Cancel
Save