|
|
@ -1,9 +1,11 @@ |
|
|
|
from __future__ import absolute_import |
|
|
|
|
|
|
|
import six |
|
|
|
import gipc |
|
|
|
import gevent |
|
|
|
import logging |
|
|
|
import dill |
|
|
|
import types |
|
|
|
|
|
|
|
from holster.log import set_logging_levels |
|
|
|
|
|
|
@ -11,11 +13,34 @@ from disco.client import Client |
|
|
|
from disco.bot import Bot, BotConfig |
|
|
|
from disco.api.client import APIClient |
|
|
|
from disco.gateway.ipc.gipc import GIPCProxy |
|
|
|
from disco.util.snowflake import calculate_shard |
|
|
|
|
|
|
|
|
|
|
|
def dump_function(func): |
|
|
|
if six.PY3: |
|
|
|
return dill.dumps(( |
|
|
|
func.__code__, |
|
|
|
func.__name__, |
|
|
|
func.__defaults__, |
|
|
|
func.__closure__, |
|
|
|
)) |
|
|
|
else: |
|
|
|
return dill.dumps(( |
|
|
|
func.func_code, |
|
|
|
func.func_name, |
|
|
|
func.func_defaults, |
|
|
|
func.func_closure |
|
|
|
)) |
|
|
|
|
|
|
|
|
|
|
|
def load_function(func): |
|
|
|
code, name, defaults, closure = dill.loads(func) |
|
|
|
return types.FunctionType(code, globals(), name, defaults, closure) |
|
|
|
|
|
|
|
|
|
|
|
def run_on(id, proxy): |
|
|
|
def f(func): |
|
|
|
return proxy.call(('run_on', ), id, dill.dumps(func)) |
|
|
|
return proxy.call(('run_on', ), id, dump_function(func)) |
|
|
|
return f |
|
|
|
|
|
|
|
|
|
|
@ -38,14 +63,36 @@ def run_shard(config, id, pipe): |
|
|
|
client = Client(config) |
|
|
|
bot = Bot(client, BotConfig(config.bot)) |
|
|
|
bot.sharder = GIPCProxy(bot, pipe) |
|
|
|
bot.shards = { |
|
|
|
i: run_on(i, bot.sharder) for i in range(config.shard_count) |
|
|
|
if i != id |
|
|
|
} |
|
|
|
bot.shards[id] = run_self(bot) |
|
|
|
bot.shards = ShardHelper(config.shard_count, bot) |
|
|
|
bot.run_forever() |
|
|
|
|
|
|
|
|
|
|
|
class ShardHelper(object): |
|
|
|
def __init__(self, count, bot): |
|
|
|
self.count = count |
|
|
|
self.bot = bot |
|
|
|
|
|
|
|
def keys(self): |
|
|
|
for id in xrange(self.count): |
|
|
|
yield id |
|
|
|
|
|
|
|
def on(self, id, func): |
|
|
|
if id == self.bot.client.config.shard_id: |
|
|
|
result = gevent.event.AsyncResult() |
|
|
|
result.set(func(self.bot)) |
|
|
|
return result |
|
|
|
|
|
|
|
return self.bot.sharder.call(('run_on', ), id, dump_function(func)) |
|
|
|
|
|
|
|
def all(self, func, timeout=None): |
|
|
|
pool = gevent.pool.Pool(self.count) |
|
|
|
return dict(zip(range(self.count), pool.imap(lambda i: self.on(i, func).wait(timeout=timeout), range(self.count)))) |
|
|
|
|
|
|
|
def for_id(self, id, func): |
|
|
|
shard = calculate_shard(self.count, id) |
|
|
|
return self.on(shard, func) |
|
|
|
|
|
|
|
|
|
|
|
class AutoSharder(object): |
|
|
|
def __init__(self, config): |
|
|
|
self.config = config |
|
|
@ -56,7 +103,8 @@ class AutoSharder(object): |
|
|
|
self.config.shard_count = 10 |
|
|
|
|
|
|
|
def run_on(self, id, raw): |
|
|
|
func = dill.loads(raw) |
|
|
|
func = load_function(raw) |
|
|
|
# func = dill.loads(raw) |
|
|
|
return self.shards[id].execute(func).wait(timeout=15) |
|
|
|
|
|
|
|
def run(self): |
|
|
|