Browse Source

reworked job handling

pull/18/merge
Rossen Georgiev 9 years ago
parent
commit
d7face3089
  1. 40
      steam/client/__init__.py
  2. 6
      steam/client/features/__init__.py
  3. 76
      steam/client/jobs.py

40
steam/client/__init__.py

@ -6,25 +6,26 @@ from steam.enums import EResult
from steam.core.msg import MsgProto from steam.core.msg import MsgProto
from steam.core.cm import CMClient from steam.core.cm import CMClient
from steam import SteamID from steam import SteamID
from steam.client.jobs import JobManager from steam.client.features import FeatureBase
logger = logging.getLogger("SteamClient") logger = logging.getLogger("SteamClient")
class SteamClient(EventEmitter): class SteamClient(EventEmitter, FeatureBase):
current_jobid = 0
def __init__(self): def __init__(self):
self.cm = CMClient() self.cm = CMClient()
self.job = JobManager(self)
# re-emit all events from CMClient
self.cm.on(None, self.emit)
# register listners # register listners
self.cm.on(None, self._handle_cm_events)
self.on(EMsg.ClientLogOnResponse, self._handle_logon) self.on(EMsg.ClientLogOnResponse, self._handle_logon)
self.on("disconnected", self._handle_disconnect) self.on("disconnected", self._handle_disconnect)
self.logged_on = False self.logged_on = False
super(SteamClient, self).__init__()
def __repr__(self): def __repr__(self):
return "<%s() %s>" % (self.__class__.__name__, return "<%s() %s>" % (self.__class__.__name__,
'online' if self.connected else 'offline', 'online' if self.connected else 'offline',
@ -49,8 +50,23 @@ class SteamClient(EventEmitter):
def disconnect(self): def disconnect(self):
self.cm.disconnect() self.cm.disconnect()
def _handle_cm_events(self, event, *args):
self.emit(event, *args)
if isinstance(event, EMsg):
message = args[0]
if message.proto:
jobid = message.header.jobid_target
else:
jobid = message.header.targetJobID
if jobid not in (-1, 18446744073709551615):
self.emit("job_%d" % jobid, *args)
def _handle_disconnect(self): def _handle_disconnect(self):
self.logged_on = False self.logged_on = False
self.current_jobid = 0
def _handle_logon(self, msg): def _handle_logon(self, msg):
result = EResult(msg.body.eresult) result = EResult(msg.body.eresult)
@ -77,6 +93,18 @@ class SteamClient(EventEmitter):
self.cm.send_message(message) self.cm.send_message(message)
def send_job(self, message):
jobid = self.current_jobid = (self.current_jobid + 1) % 4294967295
if message.proto:
message.header.jobid_source = jobid
else:
message.header.sourceJobID = jobid
self.send(message)
return "job_%d" % jobid
def _pre_login(self): def _pre_login(self):
if self.logged_on: if self.logged_on:
logger.debug("Trying to login while logged on???") logger.debug("Trying to login while logged on???")

6
steam/client/features/__init__.py

@ -0,0 +1,6 @@
class FeatureBase(object):
"""
This object is used as base to implement all high level functionality.
The features are seperated into submodules.
"""
pass

76
steam/client/jobs.py

@ -1,76 +0,0 @@
import logging
from gevent import event
from eventemitter import EventEmitter
from steam.enums.emsg import EMsg
logger = logging.getLogger("JobManager")
class JobManager(EventEmitter):
"""
Takes care of managing job messages
"""
_max_ulong = (2**64)-2
def __init__(self, client):
"""
Takes an instance of SteamClient
"""
self._client = client
self._client.on(None, self._handle_event)
self._jobid = 0
self._jobs = {}
def _handle_event(self, event, *args):
if len(args) != 1:
return
message = args[0]
if not isinstance(event, EMsg) and not self._jobs:
return
if message.proto:
jobid = message.header.jobid_target
else:
jobid = message.header.targetJobID
if jobid in self._jobs:
logger.debug("Response for job: %d, %s" % (jobid, repr(message)))
self._jobs.pop(jobid).set(message)
def get_jobid(self):
"""
Returns the next job id
"""
self._jobid = (self._jobid + 1) % self._max_ulong
return self._jobid
def send(self, message):
"""
Sends a message as job, and returns the job id
"""
jobid = self.get_jobid()
logger.debug("Sending job: %d, %s" % (jobid, repr(message)))
if message.proto:
message.header.jobid_source = jobid
else:
message.header.sourceJobID = jobid
self._jobs[jobid] = event.AsyncResult()
self._client.send(message)
return jobid
def wait_for(self, jobid, timeout=None):
"""
Blocks waiting for specified job id
"""
if jobid not in self._jobs:
raise ValueError("Specified jobid doen't exist. Did you call send() to get one?")
return self._jobs[jobid].get(True, timeout)
Loading…
Cancel
Save