From fb2a19efb8752c622b937f97a0db11129dfce6e1 Mon Sep 17 00:00:00 2001 From: Andrei Date: Wed, 19 Oct 2016 19:43:36 -0500 Subject: [PATCH] Remove dill requirement, switch to marshal for serialization, etc --- README.md | 1 + disco/gateway/ipc.py | 6 +++--- disco/gateway/sharder.py | 44 +++------------------------------------- disco/util/serializer.py | 35 ++++++++++++++++++++++++++++++++ 4 files changed, 42 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 027229d..6c52569 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Disco was built to run both as a generic-use library, and a standalone bot toolk |requests[security]|adds packages for a proper SSL implementation| |ujson|faster json parser, improves performance| |erlpack|ETF parser, only Python 2.x, run with the --encoder=etf flag| +|gipc|Gevent IPC, required for autosharding| ## Examples diff --git a/disco/gateway/ipc.py b/disco/gateway/ipc.py index e23206b..bcd3383 100644 --- a/disco/gateway/ipc.py +++ b/disco/gateway/ipc.py @@ -2,11 +2,11 @@ import random import gevent import string import weakref -import dill from holster.enum import Enum from disco.util.logging import LoggingClass +from disco.util.serializer import dump_function, load_function def get_random_str(size): @@ -49,7 +49,7 @@ class GIPCProxy(LoggingClass): self.send(IPCMessageType.RESPONSE, (nonce, self.resolve(path))) elif mtype == IPCMessageType.EXECUTE: nonce, raw = data - func = dill.loads(raw) + func = load_function(raw) try: result = func(self.obj) except Exception: @@ -73,7 +73,7 @@ class GIPCProxy(LoggingClass): def execute(self, func): nonce = get_random_str(32) - raw = dill.dumps(func) + raw = dump_function(func) self.results[nonce] = result = gevent.event.AsyncResult() self.pipe.put((IPCMessageType.EXECUTE.value, (nonce, raw))) return result diff --git a/disco/gateway/sharder.py b/disco/gateway/sharder.py index f718c3d..5d98ad6 100644 --- a/disco/gateway/sharder.py +++ b/disco/gateway/sharder.py @@ -1,11 +1,9 @@ from __future__ import absolute_import -import six import gipc import gevent import logging -import dill -import types +import marshal from holster.log import set_logging_levels @@ -14,42 +12,7 @@ from disco.bot import Bot, BotConfig from disco.api.client import APIClient from disco.gateway.ipc import GIPCProxy from disco.util.snowflake import calculate_shard - - -def dump_function(func): - if six.PY3: - return dill.dumps(( - func.__code__, - func.__name__, - func.__defaults__, - func.__closure__, - )) - else: - return dill.dumps(( - func.func_code, - func.func_name, - func.func_defaults, - func.func_closure - )) - - -def load_function(func): - code, name, defaults, closure = dill.loads(func) - return types.FunctionType(code, globals(), name, defaults, closure) - - -def run_on(id, proxy): - def f(func): - return proxy.call(('run_on', ), id, dump_function(func)) - return f - - -def run_self(bot): - def f(func): - result = gevent.event.AsyncResult() - result.set(func(bot)) - return result - return f +from disco.util.serializer import dump_function, load_function def run_shard(config, id, pipe): @@ -104,7 +67,6 @@ class AutoSharder(object): def run_on(self, id, raw): func = load_function(raw) - # func = dill.loads(raw) return self.shards[id].execute(func).wait(timeout=15) def run(self): @@ -121,6 +83,6 @@ class AutoSharder(object): ) def start_shard(self, id): - cpipe, ppipe = gipc.pipe(duplex=True, encoder=dill.dumps, decoder=dill.loads) + cpipe, ppipe = gipc.pipe(duplex=True, encoder=marshal.dumps, decoder=marshal.loads) gipc.start_process(run_shard, (self.config, id, cpipe)) self.shards[id] = GIPCProxy(self, ppipe) diff --git a/disco/util/serializer.py b/disco/util/serializer.py index 74fe766..e248490 100644 --- a/disco/util/serializer.py +++ b/disco/util/serializer.py @@ -1,3 +1,5 @@ +import six +import types class Serializer(object): @@ -36,3 +38,36 @@ class Serializer(object): def dumps(cls, fmt, raw): _, dumps = getattr(cls, fmt)() return dumps(raw) + + +def dump_cell(cell): + return cell.cell_contents + + +def load_cell(cell): + if six.PY3: + return (lambda y: cell).__closure__[0] + else: + return (lambda y: cell).func_closure[0] + + +def dump_function(func): + if six.PY3: + return ( + func.__code__, + func.__name__, + func.__defaults__, + list(map(dump_cell, func.__closure__)) if func.__closure__ else [], + ) + else: + return ( + func.func_code, + func.func_name, + func.func_defaults, + list(map(dump_cell, func.func_closure)) if func.func_closure else [], + ) + + +def load_function((code, name, defaults, closure)): + closure = tuple(map(load_cell, closure)) + return types.FunctionType(code, globals(), name, defaults, closure)