Browse Source

add a2s_info, a2s_player, a2s_ping

0.9
Rossen Georgiev 7 years ago
parent
commit
d23fbcd619
  1. 184
      steam/master_server.py

184
steam/master_server.py

@ -40,8 +40,8 @@ Filter code What it does
=========================== =========================================================================================================================
"""
import socket
import struct
from time import sleep
from struct import pack as _pack, unpack_from as _unpack_from
from time import time as _time
from enum import IntEnum, Enum
from steam.util.binary import StructReader
@ -96,7 +96,7 @@ def query(filter_text=r'\napp\500', region=MSRegion.World, master=MSServer.Sourc
ms.settimeout(8)
next_ip = b'0.0.0.0:0'
req_prefix = b'1' + struct.pack('>B', region)
req_prefix = b'1' + _pack('>B', region)
req_suffix = b'\x00' + filter_text.encode('utf-8') + b'\x00'
while True:
@ -121,3 +121,181 @@ def query(filter_text=r'\napp\500', region=MSRegion.World, master=MSServer.Sourc
yield ip, port
next_ip = '{}:{}'.format(ip, port).encode('utf-8')
def _handle_a2s_response(sock):
packet = sock.recv(2048)
header, = _unpack_from('<l', packet)
if header == -1: # single packet response
return packet[4:]
elif header == -2: # multi packet response
raise RuntimeError("Multi packet response not implemented yet")
else:
raise RuntimeError("Invalid reponse header")
def a2s_info(server_addr, goldsrc=False, timeout=6):
"""Get information from a server
:param server_addr: (ip, port) for the server
:type server_addr: tuple
:param goldsrc: (optional) Weather to expect GoldSrc or Source response format
:type goldsrc: :class:`bool`
:param timeout: (optional) timeout in seconds
:type timeout: int
:returns: a dict with information or `None` on timeout
:rtype: :class:`dict`, :class:`None`
"""
ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ss.connect(server_addr)
ss.settimeout(timeout)
# request server info
ss.send(_pack('<lc', -1, b'T') + b'Source Engine Query\x00')
resp_header = b'm' if goldsrc else b'I'
while True:
try:
data = _handle_a2s_response(ss)
except socket.timeout:
return None
else:
if data[0:1] == resp_header:
break
data = StructReader(data)
data.skip(1) # header
# GoldSrc response
if goldsrc:
info = {
'address': data.read_cstring(),
'name': data.read_cstring(),
'map': data.read_cstring(),
'folder': data.read_cstring(),
'game': data.read_cstring(),
}
(info['players'],
info['max_players'],
info['protocol'],
info['server_type'],
info['environment'],
info['visibility'],
info['mod'],
) = data.unpack('<BBBccBB')
if info['mod'] == 1:
info['link'] = data.read_cstring(),
info['download_link'] = data.read_cstring(),
(info['version'],
info['size'],
info['type'],
info['ddl'],
) = data.unpack('<xLLBB')
info['vac'], info['bots'] = data.unpack('<BB')
# Source response
else:
info = {
'protocol': data.unpack('<b')[0],
'name': data.read_cstring(),
'map': data.read_cstring(),
'folder': data.read_cstring(),
'game': data.read_cstring(),
}
(info['app_id'],
info['players'],
info['max_players'],
info['bots'],
info['server_type'],
info['environment'],
info['visibility'],
info['vac'],
) = data.unpack('<HBBBccBB')
return info
def a2s_player(server_addr, challenge=0, timeout=8):
"""Get list of players and their info
:param server_addr: (ip, port) for the server
:type server_addr: tuple
:param challenge: (optional) challenge number
:type challenge: int
:param timeout: (optional) timeout in seconds
:type timeout: int
:returns: a list of players
:rtype: :class:`list`, :class:`None`
"""
ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ss.connect(server_addr)
ss.settimeout(timeout)
# request challenge number
if challenge in (-1, 0):
ss.send(_pack('<lci', -1, b'U', challenge))
_, header, challange = _unpack_from('<lcl', ss.recv(512))
if header != b'A':
raise RuntimeError("Unexpected challange response")
# request player info
ss.send(_pack('<lci', -1, b'U', challange))
try:
data = StructReader(_handle_a2s_response(ss))
except socket.timeout:
return None
header, num_players = data.unpack('<BB')
players = []
while len(players) != num_players:
player = dict()
player['index'] = data.unpack('<B')[0]
player['name'] = data.read_cstring()
player['score'], player['duration'] = data.unpack('<lf')
players.append(player)
if data.rlen() / 8 == num_players: # assume the ship server
for player in players:
player['deaths'], player['money'] = data.unpack('<ll')
return players
def a2s_ping(server_addr, timeout=8):
"""Ping a server
.. warning::
This method for pinging is considered deprecated and will not work on newer sources games
:param server_addr: (ip, port) for the server
:type server_addr: tuple
:param timeout: (optional) timeout in seconds
:type timeout: int
:returns: ping response in seconds or `None` for timeout
:rtype: :class:`float`, :class:`None`
"""
ss = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ss.connect(server_addr)
ss.settimeout(timeout)
ss.send(_pack('<lc', -1, b'i'))
start = _time()
try:
data = _handle_a2s_response(ss)
except socket.timeout:
return None
diff = _time() - start
if data[0:1] == b'j':
return diff

Loading…
Cancel
Save