From 584e9df9c321db750c4cba2f504be02906b06b15 Mon Sep 17 00:00:00 2001 From: Rossen Georgiev Date: Wed, 30 Dec 2015 18:05:14 +0000 Subject: [PATCH] implemented EventEmitter; used in CMClient --- steam/core/cm.py | 85 ++++++++++-------------------------------- steam/util/__init__.py | 0 steam/util/events.py | 67 +++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 65 deletions(-) create mode 100644 steam/util/__init__.py create mode 100644 steam/util/events.py diff --git a/steam/core/cm.py b/steam/core/cm.py index 37e395e..5050383 100644 --- a/steam/core/cm.py +++ b/steam/core/cm.py @@ -20,6 +20,7 @@ from steam.core import crypto from steam.core.connection import TCPConnection from steam.core.msg import is_proto, clear_proto_bit from steam.core.msg import Msg, MsgProto +from steam.util.events import EventEmitter server_list = [ ('162.254.196.41', '27020'), ('162.254.196.40', '27021'), @@ -39,7 +40,7 @@ server_list = [ logger = logging.getLogger("CMClient") -class CMClient: +class CMClient(EventEmitter): TCP = 0 UDP = 1 @@ -52,18 +53,16 @@ class CMClient: if protocol == CMClient.TCP: self.connection = TCPConnection() - # elif protocol == CMClient.UDP: - # self.connection = UDPConnection() else: raise ValueError("Only TCP is supported") - self.event_connected = event.Event() - self.event_ready = event.Event() - self.event_disconnected = event.Event() + self.on(EMsg.ChannelEncryptRequest, self._handle_encrypt_request), + self.on(EMsg.Multi, self._handle_multi), + self.on(EMsg.ClientLogOnResponse, self._handle_logon), - self.register_callback(EMsg.ChannelEncryptRequest, self._handle_encrypt_request), - self.register_callback(EMsg.Multi, self._handle_multi), - self.register_callback(EMsg.ClientLogOnResponse, self._handle_logon), + def emit(self, event, *args): + logger.debug("Emit event: %s" % str(event)) + super(CMClient, self).emit(event, *args) def connect(self): logger.debug("Connect initiated.") @@ -79,8 +78,7 @@ class CMClient: break - logger.debug("Event: Connected") - self.event_connected.set() + self.emit("connected") self._recv_loop = gevent.spawn(self._recv_messages) def disconnect(self): @@ -92,12 +90,7 @@ class CMClient: self._heartbeat_loop.kill() self._init_attributes() - - self.event_connected.clear() - self.event_ready.clear() - self.event_disconnected.set() - - logger.debug("Event: Disconnected") + self.emit('disconnected') def _init_attributes(self): self.key = None @@ -168,50 +161,14 @@ class CMClient: ) raise - gevent.spawn(self.dispatch_message, emsg, msg) - - def dispatch_message(self, emsg, msg): - if self.verbose_debug: - logger.debug("Incoming: %s\n%s" % (repr(msg), str(msg))) - else: - logger.debug("Incoming: %s", repr(msg)) - - - if emsg in self.registered_callbacks: - for callback in list(self.registered_callbacks[emsg]): - if isinstance(callback, event.AsyncResult): - self.unregister_callback(emsg, callback) - callback.set((emsg, msg)) - else: - callback(emsg, msg) - - def register_callback(self, emsg, callback): - if emsg not in self.registered_callbacks: - self.registered_callbacks[emsg] = [callback] - else: - allbacks = self.registered_callbacks[emsg] - - if callback not in callbacks: - callbacks.append(callback) - - def unregister_callback(self, emsg, callback): - if (emsg not in self.registered_callbacks - or callback not in self.registered_callbacks[emsg]): - return ValueError("Callback is not registered") - - callbacks = self.registered_callbacks[emsg] - - if len(callbacks) == 1: - self.registered_callbacks.pop(emsg) - else: - callbacks.pop(callbacks.index(callback)) + if self.verbose_debug: + logger.debug("Incoming: %s\n%s" % (repr(msg), str(msg))) + else: + logger.debug("Incoming: %s", repr(msg)) - def wait_for_message(self, emsg, block=True, timeout=None): - result = event.AsyncResult() - self.register_callback(emsg, result) - return result.get(block, timeout)[1] + self.emit(emsg, msg) - def _handle_encrypt_request(self, emsg, msg): + def _handle_encrypt_request(self, msg): logger.debug("Securing channel") if msg.body.protocolVersion != 1: @@ -226,7 +183,7 @@ class CMClient: self.send_message(resp) - msg = self.wait_for_message(EMsg.ChannelEncryptResult) + msg = self.wait_event(EMsg.ChannelEncryptResult) if msg.body.result != EResult.OK: logger.debug("Failed to secure channel: %s" % msg.body.result) @@ -236,11 +193,9 @@ class CMClient: logger.debug("Channel secured") self.key = key - self.event_ready.set() - - logger.debug("Event: Ready") + self.emit('ready') - def _handle_multi(self, emsg, msg): + def _handle_multi(self, msg): logger.debug("Unpacking CMsgMulti") if msg.body.size_unzipped: @@ -267,7 +222,7 @@ class CMClient: gevent.sleep(interval) self.send_message(message) - def _handle_logon(self, emsg, msg): + def _handle_logon(self, msg): result = msg.body.eresult if result != EResult.OK: self.disconnect() diff --git a/steam/util/__init__.py b/steam/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/steam/util/events.py b/steam/util/events.py new file mode 100644 index 0000000..c0faacf --- /dev/null +++ b/steam/util/events.py @@ -0,0 +1,67 @@ +from collections import defaultdict +import gevent +from gevent.event import AsyncResult + + +class EventEmitter(object): + """ + Implements event emitter using gevent library + """ + + def emit(self, event, *args): + """ + Emit event with some arguments + """ + + if not hasattr(self, '_event_callbacks'): + return + + for callback in list(self._event_callbacks[event]): + if isinstance(callback, AsyncResult): + self.remove_listener(event, callback) + + if len(args) == 1: + args = args[0] + + callback.set(args) + else: + gevent.spawn(callback, *args) + + def remove_listener(self, event, callback): + """ + Removes a callback for the specified event + """ + + if not hasattr(self, '_event_callbacks'): + return + + self._event_callbacks[event].pop(callback, None) + + def wait_event(self, event, timeout=None): + """ + Blocks until an event and returns the results + """ + result = AsyncResult() + self.on(event, result) + return result.get(True, timeout) + + def on(self, event, callback=None): + """ + Registers a callback for the specified event + + Can be as function decorator if only event is specified. + """ + + if not hasattr(self, '_event_callbacks'): + self._event_callbacks = defaultdict(dict) + + # when used function + if callback: + self._event_callbacks[event][callback] = None + return + + # as decorator + def wrapper(callback): + self._event_callbacks[event][callback] = None + return callback + return wrapper