|
|
@ -2,6 +2,7 @@ from __future__ import absolute_import |
|
|
|
|
|
|
|
import gipc |
|
|
|
import gevent |
|
|
|
import pickle |
|
|
|
import logging |
|
|
|
import marshal |
|
|
|
|
|
|
@ -82,7 +83,24 @@ class AutoSharder(object): |
|
|
|
format='{} [%(levelname)s] %(asctime)s - %(name)s:%(lineno)d - %(message)s'.format(id) |
|
|
|
) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def dumps(data): |
|
|
|
if isinstance(data, (basestring, int, long, bool, list, set, dict)): |
|
|
|
return '\x01' + marshal.dumps(data) |
|
|
|
elif isinstance(data, object) and data.__class__.__name__ == 'code': |
|
|
|
return '\x01' + marshal.dumps(data) |
|
|
|
else: |
|
|
|
return '\x02' + pickle.dumps(data) |
|
|
|
|
|
|
|
@staticmethod |
|
|
|
def loads(data): |
|
|
|
enc_type = data[0] |
|
|
|
if enc_type == '\x01': |
|
|
|
return marshal.loads(data[1:]) |
|
|
|
elif enc_type == '\x02': |
|
|
|
return pickle.loads(data[1:]) |
|
|
|
|
|
|
|
def start_shard(self, sid): |
|
|
|
cpipe, ppipe = gipc.pipe(duplex=True, encoder=marshal.dumps, decoder=marshal.loads) |
|
|
|
cpipe, ppipe = gipc.pipe(duplex=True, encoder=self.dumps, decoder=self.loads) |
|
|
|
gipc.start_process(run_shard, (self.config, sid, cpipe)) |
|
|
|
self.shards[sid] = GIPCProxy(self, ppipe) |
|
|
|