diff --git a/steam/client/__init__.py b/steam/client/__init__.py index 19a93cc..fd34e75 100644 --- a/steam/client/__init__.py +++ b/steam/client/__init__.py @@ -48,7 +48,6 @@ class SteamClient(CMClient, BuiltinBase): """After a new login key is accepted """ - _cm_servers_timestamp = None # used to decide when to update CM list on disk _reconnect_backoff_c = 0 current_jobid = 0 credential_location = None #: location for sentry @@ -88,7 +87,7 @@ class SteamClient(CMClient, BuiltinBase): def connect(self, *args, **kwargs): """Attempt to establish connection, see :meth:`.CMClient.connect`""" self._bootstrap_cm_list_from_file() - CMClient.connect(self, *args, **kwargs) + return CMClient.connect(self, *args, **kwargs) def disconnect(self, *args, **kwargs): """Close connection, see :meth:`.CMClient.disconnect`""" @@ -96,10 +95,13 @@ class SteamClient(CMClient, BuiltinBase): CMClient.disconnect(self, *args, **kwargs) def _bootstrap_cm_list_from_file(self): - if not self.credential_location or self._cm_servers_timestamp is not None: return + if not self.credential_location or self.cm_servers.last_updated > 0: + return filepath = os.path.join(self.credential_location, 'cm_servers.json') - if not os.path.isfile(filepath): return + + if not os.path.isfile(filepath): + return self._LOG.debug("Reading CM servers from %s" % repr(filepath)) @@ -113,29 +115,44 @@ class SteamClient(CMClient, BuiltinBase): else: self.cm_servers.clear() self.cm_servers.merge_list(data['servers']) - self._cm_servers_timestamp = int(data['timestamp']) + self.cm_servers.last_updated = data.get('last_updated', 0) + self.cm_servers.cell_id = data.get('cell_id', 0) def _handle_cm_list(self, msg): - if self._cm_servers_timestamp is None: - self._cm_servers_timestamp = int(time()) + if (self.cm_servers.last_updated + 3600*24 > time() + and self.cm_servers.cell_id != 0): + return - self.cm_servers.clear() - CMClient._handle_cm_list(self, msg) # just merges the list + CMClient._handle_cm_list(self, msg) # clear and merge if self.credential_location: filepath = os.path.join(self.credential_location, 'cm_servers.json') - if not os.path.exists(filepath) or time() - (3600*24) > self._cm_servers_timestamp: - data = { - 'timestamp': self._cm_servers_timestamp, - 'servers': list(zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)), - } + if os.path.exists(filepath): try: - with open(filepath, 'wb') as f: - f.write(json.dumps(data, indent=True).encode('ascii')) - self._LOG.debug("Saved CM servers to %s" % repr(filepath)) + with open(filepath, 'r') as f: + data = json.load(f) + except ValueError: + self._LOG.error("Failed parsing %s", repr(filepath)) except IOError as e: - self._LOG.error("saving %s: %s" % (filepath, str(e))) + self._LOG.error("Failed reading %s (%s)", repr(filepath), str(e)) + else: + if data.get('last_updated', 0) + 3600*24 > time(): + return + + self._LOG.debug("Persisted CM server list is stale") + + data = { + 'cell_id': self.cm_servers.cell_id, + 'last_updated': self.cm_servers.last_updated, + 'servers': list(zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)), + } + try: + with open(filepath, 'wb') as f: + f.write(json.dumps(data, indent=True).encode('ascii')) + self._LOG.debug("Saved CM servers to %s" % repr(filepath)) + except IOError as e: + self._LOG.error("saving %s: %s" % (filepath, str(e))) def _handle_jobs(self, event, *args): if isinstance(event, EMsg): @@ -409,7 +426,8 @@ class SteamClient(CMClient, BuiltinBase): raise RuntimeError("Already logged on") if not self.connected and not self._connecting: - self.connect() + if not self.connect(): + return EResult.Fail if not self.channel_secured: resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10) @@ -418,9 +436,9 @@ class SteamClient(CMClient, BuiltinBase): if resp is None: if self.connected: self.wait_event(self.EVENT_DISCONNECTED) - return False + return EResult.TryAnotherCM - return True + return EResult.OK @property def relogin_available(self): @@ -485,8 +503,10 @@ class SteamClient(CMClient, BuiltinBase): """ self._LOG.debug("Attempting login") - if not self._pre_login(): - return EResult.TryAnotherCM + eresult = self._pre_login() + + if eresult != EResult.OK: + return eresult self.username = username @@ -540,7 +560,10 @@ class SteamClient(CMClient, BuiltinBase): """ self._LOG.debug("Attempting Anonymous login") - self._pre_login() + eresult = self._pre_login() + + if eresult != EResult.OK: + return eresult self.username = None self.login_key = None diff --git a/steam/core/cm.py b/steam/core/cm.py index 7353203..02a7a2d 100644 --- a/steam/core/cm.py +++ b/steam/core/cm.py @@ -4,10 +4,12 @@ import logging from gzip import GzipFile from time import time from collections import defaultdict +from itertools import cycle, count from io import BytesIO import gevent +import gevent.socket as socket from random import shuffle from steam.steamid import SteamID @@ -58,6 +60,7 @@ class CMClient(EventEmitter): PROTOCOL_UDP = 1 #: UDP protocol enum verbose_debug = False #: print message connects in debug + auto_discovery = True #: enables automatic CM discovery cm_servers = None #: a instance of :class:`.CMServerList` current_server_addr = None #: (ip, port) tuple _seen_logon = False @@ -70,6 +73,7 @@ class CMClient(EventEmitter): steam_id = SteamID() #: :class:`.SteamID` of the current user session_id = None #: session id when logged in + cell_id = 0 #: cell id provided by CM _recv_loop = None _heartbeat_loop = None @@ -99,7 +103,7 @@ class CMClient(EventEmitter): :param retry: number of retries before returning. Unlimited when set to ``None`` :type retry: :class:`int` - :param delay: delay in secnds before connection attempt + :param delay: delay in seconds before connection attempt :type delay: :class:`int` :return: successful connection :rtype: :class:`bool` @@ -119,19 +123,31 @@ class CMClient(EventEmitter): self._LOG.debug("Connect initiated.") - for i, server_addr in enumerate(self.cm_servers): - if retry and i > retry: + i = count(0) + + while len(self.cm_servers) == 0: + if not self.auto_discovery or (retry and next(i) >= retry): + if not self.auto_discovery: + self._LOG.error("CM server list is empty. Auto discovery is off.") + self._connecting = False + return False + + if not self.cm_servers.bootstrap_from_webapi(): + self.cm_servers.bootstrap_from_dns() + + for i, server_addr in enumerate(cycle(self.cm_servers), start=next(i)-1): + if retry and i >= retry: + self._connecting = False return False start = time() if self.connection.connect(server_addr): break + self._LOG.debug("Failed to connect. Retrying...") diff = time() - start - self._LOG.debug("Failed to connect. Retrying...") - if diff < 5: self.sleep(5 - diff) @@ -352,6 +368,7 @@ class CMClient(EventEmitter): self.steam_id = SteamID(msg.header.steamid) self.session_id = msg.header.client_sessionid + self.cell_id = msg.body.cell_id if self._heartbeat_loop: self._heartbeat_loop.kill() @@ -368,7 +385,9 @@ class CMClient(EventEmitter): self._LOG.debug("Updating CM list") new_servers = zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports) + self.cm_servers.clear() self.cm_servers.merge_list(new_servers) + self.cm_servers.cell_id = self.cell_id def sleep(self, seconds): """Yeild and sleep N seconds. Allows other greenlets to run""" @@ -400,14 +419,19 @@ class CMServerList(object): Good = 1 Bad = 2 + last_updated = 0 #: timestamp of when the list was last updated + cell_id = 0 #: cell id of the server list + bad_timestamp = 300 #: how long bad mark lasts in seconds - def __init__(self, bad_timespan=300): + def __init__(self): self._LOG = logging.getLogger("CMServerList") - - self.bad_timespan = bad_timespan self.list = defaultdict(dict) - self.bootstrap_from_builtin_list() + def __len__(self): + return len(self.list) + + def __repr__(self): + return "<%s: %d servers>" % (self.__class__.__name__, len(self)) def clear(self): """Clears the server list""" @@ -415,40 +439,32 @@ class CMServerList(object): self._LOG.debug("List cleared.") self.list.clear() - def bootstrap_from_builtin_list(self): + def bootstrap_from_dns(self): """ - Resets the server list to the built in one. - This method is called during initialization. + Fetches CM server list from WebAPI and replaces the current one """ - self._LOG.debug("Bootstraping from builtin list") - self.clear() + self._LOG.debug("Attempting bootstrap via DNS") - # build-in list - self.merge_list([ - ('162.254.193.7', 27018), - ('208.78.164.9', 27018), - ('208.78.164.11', 27017), - ('162.254.193.7', 27019), - ('162.254.193.47', 27017), - ('155.133.242.9', 27019), - ('208.78.164.14', 27018), - ('155.133.242.8', 27018), - ('162.254.195.45', 27017), - ('208.78.164.10', 27018), - ('208.78.164.12', 27017), - ('208.64.201.176', 27018), - ('146.66.152.10', 27017), - ('162.254.193.46', 27019), - ('185.25.180.14', 27017), - ('162.254.193.46', 27018), - ('155.133.242.9', 27017), - ('162.254.195.44', 27018), - ('162.254.195.45', 27018), - ('208.78.164.9', 27017), - ('208.78.164.11', 27019) - ]) - - def bootstrap_from_webapi(self, cellid=0): + try: + answer = socket.getaddrinfo("cm0.steampowered.com", + 27017, + socket.AF_INET, + proto=socket.IPPROTO_TCP) + except Exception as exp: + self._LOG.error("DNS boostrap failed: %s" % str(exp)) + return False + + servers = list(map(lambda addr: addr[4], answer)) + + if servers: + self.clear() + self.merge_list(servers) + return True + else: + self._LOG.error("DNS boostrap: cm0.steampowered.com resolved no A records") + return False + + def bootstrap_from_webapi(self, cell_id=0): """ Fetches CM server list from WebAPI and replaces the current one @@ -461,7 +477,8 @@ class CMServerList(object): from steam import webapi try: - resp = webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cellid}) + resp = webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cell_id, + 'http_timeout': 3}) except Exception as exp: self._LOG.error("WebAPI boostrap failed: %s" % str(exp)) return False @@ -480,25 +497,32 @@ class CMServerList(object): return str(ip), int(port) self.clear() + self.cell_id = cell_id self.merge_list(map(str_to_tuple, serverlist)) return True def __iter__(self): - def genfunc(): - while True: - good_servers = list(filter(lambda x: x[1]['quality'] == CMServerList.Good, self.list.items())) + def cm_server_iter(): + if not self.list: + self._LOG.error("Server list is empty.") + return - if len(good_servers) == 0: - self.reset_all() - continue + good_servers = list(filter(lambda x: x[1]['quality'] == CMServerList.Good, + self.list.items() + )) - shuffle(good_servers) + if len(good_servers) == 0: + self._LOG.debug("No good servers left. Reseting...") + self.reset_all() + return - for server_addr, meta in good_servers: - yield server_addr + shuffle(good_servers) - return genfunc() + for server_addr, meta in good_servers: + yield server_addr + + return cm_server_iter() def reset_all(self): """Reset status for all servers in the list""" @@ -539,3 +563,5 @@ class CMServerList(object): if len(self.list) > total: self._LOG.debug("Added %d new CM addresses." % (len(self.list) - total)) + + self.last_updated = int(time()) diff --git a/tests/test_core_cm.py b/tests/test_core_cm.py index ad374a8..b2c8413 100644 --- a/tests/test_core_cm.py +++ b/tests/test_core_cm.py @@ -45,28 +45,77 @@ class CMClient_Scenarios(unittest.TestCase): patcher = patch('steam.core.cm.CMServerList', autospec=True) self.addCleanup(patcher.stop) self.server_list = patcher.start().return_value - - self.server_list.__iter__.return_value = [(127001, i+1) for i in range(10)] + self.server_list.__iter__.return_value = [(127001, 20000+i) for i in range(10)] + self.server_list.bootstrap_from_webapi.return_value = False + self.server_list.bootstrap_from_dns.return_value = False @patch.object(CMClient, 'emit') @patch.object(CMClient, '_recv_messages') def test_connect(self, mock_recv, mock_emit): # setup self.conn.connect.return_value = True + self.server_list.__len__.return_value = 10 # run cm = CMClient() with gevent.Timeout(2, False): - cm.connect() + cm.connect(retry=1) gevent.idle() # verify - self.conn.connect.assert_called_once_with((127001, 1)) + self.conn.connect.assert_called_once_with((127001, 20000)) mock_emit.assert_called_once_with('connected') mock_recv.assert_called_once_with() + @patch.object(CMClient, 'emit') + @patch.object(CMClient, '_recv_messages') + def test_connect_auto_discovery_failing(self, mock_recv, mock_emit): + # setup + self.conn.connect.return_value = True + self.server_list.__len__.return_value = 0 + + # run + cm = CMClient() + + with gevent.Timeout(3, False): + cm.connect(retry=1) + + gevent.idle() + + # verify + self.server_list.bootstrap_from_webapi.assert_called_once_with() + self.server_list.bootstrap_from_dns.assert_called_once_with() + self.conn.connect.assert_not_called() + + @patch.object(CMClient, 'emit') + @patch.object(CMClient, '_recv_messages') + def test_connect_auto_discovery_success(self, mock_recv, mock_emit): + # setup + self.conn.connect.return_value = True + self.server_list.__len__.return_value = 0 + + def fake_servers(*args, **kwargs): + self.server_list.__len__.return_value = 10 + return True + + self.server_list.bootstrap_from_webapi.side_effect = fake_servers + + # run + cm = CMClient() + + with gevent.Timeout(3, False): + cm.connect(retry=1) + + gevent.idle() + + # verify + self.server_list.bootstrap_from_webapi.assert_called_once_with() + self.server_list.bootstrap_from_dns.assert_not_called() + self.conn.connect.assert_called_once_with((127001, 20000)) + mock_emit.assert_called_once_with('connected') + mock_recv.assert_called_once_with() def test_channel_encrypt_sequence(self): # setup