You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

111 lines
3.3 KiB

from __future__ import absolute_import
import gipc
import gevent
import pickle
import logging
import marshal
from six import integer_types, string_types
from six.moves import range
from disco.client import Client
from disco.bot import Bot, BotConfig
from disco.api.client import APIClient
from disco.gateway.ipc import GIPCProxy
from disco.util.logging import setup_logging
from disco.util.snowflake import calculate_shard
from disco.util.serializer import dump_function, load_function
def run_shard(config, shard_id, pipe):
setup_logging(
level=logging.INFO,
format='{} [%(levelname)s] %(asctime)s - %(name)s:%(lineno)d - %(message)s'.format(shard_id),
)
config.shard_id = shard_id
client = Client(config)
bot = Bot(client, BotConfig(config.bot))
bot.sharder = GIPCProxy(bot, pipe)
bot.shards = ShardHelper(config.shard_count, bot)
bot.run_forever()
class ShardHelper(object):
def __init__(self, count, bot):
self.count = count
self.bot = bot
def keys(self):
for sid in range(self.count):
yield sid
def on(self, sid, func):
if sid == self.bot.client.config.shard_id:
result = gevent.event.AsyncResult()
result.set(func(self.bot))
return result
return self.bot.sharder.call(('run_on', ), sid, dump_function(func))
def all(self, func, timeout=None):
pool = gevent.pool.Pool(self.count)
return dict(zip(
range(self.count),
pool.imap(
lambda i: self.on(i, func).wait(timeout=timeout),
range(self.count),
),
))
def for_id(self, sid, func):
shard = calculate_shard(self.count, sid)
return self.on(shard, func)
class AutoSharder(object):
def __init__(self, config):
self.config = config
self.client = APIClient(config.token)
self.shards = {}
self.config.shard_count = self.client.gateway_bot_get()['shards']
def run_on(self, sid, raw):
func = load_function(raw)
return self.shards[sid].execute(func).wait(timeout=15)
def run(self):
for shard in range(self.config.shard_count):
if self.config.manhole_enable and shard != 0:
self.config.manhole_enable = False
self.start_shard(shard)
gevent.sleep(6)
logging.basicConfig(
level=logging.INFO,
format='{} [%(levelname)s] %(asctime)s - %(name)s:%(lineno)d - %(message)s'.format(id),
)
@staticmethod
def dumps(data):
if isinstance(data, (string_types, integer_types, bool, list, set, dict)):
return '\x01' + marshal.dumps(data)
elif isinstance(data, object) and data.__class__.__name__ == 'code':
return '\x01' + marshal.dumps(data)
else:
return '\x02' + pickle.dumps(data)
@staticmethod
def loads(data):
enc_type = data[0]
if enc_type == '\x01':
return marshal.loads(data[1:])
elif enc_type == '\x02':
return pickle.loads(data[1:])
def start_shard(self, sid):
cpipe, ppipe = gipc.pipe(duplex=True, encoder=self.dumps, decoder=self.loads)
gipc.start_process(run_shard, (self.config, sid, cpipe))
self.shards[sid] = GIPCProxy(self, ppipe)