Browse Source

Improve CM server bootstrap discovery (#168)

* remove builtin cm list
* add bootstrap_from_dns to resolve cm0.steampowered.com
* add error and logging when cm server list is empty
* add auto_discovery flag; true by default
* when CMServerList is empty it will try to bootstrap from webapi, with dns fallback
* rework SteamClient CM server persistance logic
* move discovery logic from CMServerList to CMClient
* propagate connect() return through all login methods
* detect stale persisted cm list and update
pull/202/head
Rossen Georgiev 6 years ago
committed by GitHub
parent
commit
6cd718e2f9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 71
      steam/client/__init__.py
  2. 128
      steam/core/cm.py
  3. 57
      tests/test_core_cm.py

71
steam/client/__init__.py

@ -48,7 +48,6 @@ class SteamClient(CMClient, BuiltinBase):
"""After a new login key is accepted """After a new login key is accepted
""" """
_cm_servers_timestamp = None # used to decide when to update CM list on disk
_reconnect_backoff_c = 0 _reconnect_backoff_c = 0
current_jobid = 0 current_jobid = 0
credential_location = None #: location for sentry credential_location = None #: location for sentry
@ -88,7 +87,7 @@ class SteamClient(CMClient, BuiltinBase):
def connect(self, *args, **kwargs): def connect(self, *args, **kwargs):
"""Attempt to establish connection, see :meth:`.CMClient.connect`""" """Attempt to establish connection, see :meth:`.CMClient.connect`"""
self._bootstrap_cm_list_from_file() self._bootstrap_cm_list_from_file()
CMClient.connect(self, *args, **kwargs) return CMClient.connect(self, *args, **kwargs)
def disconnect(self, *args, **kwargs): def disconnect(self, *args, **kwargs):
"""Close connection, see :meth:`.CMClient.disconnect`""" """Close connection, see :meth:`.CMClient.disconnect`"""
@ -96,10 +95,13 @@ class SteamClient(CMClient, BuiltinBase):
CMClient.disconnect(self, *args, **kwargs) CMClient.disconnect(self, *args, **kwargs)
def _bootstrap_cm_list_from_file(self): def _bootstrap_cm_list_from_file(self):
if not self.credential_location or self._cm_servers_timestamp is not None: return if not self.credential_location or self.cm_servers.last_updated > 0:
return
filepath = os.path.join(self.credential_location, 'cm_servers.json') filepath = os.path.join(self.credential_location, 'cm_servers.json')
if not os.path.isfile(filepath): return
if not os.path.isfile(filepath):
return
self._LOG.debug("Reading CM servers from %s" % repr(filepath)) self._LOG.debug("Reading CM servers from %s" % repr(filepath))
@ -113,29 +115,44 @@ class SteamClient(CMClient, BuiltinBase):
else: else:
self.cm_servers.clear() self.cm_servers.clear()
self.cm_servers.merge_list(data['servers']) self.cm_servers.merge_list(data['servers'])
self._cm_servers_timestamp = int(data['timestamp']) self.cm_servers.last_updated = data.get('last_updated', 0)
self.cm_servers.cell_id = data.get('cell_id', 0)
def _handle_cm_list(self, msg): def _handle_cm_list(self, msg):
if self._cm_servers_timestamp is None: if (self.cm_servers.last_updated + 3600*24 > time()
self._cm_servers_timestamp = int(time()) and self.cm_servers.cell_id != 0):
return
self.cm_servers.clear() CMClient._handle_cm_list(self, msg) # clear and merge
CMClient._handle_cm_list(self, msg) # just merges the list
if self.credential_location: if self.credential_location:
filepath = os.path.join(self.credential_location, 'cm_servers.json') filepath = os.path.join(self.credential_location, 'cm_servers.json')
if not os.path.exists(filepath) or time() - (3600*24) > self._cm_servers_timestamp: if os.path.exists(filepath):
data = {
'timestamp': self._cm_servers_timestamp,
'servers': list(zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)),
}
try: try:
with open(filepath, 'wb') as f: with open(filepath, 'r') as f:
f.write(json.dumps(data, indent=True).encode('ascii')) data = json.load(f)
self._LOG.debug("Saved CM servers to %s" % repr(filepath)) except ValueError:
self._LOG.error("Failed parsing %s", repr(filepath))
except IOError as e: except IOError as e:
self._LOG.error("saving %s: %s" % (filepath, str(e))) self._LOG.error("Failed reading %s (%s)", repr(filepath), str(e))
else:
if data.get('last_updated', 0) + 3600*24 > time():
return
self._LOG.debug("Persisted CM server list is stale")
data = {
'cell_id': self.cm_servers.cell_id,
'last_updated': self.cm_servers.last_updated,
'servers': list(zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)),
}
try:
with open(filepath, 'wb') as f:
f.write(json.dumps(data, indent=True).encode('ascii'))
self._LOG.debug("Saved CM servers to %s" % repr(filepath))
except IOError as e:
self._LOG.error("saving %s: %s" % (filepath, str(e)))
def _handle_jobs(self, event, *args): def _handle_jobs(self, event, *args):
if isinstance(event, EMsg): if isinstance(event, EMsg):
@ -409,7 +426,8 @@ class SteamClient(CMClient, BuiltinBase):
raise RuntimeError("Already logged on") raise RuntimeError("Already logged on")
if not self.connected and not self._connecting: if not self.connected and not self._connecting:
self.connect() if not self.connect():
return EResult.Fail
if not self.channel_secured: if not self.channel_secured:
resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10) resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10)
@ -418,9 +436,9 @@ class SteamClient(CMClient, BuiltinBase):
if resp is None: if resp is None:
if self.connected: if self.connected:
self.wait_event(self.EVENT_DISCONNECTED) self.wait_event(self.EVENT_DISCONNECTED)
return False return EResult.TryAnotherCM
return True return EResult.OK
@property @property
def relogin_available(self): def relogin_available(self):
@ -485,8 +503,10 @@ class SteamClient(CMClient, BuiltinBase):
""" """
self._LOG.debug("Attempting login") self._LOG.debug("Attempting login")
if not self._pre_login(): eresult = self._pre_login()
return EResult.TryAnotherCM
if eresult != EResult.OK:
return eresult
self.username = username self.username = username
@ -540,7 +560,10 @@ class SteamClient(CMClient, BuiltinBase):
""" """
self._LOG.debug("Attempting Anonymous login") self._LOG.debug("Attempting Anonymous login")
self._pre_login() eresult = self._pre_login()
if eresult != EResult.OK:
return eresult
self.username = None self.username = None
self.login_key = None self.login_key = None

128
steam/core/cm.py

@ -4,10 +4,12 @@ import logging
from gzip import GzipFile from gzip import GzipFile
from time import time from time import time
from collections import defaultdict from collections import defaultdict
from itertools import cycle, count
from io import BytesIO from io import BytesIO
import gevent import gevent
import gevent.socket as socket
from random import shuffle from random import shuffle
from steam.steamid import SteamID from steam.steamid import SteamID
@ -58,6 +60,7 @@ class CMClient(EventEmitter):
PROTOCOL_UDP = 1 #: UDP protocol enum PROTOCOL_UDP = 1 #: UDP protocol enum
verbose_debug = False #: print message connects in debug verbose_debug = False #: print message connects in debug
auto_discovery = True #: enables automatic CM discovery
cm_servers = None #: a instance of :class:`.CMServerList` cm_servers = None #: a instance of :class:`.CMServerList`
current_server_addr = None #: (ip, port) tuple current_server_addr = None #: (ip, port) tuple
_seen_logon = False _seen_logon = False
@ -70,6 +73,7 @@ class CMClient(EventEmitter):
steam_id = SteamID() #: :class:`.SteamID` of the current user steam_id = SteamID() #: :class:`.SteamID` of the current user
session_id = None #: session id when logged in session_id = None #: session id when logged in
cell_id = 0 #: cell id provided by CM
_recv_loop = None _recv_loop = None
_heartbeat_loop = None _heartbeat_loop = None
@ -99,7 +103,7 @@ class CMClient(EventEmitter):
:param retry: number of retries before returning. Unlimited when set to ``None`` :param retry: number of retries before returning. Unlimited when set to ``None``
:type retry: :class:`int` :type retry: :class:`int`
:param delay: delay in secnds before connection attempt :param delay: delay in seconds before connection attempt
:type delay: :class:`int` :type delay: :class:`int`
:return: successful connection :return: successful connection
:rtype: :class:`bool` :rtype: :class:`bool`
@ -119,19 +123,31 @@ class CMClient(EventEmitter):
self._LOG.debug("Connect initiated.") self._LOG.debug("Connect initiated.")
for i, server_addr in enumerate(self.cm_servers): i = count(0)
if retry and i > retry:
while len(self.cm_servers) == 0:
if not self.auto_discovery or (retry and next(i) >= retry):
if not self.auto_discovery:
self._LOG.error("CM server list is empty. Auto discovery is off.")
self._connecting = False
return False
if not self.cm_servers.bootstrap_from_webapi():
self.cm_servers.bootstrap_from_dns()
for i, server_addr in enumerate(cycle(self.cm_servers), start=next(i)-1):
if retry and i >= retry:
self._connecting = False
return False return False
start = time() start = time()
if self.connection.connect(server_addr): if self.connection.connect(server_addr):
break break
self._LOG.debug("Failed to connect. Retrying...")
diff = time() - start diff = time() - start
self._LOG.debug("Failed to connect. Retrying...")
if diff < 5: if diff < 5:
self.sleep(5 - diff) self.sleep(5 - diff)
@ -352,6 +368,7 @@ class CMClient(EventEmitter):
self.steam_id = SteamID(msg.header.steamid) self.steam_id = SteamID(msg.header.steamid)
self.session_id = msg.header.client_sessionid self.session_id = msg.header.client_sessionid
self.cell_id = msg.body.cell_id
if self._heartbeat_loop: if self._heartbeat_loop:
self._heartbeat_loop.kill() self._heartbeat_loop.kill()
@ -368,7 +385,9 @@ class CMClient(EventEmitter):
self._LOG.debug("Updating CM list") self._LOG.debug("Updating CM list")
new_servers = zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports) new_servers = zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)
self.cm_servers.clear()
self.cm_servers.merge_list(new_servers) self.cm_servers.merge_list(new_servers)
self.cm_servers.cell_id = self.cell_id
def sleep(self, seconds): def sleep(self, seconds):
"""Yeild and sleep N seconds. Allows other greenlets to run""" """Yeild and sleep N seconds. Allows other greenlets to run"""
@ -400,14 +419,19 @@ class CMServerList(object):
Good = 1 Good = 1
Bad = 2 Bad = 2
last_updated = 0 #: timestamp of when the list was last updated
cell_id = 0 #: cell id of the server list
bad_timestamp = 300 #: how long bad mark lasts in seconds
def __init__(self, bad_timespan=300): def __init__(self):
self._LOG = logging.getLogger("CMServerList") self._LOG = logging.getLogger("CMServerList")
self.bad_timespan = bad_timespan
self.list = defaultdict(dict) self.list = defaultdict(dict)
self.bootstrap_from_builtin_list() def __len__(self):
return len(self.list)
def __repr__(self):
return "<%s: %d servers>" % (self.__class__.__name__, len(self))
def clear(self): def clear(self):
"""Clears the server list""" """Clears the server list"""
@ -415,40 +439,32 @@ class CMServerList(object):
self._LOG.debug("List cleared.") self._LOG.debug("List cleared.")
self.list.clear() self.list.clear()
def bootstrap_from_builtin_list(self): def bootstrap_from_dns(self):
""" """
Resets the server list to the built in one. Fetches CM server list from WebAPI and replaces the current one
This method is called during initialization.
""" """
self._LOG.debug("Bootstraping from builtin list") self._LOG.debug("Attempting bootstrap via DNS")
self.clear()
# build-in list try:
self.merge_list([ answer = socket.getaddrinfo("cm0.steampowered.com",
('162.254.193.7', 27018), 27017,
('208.78.164.9', 27018), socket.AF_INET,
('208.78.164.11', 27017), proto=socket.IPPROTO_TCP)
('162.254.193.7', 27019), except Exception as exp:
('162.254.193.47', 27017), self._LOG.error("DNS boostrap failed: %s" % str(exp))
('155.133.242.9', 27019), return False
('208.78.164.14', 27018),
('155.133.242.8', 27018), servers = list(map(lambda addr: addr[4], answer))
('162.254.195.45', 27017),
('208.78.164.10', 27018), if servers:
('208.78.164.12', 27017), self.clear()
('208.64.201.176', 27018), self.merge_list(servers)
('146.66.152.10', 27017), return True
('162.254.193.46', 27019), else:
('185.25.180.14', 27017), self._LOG.error("DNS boostrap: cm0.steampowered.com resolved no A records")
('162.254.193.46', 27018), return False
('155.133.242.9', 27017),
('162.254.195.44', 27018), def bootstrap_from_webapi(self, cell_id=0):
('162.254.195.45', 27018),
('208.78.164.9', 27017),
('208.78.164.11', 27019)
])
def bootstrap_from_webapi(self, cellid=0):
""" """
Fetches CM server list from WebAPI and replaces the current one Fetches CM server list from WebAPI and replaces the current one
@ -461,7 +477,8 @@ class CMServerList(object):
from steam import webapi from steam import webapi
try: try:
resp = webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cellid}) resp = webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cell_id,
'http_timeout': 3})
except Exception as exp: except Exception as exp:
self._LOG.error("WebAPI boostrap failed: %s" % str(exp)) self._LOG.error("WebAPI boostrap failed: %s" % str(exp))
return False return False
@ -480,25 +497,32 @@ class CMServerList(object):
return str(ip), int(port) return str(ip), int(port)
self.clear() self.clear()
self.cell_id = cell_id
self.merge_list(map(str_to_tuple, serverlist)) self.merge_list(map(str_to_tuple, serverlist))
return True return True
def __iter__(self): def __iter__(self):
def genfunc(): def cm_server_iter():
while True: if not self.list:
good_servers = list(filter(lambda x: x[1]['quality'] == CMServerList.Good, self.list.items())) self._LOG.error("Server list is empty.")
return
if len(good_servers) == 0: good_servers = list(filter(lambda x: x[1]['quality'] == CMServerList.Good,
self.reset_all() self.list.items()
continue ))
shuffle(good_servers) if len(good_servers) == 0:
self._LOG.debug("No good servers left. Reseting...")
self.reset_all()
return
for server_addr, meta in good_servers: shuffle(good_servers)
yield server_addr
return genfunc() for server_addr, meta in good_servers:
yield server_addr
return cm_server_iter()
def reset_all(self): def reset_all(self):
"""Reset status for all servers in the list""" """Reset status for all servers in the list"""
@ -539,3 +563,5 @@ class CMServerList(object):
if len(self.list) > total: if len(self.list) > total:
self._LOG.debug("Added %d new CM addresses." % (len(self.list) - total)) self._LOG.debug("Added %d new CM addresses." % (len(self.list) - total))
self.last_updated = int(time())

57
tests/test_core_cm.py

@ -45,28 +45,77 @@ class CMClient_Scenarios(unittest.TestCase):
patcher = patch('steam.core.cm.CMServerList', autospec=True) patcher = patch('steam.core.cm.CMServerList', autospec=True)
self.addCleanup(patcher.stop) self.addCleanup(patcher.stop)
self.server_list = patcher.start().return_value self.server_list = patcher.start().return_value
self.server_list.__iter__.return_value = [(127001, 20000+i) for i in range(10)]
self.server_list.__iter__.return_value = [(127001, i+1) for i in range(10)] self.server_list.bootstrap_from_webapi.return_value = False
self.server_list.bootstrap_from_dns.return_value = False
@patch.object(CMClient, 'emit') @patch.object(CMClient, 'emit')
@patch.object(CMClient, '_recv_messages') @patch.object(CMClient, '_recv_messages')
def test_connect(self, mock_recv, mock_emit): def test_connect(self, mock_recv, mock_emit):
# setup # setup
self.conn.connect.return_value = True self.conn.connect.return_value = True
self.server_list.__len__.return_value = 10
# run # run
cm = CMClient() cm = CMClient()
with gevent.Timeout(2, False): with gevent.Timeout(2, False):
cm.connect() cm.connect(retry=1)
gevent.idle() gevent.idle()
# verify # verify
self.conn.connect.assert_called_once_with((127001, 1)) self.conn.connect.assert_called_once_with((127001, 20000))
mock_emit.assert_called_once_with('connected') mock_emit.assert_called_once_with('connected')
mock_recv.assert_called_once_with() mock_recv.assert_called_once_with()
@patch.object(CMClient, 'emit')
@patch.object(CMClient, '_recv_messages')
def test_connect_auto_discovery_failing(self, mock_recv, mock_emit):
# setup
self.conn.connect.return_value = True
self.server_list.__len__.return_value = 0
# run
cm = CMClient()
with gevent.Timeout(3, False):
cm.connect(retry=1)
gevent.idle()
# verify
self.server_list.bootstrap_from_webapi.assert_called_once_with()
self.server_list.bootstrap_from_dns.assert_called_once_with()
self.conn.connect.assert_not_called()
@patch.object(CMClient, 'emit')
@patch.object(CMClient, '_recv_messages')
def test_connect_auto_discovery_success(self, mock_recv, mock_emit):
# setup
self.conn.connect.return_value = True
self.server_list.__len__.return_value = 0
def fake_servers(*args, **kwargs):
self.server_list.__len__.return_value = 10
return True
self.server_list.bootstrap_from_webapi.side_effect = fake_servers
# run
cm = CMClient()
with gevent.Timeout(3, False):
cm.connect(retry=1)
gevent.idle()
# verify
self.server_list.bootstrap_from_webapi.assert_called_once_with()
self.server_list.bootstrap_from_dns.assert_not_called()
self.conn.connect.assert_called_once_with((127001, 20000))
mock_emit.assert_called_once_with('connected')
mock_recv.assert_called_once_with()
def test_channel_encrypt_sequence(self): def test_channel_encrypt_sequence(self):
# setup # setup

Loading…
Cancel
Save