Browse Source

implemented EventEmitter; used in CMClient

pull/18/merge
Rossen Georgiev 9 years ago
parent
commit
584e9df9c3
  1. 85
      steam/core/cm.py
  2. 0
      steam/util/__init__.py
  3. 67
      steam/util/events.py

85
steam/core/cm.py

@ -20,6 +20,7 @@ from steam.core import crypto
from steam.core.connection import TCPConnection from steam.core.connection import TCPConnection
from steam.core.msg import is_proto, clear_proto_bit from steam.core.msg import is_proto, clear_proto_bit
from steam.core.msg import Msg, MsgProto from steam.core.msg import Msg, MsgProto
from steam.util.events import EventEmitter
server_list = [ server_list = [
('162.254.196.41', '27020'), ('162.254.196.40', '27021'), ('162.254.196.41', '27020'), ('162.254.196.40', '27021'),
@ -39,7 +40,7 @@ server_list = [
logger = logging.getLogger("CMClient") logger = logging.getLogger("CMClient")
class CMClient: class CMClient(EventEmitter):
TCP = 0 TCP = 0
UDP = 1 UDP = 1
@ -52,18 +53,16 @@ class CMClient:
if protocol == CMClient.TCP: if protocol == CMClient.TCP:
self.connection = TCPConnection() self.connection = TCPConnection()
# elif protocol == CMClient.UDP:
# self.connection = UDPConnection()
else: else:
raise ValueError("Only TCP is supported") raise ValueError("Only TCP is supported")
self.event_connected = event.Event() self.on(EMsg.ChannelEncryptRequest, self._handle_encrypt_request),
self.event_ready = event.Event() self.on(EMsg.Multi, self._handle_multi),
self.event_disconnected = event.Event() self.on(EMsg.ClientLogOnResponse, self._handle_logon),
self.register_callback(EMsg.ChannelEncryptRequest, self._handle_encrypt_request), def emit(self, event, *args):
self.register_callback(EMsg.Multi, self._handle_multi), logger.debug("Emit event: %s" % str(event))
self.register_callback(EMsg.ClientLogOnResponse, self._handle_logon), super(CMClient, self).emit(event, *args)
def connect(self): def connect(self):
logger.debug("Connect initiated.") logger.debug("Connect initiated.")
@ -79,8 +78,7 @@ class CMClient:
break break
logger.debug("Event: Connected") self.emit("connected")
self.event_connected.set()
self._recv_loop = gevent.spawn(self._recv_messages) self._recv_loop = gevent.spawn(self._recv_messages)
def disconnect(self): def disconnect(self):
@ -92,12 +90,7 @@ class CMClient:
self._heartbeat_loop.kill() self._heartbeat_loop.kill()
self._init_attributes() self._init_attributes()
self.emit('disconnected')
self.event_connected.clear()
self.event_ready.clear()
self.event_disconnected.set()
logger.debug("Event: Disconnected")
def _init_attributes(self): def _init_attributes(self):
self.key = None self.key = None
@ -168,50 +161,14 @@ class CMClient:
) )
raise raise
gevent.spawn(self.dispatch_message, emsg, msg) if self.verbose_debug:
logger.debug("Incoming: %s\n%s" % (repr(msg), str(msg)))
def dispatch_message(self, emsg, msg): else:
if self.verbose_debug: logger.debug("Incoming: %s", repr(msg))
logger.debug("Incoming: %s\n%s" % (repr(msg), str(msg)))
else:
logger.debug("Incoming: %s", repr(msg))
if emsg in self.registered_callbacks:
for callback in list(self.registered_callbacks[emsg]):
if isinstance(callback, event.AsyncResult):
self.unregister_callback(emsg, callback)
callback.set((emsg, msg))
else:
callback(emsg, msg)
def register_callback(self, emsg, callback):
if emsg not in self.registered_callbacks:
self.registered_callbacks[emsg] = [callback]
else:
allbacks = self.registered_callbacks[emsg]
if callback not in callbacks:
callbacks.append(callback)
def unregister_callback(self, emsg, callback):
if (emsg not in self.registered_callbacks
or callback not in self.registered_callbacks[emsg]):
return ValueError("Callback is not registered")
callbacks = self.registered_callbacks[emsg]
if len(callbacks) == 1:
self.registered_callbacks.pop(emsg)
else:
callbacks.pop(callbacks.index(callback))
def wait_for_message(self, emsg, block=True, timeout=None): self.emit(emsg, msg)
result = event.AsyncResult()
self.register_callback(emsg, result)
return result.get(block, timeout)[1]
def _handle_encrypt_request(self, emsg, msg): def _handle_encrypt_request(self, msg):
logger.debug("Securing channel") logger.debug("Securing channel")
if msg.body.protocolVersion != 1: if msg.body.protocolVersion != 1:
@ -226,7 +183,7 @@ class CMClient:
self.send_message(resp) self.send_message(resp)
msg = self.wait_for_message(EMsg.ChannelEncryptResult) msg = self.wait_event(EMsg.ChannelEncryptResult)
if msg.body.result != EResult.OK: if msg.body.result != EResult.OK:
logger.debug("Failed to secure channel: %s" % msg.body.result) logger.debug("Failed to secure channel: %s" % msg.body.result)
@ -236,11 +193,9 @@ class CMClient:
logger.debug("Channel secured") logger.debug("Channel secured")
self.key = key self.key = key
self.event_ready.set() self.emit('ready')
logger.debug("Event: Ready")
def _handle_multi(self, emsg, msg): def _handle_multi(self, msg):
logger.debug("Unpacking CMsgMulti") logger.debug("Unpacking CMsgMulti")
if msg.body.size_unzipped: if msg.body.size_unzipped:
@ -267,7 +222,7 @@ class CMClient:
gevent.sleep(interval) gevent.sleep(interval)
self.send_message(message) self.send_message(message)
def _handle_logon(self, emsg, msg): def _handle_logon(self, msg):
result = msg.body.eresult result = msg.body.eresult
if result != EResult.OK: if result != EResult.OK:
self.disconnect() self.disconnect()

0
steam/util/__init__.py

67
steam/util/events.py

@ -0,0 +1,67 @@
from collections import defaultdict
import gevent
from gevent.event import AsyncResult
class EventEmitter(object):
"""
Implements event emitter using gevent library
"""
def emit(self, event, *args):
"""
Emit event with some arguments
"""
if not hasattr(self, '_event_callbacks'):
return
for callback in list(self._event_callbacks[event]):
if isinstance(callback, AsyncResult):
self.remove_listener(event, callback)
if len(args) == 1:
args = args[0]
callback.set(args)
else:
gevent.spawn(callback, *args)
def remove_listener(self, event, callback):
"""
Removes a callback for the specified event
"""
if not hasattr(self, '_event_callbacks'):
return
self._event_callbacks[event].pop(callback, None)
def wait_event(self, event, timeout=None):
"""
Blocks until an event and returns the results
"""
result = AsyncResult()
self.on(event, result)
return result.get(True, timeout)
def on(self, event, callback=None):
"""
Registers a callback for the specified event
Can be as function decorator if only event is specified.
"""
if not hasattr(self, '_event_callbacks'):
self._event_callbacks = defaultdict(dict)
# when used function
if callback:
self._event_callbacks[event][callback] = None
return
# as decorator
def wrapper(callback):
self._event_callbacks[event][callback] = None
return callback
return wrapper
Loading…
Cancel
Save