From cc0c01aa06aa141a70331c44bb86a3f77a55db81 Mon Sep 17 00:00:00 2001 From: Rossen Georgiev Date: Sat, 16 Jan 2016 07:28:20 +0000 Subject: [PATCH] implemented JobManager --- steam/client/__init__.py | 3 ++ steam/client/jobs.py | 76 ++++++++++++++++++++++++++++++++++++++++ steam/core/msg.py | 2 ++ 3 files changed, 81 insertions(+) create mode 100644 steam/client/jobs.py diff --git a/steam/client/__init__.py b/steam/client/__init__.py index b37cac6..d744fb7 100644 --- a/steam/client/__init__.py +++ b/steam/client/__init__.py @@ -6,12 +6,15 @@ from steam.enums import EResult from steam.core.msg import MsgProto from steam.core.cm import CMClient from steam import SteamID +from steam.client.jobs import JobManager logger = logging.getLogger("SteamClient") + class SteamClient(EventEmitter): def __init__(self): self.cm = CMClient() + self.job = JobManager(self) # re-emit all events from CMClient self.cm.on(None, self.emit) diff --git a/steam/client/jobs.py b/steam/client/jobs.py new file mode 100644 index 0000000..1dc91a6 --- /dev/null +++ b/steam/client/jobs.py @@ -0,0 +1,76 @@ +import logging +from gevent import event +from steam.util.events import EventEmitter +from steam.enums.emsg import EMsg + +logger = logging.getLogger("JobManager") + + +class JobManager(EventEmitter): + """ + Takes care of managing job messages + """ + + _max_ulong = (2**64)-2 + + def __init__(self, client): + """ + Takes an instance of SteamClient + """ + + self._client = client + self._client.on(None, self._handle_event) + self._jobid = 0 + self._jobs = {} + + def _handle_event(self, event, *args): + if len(args) != 1: + return + + message = args[0] + + if not isinstance(event, EMsg) and not self._jobs: + return + + if message.proto: + jobid = message.header.jobid_target + else: + jobid = message.header.targetJobID + + if jobid in self._jobs: + logger.debug("Response for job: %d, %s" % (jobid, repr(message))) + self._jobs.pop(jobid).set(message) + + def get_jobid(self): + """ + Returns the next job id + """ + self._jobid = (self._jobid + 1) % self._max_ulong + return self._jobid + + def send(self, message): + """ + Sends a message as job, and returns the job id + """ + jobid = self.get_jobid() + + logger.debug("Sending job: %d, %s" % (jobid, repr(message))) + + if message.proto: + message.header.jobid_source = jobid + else: + message.header.sourceJobID = jobid + + self._jobs[jobid] = event.AsyncResult() + self._client.send(message) + return jobid + + def wait_for(self, jobid, timeout=None): + """ + Blocks waiting for specified job id + """ + + if jobid not in self._jobs: + raise ValueError("Specified jobid doen't exist. Did you call send() to get one?") + + return self._jobs[jobid].get(True, timeout) diff --git a/steam/core/msg.py b/steam/core/msg.py index 0531770..88eb562 100644 --- a/steam/core/msg.py +++ b/steam/core/msg.py @@ -124,6 +124,7 @@ class MsgHdrProtoBuf: class Msg(object): def __init__(self, msg, data=None, extended=False): + self.proto = False self.extended = extended self.header = ExtendedMsgHdr(data) if extended else MsgHdr(data) self.header.msg = msg @@ -229,6 +230,7 @@ def get_cmsg(emsg): class MsgProto(object): def __init__(self, msg, data=None): + self.proto = True self._header = MsgHdrProtoBuf(data) self._header.msg = msg