diff --git a/disco/bot/bot.py b/disco/bot/bot.py index 93fa967..7a70ceb 100644 --- a/disco/bot/bot.py +++ b/disco/bot/bot.py @@ -79,6 +79,9 @@ class Bot(object): self.client = client self.config = config or BotConfig() + if self.client.config.manhole_enable: + self.client.manhole_locals['bot'] = self + self.plugins = {} # Only bind event listeners if we're going to parse commands @@ -203,6 +206,9 @@ class Bot(object): return False def on_message_create(self, event): + if event.message.author.id == self.client.state.me.id: + return + if self.config.commands_allow_edit: self.last_message_cache[event.message.channel_id] = (event.message, False) diff --git a/disco/client.py b/disco/client.py index 6c1c1c8..24bc9a3 100644 --- a/disco/client.py +++ b/disco/client.py @@ -1,12 +1,12 @@ import gevent -from gevent.backdoor import BackdoorServer from holster.emitter import Emitter from disco.state import State from disco.api.client import APIClient from disco.gateway.client import GatewayClient from disco.util.logging import LoggingClass +from disco.util.backdoor import DiscoBackdoorServer class ClientConfig(LoggingClass): @@ -67,6 +67,9 @@ class Client(object): The API client. gw : :class:`GatewayClient` The gateway client. + manhole_locals : dict + Dictionary of local variables for each manhole connection. This can be + modified to add/modify local variables. manhole : Optional[:class:`BackdoorServer`] Gevent backdoor server (if the manhole is enabled). """ @@ -82,14 +85,10 @@ class Client(object): self.gw = GatewayClient(self, self.config.encoding_cls) if self.config.manhole_enable: - self.manhole = BackdoorServer(self.config.manhole_bind, + self.manhole_locals = {} + self.manhole = DiscoBackdoorServer(self.config.manhole_bind, banner='Disco Manhole', - locals={ - 'client': self, - 'state': self.state, - 'api': self.api, - 'gw': self.gw, - }) + localf=lambda: self.manhole_locals) self.manhole.start() def run(self): diff --git a/disco/gateway/client.py b/disco/gateway/client.py index b12aafe..3b764c6 100644 --- a/disco/gateway/client.py +++ b/disco/gateway/client.py @@ -5,7 +5,7 @@ import six from disco.gateway.packets import OPCode from disco.gateway.events import GatewayEvent from disco.gateway.encoding.json import JSONEncoder -from disco.util.websocket import WebsocketProcessProxy +from disco.util.websocket import Websocket from disco.util.logging import LoggingClass TEN_MEGABYTES = 10490000 @@ -29,7 +29,6 @@ class GatewayClient(LoggingClass): self.packets.on(OPCode.RECONNECT, self.handle_reconnect) self.packets.on(OPCode.INVALID_SESSION, self.handle_invalid_session) self.packets.on(OPCode.HELLO, self.handle_hello) - self.packets.on(OPCode.HEARTBEAT_ACK, self.handle_heartbeat_ack) # Bind to ready payload self.events.on('Ready', self.on_ready) @@ -74,7 +73,7 @@ class GatewayClient(LoggingClass): self.ws.close() def handle_invalid_session(self, packet): - self.log.warning('Recieved INVALID_SESSIOIN, forcing a fresh reconnect') + self.log.warning('Recieved INVALID_SESSION, forcing a fresh reconnect') self.session_id = None self.ws.close() @@ -82,9 +81,6 @@ class GatewayClient(LoggingClass): self.log.info('Recieved HELLO, starting heartbeater...') self._heartbeat_task = gevent.spawn(self.heartbeat_task, packet['d']['heartbeat_interval']) - def handle_heartbeat_ack(self, packet): - pass - def on_ready(self, ready): self.log.info('Recieved READY') self.session_id = ready.session_id @@ -97,7 +93,7 @@ class GatewayClient(LoggingClass): encoding=self.encoder.TYPE) self.log.info('Opening websocket connection to URL `%s`', self._cached_gateway_url) - self.ws = WebsocketProcessProxy(self._cached_gateway_url) + self.ws = Websocket(self._cached_gateway_url) self.ws.emitter.on('on_open', self.on_open) self.ws.emitter.on('on_error', self.on_error) self.ws.emitter.on('on_close', self.on_close) diff --git a/disco/util/backdoor.py b/disco/util/backdoor.py new file mode 100644 index 0000000..46a4c6b --- /dev/null +++ b/disco/util/backdoor.py @@ -0,0 +1,12 @@ +from gevent.backdoor import BackdoorServer + + +class DiscoBackdoorServer(BackdoorServer): + def __init__(self, listener, localf=None, banner=None, **server_args): + super(DiscoBackdoorServer, self).__init__(listener, {}, banner, **server_args) + self.localf = localf + + def _create_interactive_locals(self): + obj = super(DiscoBackdoorServer, self)._create_interactive_locals() + obj.update(self.localf()) + return obj diff --git a/disco/util/functional.py b/disco/util/functional.py index caae5a3..951a1e4 100644 --- a/disco/util/functional.py +++ b/disco/util/functional.py @@ -3,8 +3,8 @@ from gevent.lock import RLock def cached_property(f): """ - Creates a cached property out of ``f``. When the property is resolved for - the first time, the function will be called and its result will be cached. + Creates a cached class property out of ``f``. When the property is resolved + for the first time, the function will be called and its result will be cached. Subsequent calls will return the cached value. If this property is set, the cached value will be replaced (or set initially) with the value provided. If this property is deleted, the cache will be cleared and the next call will @@ -25,25 +25,22 @@ def cached_property(f): The cached property created. """ lock = RLock() - f._value = None - f._has_value = False + value_name = '_' + f.__name__ - def getf(*args, **kwargs): - if not f._has_value: + def getf(self, *args, **kwargs): + if not hasattr(self, value_name): with lock: - if f._has_value: - return f._value + if hasattr(self, value_name): + return getattr(self, value_name) - f._value = f(*args, **kwargs) - f._has_value = True + setattr(self, value_name, f(self, *args, **kwargs)) - return f._value + return getattr(self, value_name) def setf(self, value): - f._value = value + setattr(self, value_name, value) def delf(self): - f._value = None - f._has_value = False + delattr(self, value_name) return property(getf, setf, delf) diff --git a/disco/util/websocket.py b/disco/util/websocket.py index e488a44..c2bcf31 100644 --- a/disco/util/websocket.py +++ b/disco/util/websocket.py @@ -1,12 +1,8 @@ from __future__ import absolute_import -import sys -import ssl import websocket import gevent import six -import gipc -import signal from holster.emitter import Emitter @@ -15,35 +11,17 @@ from disco.util.logging import LoggingClass class Websocket(LoggingClass, websocket.WebSocketApp): """ - Subclass of websocket.WebSocketApp that adds some important improvements: - - Passes exit code to on_error callback in all cases - - Spawns callbacks in a gevent greenlet, and catches/logs exceptions + A utility class which wraps the functionality of :class:`websocket.WebSocketApp` + changing its behavior to better conform with standard style across disco. + + The major difference comes with the move from callback functions, to all + events being piped into a single emitter. """ def __init__(self, *args, **kwargs): LoggingClass.__init__(self) websocket.WebSocketApp.__init__(self, *args, **kwargs) - def _get_close_args(self, data): - if data and len(data) >= 2: - code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) - reason = data[2:].decode('utf-8') - return [code, reason] - return [None, None] - - def _callback(self, callback, *args): - if not callback: - return - - try: - gevent.spawn(callback, self, *args) - except Exception: - self.log.exception('Error in Websocket callback for {}: '.format(callback)) - - -class WebsocketProcess(Websocket): - def __init__(self, pipe, *args, **kwargs): - Websocket.__init__(self, *args, **kwargs) - self.pipe = pipe + self.emitter = Emitter(gevent.spawn) # Hack to get events to emit for var in self.__dict__.keys(): @@ -52,54 +30,15 @@ class WebsocketProcess(Websocket): setattr(self, var, var) + def _get_close_args(self, data): + if data and len(data) >= 2: + code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) + reason = data[2:].decode('utf-8') + return [code, reason] + return [None, None] + def _callback(self, callback, *args): if not callback: return - self.pipe.put((callback, args)) - - -class WebsocketProcessProxy(object): - def __init__(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs - self.emitter = Emitter(gevent.spawn) - - gevent.signal(signal.SIGINT, self.handle_signal) - gevent.signal(signal.SIGTERM, self.handle_signal) - - def handle_signal(self, *args): - self.close() - gevent.sleep(1) - self.process.terminate() - sys.exit() - - @classmethod - def process(cls, pipe, *args, **kwargs): - proc = WebsocketProcess(pipe, *args, **kwargs) - - # TODO: ssl? - gevent.spawn(proc.run_forever, sslopt={'cert_reqs': ssl.CERT_NONE}) - - while True: - op = pipe.get() - getattr(proc, op['method'])(*op['args'], **op['kwargs']) - - def read_task(self): - while True: - try: - name, args = self.pipe.get() - except EOFError: - return - self.emitter.emit(name, *args) - - def run_forever(self): - self.pipe, pipe = gipc.pipe(True) - self.process = gipc.start_process(self.process, args=tuple([pipe] + list(self.args)), kwargs=self.kwargs) - self.read_task() - - def __getattr__(self, attr): - def _wrapped(*args, **kwargs): - self.pipe.put({'method': attr, 'args': args, 'kwargs': kwargs}) - - return _wrapped + self.emitter.emit(callback, *args)