Browse Source

First stab at IPC based auto sharding

pull/9/head
Andrei 9 years ago
parent
commit
ce29836e84
  1. 11
      disco/api/client.py
  2. 2
      disco/api/http.py
  3. 4
      disco/bot/bot.py
  4. 6
      disco/cli.py
  5. 2
      disco/client.py
  6. 24
      disco/gateway/client.py
  7. 0
      disco/gateway/ipc/__init__.py
  8. 50
      disco/gateway/ipc/gipc.py
  9. 31
      disco/gateway/sharder.py

11
disco/api/client.py

@ -32,9 +32,13 @@ class APIClient(LoggingClass):
self.client = client
self.http = HTTPClient(self.client.config.token)
def gateway(self, version, encoding):
def gateway_get(self):
data = self.http(Routes.GATEWAY_GET).json()
return data['url'] + '?v={}&encoding={}'.format(version, encoding)
return data
def gateway_bot_get(self):
data = self.http(Routes.GATEWAY_BOT_GET).json()
return data
def channels_get(self, channel):
r = self.http(Routes.CHANNELS_GET, dict(channel=channel))
@ -48,6 +52,9 @@ class APIClient(LoggingClass):
r = self.http(Routes.CHANNELS_DELETE, dict(channel=channel))
return Channel.create(self.client, r.json())
def channels_typing(self, channel):
self.http(Routes.CHANNELS_TYPING, dict(channel=channel))
def channels_messages_list(self, channel, around=None, before=None, after=None, limit=50):
r = self.http(Routes.CHANNELS_MESSAGES_LIST, dict(channel=channel), params=optional(
around=around,

2
disco/api/http.py

@ -25,12 +25,14 @@ class Routes(object):
"""
# Gateway
GATEWAY_GET = (HTTPMethod.GET, '/gateway')
GATEWAY_BOT_GET = (HTTPMethod.GET, '/gateway/bot')
# Channels
CHANNELS = '/channels/{channel}'
CHANNELS_GET = (HTTPMethod.GET, CHANNELS)
CHANNELS_MODIFY = (HTTPMethod.PATCH, CHANNELS)
CHANNELS_DELETE = (HTTPMethod.DELETE, CHANNELS)
CHANNELS_TYPING = (HTTPMethod.POST, CHANNELS + '/typing')
CHANNELS_MESSAGES_LIST = (HTTPMethod.GET, CHANNELS + '/messages')
CHANNELS_MESSAGES_GET = (HTTPMethod.GET, CHANNELS + '/messages/{message}')
CHANNELS_MESSAGES_CREATE = (HTTPMethod.POST, CHANNELS + '/messages')

4
disco/bot/bot.py

@ -430,8 +430,8 @@ class Bot(object):
def load_plugin_config(self, cls):
name = cls.__name__.lower()
if name.startswith('plugin'):
name = name[6:]
if name.endswith('plugin'):
name = name[:-6]
path = os.path.join(
self.config.plugin_config_dir, name) + '.' + self.config.plugin_config_format

6
disco/cli.py

@ -18,6 +18,7 @@ parser.add_argument('--config', help='Configuration file', default='config.yaml'
parser.add_argument('--token', help='Bot Authentication Token', default=None)
parser.add_argument('--shard-count', help='Total number of shards', default=None)
parser.add_argument('--shard-id', help='Current shard number/id', default=None)
parser.add_argument('--shard-auto', help='Automatically run all shards', action='store_true', default=False)
parser.add_argument('--manhole', action='store_true', help='Enable the manhole', default=None)
parser.add_argument('--manhole-bind', help='host:port for the manhole to bind too', default=None)
parser.add_argument('--encoder', help='encoder for gateway data', default=None)
@ -41,6 +42,7 @@ def disco_main(run=False):
from disco.client import Client, ClientConfig
from disco.bot import Bot, BotConfig
from disco.gateway.sharder import AutoSharder
from disco.util.token import is_valid_token
if os.path.exists(args.config):
@ -56,6 +58,10 @@ def disco_main(run=False):
print('Invalid token passed')
return
if args.shard_auto:
AutoSharder(config).run()
return
client = Client(config)
bot = None

2
disco/client.py

@ -37,7 +37,7 @@ class ClientConfig(LoggingClass, Config):
shard_id = 0
shard_count = 1
manhole_enable = True
manhole_enable = False
manhole_bind = ('127.0.0.1', 8484)
encoder = 'json'

24
disco/gateway/client.py

@ -17,7 +17,7 @@ class GatewayClient(LoggingClass):
GATEWAY_VERSION = 6
MAX_RECONNECTS = 5
def __init__(self, client, encoder='json'):
def __init__(self, client, encoder='json', ipc=None):
super(GatewayClient, self).__init__()
self.client = client
self.encoder = ENCODERS[encoder]
@ -25,6 +25,11 @@ class GatewayClient(LoggingClass):
self.events = client.events
self.packets = client.packets
# IPC for shards
if ipc:
self.shards = ipc.get_shards()
self.ipc = ipc
# Its actually 60, 120 but lets give ourselves a buffer
self.limiter = SimpleLimiter(60, 130)
@ -98,14 +103,17 @@ class GatewayClient(LoggingClass):
self.session_id = ready.session_id
self.reconnects = 0
def connect_and_run(self):
if not self._cached_gateway_url:
self._cached_gateway_url = self.client.api.gateway(
version=self.GATEWAY_VERSION,
encoding=self.encoder.TYPE)
def connect_and_run(self, gateway_url=None):
if not gateway_url:
if not self._cached_gateway_url:
self._cached_gateway_url = self.client.api.gateway_get()['url']
gateway_url = self._cached_gateway_url
gateway_url += '?v={}&encoding={}'.format(self.GATEWAY_VERSION, self.encoder.TYPE)
self.log.info('Opening websocket connection to URL `%s`', self._cached_gateway_url)
self.ws = Websocket(self._cached_gateway_url)
self.log.info('Opening websocket connection to URL `%s`', gateway_url)
self.ws = Websocket(gateway_url)
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)

0
disco/gateway/ipc/__init__.py

50
disco/gateway/ipc/gipc.py

@ -0,0 +1,50 @@
import random
import gipc
import gevent
import string
import weakref
def get_random_str(size):
return ''.join([random.choice(string.ascii_printable) for _ in range(size)])
class GIPCProxy(object):
def __init__(self, pipe):
self.pipe = pipe
self.results = weakref.WeakValueDictionary()
gevent.spawn(self.read_loop)
def read_loop(self):
while True:
nonce, data = self.pipe.get()
if nonce in self.results:
self.results[nonce].set(data)
def __getattr__(self, name):
def wrapper(*args, **kwargs):
nonce = get_random_str()
self.results[nonce] = gevent.event.AsyncResult()
self.pipe.put(nonce, name, args, kwargs)
return self.results[nonce]
return wrapper
class GIPCObject(object):
def __init__(self, inst, pipe):
self.inst = inst
self.pipe = pipe
gevent.spawn(self.read_loop)
def read_loop(self):
while True:
nonce, func, args, kwargs = self.pipe.get()
func = getattr(self.inst, func)
self.pipe.put((nonce, func(*args, **kwargs)))
class IPC(object):
def __init__(self, sharder):
self.sharder = sharder
def get_shards(self):
return {}

31
disco/gateway/sharder.py

@ -0,0 +1,31 @@
import gipc
from disco.client import Client
from disco.bot import Bot, BotConfig
from disco.api.client import APIClient
from disco.gateway.ipc.gipc import GIPCObject, GIPCProxy
def run_shard(config, id, pipe):
config.shard_id = id
client = Client(config)
bot = Bot(client, BotConfig(config.bot))
GIPCObject(bot, pipe)
bot.run_forever()
class AutoSharder(object):
def __init__(self, config):
self.config = config
self.client = APIClient(config.token)
self.shards = {}
self.config.shard_count = self.client.gateway_bot_get()['shards']
def run(self):
for shard in range(self.shard_count):
self.start_shard(shard)
def start_shard(self, id):
cpipe, ppipe = gipc.pipe(duplex=True)
gipc.start_process(run_shard, (self.config, id, cpipe))
self.shards[id] = GIPCProxy(ppipe)
Loading…
Cancel
Save