Browse Source

Use a dataclasses-like module for responses and some other fixes

async
Gabriel Huber 5 years ago
parent
commit
ac53699495
  1. 9
      a2s/__init__.py
  2. 4
      a2s/a2sstream.py
  3. 7
      a2s/byteio.py
  4. 47
      a2s/datacls.py
  5. 18
      a2s/defaults.py
  6. 267
      a2s/info.py
  7. 59
      a2s/players.py
  8. 28
      a2s/rules.py

9
a2s/__init__.py

@ -1,8 +1,5 @@
from a2s.exceptions import BrokenMessageError, BufferExhaustedError
from a2s.defaults import \
set_default_timeout, get_default_timeout, \
set_default_encoding, get_default_encoding
from a2s.info import info, InfoResponse
from a2s.players import players, PlayersResponse, PlayerEntry
from a2s.rules import rules, RulesResponse
from a2s.info import info, SourceInfo, GoldSrcInfo
from a2s.players import players, Player
from a2s.rules import rules

4
a2s/a2sstream.py

@ -2,11 +2,11 @@ import socket
import bz2
import io
from a2s.defaults import default_encoding, default_timeout
from a2s.exceptions import BrokenMessageError
from a2s.byteio import ByteReader
HEADER_SIMPLE = b"\xFF\xFF\xFF\xFF"
HEADER_MULTI = b"\xFE\xFF\xFF\xFF"
@ -27,7 +27,7 @@ class A2SFragment:
def decode_fragment(data):
reader = ByteReader(
io.BytesIO(data), endian="<", encoding=default_encoding)
io.BytesIO(data), endian="<", encoding="utf-8")
frag = A2SFragment(
message_id=reader.read_uint32(),
fragment_count=reader.read_uint8(),

7
a2s/byteio.py

@ -70,7 +70,7 @@ class ByteReader():
def read_char(self):
char = self.unpack_one("c")
if self.encoding is not None:
return char.decode(self.encoding)
return char.decode(self.encoding, errors="replace")
else:
return char
@ -84,13 +84,10 @@ class ByteReader():
string += c
if self.encoding is not None:
return string.decode(self.encoding)
return string.decode(self.encoding, errors="replace")
else:
return string
def read_ip(self):
return ".".join(str(o) for o in self.unpack("BBBB"))
class ByteWriter():
def __init__(self, stream, endian="=", encoding=None):

47
a2s/datacls.py

@ -0,0 +1,47 @@
"""
Cheap dataclasses module backport
Check out the official documentation to see what this is trying to
achieve:
https://docs.python.org/3/library/dataclasses.html
"""
import collections
import copy
class DataclsBase:
def __init__(self, **kwargs):
for name, value in self._defaults.items():
if name in kwargs:
value = kwargs[name]
setattr(self, name, copy.copy(value))
def __iter__(self):
for name in self.__annotations__:
yield (name, getattr(self, name))
def __repr__(self):
return "{}({})".format(
self.__class__.__name__,
", ".join(name + "=" + repr(value) for name, value in self))
class DataclsMeta(type):
def __new__(cls, name, bases, prop):
values = collections.OrderedDict()
for member_name in prop["__annotations__"].keys():
# Check if member has a default value set as class variable
if member_name in prop:
# Store default value and remove the class variable
values[member_name] = prop[member_name]
del prop[member_name]
else:
# Set None as the default value
values[member_name] = None
prop["__slots__"] = list(values.keys())
prop["_defaults"] = values
bases = (DataclsBase, *bases)
return super().__new__(cls, name, bases, prop)
def __prepare__(self, *args, **kwargs):
return collections.OrderedDict()

18
a2s/defaults.py

@ -1,16 +1,2 @@
default_timeout = 3.0
default_encoding = "utf-8"
def set_default_timeout(timeout):
"""Set module-wide default timeout in seconds"""
default_timeout = timeout
def get_default_timeout():
"""Get module-wide default timeout in seconds"""
return default_timeout
def set_default_encoding(enc):
default_encoding = enc
def get_default_encoding():
return default_encoding
DEFAULT_TIMEOUT = 3.0
DEFAULT_ENCODING = "utf-8"

267
a2s/info.py

@ -2,44 +2,89 @@ import time
import io
from a2s.exceptions import BrokenMessageError
from a2s.defaults import default_encoding, default_timeout
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING
from a2s.a2sstream import request
from a2s.byteio import ByteReader
from a2s.datacls import DataclsMeta
A2S_INFO_RESPONSE = 0x49
A2S_INFO_RESPONSE_LEGACY = 0x6D
class InfoResponse:
def __init__(self, protocol, server_name, map_name, folder, game,
app_id, player_count, max_players, bot_count,
server_type, platform, password_protected, vac_enabled,
version, edf=0, port=0, steam_id=0, stv_port=0,
stv_name="", keywords="", game_id=0):
self.protocol = protocol
self.server_name = server_name
self.map_name = map_name
self.folder = folder
self.game = game
self.app_id = app_id
self.player_count = player_count
self.max_players = max_players
self.bot_count = bot_count
self.server_type = server_type.lower()
self.platform = platform.lower()
if self.platform == "o":
self.platform = "m"
self.password_protected = password_protected
self.vac_enabled = vac_enabled
self.version = version
self.edf = edf
self.port = port
self.steam_id = steam_id
self.stv_port = stv_port
self.stv_name = stv_name
self.keywords = keywords
self.game_id = game_id
class SourceInfo(metaclass=DataclsMeta):
"""Protocol version used by the server"""
protocol: int
"""Display name of the server"""
server_name: str
"""The currently loaded map"""
map_name: str
"""Name of the game directory"""
folder: str
"""Name of the game"""
game: str
"""App ID of the game required to connect"""
app_id: int
"""Number of players currently connected"""
player_count: int
"""Number of player slots available"""
max_players: int
"""Number of bots on the server"""
bot_count: int
"""Type of the server:
'd': Dedicated server
'l': Non-dedicated server
'p': SourceTV relay (proxy)"""
server_type: str
"""Operating system of the server
'l', 'w', 'm' for Linux, Windows, macOS"""
platform: str
"""Server requires a password to connect"""
password_protected: bool
"""Server has VAC enabled"""
vac_enabled: bool
"""Version of the server software"""
version: str
# Optional:
"""Extra data field, used to indicate if extra values are
included in the response"""
edf: int
"""Port of the game server."""
port: int
"""Steam ID of the server"""
steam_id: int
"""Port of the SourceTV server"""
stv_port: int
"""Name of the SourceTV server"""
stv_name: str
"""Tags that describe the gamemode being played"""
keywords: str
"""Game ID for games that have an app ID too high for 16bit."""
game_id: int
# Client determined values:
"""Round-trip delay time for the request in seconds"""
ping: float
@property
def has_port(self):
@ -61,36 +106,94 @@ class InfoResponse:
def has_game_id(self):
return bool(self.edf & 0x01)
class GoldSrcInfo(metaclass=DataclsMeta):
"""IP Address and port of the server"""
address: str
def info(address, timeout=default_timeout):
send_time = time.monotonic()
resp_data = request(address, b"\x54Source Engine Query\0", timeout)
recv_time = time.monotonic()
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=default_encoding)
"""Display name of the server"""
server_name: str
response_type = reader.read_uint8()
if response_type != A2S_INFO_RESPONSE:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
"""The currently loaded map"""
map_name: str
resp = InfoResponse(
protocol=reader.read_uint8(),
server_name=reader.read_cstring(),
map_name=reader.read_cstring(),
folder=reader.read_cstring(),
game=reader.read_cstring(),
app_id=reader.read_uint16(),
player_count=reader.read_uint8(),
max_players=reader.read_uint8(),
bot_count=reader.read_uint8(),
server_type=reader.read_char(),
platform=reader.read_char(),
password_protected=reader.read_bool(),
vac_enabled=reader.read_bool(),
version=reader.read_cstring()
)
resp.ping = recv_time - send_time
"""Name of the game directory"""
folder: str
"""Name of the game"""
game: str
"""Number of players currently connected"""
player_count: int
"""Number of player slots available"""
max_players: int
"""Protocol version used by the server"""
protocol: int
"""Type of the server:
'd': Dedicated server
'l': Non-dedicated server
'p': SourceTV relay (proxy)"""
server_type: str
"""Operating system of the server
'l', 'w' for Linux and Windows"""
platform: str
"""Server requires a password to connect"""
password_protected: bool
"""Server is running a Half-Life mod instead of the base game"""
is_mod: bool
"""Server has VAC enabled"""
vac_enabled: bool
"""Number of bots on the server"""
bots_count: int
# Optional:
"""URL to the mod website"""
mod_website: str
"""URL to download the mod"""
mod_download: str
"""Version of the mod installed on the server"""
mod_version: int
"""Size in bytes of the mod"""
mod_size: int
"""Mod supports multiplayer only"""
multiplayer_only: bool = False
"""Mod uses a custom DLL"""
uses_hl_dll: bool = True
# Client determined values:
"""Round-trip delay time for the request in seconds"""
ping: float
def parse_source(reader):
resp = SourceInfo()
resp.protocol = reader.read_uint8()
resp.server_name = reader.read_cstring()
resp.map_name = reader.read_cstring()
resp.folder = reader.read_cstring()
resp.game = reader.read_cstring()
resp.app_id = reader.read_uint16()
resp.player_count = reader.read_uint8()
resp.max_players = reader.read_uint8()
resp.bot_count = reader.read_uint8()
resp.server_type = reader.read_char().lower()
resp.platform = reader.read_char().lower()
if resp.platform == "o": # Deprecated mac value
resp.platform = "m"
resp.password_protected = reader.read_bool()
resp.vac_enabled = reader.read_bool()
resp.version = reader.read_cstring()
try:
resp.edf = reader.read_uint8()
@ -110,3 +213,51 @@ def info(address, timeout=default_timeout):
resp.game_id = reader.read_uint64()
return resp
def parse_goldsrc(reader):
resp = GoldSrcInfo()
resp.address = reader.read_cstring()
resp.server_name = reader.read_cstring()
resp.map_name = reader.read_cstring()
resp.folder = reader.read_cstring()
resp.game = reader.read_cstring()
resp.player_count = reader.read_uint8()
resp.max_players = reader.read_uint8()
resp.protocol = reader.read_uint8()
resp.server_type = reader.read_char()
resp.platform = reader.read_char()
resp.password_protected = reader.read_bool()
resp.is_mod = reader.read_bool()
if resp.is_mod:
resp.mod_website = reader.read_cstring()
resp.mod_download = reader.read_cstring()
reader.read(1) # Skip a NULL byte
resp.mod_version = reader.read_uint()
resp.mod_size = reader.read_uint()
resp.multiplayer_only = reader.read_bool()
resp.uses_custom_dll = reader.read_bool()
resp.vac_enabled = reader.read_bool()
resp.bot_count = reader.read_uint8()
return resp
def info(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
send_time = time.monotonic()
resp_data = request(address, b"\x54Source Engine Query\0", timeout)
recv_time = time.monotonic()
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=encoding)
response_type = reader.read_uint8()
if response_type == A2S_INFO_RESPONSE:
resp = parse_source(reader)
elif response_type == A2S_INFO_RESPONSE_LEGACY:
resp = parse_goldsrc(reader)
else:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
resp.ping = recv_time - send_time
return resp

59
a2s/players.py

@ -1,54 +1,61 @@
import io
from typing import List
from a2s.exceptions import BrokenMessageError
from a2s.defaults import default_encoding, default_timeout
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING
from a2s.a2sstream import request
from a2s.byteio import ByteReader
from a2s.datacls import DataclsMeta
A2S_PLAYER_RESPONSE = 0x44
A2S_CHALLENGE_RESPONSE = 0x41
class PlayerEntry:
def __init__(self, index, name, score, duration):
self.index = index
self.name = name
self.score = score
self.duration = duration
class Player(metaclass=DataclsMeta):
"""Apparently an entry index, but seems to be always 0"""
index: int
class PlayersResponse:
def __init__(self, player_count, players):
self.player_count = player_count
self.players = players
"""Name of the player"""
name: str
def players(address, challenge=0, timeout=default_timeout):
"""Score of the player"""
score: int
"""Time the player has been connected to the server"""
duration: float
def players(address, timeout=DEFAULT_TIMEOUT,
encoding=DEFAULT_ENCODING):
return players_impl(address, timeout, encoding)
def players_impl(address, timeout, encoding, challenge=0):
resp_data = request(
address, b"\x55" + challenge.to_bytes(4, "little"), timeout)
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=default_encoding)
io.BytesIO(resp_data), endian="<", encoding=encoding)
response_type = reader.read_uint8()
if response_type == A2S_CHALLENGE_RESPONSE:
if challenge != 0:
raise BrokenMessageError(
"Server keeps sending challenge responses")
challenge = reader.read_int32()
return players(address, challenge, timeout)
return players_impl(address, timeout, encoding, challenge)
if response_type != A2S_PLAYER_RESPONSE:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
player_count = reader.read_uint8()
resp = PlayersResponse(
player_count=player_count,
players=[
PlayerEntry(
index=reader.read_uint8(),
name=reader.read_cstring(),
score=reader.read_int32(),
duration=reader.read_float()
)
for player_num in range(player_count)
]
)
resp = [
Player(
index=reader.read_uint8(),
name=reader.read_cstring(),
score=reader.read_int32(),
duration=reader.read_float()
)
for player_num in range(player_count)
]
return resp

28
a2s/rules.py

@ -1,7 +1,7 @@
import io
from a2s.exceptions import BrokenMessageError
from a2s.defaults import default_encoding, default_timeout
from a2s.defaults import DEFAULT_TIMEOUT, DEFAULT_ENCODING
from a2s.a2sstream import request
from a2s.byteio import ByteReader
@ -10,16 +10,14 @@ from a2s.byteio import ByteReader
A2S_RULES_RESPONSE = 0x45
A2S_CHALLENGE_RESPONSE = 0x41
class RulesResponse:
def __init__(self, rule_count, rules):
self.rule_count = rule_count
self.rules = rules
def rules(address, timeout=DEFAULT_TIMEOUT, encoding=DEFAULT_ENCODING):
return rules_impl(address, timeout, encoding)
def rules(address, challenge=0, timeout=default_timeout):
def rules_impl(address, timeout, encoding, challenge=0):
resp_data = request(
address, b"\x56" + challenge.to_bytes(4, "little"), timeout)
reader = ByteReader(
io.BytesIO(resp_data), endian="<", encoding=default_encoding)
io.BytesIO(resp_data), endian="<", encoding=encoding)
# A2S_RESPONSE misteriously seems to add a FF FF FF FF
# long to the beginning of the response which isn't
@ -33,21 +31,21 @@ def rules(address, challenge=0, timeout=default_timeout):
response_type = reader.read_uint8()
if response_type == A2S_CHALLENGE_RESPONSE:
if challenge != 0:
raise BrokenMessageError(
"Server keeps sending challenge responses")
challenge = reader.read_int32()
return rules(address, challenge, timeout)
return rules_impl(address, timeout, encoding, challenge)
if response_type != A2S_RULES_RESPONSE:
raise BrokenMessageError(
"Invalid response type: " + str(response_type))
rule_count = reader.read_int16()
resp = RulesResponse(
rule_count=rule_count,
rules={
reader.read_cstring(): reader.read_cstring()
for rule_num in range(rule_count)
}
# Have to use tuples to preserve evaluation order
resp = dict(
(reader.read_cstring(), reader.read_cstring())
for rule_num in range(rule_count)
)
return resp

Loading…
Cancel
Save