From ce29836e84f711288008e766d63e426f7ea3ba84 Mon Sep 17 00:00:00 2001 From: Andrei Date: Mon, 17 Oct 2016 19:36:03 -0500 Subject: [PATCH] First stab at IPC based auto sharding --- disco/api/client.py | 11 ++++++-- disco/api/http.py | 2 ++ disco/bot/bot.py | 4 +-- disco/cli.py | 6 +++++ disco/client.py | 2 +- disco/gateway/client.py | 24 +++++++++++------ disco/gateway/ipc/__init__.py | 0 disco/gateway/ipc/gipc.py | 50 +++++++++++++++++++++++++++++++++++ disco/gateway/sharder.py | 31 ++++++++++++++++++++++ 9 files changed, 117 insertions(+), 13 deletions(-) create mode 100644 disco/gateway/ipc/__init__.py create mode 100644 disco/gateway/ipc/gipc.py create mode 100644 disco/gateway/sharder.py diff --git a/disco/api/client.py b/disco/api/client.py index 3945e76..b9cc813 100644 --- a/disco/api/client.py +++ b/disco/api/client.py @@ -32,9 +32,13 @@ class APIClient(LoggingClass): self.client = client self.http = HTTPClient(self.client.config.token) - def gateway(self, version, encoding): + def gateway_get(self): data = self.http(Routes.GATEWAY_GET).json() - return data['url'] + '?v={}&encoding={}'.format(version, encoding) + return data + + def gateway_bot_get(self): + data = self.http(Routes.GATEWAY_BOT_GET).json() + return data def channels_get(self, channel): r = self.http(Routes.CHANNELS_GET, dict(channel=channel)) @@ -48,6 +52,9 @@ class APIClient(LoggingClass): r = self.http(Routes.CHANNELS_DELETE, dict(channel=channel)) return Channel.create(self.client, r.json()) + def channels_typing(self, channel): + self.http(Routes.CHANNELS_TYPING, dict(channel=channel)) + def channels_messages_list(self, channel, around=None, before=None, after=None, limit=50): r = self.http(Routes.CHANNELS_MESSAGES_LIST, dict(channel=channel), params=optional( around=around, diff --git a/disco/api/http.py b/disco/api/http.py index c1930bd..4757da4 100644 --- a/disco/api/http.py +++ b/disco/api/http.py @@ -25,12 +25,14 @@ class Routes(object): """ # Gateway GATEWAY_GET = (HTTPMethod.GET, '/gateway') + GATEWAY_BOT_GET = (HTTPMethod.GET, '/gateway/bot') # Channels CHANNELS = '/channels/{channel}' CHANNELS_GET = (HTTPMethod.GET, CHANNELS) CHANNELS_MODIFY = (HTTPMethod.PATCH, CHANNELS) CHANNELS_DELETE = (HTTPMethod.DELETE, CHANNELS) + CHANNELS_TYPING = (HTTPMethod.POST, CHANNELS + '/typing') CHANNELS_MESSAGES_LIST = (HTTPMethod.GET, CHANNELS + '/messages') CHANNELS_MESSAGES_GET = (HTTPMethod.GET, CHANNELS + '/messages/{message}') CHANNELS_MESSAGES_CREATE = (HTTPMethod.POST, CHANNELS + '/messages') diff --git a/disco/bot/bot.py b/disco/bot/bot.py index 6d5f250..99ac0b5 100644 --- a/disco/bot/bot.py +++ b/disco/bot/bot.py @@ -430,8 +430,8 @@ class Bot(object): def load_plugin_config(self, cls): name = cls.__name__.lower() - if name.startswith('plugin'): - name = name[6:] + if name.endswith('plugin'): + name = name[:-6] path = os.path.join( self.config.plugin_config_dir, name) + '.' + self.config.plugin_config_format diff --git a/disco/cli.py b/disco/cli.py index 951fd96..27cc220 100644 --- a/disco/cli.py +++ b/disco/cli.py @@ -18,6 +18,7 @@ parser.add_argument('--config', help='Configuration file', default='config.yaml' parser.add_argument('--token', help='Bot Authentication Token', default=None) parser.add_argument('--shard-count', help='Total number of shards', default=None) parser.add_argument('--shard-id', help='Current shard number/id', default=None) +parser.add_argument('--shard-auto', help='Automatically run all shards', action='store_true', default=False) parser.add_argument('--manhole', action='store_true', help='Enable the manhole', default=None) parser.add_argument('--manhole-bind', help='host:port for the manhole to bind too', default=None) parser.add_argument('--encoder', help='encoder for gateway data', default=None) @@ -41,6 +42,7 @@ def disco_main(run=False): from disco.client import Client, ClientConfig from disco.bot import Bot, BotConfig + from disco.gateway.sharder import AutoSharder from disco.util.token import is_valid_token if os.path.exists(args.config): @@ -56,6 +58,10 @@ def disco_main(run=False): print('Invalid token passed') return + if args.shard_auto: + AutoSharder(config).run() + return + client = Client(config) bot = None diff --git a/disco/client.py b/disco/client.py index 54dacd5..496aa7a 100644 --- a/disco/client.py +++ b/disco/client.py @@ -37,7 +37,7 @@ class ClientConfig(LoggingClass, Config): shard_id = 0 shard_count = 1 - manhole_enable = True + manhole_enable = False manhole_bind = ('127.0.0.1', 8484) encoder = 'json' diff --git a/disco/gateway/client.py b/disco/gateway/client.py index b3f8012..61f7dfd 100644 --- a/disco/gateway/client.py +++ b/disco/gateway/client.py @@ -17,7 +17,7 @@ class GatewayClient(LoggingClass): GATEWAY_VERSION = 6 MAX_RECONNECTS = 5 - def __init__(self, client, encoder='json'): + def __init__(self, client, encoder='json', ipc=None): super(GatewayClient, self).__init__() self.client = client self.encoder = ENCODERS[encoder] @@ -25,6 +25,11 @@ class GatewayClient(LoggingClass): self.events = client.events self.packets = client.packets + # IPC for shards + if ipc: + self.shards = ipc.get_shards() + self.ipc = ipc + # Its actually 60, 120 but lets give ourselves a buffer self.limiter = SimpleLimiter(60, 130) @@ -98,14 +103,17 @@ class GatewayClient(LoggingClass): self.session_id = ready.session_id self.reconnects = 0 - def connect_and_run(self): - if not self._cached_gateway_url: - self._cached_gateway_url = self.client.api.gateway( - version=self.GATEWAY_VERSION, - encoding=self.encoder.TYPE) + def connect_and_run(self, gateway_url=None): + if not gateway_url: + if not self._cached_gateway_url: + self._cached_gateway_url = self.client.api.gateway_get()['url'] + + gateway_url = self._cached_gateway_url + + gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE) - self.log.info('Opening websocket connection to URL `%s`', self._cached_gateway_url) - self.ws = Websocket(self._cached_gateway_url) + self.log.info('Opening websocket connection to URL `%s`', gateway_url) + self.ws = Websocket(gateway_url) self.ws.emitter.on('on_open', self.on_open) self.ws.emitter.on('on_error', self.on_error) self.ws.emitter.on('on_close', self.on_close) diff --git a/disco/gateway/ipc/__init__.py b/disco/gateway/ipc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/disco/gateway/ipc/gipc.py b/disco/gateway/ipc/gipc.py new file mode 100644 index 0000000..b9468d0 --- /dev/null +++ b/disco/gateway/ipc/gipc.py @@ -0,0 +1,50 @@ +import random +import gipc +import gevent +import string +import weakref + + +def get_random_str(size): + return ''.join([random.choice(string.ascii_printable) for _ in range(size)]) + + +class GIPCProxy(object): + def __init__(self, pipe): + self.pipe = pipe + self.results = weakref.WeakValueDictionary() + gevent.spawn(self.read_loop) + + def read_loop(self): + while True: + nonce, data = self.pipe.get() + if nonce in self.results: + self.results[nonce].set(data) + + def __getattr__(self, name): + def wrapper(*args, **kwargs): + nonce = get_random_str() + self.results[nonce] = gevent.event.AsyncResult() + self.pipe.put(nonce, name, args, kwargs) + return self.results[nonce] + return wrapper + + +class GIPCObject(object): + def __init__(self, inst, pipe): + self.inst = inst + self.pipe = pipe + gevent.spawn(self.read_loop) + + def read_loop(self): + while True: + nonce, func, args, kwargs = self.pipe.get() + func = getattr(self.inst, func) + self.pipe.put((nonce, func(*args, **kwargs))) + +class IPC(object): + def __init__(self, sharder): + self.sharder = sharder + + def get_shards(self): + return {} diff --git a/disco/gateway/sharder.py b/disco/gateway/sharder.py new file mode 100644 index 0000000..fb89875 --- /dev/null +++ b/disco/gateway/sharder.py @@ -0,0 +1,31 @@ +import gipc + +from disco.client import Client +from disco.bot import Bot, BotConfig +from disco.api.client import APIClient +from disco.gateway.ipc.gipc import GIPCObject, GIPCProxy + + +def run_shard(config, id, pipe): + config.shard_id = id + client = Client(config) + bot = Bot(client, BotConfig(config.bot)) + GIPCObject(bot, pipe) + bot.run_forever() + + +class AutoSharder(object): + def __init__(self, config): + self.config = config + self.client = APIClient(config.token) + self.shards = {} + self.config.shard_count = self.client.gateway_bot_get()['shards'] + + def run(self): + for shard in range(self.shard_count): + self.start_shard(shard) + + def start_shard(self, id): + cpipe, ppipe = gipc.pipe(duplex=True) + gipc.start_process(run_shard, (self.config, id, cpipe)) + self.shards[id] = GIPCProxy(ppipe)