Browse Source

Refactoring, manhole, ETF support

- s/DiscoClient/Client (was redundant and looked ugly)
- move cached_property to functional utils
- abstract client configuration out to a ClientConfig
- command line utility is now completely isolated from the client
- add ETF (using erlpack) support, optionally enabled via command line
flags (and only works on 2.x because of erlpack)
- Refactor the way gateway events are built a bit
- Add documentation on utilities
pull/3/head
Andrei 9 years ago
parent
commit
1505275a98
  1. 3
      README.md
  2. 2
      disco/api/client.py
  3. 15
      disco/bot/bot.py
  4. 2
      disco/bot/command.py
  5. 2
      disco/bot/plugin.py
  6. 23
      disco/cli.py
  7. 101
      disco/client.py
  8. 28
      disco/gateway/client.py
  9. 11
      disco/gateway/encoding/__init__.py
  10. 14
      disco/gateway/encoding/base.py
  11. 28
      disco/gateway/encoding/etf.py
  12. 22
      disco/gateway/encoding/json.py
  13. 65
      disco/gateway/events.py
  14. 5
      disco/state.py
  15. 4
      disco/types/channel.py
  16. 2
      disco/types/message.py
  17. 15
      disco/util/cache.py
  18. 49
      disco/util/functional.py
  19. 11
      disco/util/json.py
  20. 11
      disco/voice/client.py
  21. 4
      docs/api.rst
  22. 1
      docs/index.rst
  23. 27
      docs/utilities.rst

3
README.md

@ -14,7 +14,8 @@ Disco was built to run both as a generic-use library, and a standalone bot toolk
|Name|Reason|
|----|------|
|requests[security]|adds packages for a proper SSL implementation|
|rapidjson|provides a Python implementation of the C rapidjson library, improves performance|
|ujson|faster json parser, improves performance|
|erlpack|ETF parser, only Python 2.x, run with the --encoder=etf flag|
## Examples

2
disco/api/client.py

@ -27,7 +27,7 @@ class APIClient(LoggingClass):
super(APIClient, self).__init__()
self.client = client
self.http = HTTPClient(self.client.token)
self.http = HTTPClient(self.client.config.token)
def gateway(self, version, encoding):
data = self.http(Routes.GATEWAY_GET).json()

15
disco/bot/bot.py

@ -1,6 +1,5 @@
import re
from disco.client import DiscoClient
from disco.bot.command import CommandEvent
@ -13,7 +12,7 @@ class BotConfig(object):
----------
token : str
The authentication token for this bot. This is passed on to the
:class:`disco.client.DiscoClient` without any validation.
:class:`disco.client.Client` without any validation.
commands_enabled : bool
Whether this bot instance should utilize command parsing. Generally this
should be true, unless your bot is only handling events and has no user
@ -61,25 +60,23 @@ class Bot(object):
Parameters
----------
client : Optional[:class:`disco.client.DiscoClient`]
The client this bot should utilize for its connection. If not provided,
will create a new :class:`disco.client.DiscoClient` with the token inside
the bot config (:class:`BotConfig`)
client : :class:`disco.client.Client`
The client this bot should utilize for its connection.
config : Optional[:class:`BotConfig`]
The configuration to use for this bot. If not provided will use the defaults
inside of :class:`BotConfig`.
Attributes
----------
client : `disco.client.DiscoClient`
client : `disco.client.Client`
The client instance for this bot.
config : `BotConfig`
The bot configuration instance for this bot.
plugins : dict(str, :class:`disco.bot.plugin.Plugin`)
Any plugins this bot has loaded
"""
def __init__(self, client=None, config=None):
self.client = client or DiscoClient(config.token)
def __init__(self, client, config=None):
self.client = client
self.config = config or BotConfig()
self.plugins = {}

2
disco/bot/command.py

@ -1,7 +1,7 @@
import re
from disco.bot.parser import ArgumentSet, ArgumentError
from disco.util.cache import cached_property
from disco.util.functional import cached_property
REGEX_FMT = '({})'
ARGS_REGEX = '( (.*)$|$)'

2
disco/bot/plugin.py

@ -92,7 +92,7 @@ class Plugin(LoggingClass, PluginDeco):
Attributes
----------
client : :class:`disco.client.DiscoClient`
client : :class:`disco.client.Client`
An alias to the client the bot is running with.
state : :class:`disco.state.State`
An alias to the state object for the client.

23
disco/cli.py

@ -10,10 +10,14 @@ import argparse
from gevent import monkey
monkey.patch_all()
parser = argparse.ArgumentParser()
parser.add_argument('--token', help='Bot Authentication Token', required=True)
parser.add_argument('--shard-count', help='Total number of shards', default=1)
parser.add_argument('--shard-id', help='Current shard number/id', default=0)
parser.add_argument('--manhole', action='store_true', help='Enable the manhole', default=False)
parser.add_argument('--manhole-bind', help='host:port for the manhole to bind too', default='localhost:8484')
parser.add_argument('--encoder', help='encoder for gateway data', default='json')
logging.basicConfig(level=logging.INFO)
@ -21,23 +25,32 @@ logging.basicConfig(level=logging.INFO)
def disco_main():
"""
Creates an argument parser and parses a standard set of command line arguments,
creating a new :class:`DiscoClient`.
creating a new :class:`Client`.
Returns
-------
:class:`DiscoClient`
A new DiscoClient from the provided command line arguments
:class:`Client`
A new Client from the provided command line arguments
"""
args = parser.parse_args()
from disco.client import Client, ClientConfig
from disco.gateway.encoding import ENCODERS
from disco.util.token import is_valid_token
if not is_valid_token(args.token):
print('Invalid token passed')
return
from disco.client import DiscoClient
return DiscoClient.from_cli(args)
cfg = ClientConfig()
cfg.token = args.token
cfg.shard_id = args.shard_id
cfg.shard_count = args.shard_count
cfg.manhole_enable = args.manhole
cfg.manhole_bind = args.manhole_bind
cfg.encoding_cls = ENCODERS[args.encoder]
return Client(cfg)
if __name__ == '__main__':
disco_main().run_forever()

101
disco/client.py

@ -1,71 +1,96 @@
import logging
import gevent
from gevent.backdoor import BackdoorServer
from holster.emitter import Emitter
from disco.state import State
from disco.api.client import APIClient
from disco.gateway.client import GatewayClient
from disco.util.logging import LoggingClass
log = logging.getLogger(__name__)
class ClientConfig(LoggingClass):
"""
Configuration for the :class:`Client`.
class DiscoClient(object):
Attributes
----------
token : str
Discord authentication token, ca be validated using the
:func:`disco.util.token.is_valid_token` function.
shard_id : int
The shard ID for the current client instance.
shard_count : int
The total count of shards running.
manhole_enable : bool
Whether to enable the manhole (e.g. console backdoor server) utility.
manhole_bind : tuple(str, int)
A (host, port) combination which the manhole server will bind to (if its
enabled using :attr:`manhole_enable`).
encoding_cls : class
The class to use for encoding/decoding data from websockets.
"""
The DiscoClient represents the base entry point to utilizing the Discord API
through disco. It wraps the functionality of both the REST API, and the realtime
secure websocket gateway.
token = ""
shard_id = 0
shard_count = 1
manhole_enable = True
manhole_bind = ('127.0.0.1', 8484)
encoding_cls = None
class Client(object):
"""
Class representing the base entry point that should be used in almost all
implementation cases. This class wraps the functionality of both the REST API
(:class:`disco.api.client.APIClient`) and the realtime gateway API
(:class:`disco.gateway.client.GatewayClient`).
Parameters
----------
token : str
The Discord authentication token which is used for both the :class:`APIClient`
and the :class:`GatewayClient`. This token can be validated before being
passed in, by using the :func:`disco.util.token.is_valid_token` function.
sharding : Optional[dict(str, int)]
A dictionary containing two pairs with information that is used to control
the sharding behavior of the :class:`GatewayClient`. By setting the `number`
key, the current shard ID can be controlled. While when setting the `total`
key, the total number of running shards can be set.
config : :class:`ClientConfig`
Configuration for this client instance.
Attributes
----------
config : :class:`ClientConfig`
The runtime configuration for this client.
events : :class:`Emitter`
An emitter which emits Gateway events
An emitter which emits Gateway events.
packets : :class:`Emitter`
An emitter which emits Gateway packets
An emitter which emits Gateway packets.
state : :class:`State`
The state tracking object
The state tracking object.
api : :class:`APIClient`
The API client
The API client.
gw : :class:`GatewayClient`
The gateway client
The gateway client.
manhole : Optional[:class:`BackdoorServer`]
Gevent backdoor server (if the manhole is enabled).
"""
def __init__(self, token, sharding=None):
self.log = log
self.token = token
self.sharding = sharding or {'number': 0, 'total': 1}
def __init__(self, config):
super(Client, self).__init__()
self.config = config
self.events = Emitter(gevent.spawn)
self.packets = Emitter(gevent.spawn)
self.state = State(self)
self.api = APIClient(self)
self.gw = GatewayClient(self)
self.gw = GatewayClient(self, self.config.encoding_cls)
@classmethod
def from_cli(cls, args):
"""
Create a new client from a argparse command line argument object, usually
generated from the :func:`disco_main` function.
"""
inst = cls(
token=args.token,
sharding={
'number': args.shard_id,
'total': args.shard_count,
})
return inst
if self.config.manhole_enable:
self.manhole = BackdoorServer(self.config.manhole_bind,
banner='Disco Manhole',
locals={
'client': self,
'state': self.state,
'api': self.api,
'gw': self.gw,
})
self.manhole.start()
def run(self):
"""

28
disco/gateway/client.py

@ -1,9 +1,10 @@
import gevent
import zlib
import six
from disco.gateway.packets import OPCode
from disco.gateway.events import GatewayEvent
from disco.util.json import loads, dumps
from disco.gateway.encoding.json import JSONEncoder
from disco.util.websocket import WebsocketProcessProxy
from disco.util.logging import LoggingClass
@ -14,9 +15,10 @@ class GatewayClient(LoggingClass):
GATEWAY_VERSION = 6
MAX_RECONNECTS = 5
def __init__(self, client):
def __init__(self, client, encoder=None):
super(GatewayClient, self).__init__()
self.client = client
self.encoder = encoder or JSONEncoder
self.events = client.events
self.packets = client.packets
@ -48,10 +50,10 @@ class GatewayClient(LoggingClass):
self._heartbeat_task = None
def send(self, op, data):
self.ws.send(dumps({
self.ws.send(self.encoder.encode({
'op': op.value,
'd': data,
}))
}), self.encoder.OPCODE)
def heartbeat_task(self, interval):
while True:
@ -90,7 +92,9 @@ class GatewayClient(LoggingClass):
def connect_and_run(self):
if not self._cached_gateway_url:
self._cached_gateway_url = self.client.api.gateway(version=self.GATEWAY_VERSION, encoding='json')
self._cached_gateway_url = self.client.api.gateway(
version=self.GATEWAY_VERSION,
encoding=self.encoder.TYPE)
self.log.info('Opening websocket connection to URL `%s`', self._cached_gateway_url)
self.ws = WebsocketProcessProxy(self._cached_gateway_url)
@ -103,11 +107,12 @@ class GatewayClient(LoggingClass):
def on_message(self, msg):
# Detect zlib and decompress
if msg[0] != '{':
is_erlpack = ((six.PY2 and ord(msg[0]) == 131) or (six.PY3 and msg[0] == 131))
if msg[0] != '{' and not is_erlpack:
msg = zlib.decompress(msg, 15, TEN_MEGABYTES).decode("utf-8")
try:
data = loads(msg)
data = self.encoder.decode(msg)
except:
self.log.exception('Failed to parse gateway message: ')
return
@ -128,17 +133,20 @@ class GatewayClient(LoggingClass):
if self.seq and self.session_id:
self.log.info('WS Opened: attempting resume w/ SID: %s SEQ: %s', self.session_id, self.seq)
self.send(OPCode.RESUME, {
'token': self.client.token,
'token': self.client.config.token,
'session_id': self.session_id,
'seq': self.seq
})
else:
self.log.info('WS Opened: sending identify payload')
self.send(OPCode.IDENTIFY, {
'token': self.client.token,
'token': self.client.config.token,
'compress': True,
'large_threshold': 250,
'shard': [self.client.sharding['number'], self.client.sharding['total']],
'shard': [
self.client.config.shard_id,
self.client.config.shard_count,
],
'properties': {
'$os': 'linux',
'$browser': 'disco',

11
disco/gateway/encoding/__init__.py

@ -0,0 +1,11 @@
from .json import JSONEncoder
ENCODERS = {
'json': JSONEncoder,
}
try:
from .etf import ETFEncoder
ENCODERS['etf'] = ETFEncoder
except ImportError:
pass

14
disco/gateway/encoding/base.py

@ -0,0 +1,14 @@
from websocket import ABNF
class BaseEncoder(object):
TYPE = None
OPCODE = ABNF.OPCODE_TEXT
@staticmethod
def encode(obj):
pass
@staticmethod
def decode(obj):
pass

28
disco/gateway/encoding/etf.py

@ -0,0 +1,28 @@
import six
from websocket import ABNF
from erlpack import Atom, unpack, pack
from disco.gateway.encoding.base import BaseEncoder
def make_keys_atom(obj):
res = {}
for k, v in six.iteritems(obj):
if isinstance(v, dict):
v = make_keys_atom(v)
res[Atom(k)] = v
return res
class ETFEncoder(BaseEncoder):
TYPE = 'etf'
OPCODE = ABNF.OPCODE_BINARY
@staticmethod
def encode(obj):
return pack(obj)
@staticmethod
def decode(obj):
return unpack(obj)

22
disco/gateway/encoding/json.py

@ -0,0 +1,22 @@
from __future__ import absolute_import, print_function
import six
try:
import ujson as json
except ImportError:
import json
from disco.gateway.encoding.base import BaseEncoder
class JSONEncoder(BaseEncoder):
TYPE = 'json'
@staticmethod
def encode(obj):
return json.dumps(obj)
@staticmethod
def decode(obj):
return json.loads(obj)

65
disco/gateway/events.py

@ -1,5 +1,6 @@
import inflection
import skema
import six
from disco.util import skema_find_recursive_by_type
from disco.types import Guild, Channel, User, GuildMember, Role, Message, VoiceState
@ -22,21 +23,29 @@ class GatewayEvent(skema.Model):
@classmethod
def create(cls, obj):
# If this event is wrapping a model, pull its fields
if hasattr(cls, '_wraps_model'):
alias, model = cls._wraps_model
data = {
k: obj.pop(k) for k in six.iterkeys(model._fields_by_stored_name) if k in obj
}
obj[alias] = data
self = cls(obj)
self.validate()
return self
def Sub(field):
class _T(GatewayEvent):
@classmethod
def create(cls, obj):
obj[field] = obj
self = cls(obj)
self.validate()
return self
def wraps_model(model, alias=None):
alias = alias or model.__name__.lower()
return _T
def deco(cls):
cls.add_field(alias, skema.ModelType(model))
cls._wraps_model = (alias, model)
return cls
return deco
class Ready(GatewayEvent):
@ -50,12 +59,13 @@ class Resumed(GatewayEvent):
pass
class GuildCreate(Sub('guild')):
guild = skema.ModelType(Guild)
@wraps_model(Guild)
class GuildCreate(GatewayEvent):
unavailable = skema.BooleanType(default=None)
class GuildUpdate(Sub('guild')):
@wraps_model(Guild)
class GuildUpdate(GatewayEvent):
guild = skema.ModelType(Guild)
@ -64,9 +74,8 @@ class GuildDelete(GatewayEvent):
unavailable = skema.BooleanType(default=None)
class ChannelCreate(Sub('channel')):
channel = skema.ModelType(Channel)
@wraps_model(Channel)
class ChannelCreate(GatewayEvent):
@property
def guild(self):
return self.channel.guild
@ -85,12 +94,13 @@ class ChannelPinsUpdate(GatewayEvent):
last_pin_timestamp = skema.IntType()
class GuildBanAdd(Sub('user')):
user = skema.ModelType(User)
@wraps_model(User)
class GuildBanAdd(GatewayEvent):
pass
class GuildBanRemove(Sub('user')):
user = skema.ModelType(User)
class GuildBanRemove(GuildBanAdd):
pass
class GuildEmojisUpdate(GatewayEvent):
@ -106,8 +116,9 @@ class GuildMembersChunk(GatewayEvent):
members = skema.ListType(skema.ModelType(GuildMember))
class GuildMemberAdd(Sub('member')):
member = skema.ModelType(GuildMember)
@wraps_model(GuildMember, alias='member')
class GuildMemberAdd(GatewayEvent):
pass
class GuildMemberRemove(GatewayEvent):
@ -136,16 +147,15 @@ class GuildRoleDelete(GatewayEvent):
role = skema.ModelType(Role)
class MessageCreate(Sub('message')):
message = skema.ModelType(Message)
@wraps_model(Message)
class MessageCreate(GatewayEvent):
@property
def channel(self):
return self.message.channel
class MessageUpdate(MessageCreate):
message = skema.ModelType(Message)
pass
class MessageDelete(GatewayEvent):
@ -177,8 +187,9 @@ class TypingStart(GatewayEvent):
timestamp = skema.IntType()
class VoiceStateUpdate(Sub('state')):
state = skema.ModelType(VoiceState)
@wraps_model(VoiceState, alias='state')
class VoiceStateUpdate(GatewayEvent):
pass
class VoiceServerUpdate(GatewayEvent):

5
disco/state.py

@ -57,8 +57,8 @@ class State(object):
----------
EVENTS : list(str)
A list of all events the State object binds too.
client : :class:`disco.client.DiscoClient`
The DiscoClient instance this state is attached too
client : :class:`disco.client.Client`
The Client instance this state is attached too
config : :class:`StateConfig`
The configuration for this state instance
me : :class:`disco.types.user.User`
@ -239,6 +239,7 @@ class State(object):
member.guild = guild
member.guild_id = guild.id
guild.members[member.id] = member
self.users[member.id] = member.user
def on_guild_role_create(self, event):
if event.guild_id not in self.guilds:

4
disco/types/channel.py

@ -2,7 +2,7 @@ import skema
from holster.enum import Enum
from disco.util.cache import cached_property
from disco.util.functional import cached_property
from disco.util.types import ListToDictType
from disco.types.base import BaseType
from disco.types.user import User
@ -214,7 +214,7 @@ class MessageIterator(object):
Parameters
----------
client : :class:`disco.client.DiscoClient`
client : :class:`disco.client.Client`
The disco client instance to use when making requests.
channel : `Channel`
The channel to iterate within.

2
disco/types/message.py

@ -2,7 +2,7 @@ import re
import skema
from disco.util import to_snowflake
from disco.util.cache import cached_property
from disco.util.functional import cached_property
from disco.util.types import PreHookType, ListToDictType
from disco.types.base import BaseType
from disco.types.user import User

15
disco/util/cache.py

@ -1,15 +0,0 @@
def cached_property(f):
def getf(self, *args, **kwargs):
if not hasattr(self, '__' + f.__name__):
setattr(self, '__' + f.__name__, f(self, *args, **kwargs))
return getattr(self, '__' + f.__name__)
def setf(self, value):
setattr(self, '__' + f.__name__, value)
def delf(self):
setattr(self, '__' + f.__name__, None)
return property(getf, setf, delf)

49
disco/util/functional.py

@ -0,0 +1,49 @@
from gevent.lock import RLock
def cached_property(f):
"""
Creates a cached property out of ``f``. When the property is resolved for
the first time, the function will be called and its result will be cached.
Subsequent calls will return the cached value. If this property is set, the
cached value will be replaced (or set initially) with the value provided. If
this property is deleted, the cache will be cleared and the next call will
refill it with a new value.
Notes
-----
This function is greenlet safe.
Args
----
f : function
The function to wrap.
Returns
-------
property
The cached property created.
"""
lock = RLock()
f._value = None
f._has_value = False
def getf(*args, **kwargs):
if not f._has_value:
with lock:
if f._has_value:
return f._value
f._value = f(*args, **kwargs)
f._has_value = True
return f._value
def setf(self, value):
f._value = value
def delf(self):
f._value = None
f._has_value = False
return property(getf, setf, delf)

11
disco/util/json.py

@ -1,11 +0,0 @@
from __future__ import absolute_import, print_function
from json import dumps
try:
from rapidjson import loads
except ImportError:
print('[WARNING] rapidjson not installed, falling back to default Python JSON parser')
from json import loads
__all__ = ['dumps', 'loads']

11
disco/voice/client.py

@ -6,9 +6,9 @@ import time
from holster.enum import Enum
from holster.emitter import Emitter
from disco.gateway.encoding.json import JSONEncoder
from disco.util.websocket import Websocket
from disco.util.logging import LoggingClass
from disco.util.json import loads, dumps
from disco.voice.packets import VoiceOPCode
from disco.gateway.packets import OPCode
@ -83,12 +83,13 @@ class UDPVoiceClient(LoggingClass):
class VoiceClient(LoggingClass):
def __init__(self, channel):
def __init__(self, channel, encoder=None):
super(VoiceClient, self).__init__()
assert channel.is_voice, 'Cannot spawn a VoiceClient for a non-voice channel'
self.channel = channel
self.client = self.channel.client
self.encoder = encoder or JSONEncoder
self.packets = Emitter(gevent.spawn)
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
@ -120,10 +121,10 @@ class VoiceClient(LoggingClass):
})
def send(self, op, data):
self.ws.send(dumps({
self.ws.send(self.encoder.encode({
'op': op.value,
'd': data,
}))
}), self.encoder.OPCODE)
def on_voice_ready(self, data):
self.state = VoiceState.CONNECTING
@ -179,7 +180,7 @@ class VoiceClient(LoggingClass):
def on_message(self, ws, msg):
try:
data = loads(msg)
data = self.encoder.decode(msg)
except:
self.log.exception('Failed to parse voice gateway message: ')

4
docs/api.rst

@ -14,10 +14,10 @@ version information for the installed package.
versioning format. E.g. ``'5.4.3-rc.2'``
DiscoClient
Client
------------
.. autoclass:: disco.client.DiscoClient
.. autoclass:: disco.client.Client
:members:

1
docs/index.rst

@ -17,6 +17,7 @@ Contents:
tutorial
api
utilities
Indices and tables
==================

27
docs/utilities.rst

@ -0,0 +1,27 @@
.. currentmodule:: disco
Utilities
=========
This section details information about various utilities provided in the disco
package, which aid the development and runtime management of disco clients/bots.
Generally these utilties are situational, and can be enabled depending on
various scenarious developers and users may find themselves in.
Manhole
-------
The manhole utilty is a backdoor server that allows opening a interactive REPL
while the client is running. This can be very useful for attaching and
inspecting runtime state, without distribing the normal client operations. To
enable the backdoor, simply set the
:attr:`disco.client.ClientConfig.manhole_enable` setting, and tweak
:attr:`disco.client.ClientConfig.manhole_bind` settings based on the connection
parameters you'd like.
It's recommended you connect to the manhole via ``rlwrap`` and ``netcat``, which
will give a proper TTY-like readline experience. For example:
.. sourcecode:: bash
rlwrap netcat localhost 8484
Loading…
Cancel
Save