Browse Source

Remove dill requirement, switch to marshal for serialization, etc

pull/9/head
Andrei 9 years ago
parent
commit
fb2a19efb8
  1. 1
      README.md
  2. 6
      disco/gateway/ipc.py
  3. 44
      disco/gateway/sharder.py
  4. 35
      disco/util/serializer.py

1
README.md

@ -20,6 +20,7 @@ Disco was built to run both as a generic-use library, and a standalone bot toolk
|requests[security]|adds packages for a proper SSL implementation| |requests[security]|adds packages for a proper SSL implementation|
|ujson|faster json parser, improves performance| |ujson|faster json parser, improves performance|
|erlpack|ETF parser, only Python 2.x, run with the --encoder=etf flag| |erlpack|ETF parser, only Python 2.x, run with the --encoder=etf flag|
|gipc|Gevent IPC, required for autosharding|
## Examples ## Examples

6
disco/gateway/ipc.py

@ -2,11 +2,11 @@ import random
import gevent import gevent
import string import string
import weakref import weakref
import dill
from holster.enum import Enum from holster.enum import Enum
from disco.util.logging import LoggingClass from disco.util.logging import LoggingClass
from disco.util.serializer import dump_function, load_function
def get_random_str(size): def get_random_str(size):
@ -49,7 +49,7 @@ class GIPCProxy(LoggingClass):
self.send(IPCMessageType.RESPONSE, (nonce, self.resolve(path))) self.send(IPCMessageType.RESPONSE, (nonce, self.resolve(path)))
elif mtype == IPCMessageType.EXECUTE: elif mtype == IPCMessageType.EXECUTE:
nonce, raw = data nonce, raw = data
func = dill.loads(raw) func = load_function(raw)
try: try:
result = func(self.obj) result = func(self.obj)
except Exception: except Exception:
@ -73,7 +73,7 @@ class GIPCProxy(LoggingClass):
def execute(self, func): def execute(self, func):
nonce = get_random_str(32) nonce = get_random_str(32)
raw = dill.dumps(func) raw = dump_function(func)
self.results[nonce] = result = gevent.event.AsyncResult() self.results[nonce] = result = gevent.event.AsyncResult()
self.pipe.put((IPCMessageType.EXECUTE.value, (nonce, raw))) self.pipe.put((IPCMessageType.EXECUTE.value, (nonce, raw)))
return result return result

44
disco/gateway/sharder.py

@ -1,11 +1,9 @@
from __future__ import absolute_import from __future__ import absolute_import
import six
import gipc import gipc
import gevent import gevent
import logging import logging
import dill import marshal
import types
from holster.log import set_logging_levels from holster.log import set_logging_levels
@ -14,42 +12,7 @@ from disco.bot import Bot, BotConfig
from disco.api.client import APIClient from disco.api.client import APIClient
from disco.gateway.ipc import GIPCProxy from disco.gateway.ipc import GIPCProxy
from disco.util.snowflake import calculate_shard from disco.util.snowflake import calculate_shard
from disco.util.serializer import dump_function, load_function
def dump_function(func):
if six.PY3:
return dill.dumps((
func.__code__,
func.__name__,
func.__defaults__,
func.__closure__,
))
else:
return dill.dumps((
func.func_code,
func.func_name,
func.func_defaults,
func.func_closure
))
def load_function(func):
code, name, defaults, closure = dill.loads(func)
return types.FunctionType(code, globals(), name, defaults, closure)
def run_on(id, proxy):
def f(func):
return proxy.call(('run_on', ), id, dump_function(func))
return f
def run_self(bot):
def f(func):
result = gevent.event.AsyncResult()
result.set(func(bot))
return result
return f
def run_shard(config, id, pipe): def run_shard(config, id, pipe):
@ -104,7 +67,6 @@ class AutoSharder(object):
def run_on(self, id, raw): def run_on(self, id, raw):
func = load_function(raw) func = load_function(raw)
# func = dill.loads(raw)
return self.shards[id].execute(func).wait(timeout=15) return self.shards[id].execute(func).wait(timeout=15)
def run(self): def run(self):
@ -121,6 +83,6 @@ class AutoSharder(object):
) )
def start_shard(self, id): def start_shard(self, id):
cpipe, ppipe = gipc.pipe(duplex=True, encoder=dill.dumps, decoder=dill.loads) cpipe, ppipe = gipc.pipe(duplex=True, encoder=marshal.dumps, decoder=marshal.loads)
gipc.start_process(run_shard, (self.config, id, cpipe)) gipc.start_process(run_shard, (self.config, id, cpipe))
self.shards[id] = GIPCProxy(self, ppipe) self.shards[id] = GIPCProxy(self, ppipe)

35
disco/util/serializer.py

@ -1,3 +1,5 @@
import six
import types
class Serializer(object): class Serializer(object):
@ -36,3 +38,36 @@ class Serializer(object):
def dumps(cls, fmt, raw): def dumps(cls, fmt, raw):
_, dumps = getattr(cls, fmt)() _, dumps = getattr(cls, fmt)()
return dumps(raw) return dumps(raw)
def dump_cell(cell):
return cell.cell_contents
def load_cell(cell):
if six.PY3:
return (lambda y: cell).__closure__[0]
else:
return (lambda y: cell).func_closure[0]
def dump_function(func):
if six.PY3:
return (
func.__code__,
func.__name__,
func.__defaults__,
list(map(dump_cell, func.__closure__)) if func.__closure__ else [],
)
else:
return (
func.func_code,
func.func_name,
func.func_defaults,
list(map(dump_cell, func.func_closure)) if func.func_closure else [],
)
def load_function((code, name, defaults, closure)):
closure = tuple(map(load_cell, closure))
return types.FunctionType(code, globals(), name, defaults, closure)

Loading…
Cancel
Save