|
|
@ -5,6 +5,8 @@ import gevent |
|
|
|
import logging |
|
|
|
import marshal |
|
|
|
|
|
|
|
from six.moves import range |
|
|
|
|
|
|
|
from disco.client import Client |
|
|
|
from disco.bot import Bot, BotConfig |
|
|
|
from disco.api.client import APIClient |
|
|
@ -14,13 +16,13 @@ from disco.util.snowflake import calculate_shard |
|
|
|
from disco.util.serializer import dump_function, load_function |
|
|
|
|
|
|
|
|
|
|
|
def run_shard(config, id, pipe): |
|
|
|
def run_shard(config, shard_id, pipe): |
|
|
|
setup_logging( |
|
|
|
level=logging.INFO, |
|
|
|
format='{} [%(levelname)s] %(asctime)s - %(name)s:%(lineno)d - %(message)s'.format(id) |
|
|
|
format='{} [%(levelname)s] %(asctime)s - %(name)s:%(lineno)d - %(message)s'.format(shard_id) |
|
|
|
) |
|
|
|
|
|
|
|
config.shard_id = id |
|
|
|
config.shard_id = shard_id |
|
|
|
client = Client(config) |
|
|
|
bot = Bot(client, BotConfig(config.bot)) |
|
|
|
bot.sharder = GIPCProxy(bot, pipe) |
|
|
@ -34,8 +36,8 @@ class ShardHelper(object): |
|
|
|
self.bot = bot |
|
|
|
|
|
|
|
def keys(self): |
|
|
|
for id in xrange(self.count): |
|
|
|
yield id |
|
|
|
for sid in range(self.count): |
|
|
|
yield sid |
|
|
|
|
|
|
|
def on(self, id, func): |
|
|
|
if id == self.bot.client.config.shard_id: |
|
|
@ -49,8 +51,8 @@ class ShardHelper(object): |
|
|
|
pool = gevent.pool.Pool(self.count) |
|
|
|
return dict(zip(range(self.count), pool.imap(lambda i: self.on(i, func).wait(timeout=timeout), range(self.count)))) |
|
|
|
|
|
|
|
def for_id(self, id, func): |
|
|
|
shard = calculate_shard(self.count, id) |
|
|
|
def for_id(self, sid, func): |
|
|
|
shard = calculate_shard(self.count, sid) |
|
|
|
return self.on(shard, func) |
|
|
|
|
|
|
|
|
|
|
@ -63,9 +65,9 @@ class AutoSharder(object): |
|
|
|
if self.config.shard_count > 1: |
|
|
|
self.config.shard_count = 10 |
|
|
|
|
|
|
|
def run_on(self, id, raw): |
|
|
|
def run_on(self, sid, raw): |
|
|
|
func = load_function(raw) |
|
|
|
return self.shards[id].execute(func).wait(timeout=15) |
|
|
|
return self.shards[sid].execute(func).wait(timeout=15) |
|
|
|
|
|
|
|
def run(self): |
|
|
|
for shard in range(self.config.shard_count): |
|
|
@ -80,7 +82,7 @@ class AutoSharder(object): |
|
|
|
format='{} [%(levelname)s] %(asctime)s - %(name)s:%(lineno)d - %(message)s'.format(id) |
|
|
|
) |
|
|
|
|
|
|
|
def start_shard(self, id): |
|
|
|
def start_shard(self, sid): |
|
|
|
cpipe, ppipe = gipc.pipe(duplex=True, encoder=marshal.dumps, decoder=marshal.loads) |
|
|
|
gipc.start_process(run_shard, (self.config, id, cpipe)) |
|
|
|
self.shards[id] = GIPCProxy(self, ppipe) |
|
|
|
gipc.start_process(run_shard, (self.config, sid, cpipe)) |
|
|
|
self.shards[sid] = GIPCProxy(self, ppipe) |
|
|
|