Browse Source

final CMServerList refactor

* rework SteamClient CM server persistance logic
* move discovery logic from CMServerList to CMClient
* propagate connect() return through all login methods
pull/168/head
Rossen Georgiev 6 years ago
parent
commit
d7e819f02a
  1. 44
      steam/client/__init__.py
  2. 82
      steam/core/cm.py

44
steam/client/__init__.py

@ -48,7 +48,6 @@ class SteamClient(CMClient, BuiltinBase):
"""After a new login key is accepted """After a new login key is accepted
""" """
_cm_servers_timestamp = None # used to decide when to update CM list on disk
_reconnect_backoff_c = 0 _reconnect_backoff_c = 0
current_jobid = 0 current_jobid = 0
credential_location = None #: location for sentry credential_location = None #: location for sentry
@ -88,7 +87,7 @@ class SteamClient(CMClient, BuiltinBase):
def connect(self, *args, **kwargs): def connect(self, *args, **kwargs):
"""Attempt to establish connection, see :meth:`.CMClient.connect`""" """Attempt to establish connection, see :meth:`.CMClient.connect`"""
self._bootstrap_cm_list_from_file() self._bootstrap_cm_list_from_file()
CMClient.connect(self, *args, **kwargs) return CMClient.connect(self, *args, **kwargs)
def disconnect(self, *args, **kwargs): def disconnect(self, *args, **kwargs):
"""Close connection, see :meth:`.CMClient.disconnect`""" """Close connection, see :meth:`.CMClient.disconnect`"""
@ -96,10 +95,13 @@ class SteamClient(CMClient, BuiltinBase):
CMClient.disconnect(self, *args, **kwargs) CMClient.disconnect(self, *args, **kwargs)
def _bootstrap_cm_list_from_file(self): def _bootstrap_cm_list_from_file(self):
if not self.credential_location or self._cm_servers_timestamp is not None: return if not self.credential_location or self.cm_servers.last_updated > 0:
return
filepath = os.path.join(self.credential_location, 'cm_servers.json') filepath = os.path.join(self.credential_location, 'cm_servers.json')
if not os.path.isfile(filepath): return
if not os.path.isfile(filepath):
return
self._LOG.debug("Reading CM servers from %s" % repr(filepath)) self._LOG.debug("Reading CM servers from %s" % repr(filepath))
@ -113,21 +115,23 @@ class SteamClient(CMClient, BuiltinBase):
else: else:
self.cm_servers.clear() self.cm_servers.clear()
self.cm_servers.merge_list(data['servers']) self.cm_servers.merge_list(data['servers'])
self._cm_servers_timestamp = int(data['timestamp']) self.cm_servers.last_updated = data.get('last_updated', 0)
self.cm_servers.cell_id = data.get('cell_id', 0)
def _handle_cm_list(self, msg): def _handle_cm_list(self, msg):
if self._cm_servers_timestamp is None: if (self.cm_servers.last_updated + 3600*24 > time()
self._cm_servers_timestamp = int(time()) and self.cm_servers.cell_id != 0):
return
self.cm_servers.clear() CMClient._handle_cm_list(self, msg) # clear and merge
CMClient._handle_cm_list(self, msg) # just merges the list
if self.credential_location: if self.credential_location:
filepath = os.path.join(self.credential_location, 'cm_servers.json') filepath = os.path.join(self.credential_location, 'cm_servers.json')
if not os.path.exists(filepath) or time() - (3600*24) > self._cm_servers_timestamp: if not os.path.exists(filepath):
data = { data = {
'timestamp': self._cm_servers_timestamp, 'cell_id': self.cm_servers.cell_id,
'last_updated': self.cm_servers.last_updated,
'servers': list(zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)), 'servers': list(zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)),
} }
try: try:
@ -409,7 +413,8 @@ class SteamClient(CMClient, BuiltinBase):
raise RuntimeError("Already logged on") raise RuntimeError("Already logged on")
if not self.connected and not self._connecting: if not self.connected and not self._connecting:
self.connect() if not self.connect():
return EResult.Fail
if not self.channel_secured: if not self.channel_secured:
resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10) resp = self.wait_event(self.EVENT_CHANNEL_SECURED, timeout=10)
@ -418,9 +423,9 @@ class SteamClient(CMClient, BuiltinBase):
if resp is None: if resp is None:
if self.connected: if self.connected:
self.wait_event(self.EVENT_DISCONNECTED) self.wait_event(self.EVENT_DISCONNECTED)
return False return EResult.TryAnotherCM
return True return EResult.OK
@property @property
def relogin_available(self): def relogin_available(self):
@ -485,8 +490,10 @@ class SteamClient(CMClient, BuiltinBase):
""" """
self._LOG.debug("Attempting login") self._LOG.debug("Attempting login")
if not self._pre_login(): eresult = self._pre_login()
return EResult.TryAnotherCM
if eresult != EResult.OK:
return eresult
self.username = username self.username = username
@ -540,7 +547,10 @@ class SteamClient(CMClient, BuiltinBase):
""" """
self._LOG.debug("Attempting Anonymous login") self._LOG.debug("Attempting Anonymous login")
self._pre_login() eresult = self._pre_login()
if eresult != EResult.OK:
return eresult
self.username = None self.username = None
self.login_key = None self.login_key = None

82
steam/core/cm.py

@ -4,11 +4,12 @@ import logging
from gzip import GzipFile from gzip import GzipFile
from time import time from time import time
from collections import defaultdict from collections import defaultdict
from itertools import cycle, count
from io import BytesIO from io import BytesIO
import socket
import gevent import gevent
import gevent.socket as socket
from random import shuffle from random import shuffle
from steam.steamid import SteamID from steam.steamid import SteamID
@ -59,6 +60,7 @@ class CMClient(EventEmitter):
PROTOCOL_UDP = 1 #: UDP protocol enum PROTOCOL_UDP = 1 #: UDP protocol enum
verbose_debug = False #: print message connects in debug verbose_debug = False #: print message connects in debug
auto_discovery = True #: enables automatic CM discovery
cm_servers = None #: a instance of :class:`.CMServerList` cm_servers = None #: a instance of :class:`.CMServerList`
current_server_addr = None #: (ip, port) tuple current_server_addr = None #: (ip, port) tuple
_seen_logon = False _seen_logon = False
@ -71,6 +73,7 @@ class CMClient(EventEmitter):
steam_id = SteamID() #: :class:`.SteamID` of the current user steam_id = SteamID() #: :class:`.SteamID` of the current user
session_id = None #: session id when logged in session_id = None #: session id when logged in
cell_id = 0 #: cell id provided by CM
_recv_loop = None _recv_loop = None
_heartbeat_loop = None _heartbeat_loop = None
@ -120,12 +123,20 @@ class CMClient(EventEmitter):
self._LOG.debug("Connect initiated.") self._LOG.debug("Connect initiated.")
if len(self.cm_servers) == 0 and not self.cm_servers.auto_discovery: i = count()
self._LOG.error("CM server list is empty.")
self._connecting = False while len(self.cm_servers) == 0:
return False if self.auto_discovery:
if not self.cm_servers.bootstrap_from_webapi():
self.cm_servers.bootstrap_from_dns()
else:
self._LOG.error("CM server list is empty. Auto discovery is off.")
for i, server_addr in enumerate(self.cm_servers): if not self.auto_discovery or (retry and next(i) > retry):
self._connecting = False
return False
for i, server_addr in enumerate(cycle(self.cm_servers)):
if retry and i > retry: if retry and i > retry:
self._connecting = False self._connecting = False
return False return False
@ -361,6 +372,7 @@ class CMClient(EventEmitter):
self.steam_id = SteamID(msg.header.steamid) self.steam_id = SteamID(msg.header.steamid)
self.session_id = msg.header.client_sessionid self.session_id = msg.header.client_sessionid
self.cell_id = msg.body.cell_id
if self._heartbeat_loop: if self._heartbeat_loop:
self._heartbeat_loop.kill() self._heartbeat_loop.kill()
@ -377,7 +389,9 @@ class CMClient(EventEmitter):
self._LOG.debug("Updating CM list") self._LOG.debug("Updating CM list")
new_servers = zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports) new_servers = zip(map(ip_from_int, msg.body.cm_addresses), msg.body.cm_ports)
self.cm_servers.clear()
self.cm_servers.merge_list(new_servers) self.cm_servers.merge_list(new_servers)
self.cm_servers.cell_id = self.cell_id
def sleep(self, seconds): def sleep(self, seconds):
"""Yeild and sleep N seconds. Allows other greenlets to run""" """Yeild and sleep N seconds. Allows other greenlets to run"""
@ -409,12 +423,12 @@ class CMServerList(object):
Good = 1 Good = 1
Bad = 2 Bad = 2
auto_discovery = True #: whether to automatically try to bootstrap CM server list last_updated = 0 #: timestamp of when the list was last updated
cell_id = 0 #: cell id of the server list
bad_timestamp = 300 #: how long bad mark lasts in seconds
def __init__(self, bad_timespan=300): def __init__(self):
self._LOG = logging.getLogger("CMServerList") self._LOG = logging.getLogger("CMServerList")
self.bad_timespan = bad_timespan
self.list = defaultdict(dict) self.list = defaultdict(dict)
def __len__(self): def __len__(self):
@ -454,7 +468,7 @@ class CMServerList(object):
self._LOG.error("DNS boostrap: cm0.steampowered.com resolved no A records") self._LOG.error("DNS boostrap: cm0.steampowered.com resolved no A records")
return False return False
def bootstrap_from_webapi(self, cellid=0): def bootstrap_from_webapi(self, cell_id=0):
""" """
Fetches CM server list from WebAPI and replaces the current one Fetches CM server list from WebAPI and replaces the current one
@ -467,7 +481,7 @@ class CMServerList(object):
from steam import webapi from steam import webapi
try: try:
resp = webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cellid, resp = webapi.get('ISteamDirectory', 'GetCMList', 1, params={'cellid': cell_id,
'http_timeout': 3}) 'http_timeout': 3})
except Exception as exp: except Exception as exp:
self._LOG.error("WebAPI boostrap failed: %s" % str(exp)) self._LOG.error("WebAPI boostrap failed: %s" % str(exp))
@ -487,36 +501,30 @@ class CMServerList(object):
return str(ip), int(port) return str(ip), int(port)
self.clear() self.clear()
self.cell_id = cell_id
self.merge_list(map(str_to_tuple, serverlist)) self.merge_list(map(str_to_tuple, serverlist))
return True return True
def __iter__(self): def __iter__(self):
def cm_server_iter(): def cm_server_iter():
while True: if not self.list:
if self.auto_discovery: self._LOG.error("Server list is empty.")
if not self.list: return
self.bootstrap_from_webapi()
if not self.list: good_servers = list(filter(lambda x: x[1]['quality'] == CMServerList.Good,
self.bootstrap_from_dns() self.list.items()
if not self.list: ))
yield None
elif not self.list: if len(good_servers) == 0:
self._LOG.error("Server list is empty.") self._LOG.debug("No good servers left. Reseting...")
return self.reset_all()
return
good_servers = list(filter(lambda x: x[1]['quality'] == CMServerList.Good,
self.list.items() shuffle(good_servers)
))
for server_addr, meta in good_servers:
if len(good_servers) == 0: yield server_addr
self.reset_all()
continue
shuffle(good_servers)
for server_addr, meta in good_servers:
yield server_addr
return cm_server_iter() return cm_server_iter()
@ -559,3 +567,5 @@ class CMServerList(object):
if len(self.list) > total: if len(self.list) > total:
self._LOG.debug("Added %d new CM addresses." % (len(self.list) - total)) self._LOG.debug("Added %d new CM addresses." % (len(self.list) - total))
self.last_updated = int(time())

Loading…
Cancel
Save