21 changed files with 176 additions and 1493 deletions
@ -0,0 +1,64 @@ |
|||
import logging |
|||
from sdjango import namespace |
|||
|
|||
|
|||
online_user_num = 0 |
|||
|
|||
|
|||
@namespace('/test') |
|||
class TestNamespace: |
|||
|
|||
def __init__(self, name): |
|||
self.name = name |
|||
self.request = None # django request object |
|||
|
|||
def _get_socket(self, sid): |
|||
socket = namespace.server.eio._get_socket(sid) |
|||
return socket |
|||
|
|||
def _get_request(self, sid): |
|||
socket = self._get_socket(sid) |
|||
return socket._request |
|||
|
|||
def emit(self, *args, **kwargs): |
|||
if 'namespace' not in kwargs: |
|||
kwargs['namespace'] = self.name |
|||
|
|||
namespace.server.emit(*args, **kwargs) |
|||
|
|||
def on_my_event(self, sid, message): |
|||
self.emit('my response', {'data': message['data']}, room=sid) |
|||
|
|||
def on_my_broadcast_event(self, sid, message): |
|||
self.emit('my response', {'data': message['data']}) |
|||
|
|||
def on_join(self, sid, message): |
|||
namespace.server.enter_room(sid, message['room'], namespace='/test') |
|||
self.emit('my response', {'data': 'Entered room: '+message['room']}, room=sid) |
|||
|
|||
def on_leave(self, sid, message): |
|||
namespace.server.leave_room(sid, message['room'], namespace='/test') |
|||
self.emit('my response', {'data': 'Left room:'+message['room']}, room=sid) |
|||
|
|||
def on_close_room(self, sid, message): |
|||
self.emit('my response', {'data': 'Room '+message['room']+ ' is closing'}, |
|||
room=message['room']) |
|||
namespace.server.close_room(message['room'], namespace='/test') |
|||
|
|||
def on_my_room_event(self, sid, message): |
|||
self.emit('my response', {'data': message['data']}, room=message['room']) |
|||
|
|||
def on_disconnect_request(self, sid): |
|||
namespace.server.disconnect(sid, namespace='/test') |
|||
|
|||
# two method must have |
|||
def on_connect(self, sid, environ): |
|||
if 'django_request' in environ: |
|||
request = environ['django_request'] |
|||
socket = self._get_socket(sid) |
|||
socket._request = request |
|||
|
|||
self.emit('my response', {'data': "{} Connected".format(request.user), "count": 0}, room=sid) |
|||
|
|||
def on_disconnect(self, sid): |
|||
print('Client disconnected') |
@ -0,0 +1,91 @@ |
|||
<!DOCTYPE HTML> |
|||
<html> |
|||
<head> |
|||
<title>Django-SocketIO Test</title> |
|||
<script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script> |
|||
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.5/socket.io.min.js"></script> |
|||
<script type="text/javascript" charset="utf-8"> |
|||
$(document).ready(function(){ |
|||
namespace = '/test'; |
|||
var socket = io.connect('http://' + document.domain + ':' + location.port + namespace); |
|||
|
|||
socket.on('connect', function() { |
|||
socket.emit('my event', {data: 'I\'m connected!'}); |
|||
}); |
|||
socket.on('disconnect', function() { |
|||
$('#log').append('<br>Disconnected'); |
|||
}); |
|||
socket.on('my response', function(msg) { |
|||
$('#log').append('<br>Received: ' + msg.data); |
|||
}); |
|||
|
|||
// event handler for server sent data |
|||
// the data is displayed in the "Received" section of the page |
|||
// handlers for the different forms in the page |
|||
// these send data to the server in a variety of ways |
|||
$('form#emit').submit(function(event) { |
|||
socket.emit('my event', {data: $('#emit_data').val()}); |
|||
return false; |
|||
}); |
|||
$('form#broadcast').submit(function(event) { |
|||
socket.emit('my broadcast event', {data: $('#broadcast_data').val()}); |
|||
return false; |
|||
}); |
|||
$('form#join').submit(function(event) { |
|||
socket.emit('join', {room: $('#join_room').val()}); |
|||
return false; |
|||
}); |
|||
$('form#leave').submit(function(event) { |
|||
socket.emit('leave', {room: $('#leave_room').val()}); |
|||
return false; |
|||
}); |
|||
$('form#send_room').submit(function(event) { |
|||
socket.emit('my room event', {room: $('#room_name').val(), data: $('#room_data').val()}); |
|||
return false; |
|||
}); |
|||
$('form#close').submit(function(event) { |
|||
socket.emit('close room', {room: $('#close_room').val()}); |
|||
return false; |
|||
}); |
|||
$('form#disconnect').submit(function(event) { |
|||
socket.emit('disconnect request'); |
|||
return false; |
|||
}); |
|||
}); |
|||
</script> |
|||
</head> |
|||
<body> |
|||
<h1>Flask-SocketIO Test</h1> |
|||
<h2>Send:</h2> |
|||
<form id="emit" method="POST" action='#'> |
|||
<input type="text" name="emit_data" id="emit_data" placeholder="Message"> |
|||
<input type="submit" value="Echo"> |
|||
</form> |
|||
<form id="broadcast" method="POST" action='#'> |
|||
<input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message"> |
|||
<input type="submit" value="Broadcast"> |
|||
</form> |
|||
<form id="join" method="POST" action='#'> |
|||
<input type="text" name="join_room" id="join_room" placeholder="Room Name"> |
|||
<input type="submit" value="Join Room"> |
|||
</form> |
|||
<form id="leave" method="POST" action='#'> |
|||
<input type="text" name="leave_room" id="leave_room" placeholder="Room Name"> |
|||
<input type="submit" value="Leave Room"> |
|||
</form> |
|||
<form id="send_room" method="POST" action='#'> |
|||
<input type="text" name="room_name" id="room_name" placeholder="Room Name"> |
|||
<input type="text" name="room_data" id="room_data" placeholder="Message"> |
|||
<input type="submit" value="Send to Room"> |
|||
</form> |
|||
<form id="close" method="POST" action="#"> |
|||
<input type="text" name="close_room" id="close_room" placeholder="Room Name"> |
|||
<input type="submit" value="Close Room"> |
|||
</form> |
|||
<form id="disconnect" method="POST" action="#"> |
|||
<input type="submit" value="Disconnect"> |
|||
</form> |
|||
<h2>Receive:</h2> |
|||
<div><p id="log"></p></div> |
|||
</body> |
|||
</html> |
@ -0,0 +1,13 @@ |
|||
from django.conf.urls import ( |
|||
url, patterns, include |
|||
) |
|||
|
|||
import sdjango |
|||
|
|||
from .views import socket_base |
|||
|
|||
|
|||
urlpatterns = [ |
|||
url(r'^socket\.io', include(sdjango.urls)), |
|||
url(r'^$', socket_base, name='socket_base'), |
|||
] |
@ -1,3 +1,6 @@ |
|||
from django.shortcuts import render |
|||
|
|||
# Create your views here. |
|||
|
|||
def socket_base(request, template="base.html"): |
|||
context={} |
|||
return render(request, template, context) |
|||
|
@ -1,21 +0,0 @@ |
|||
VERSION = (0, 9, 5, 'final', 0) |
|||
|
|||
__all__ = [ |
|||
'WebSocketApplication', |
|||
'Resource', |
|||
'WebSocketServer', |
|||
'WebSocketError', |
|||
'get_version' |
|||
] |
|||
|
|||
|
|||
def get_version(*args, **kwargs): |
|||
from .utils import get_version |
|||
return get_version(*args, **kwargs) |
|||
|
|||
try: |
|||
from .resource import WebSocketApplication, Resource |
|||
from .server import WebSocketServer |
|||
from .exceptions import WebSocketError |
|||
except ImportError: |
|||
pass |
@ -1,19 +0,0 @@ |
|||
from socket import error as socket_error |
|||
|
|||
|
|||
class WebSocketError(socket_error): |
|||
""" |
|||
Base class for all websocket errors. |
|||
""" |
|||
|
|||
|
|||
class ProtocolError(WebSocketError): |
|||
""" |
|||
Raised if an error occurs when de/encoding the websocket protocol. |
|||
""" |
|||
|
|||
|
|||
class FrameTooLargeException(ProtocolError): |
|||
""" |
|||
Raised if a frame is received that is too large. |
|||
""" |
@ -1,6 +0,0 @@ |
|||
from geventwebsocket.handler import WebSocketHandler |
|||
from gunicorn.workers.ggevent import GeventPyWSGIWorker |
|||
|
|||
|
|||
class GeventWebSocketWorker(GeventPyWSGIWorker): |
|||
wsgi_handler = WebSocketHandler |
@ -1,284 +0,0 @@ |
|||
import base64 |
|||
import hashlib |
|||
import warnings |
|||
|
|||
from gevent.pywsgi import WSGIHandler |
|||
from .websocket import WebSocket, Stream |
|||
from .logging import create_logger |
|||
|
|||
|
|||
class Client(object): |
|||
def __init__(self, address, ws): |
|||
self.address = address |
|||
self.ws = ws |
|||
|
|||
|
|||
class WebSocketHandler(WSGIHandler): |
|||
""" |
|||
Automatically upgrades the connection to a websocket. |
|||
|
|||
To prevent the WebSocketHandler to call the underlying WSGI application, |
|||
but only setup the WebSocket negotiations, do: |
|||
|
|||
mywebsockethandler.prevent_wsgi_call = True |
|||
|
|||
before calling run_application(). This is useful if you want to do more |
|||
things before calling the app, and want to off-load the WebSocket |
|||
negotiations to this library. Socket.IO needs this for example, to send |
|||
the 'ack' before yielding the control to your WSGI app. |
|||
""" |
|||
|
|||
SUPPORTED_VERSIONS = ('13', '8', '7') |
|||
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" |
|||
|
|||
def run_websocket(self): |
|||
""" |
|||
Called when a websocket has been created successfully. |
|||
""" |
|||
|
|||
if getattr(self, 'prevent_wsgi_call', False): |
|||
return |
|||
|
|||
# In case WebSocketServer is not used |
|||
if not hasattr(self.server, 'clients'): |
|||
self.server.clients = {} |
|||
|
|||
# Since we're now a websocket connection, we don't care what the |
|||
# application actually responds with for the http response |
|||
|
|||
try: |
|||
|
|||
self.server.clients[self.client_address] = Client( |
|||
self.client_address, self.websocket) |
|||
# list(self.application(self.environ, lambda s, h, e=None: [])) |
|||
# have no idea why do this here |
|||
finally: |
|||
del self.server.clients[self.client_address] |
|||
if not self.websocket.closed: |
|||
self.websocket.close() |
|||
self.environ.update({ |
|||
'wsgi.websocket': None |
|||
}) |
|||
self.websocket = None |
|||
|
|||
def run_application(self): |
|||
if (hasattr(self.server, 'pre_start_hook') |
|||
and self.server.pre_start_hook): |
|||
self.logger.debug("Calling pre-start hook") |
|||
if self.server.pre_start_hook(self): |
|||
return super(WebSocketHandler, self).run_application() |
|||
|
|||
self.logger.debug("Initializing WebSocket") |
|||
self.result = self.upgrade_websocket() |
|||
|
|||
if hasattr(self, 'websocket'): |
|||
if self.status and not self.headers_sent: |
|||
self.write('') |
|||
|
|||
self.run_websocket() |
|||
else: |
|||
if self.status: |
|||
# A status was set, likely an error so just send the response |
|||
if not self.result: |
|||
self.result = [] |
|||
|
|||
self.process_result() |
|||
return |
|||
|
|||
# This handler did not handle the request, so defer it to the |
|||
# underlying application object |
|||
return super(WebSocketHandler, self).run_application() |
|||
|
|||
def upgrade_websocket(self): |
|||
""" |
|||
Attempt to upgrade the current environ into a websocket enabled |
|||
connection. If successful, the environ dict with be updated with two |
|||
new entries, `wsgi.websocket` and `wsgi.websocket_version`. |
|||
|
|||
:returns: Whether the upgrade was successful. |
|||
""" |
|||
|
|||
# Some basic sanity checks first |
|||
|
|||
self.logger.debug("Validating WebSocket request") |
|||
|
|||
if self.environ.get('REQUEST_METHOD', '') != 'GET': |
|||
# This is not a websocket request, so we must not handle it |
|||
self.logger.debug('Can only upgrade connection if using GET method.') |
|||
return |
|||
|
|||
upgrade = self.environ.get('HTTP_UPGRADE', '').lower() |
|||
|
|||
if upgrade == 'websocket': |
|||
connection = self.environ.get('HTTP_CONNECTION', '').lower() |
|||
|
|||
if 'upgrade' not in connection: |
|||
# This is not a websocket request, so we must not handle it |
|||
self.logger.warning("Client didn't ask for a connection " |
|||
"upgrade") |
|||
return |
|||
else: |
|||
# This is not a websocket request, so we must not handle it |
|||
return |
|||
|
|||
if self.request_version != 'HTTP/1.1': |
|||
self.start_response('402 Bad Request', []) |
|||
self.logger.warning("Bad server protocol in headers") |
|||
|
|||
return ['Bad protocol version'] |
|||
|
|||
if self.environ.get('HTTP_SEC_WEBSOCKET_VERSION'): |
|||
return self.upgrade_connection() |
|||
else: |
|||
self.logger.warning("No protocol defined") |
|||
self.start_response('426 Upgrade Required', [ |
|||
('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS))]) |
|||
|
|||
return ['No Websocket protocol version defined'] |
|||
|
|||
def upgrade_connection(self): |
|||
""" |
|||
Validate and 'upgrade' the HTTP request to a WebSocket request. |
|||
|
|||
If an upgrade succeeded then then handler will have `start_response` |
|||
with a status of `101`, the environ will also be updated with |
|||
`wsgi.websocket` and `wsgi.websocket_version` keys. |
|||
|
|||
:param environ: The WSGI environ dict. |
|||
:param start_response: The callable used to start the response. |
|||
:param stream: File like object that will be read from/written to by |
|||
the underlying WebSocket object, if created. |
|||
:return: The WSGI response iterator is something went awry. |
|||
""" |
|||
|
|||
self.logger.debug("Attempting to upgrade connection") |
|||
|
|||
version = self.environ.get("HTTP_SEC_WEBSOCKET_VERSION") |
|||
|
|||
if version not in self.SUPPORTED_VERSIONS: |
|||
msg = "Unsupported WebSocket Version: {0}".format(version) |
|||
|
|||
self.logger.warning(msg) |
|||
self.start_response('400 Bad Request', [ |
|||
('Sec-WebSocket-Version', ', '.join(self.SUPPORTED_VERSIONS)) |
|||
]) |
|||
|
|||
return [msg] |
|||
|
|||
key = self.environ.get("HTTP_SEC_WEBSOCKET_KEY", '').strip() |
|||
|
|||
if not key: |
|||
# 5.2.1 (3) |
|||
msg = "Sec-WebSocket-Key header is missing/empty" |
|||
|
|||
self.logger.warning(msg) |
|||
self.start_response('400 Bad Request', []) |
|||
|
|||
return [msg] |
|||
|
|||
try: |
|||
key_len = len(base64.b64decode(key)) |
|||
except TypeError: |
|||
msg = "Invalid key: {0}".format(key) |
|||
|
|||
self.logger.warning(msg) |
|||
self.start_response('400 Bad Request', []) |
|||
|
|||
return [msg] |
|||
|
|||
if key_len != 16: |
|||
# 5.2.1 (3) |
|||
msg = "Invalid key: {0}".format(key) |
|||
|
|||
self.logger.warning(msg) |
|||
self.start_response('400 Bad Request', []) |
|||
|
|||
return [msg] |
|||
|
|||
# Check for WebSocket Protocols |
|||
requested_protocols = self.environ.get( |
|||
'HTTP_SEC_WEBSOCKET_PROTOCOL', '') |
|||
protocol = None |
|||
|
|||
if hasattr(self.application, 'app_protocol'): |
|||
allowed_protocol = self.application.app_protocol( |
|||
self.environ['PATH_INFO']) |
|||
|
|||
if allowed_protocol and allowed_protocol in requested_protocols: |
|||
protocol = allowed_protocol |
|||
self.logger.debug("Protocol allowed: {0}".format(protocol)) |
|||
|
|||
self.websocket = WebSocket(self.environ, Stream(self), self) |
|||
self.environ.update({ |
|||
'wsgi.websocket_version': version, |
|||
'wsgi.websocket': self.websocket |
|||
}) |
|||
|
|||
headers = [ |
|||
("Upgrade", "websocket"), |
|||
("Connection", "Upgrade"), |
|||
("Sec-WebSocket-Accept", base64.b64encode( |
|||
hashlib.sha1((key + self.GUID).encode('utf-8')).digest())), |
|||
] |
|||
|
|||
if protocol: |
|||
headers.append(("Sec-WebSocket-Protocol", protocol)) |
|||
|
|||
self.logger.debug("WebSocket request accepted, switching protocols") |
|||
self.start_response("101 Switching Protocols", headers) |
|||
|
|||
@property |
|||
def logger(self): |
|||
if not hasattr(self.server, 'logger'): |
|||
self.server.logger = create_logger(__name__) |
|||
|
|||
return self.server.logger |
|||
|
|||
# def log_request(self): |
|||
# if b'101' not in self.status: |
|||
# self.logger.info(self.format_request()) |
|||
# super().log_request() |
|||
|
|||
@property |
|||
def active_client(self): |
|||
return self.server.clients[self.client_address] |
|||
|
|||
def start_response(self, status, headers, exc_info=None): |
|||
""" |
|||
Called when the handler is ready to send a response back to the remote |
|||
endpoint. A websocket connection may have not been created. |
|||
""" |
|||
# everything in headers must be str |
|||
headers = [(header[0], header[1].decode('utf-8') if not isinstance(header[1], str) else header[1]) \ |
|||
for header in headers] |
|||
writer = super(WebSocketHandler, self).start_response( |
|||
status, headers, exc_info=exc_info) |
|||
|
|||
self._prepare_response() |
|||
|
|||
return writer |
|||
|
|||
def _prepare_response(self): |
|||
""" |
|||
Sets up the ``pywsgi.Handler`` to work with a websocket response. |
|||
|
|||
This is used by other projects that need to support WebSocket |
|||
connections as part of a larger effort. |
|||
""" |
|||
assert not self.headers_sent |
|||
|
|||
if not self.environ.get('wsgi.websocket'): |
|||
# a WebSocket connection is not established, do nothing |
|||
return |
|||
|
|||
# So that `finalize_headers` doesn't write a Content-Length header |
|||
self.provided_content_length = False |
|||
|
|||
# The websocket is now controlling the response |
|||
self.response_use_chunked = False |
|||
|
|||
# Once the request is over, the connection must be closed |
|||
self.close_connection = True |
|||
|
|||
# Prevents the Date header from being written |
|||
self.provided_date = True |
@ -1,31 +0,0 @@ |
|||
|
|||
|
|||
from logging import getLogger, StreamHandler, getLoggerClass, Formatter, DEBUG |
|||
|
|||
|
|||
def create_logger(name, debug=False, format=None): |
|||
Logger = getLoggerClass() |
|||
|
|||
class DebugLogger(Logger): |
|||
def getEffectiveLevel(x): |
|||
if x.level == 0 and debug: |
|||
return DEBUG |
|||
else: |
|||
return Logger.getEffectiveLevel(x) |
|||
|
|||
class DebugHandler(StreamHandler): |
|||
def emit(x, record): |
|||
StreamHandler.emit(x, record) if debug else None |
|||
|
|||
handler = DebugHandler() |
|||
handler.setLevel(DEBUG) |
|||
|
|||
if format: |
|||
handler.setFormatter(Formatter(format)) |
|||
|
|||
logger = getLogger(name) |
|||
del logger.handlers[:] |
|||
logger.__class__ = DebugLogger |
|||
logger.addHandler(handler) |
|||
|
|||
return logger |
@ -1,35 +0,0 @@ |
|||
class BaseProtocol(object): |
|||
PROTOCOL_NAME = '' |
|||
|
|||
def __init__(self, app): |
|||
self._app = app |
|||
|
|||
def on_open(self): |
|||
self.app.on_open() |
|||
|
|||
def on_message(self, message): |
|||
self.app.on_message(message) |
|||
|
|||
def on_close(self, reason=None): |
|||
self.app.on_close(reason) |
|||
|
|||
@property |
|||
def app(self): |
|||
if self._app: |
|||
return self._app |
|||
else: |
|||
raise Exception("No application coupled") |
|||
|
|||
@property |
|||
def server(self): |
|||
if not hasattr(self.app, 'ws'): |
|||
return None |
|||
|
|||
return self.app.ws.handler.server |
|||
|
|||
@property |
|||
def handler(self): |
|||
if not hasattr(self.app, 'ws'): |
|||
return None |
|||
|
|||
return self.app.ws.handler |
@ -1,234 +0,0 @@ |
|||
import inspect |
|||
import random |
|||
import string |
|||
import types |
|||
|
|||
try: |
|||
import ujson as json |
|||
except ImportError: |
|||
try: |
|||
import simplejson as json |
|||
except ImportError: |
|||
import json |
|||
|
|||
from ..exceptions import WebSocketError |
|||
from .base import BaseProtocol |
|||
|
|||
|
|||
def export_rpc(arg=None): |
|||
if isinstance(arg, types.FunctionType): |
|||
arg._rpc = arg.__name__ |
|||
return arg |
|||
|
|||
|
|||
def serialize(data): |
|||
return json.dumps(data) |
|||
|
|||
|
|||
class Prefixes(object): |
|||
def __init__(self): |
|||
self.prefixes = {} |
|||
|
|||
def add(self, prefix, uri): |
|||
self.prefixes[prefix] = uri |
|||
|
|||
def resolve(self, curie_or_uri): |
|||
if "http://" in curie_or_uri: |
|||
return curie_or_uri |
|||
elif ':' in curie_or_uri: |
|||
prefix, proc = curie_or_uri.split(':', 1) |
|||
return self.prefixes[prefix] + proc |
|||
else: |
|||
raise Exception(curie_or_uri) |
|||
|
|||
|
|||
class RemoteProcedures(object): |
|||
def __init__(self): |
|||
self.calls = {} |
|||
|
|||
def register_procedure(self, uri, proc): |
|||
self.calls[uri] = proc |
|||
|
|||
def register_object(self, uri, obj): |
|||
for k in inspect.getmembers(obj, inspect.ismethod): |
|||
if '_rpc' in k[1].__dict__: |
|||
proc_uri = uri + k[1]._rpc |
|||
self.calls[proc_uri] = (obj, k[1]) |
|||
|
|||
def call(self, uri, args): |
|||
if uri in self.calls: |
|||
proc = self.calls[uri] |
|||
|
|||
# Do the correct call whether it's a function or instance method. |
|||
if isinstance(proc, tuple): |
|||
if proc[1].__self__ is None: |
|||
# Create instance of object and call method |
|||
return proc[1](proc[0](), *args) |
|||
else: |
|||
# Call bound method on instance |
|||
return proc[1](*args) |
|||
else: |
|||
return self.calls[uri](*args) |
|||
else: |
|||
raise Exception("no such uri '{}'".format(uri)) |
|||
|
|||
|
|||
class Channels(object): |
|||
def __init__(self): |
|||
self.channels = {} |
|||
|
|||
def create(self, uri, prefix_matching=False): |
|||
if uri not in self.channels: |
|||
self.channels[uri] = [] |
|||
|
|||
# TODO: implement prefix matching |
|||
|
|||
def subscribe(self, uri, client): |
|||
if uri in self.channels: |
|||
self.channels[uri].append(client) |
|||
|
|||
def unsubscribe(self, uri, client): |
|||
if uri not in self.channels: |
|||
return |
|||
|
|||
client_index = self.channels[uri].index(client) |
|||
self.channels[uri].pop(client_index) |
|||
|
|||
if len(self.channels[uri]) == 0: |
|||
del self.channels[uri] |
|||
|
|||
def publish(self, uri, event, exclude=None, eligible=None): |
|||
if uri not in self.channels: |
|||
return |
|||
|
|||
# TODO: exclude & eligible |
|||
|
|||
msg = [WampProtocol.MSG_EVENT, uri, event] |
|||
|
|||
for client in self.channels[uri]: |
|||
try: |
|||
client.ws.send(serialize(msg)) |
|||
except WebSocketError: |
|||
# Seems someone didn't unsubscribe before disconnecting |
|||
self.channels[uri].remove(client) |
|||
|
|||
|
|||
class WampProtocol(BaseProtocol): |
|||
MSG_WELCOME = 0 |
|||
MSG_PREFIX = 1 |
|||
MSG_CALL = 2 |
|||
MSG_CALL_RESULT = 3 |
|||
MSG_CALL_ERROR = 4 |
|||
MSG_SUBSCRIBE = 5 |
|||
MSG_UNSUBSCRIBE = 6 |
|||
MSG_PUBLISH = 7 |
|||
MSG_EVENT = 8 |
|||
|
|||
PROTOCOL_NAME = "wamp" |
|||
|
|||
def __init__(self, *args, **kwargs): |
|||
self.procedures = RemoteProcedures() |
|||
self.prefixes = Prefixes() |
|||
self.session_id = ''.join( |
|||
[random.choice(string.digits + string.letters) |
|||
for i in range(16)]) |
|||
|
|||
super(WampProtocol, self).__init__(*args, **kwargs) |
|||
|
|||
def register_procedure(self, *args, **kwargs): |
|||
self.procedures.register_procedure(*args, **kwargs) |
|||
|
|||
def register_object(self, *args, **kwargs): |
|||
self.procedures.register_object(*args, **kwargs) |
|||
|
|||
def register_pubsub(self, *args, **kwargs): |
|||
if not hasattr(self.server, 'channels'): |
|||
self.server.channels = Channels() |
|||
|
|||
self.server.channels.create(*args, **kwargs) |
|||
|
|||
def do_handshake(self): |
|||
from geventwebsocket import get_version |
|||
|
|||
welcome = [ |
|||
self.MSG_WELCOME, |
|||
self.session_id, |
|||
1, |
|||
'gevent-websocket/' + get_version() |
|||
] |
|||
self.app.ws.send(serialize(welcome)) |
|||
|
|||
def _get_exception_info(self, e): |
|||
uri = 'http://TODO#generic' |
|||
desc = str(type(e)) |
|||
details = str(e) |
|||
return [uri, desc, details] |
|||
|
|||
def rpc_call(self, data): |
|||
call_id, curie_or_uri = data[1:3] |
|||
args = data[3:] |
|||
|
|||
if not isinstance(call_id, str): |
|||
raise Exception() |
|||
if not isinstance(curie_or_uri, str): |
|||
raise Exception() |
|||
|
|||
uri = self.prefixes.resolve(curie_or_uri) |
|||
|
|||
try: |
|||
result = self.procedures.call(uri, args) |
|||
result_msg = [self.MSG_CALL_RESULT, call_id, result] |
|||
except Exception as e: |
|||
result_msg = [self.MSG_CALL_ERROR, |
|||
call_id] + self._get_exception_info(e) |
|||
|
|||
self.app.on_message(serialize(result_msg)) |
|||
|
|||
def pubsub_action(self, data): |
|||
action = data[0] |
|||
curie_or_uri = data[1] |
|||
|
|||
if not isinstance(action, int): |
|||
raise Exception() |
|||
if not isinstance(curie_or_uri, str): |
|||
raise Exception() |
|||
|
|||
uri = self.prefixes.resolve(curie_or_uri) |
|||
|
|||
if action == self.MSG_SUBSCRIBE and len(data) == 2: |
|||
self.server.channels.subscribe(data[1], self.handler.active_client) |
|||
|
|||
elif action == self.MSG_UNSUBSCRIBE and len(data) == 2: |
|||
self.server.channels.unsubscribe( |
|||
data[1], self.handler.active_client) |
|||
|
|||
elif action == self.MSG_PUBLISH and len(data) >= 3: |
|||
payload = data[2] if len(data) >= 3 else None |
|||
exclude = data[3] if len(data) >= 4 else None |
|||
eligible = data[4] if len(data) >= 5 else None |
|||
|
|||
self.server.channels.publish(uri, payload, exclude, eligible) |
|||
|
|||
def on_open(self): |
|||
self.app.on_open() |
|||
self.do_handshake() |
|||
|
|||
def on_message(self, message): |
|||
data = json.loads(message) |
|||
|
|||
if not isinstance(data, list): |
|||
raise Exception('incoming data is no list') |
|||
|
|||
if data[0] == self.MSG_PREFIX and len(data) == 3: |
|||
prefix, uri = data[1:3] |
|||
self.prefixes.add(prefix, uri) |
|||
|
|||
elif data[0] == self.MSG_CALL and len(data) >= 3: |
|||
return self.rpc_call(data) |
|||
|
|||
elif data[0] in (self.MSG_SUBSCRIBE, self.MSG_UNSUBSCRIBE, |
|||
self.MSG_PUBLISH): |
|||
return self.pubsub_action(data) |
|||
else: |
|||
raise Exception("Unknown call") |
|||
|
@ -1,100 +0,0 @@ |
|||
import re |
|||
import warnings |
|||
|
|||
from .protocols.base import BaseProtocol |
|||
from .exceptions import WebSocketError |
|||
|
|||
try: |
|||
from collections import OrderedDict |
|||
except ImportError: |
|||
class OrderedDict: |
|||
pass |
|||
|
|||
|
|||
class WebSocketApplication(object): |
|||
protocol_class = BaseProtocol |
|||
|
|||
def __init__(self, ws): |
|||
self.protocol = self.protocol_class(self) |
|||
self.ws = ws |
|||
|
|||
def handle(self): |
|||
self.protocol.on_open() |
|||
|
|||
while True: |
|||
try: |
|||
message = self.ws.receive() |
|||
except WebSocketError: |
|||
self.protocol.on_close() |
|||
break |
|||
|
|||
self.protocol.on_message(message) |
|||
|
|||
def on_open(self, *args, **kwargs): |
|||
pass |
|||
|
|||
def on_close(self, *args, **kwargs): |
|||
pass |
|||
|
|||
def on_message(self, message, *args, **kwargs): |
|||
self.ws.send(message, **kwargs) |
|||
|
|||
@classmethod |
|||
def protocol_name(cls): |
|||
return cls.protocol_class.PROTOCOL_NAME |
|||
|
|||
|
|||
class Resource(object): |
|||
def __init__(self, apps=None): |
|||
self.apps = apps if apps else [] |
|||
|
|||
if isinstance(apps, dict): |
|||
if not isinstance(apps, OrderedDict): |
|||
warnings.warn("Using an unordered dictionary for the " |
|||
"app list is discouraged and may lead to " |
|||
"undefined behavior.", UserWarning) |
|||
|
|||
self.apps = list(apps.items()) |
|||
|
|||
# An app can either be a standard WSGI application (an object we call with |
|||
# __call__(self, environ, start_response)) or a class we instantiate |
|||
# (and which can handle websockets). This function tells them apart. |
|||
# Override this if you have apps that can handle websockets but don't |
|||
# fulfill these criteria. |
|||
def _is_websocket_app(self, app): |
|||
return isinstance(app, type) and issubclass(app, WebSocketApplication) |
|||
|
|||
def _app_by_path(self, environ_path, is_websocket_request): |
|||
# Which app matched the current path? |
|||
for path, app in self.apps: |
|||
if re.match(path, environ_path): |
|||
if is_websocket_request == self._is_websocket_app(app): |
|||
return app |
|||
return None |
|||
|
|||
def app_protocol(self, path): |
|||
# app_protocol will only be called for websocket apps |
|||
app = self._app_by_path(path, True) |
|||
|
|||
if hasattr(app, 'protocol_name'): |
|||
return app.protocol_name() |
|||
else: |
|||
return '' |
|||
|
|||
def __call__(self, environ, start_response): |
|||
environ = environ |
|||
is_websocket_call = 'wsgi.websocket' in environ |
|||
current_app = self._app_by_path(environ['PATH_INFO'], is_websocket_call) |
|||
|
|||
if current_app is None: |
|||
raise Exception("No apps defined") |
|||
|
|||
if is_websocket_call: |
|||
ws = environ['wsgi.websocket'] |
|||
current_app = current_app(ws) |
|||
current_app.ws = ws # TODO: needed? |
|||
current_app.handle() |
|||
# Always return something, calling WSGI middleware may rely on it |
|||
return [] |
|||
else: |
|||
return current_app(environ, start_response) |
@ -1,34 +0,0 @@ |
|||
from gevent.pywsgi import WSGIServer |
|||
|
|||
from .handler import WebSocketHandler |
|||
from .logging import create_logger |
|||
|
|||
|
|||
class WebSocketServer(WSGIServer): |
|||
handler_class = WebSocketHandler |
|||
debug_log_format = ( |
|||
'-' * 80 + '\n' + |
|||
'%(levelname)s in %(module)s [%(pathname)s:%(lineno)d]:\n' + |
|||
'%(message)s\n' + |
|||
'-' * 80 |
|||
) |
|||
|
|||
def __init__(self, *args, **kwargs): |
|||
self.debug = kwargs.pop('debug', False) |
|||
self.pre_start_hook = kwargs.pop('pre_start_hook', None) |
|||
self._logger = None |
|||
self.clients = {} |
|||
|
|||
super(WebSocketServer, self).__init__(*args, **kwargs) |
|||
|
|||
def handle(self, socket, address): |
|||
handler = self.handler_class(socket, address, self) |
|||
handler.handle() |
|||
|
|||
@property |
|||
def logger(self): |
|||
if not self._logger: |
|||
self._logger = create_logger( |
|||
__name__, self.debug, self.debug_log_format) |
|||
|
|||
return self._logger |
@ -1,128 +0,0 @@ |
|||
############################################################################### |
|||
## |
|||
## Copyright 2011-2013 Tavendo GmbH |
|||
## |
|||
## Note: |
|||
## |
|||
## This code is a Python implementation of the algorithm |
|||
## |
|||
## "Flexible and Economical UTF-8 Decoder" |
|||
## |
|||
## by Bjoern Hoehrmann |
|||
## |
|||
## [email protected] |
|||
## http://bjoern.hoehrmann.de/utf-8/decoder/dfa/ |
|||
## |
|||
## Licensed under the Apache License, Version 2.0 (the "License"); |
|||
## you may not use this file except in compliance with the License. |
|||
## You may obtain a copy of the License at |
|||
## |
|||
## http://www.apache.org/licenses/LICENSE-2.0 |
|||
## |
|||
## Unless required by applicable law or agreed to in writing, software |
|||
## distributed under the License is distributed on an "AS IS" BASIS, |
|||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
## See the License for the specific language governing permissions and |
|||
## limitations under the License. |
|||
## |
|||
############################################################################### |
|||
|
|||
|
|||
## use Cython implementation of UTF8 validator if available |
|||
## |
|||
try: |
|||
from wsaccel.utf8validator import Utf8Validator |
|||
except: |
|||
## fallback to pure Python implementation |
|||
|
|||
class Utf8Validator: |
|||
""" |
|||
Incremental UTF-8 validator with constant memory consumption (minimal |
|||
state). |
|||
|
|||
Implements the algorithm "Flexible and Economical UTF-8 Decoder" by |
|||
Bjoern Hoehrmann (http://bjoern.hoehrmann.de/utf-8/decoder/dfa/). |
|||
""" |
|||
|
|||
## DFA transitions |
|||
UTF8VALIDATOR_DFA = [ |
|||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 00..1f |
|||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 20..3f |
|||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 40..5f |
|||
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, # 60..7f |
|||
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9, # 80..9f |
|||
7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, # a0..bf |
|||
8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, # c0..df |
|||
0xa,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x3,0x4,0x3,0x3, # e0..ef |
|||
0xb,0x6,0x6,0x6,0x5,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8,0x8, # f0..ff |
|||
0x0,0x1,0x2,0x3,0x5,0x8,0x7,0x1,0x1,0x1,0x4,0x6,0x1,0x1,0x1,0x1, # s0..s0 |
|||
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,1, # s1..s2 |
|||
1,2,1,1,1,1,1,2,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1, # s3..s4 |
|||
1,2,1,1,1,1,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,1,1,3,1,3,1,1,1,1,1,1, # s5..s6 |
|||
1,3,1,1,1,1,1,3,1,3,1,1,1,1,1,1,1,3,1,1,1,1,1,1,1,1,1,1,1,1,1,1, # s7..s8 |
|||
] |
|||
|
|||
UTF8_ACCEPT = 0 |
|||
UTF8_REJECT = 1 |
|||
|
|||
def __init__(self): |
|||
self.reset() |
|||
|
|||
def decode(self, b): |
|||
""" |
|||
Eat one UTF-8 octet, and validate on the fly. |
|||
|
|||
Returns UTF8_ACCEPT when enough octets have been consumed, in which case |
|||
self.codepoint contains the decoded Unicode code point. |
|||
|
|||
Returns UTF8_REJECT when invalid UTF-8 was encountered. |
|||
|
|||
Returns some other positive integer when more octets need to be eaten. |
|||
""" |
|||
type = Utf8Validator.UTF8VALIDATOR_DFA[b] |
|||
|
|||
if self.state != Utf8Validator.UTF8_ACCEPT: |
|||
self.codepoint = (b & 0x3f) | (self.codepoint << 6) |
|||
else: |
|||
self.codepoint = (0xff >> type) & b |
|||
|
|||
self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + self.state * 16 + type] |
|||
|
|||
return self.state |
|||
|
|||
def reset(self): |
|||
""" |
|||
Reset validator to start new incremental UTF-8 decode/validation. |
|||
""" |
|||
self.state = Utf8Validator.UTF8_ACCEPT |
|||
self.codepoint = 0 |
|||
self.i = 0 |
|||
|
|||
def validate(self, ba): |
|||
""" |
|||
Incrementally validate a chunk of bytes provided as string. |
|||
|
|||
Will return a quad (valid?, endsOnCodePoint?, currentIndex, totalIndex). |
|||
|
|||
As soon as an octet is encountered which renders the octet sequence |
|||
invalid, a quad with valid? == False is returned. currentIndex returns |
|||
the index within the currently consumed chunk, and totalIndex the |
|||
index within the total consumed sequence that was the point of bail out. |
|||
When valid? == True, currentIndex will be len(ba) and totalIndex the |
|||
total amount of consumed bytes. |
|||
""" |
|||
|
|||
l = len(ba) |
|||
|
|||
for i in range(l): |
|||
## optimized version of decode(), since we are not interested in actual code points |
|||
|
|||
self.state = Utf8Validator.UTF8VALIDATOR_DFA[256 + (self.state << 4) + Utf8Validator.UTF8VALIDATOR_DFA[ord(ba[i])]] |
|||
|
|||
if self.state == Utf8Validator.UTF8_REJECT: |
|||
self.i += i |
|||
return False, False, i, self.i |
|||
|
|||
self.i += l |
|||
|
|||
return True, self.state == Utf8Validator.UTF8_ACCEPT, l, self.i |
@ -1,45 +0,0 @@ |
|||
import subprocess |
|||
|
|||
|
|||
def get_version(version=None): |
|||
"Returns a PEP 386-compliant version number from VERSION." |
|||
|
|||
if version is None: |
|||
from geventwebsocket import VERSION as version |
|||
else: |
|||
assert len(version) == 5 |
|||
assert version[3] in ('alpha', 'beta', 'rc', 'final') |
|||
|
|||
# Now build the two parts of the version number: |
|||
# main = X.Y[.Z] |
|||
# sub = .devN - for pre-alpha releases |
|||
# | {a|b|c}N - for alpha, beta and rc releases |
|||
|
|||
parts = 2 if version[2] == 0 else 3 |
|||
main = '.'.join(str(x) for x in version[:parts]) |
|||
|
|||
sub = '' |
|||
if version[3] == 'alpha' and version[4] == 0: |
|||
hg_changeset = get_hg_changeset() |
|||
if hg_changeset: |
|||
sub = '.dev{0}'.format(hg_changeset) |
|||
|
|||
elif version[3] != 'final': |
|||
mapping = {'alpha': 'a', 'beta': 'b', 'rc': 'c'} |
|||
sub = mapping[version[3]] + str(version[4]) |
|||
|
|||
return str(main + sub) |
|||
|
|||
|
|||
def get_hg_changeset(): |
|||
rev, err = subprocess.Popen( |
|||
'hg id -i', |
|||
shell=True, |
|||
stdout=subprocess.PIPE, |
|||
stderr=subprocess.PIPE |
|||
).communicate() |
|||
|
|||
if err: |
|||
return None |
|||
else: |
|||
return rev.strip().replace('+', '') |
@ -1,554 +0,0 @@ |
|||
import struct |
|||
|
|||
from socket import error |
|||
|
|||
from .exceptions import ProtocolError |
|||
from .exceptions import WebSocketError |
|||
from .exceptions import FrameTooLargeException |
|||
|
|||
from .utf8validator import Utf8Validator |
|||
|
|||
|
|||
MSG_SOCKET_DEAD = "Socket is dead" |
|||
MSG_ALREADY_CLOSED = "Connection is already closed" |
|||
MSG_CLOSED = "Connection closed" |
|||
|
|||
|
|||
class WebSocket(object): |
|||
""" |
|||
Base class for supporting websocket operations. |
|||
|
|||
:ivar environ: The http environment referenced by this connection. |
|||
:ivar closed: Whether this connection is closed/closing. |
|||
:ivar stream: The underlying file like object that will be read from / |
|||
written to by this WebSocket object. |
|||
""" |
|||
|
|||
__slots__ = ('utf8validator', 'utf8validate_last', 'environ', 'closed', |
|||
'stream', 'raw_write', 'raw_read', 'handler') |
|||
|
|||
OPCODE_CONTINUATION = 0x00 |
|||
OPCODE_TEXT = 0x01 |
|||
OPCODE_BINARY = 0x02 |
|||
OPCODE_CLOSE = 0x08 |
|||
OPCODE_PING = 0x09 |
|||
OPCODE_PONG = 0x0a |
|||
|
|||
def __init__(self, environ, stream, handler): |
|||
self.environ = environ |
|||
self.closed = False |
|||
|
|||
self.stream = stream |
|||
|
|||
self.raw_write = stream.write |
|||
self.raw_read = stream.read |
|||
|
|||
self.utf8validator = Utf8Validator() |
|||
self.handler = handler |
|||
|
|||
def __del__(self): |
|||
try: |
|||
self.close() |
|||
except: |
|||
# close() may fail if __init__ didn't complete |
|||
pass |
|||
|
|||
def _decode_bytes(self, bytestring): |
|||
""" |
|||
Internal method used to convert the utf-8 encoded bytestring into |
|||
unicode. |
|||
|
|||
If the conversion fails, the socket will be closed. |
|||
""" |
|||
|
|||
if not bytestring: |
|||
return '' |
|||
|
|||
try: |
|||
return bytestring.decode('utf-8') |
|||
except UnicodeDecodeError: |
|||
self.close(1007) |
|||
|
|||
raise |
|||
|
|||
def _encode_bytes(self, text): |
|||
""" |
|||
:returns: The utf-8 byte string equivalent of `text`. |
|||
""" |
|||
if isinstance(text, str): |
|||
return text |
|||
|
|||
if not isinstance(text, str): |
|||
text = str(text or '') |
|||
|
|||
return text |
|||
|
|||
def _is_valid_close_code(self, code): |
|||
""" |
|||
:returns: Whether the returned close code is a valid hybi return code. |
|||
""" |
|||
if code < 1000: |
|||
return False |
|||
|
|||
if 1004 <= code <= 1006: |
|||
return False |
|||
|
|||
if 1012 <= code <= 1016: |
|||
return False |
|||
|
|||
if code == 1100: |
|||
# not sure about this one but the autobahn fuzzer requires it. |
|||
return False |
|||
|
|||
if 2000 <= code <= 2999: |
|||
return False |
|||
|
|||
return True |
|||
|
|||
@property |
|||
def current_app(self): |
|||
if hasattr(self.handler.server.application, 'current_app'): |
|||
return self.handler.server.application.current_app |
|||
else: |
|||
# For backwards compatibility reasons |
|||
class MockApp(): |
|||
def on_close(self, *args): |
|||
pass |
|||
|
|||
return MockApp() |
|||
|
|||
@property |
|||
def origin(self): |
|||
if not self.environ: |
|||
return |
|||
|
|||
return self.environ.get('HTTP_ORIGIN') |
|||
|
|||
@property |
|||
def protocol(self): |
|||
if not self.environ: |
|||
return |
|||
|
|||
return self.environ.get('HTTP_SEC_WEBSOCKET_PROTOCOL') |
|||
|
|||
@property |
|||
def version(self): |
|||
if not self.environ: |
|||
return |
|||
|
|||
return self.environ.get('HTTP_SEC_WEBSOCKET_VERSION') |
|||
|
|||
@property |
|||
def path(self): |
|||
if not self.environ: |
|||
return |
|||
|
|||
return self.environ.get('PATH_INFO') |
|||
|
|||
@property |
|||
def logger(self): |
|||
return self.handler.logger |
|||
|
|||
def handle_close(self, header, payload): |
|||
""" |
|||
Called when a close frame has been decoded from the stream. |
|||
|
|||
:param header: The decoded `Header`. |
|||
:param payload: The bytestring payload associated with the close frame. |
|||
""" |
|||
if not payload: |
|||
self.close(1000, None) |
|||
|
|||
return |
|||
|
|||
if len(payload) < 2: |
|||
raise ProtocolError('Invalid close frame: {0} {1}'.format( |
|||
header, payload)) |
|||
#TODO: fix payload |
|||
if 'bytearray' in payload: |
|||
payload = eval(payload) |
|||
|
|||
code = struct.unpack('!H', payload[:2])[0] |
|||
payload = payload[2:] |
|||
|
|||
if payload: |
|||
validator = Utf8Validator() |
|||
val = validator.validate(payload) |
|||
|
|||
if not val[0]: |
|||
raise UnicodeError |
|||
|
|||
if not self._is_valid_close_code(code): |
|||
raise ProtocolError('Invalid close code {0}'.format(code)) |
|||
|
|||
self.close(code, payload) |
|||
|
|||
def handle_ping(self, header, payload): |
|||
self.send_frame(payload, self.OPCODE_PONG) |
|||
|
|||
def handle_pong(self, header, payload): |
|||
pass |
|||
|
|||
def read_frame(self): |
|||
""" |
|||
Block until a full frame has been read from the socket. |
|||
|
|||
This is an internal method as calling this will not cleanup correctly |
|||
if an exception is called. Use `receive` instead. |
|||
|
|||
:return: The header and payload as a tuple. |
|||
""" |
|||
|
|||
header = Header.decode_header(self.stream) |
|||
|
|||
if header.flags: |
|||
raise ProtocolError |
|||
|
|||
if not header.length: |
|||
return header, '' |
|||
|
|||
try: |
|||
payload = self.raw_read(header.length) |
|||
except error: |
|||
payload = '' |
|||
except Exception: |
|||
# TODO log out this exception |
|||
payload = '' |
|||
|
|||
if len(payload) != header.length: |
|||
raise WebSocketError('Unexpected EOF reading frame payload') |
|||
|
|||
if header.mask: |
|||
payload = header.unmask_payload(payload) |
|||
|
|||
return header, payload |
|||
|
|||
def validate_utf8(self, payload): |
|||
# Make sure the frames are decodable independently |
|||
self.utf8validate_last = self.utf8validator.validate(payload) |
|||
|
|||
if not self.utf8validate_last[0]: |
|||
raise UnicodeError("Encountered invalid UTF-8 while processing " |
|||
"text message at payload octet index " |
|||
"{0:d}".format(self.utf8validate_last[3])) |
|||
|
|||
def read_message(self): |
|||
""" |
|||
Return the next text or binary message from the socket. |
|||
|
|||
This is an internal method as calling this will not cleanup correctly |
|||
if an exception is called. Use `receive` instead. |
|||
""" |
|||
opcode = None |
|||
message = "" |
|||
|
|||
while True: |
|||
header, payload = self.read_frame() |
|||
f_opcode = header.opcode |
|||
|
|||
if f_opcode in (self.OPCODE_TEXT, self.OPCODE_BINARY): |
|||
# a new frame |
|||
if opcode: |
|||
raise ProtocolError("The opcode in non-fin frame is " |
|||
"expected to be zero, got " |
|||
"{0!r}".format(f_opcode)) |
|||
|
|||
# Start reading a new message, reset the validator |
|||
self.utf8validator.reset() |
|||
self.utf8validate_last = (True, True, 0, 0) |
|||
|
|||
opcode = f_opcode |
|||
|
|||
elif f_opcode == self.OPCODE_CONTINUATION: |
|||
if not opcode: |
|||
raise ProtocolError("Unexpected frame with opcode=0") |
|||
|
|||
elif f_opcode == self.OPCODE_PING: |
|||
self.handle_ping(header, payload) |
|||
continue |
|||
|
|||
elif f_opcode == self.OPCODE_PONG: |
|||
self.handle_pong(header, payload) |
|||
continue |
|||
|
|||
elif f_opcode == self.OPCODE_CLOSE: |
|||
self.handle_close(header, payload) |
|||
return |
|||
|
|||
else: |
|||
raise ProtocolError("Unexpected opcode={0!r}".format(f_opcode)) |
|||
|
|||
if opcode == self.OPCODE_TEXT: |
|||
self.validate_utf8(payload) |
|||
|
|||
message += payload |
|||
|
|||
if header.fin: |
|||
break |
|||
|
|||
if opcode == self.OPCODE_TEXT: |
|||
self.validate_utf8(message) |
|||
return message |
|||
else: |
|||
return bytearray(message) |
|||
|
|||
def receive(self): |
|||
""" |
|||
Read and return a message from the stream. If `None` is returned, then |
|||
the socket is considered closed/errored. |
|||
""" |
|||
|
|||
if self.closed: |
|||
self.current_app.on_close(MSG_ALREADY_CLOSED) |
|||
raise WebSocketError(MSG_ALREADY_CLOSED) |
|||
|
|||
try: |
|||
return self.read_message() |
|||
except UnicodeError: |
|||
self.close(1007) |
|||
except ProtocolError: |
|||
self.close(1002) |
|||
except error: |
|||
self.close() |
|||
self.current_app.on_close(MSG_CLOSED) |
|||
|
|||
return None |
|||
|
|||
def send_frame(self, message, opcode): |
|||
""" |
|||
Send a frame over the websocket with message as its payload |
|||
""" |
|||
if self.closed: |
|||
self.current_app.on_close(MSG_ALREADY_CLOSED) |
|||
raise WebSocketError(MSG_ALREADY_CLOSED) |
|||
|
|||
if opcode == self.OPCODE_TEXT: |
|||
message = self._encode_bytes(message) |
|||
elif opcode == self.OPCODE_BINARY: |
|||
message = str(message) |
|||
|
|||
if not isinstance(message, str): |
|||
message = message.decode('latin-1') |
|||
|
|||
header = Header.encode_header(True, opcode, '', len(message), 0) |
|||
try: |
|||
self.raw_write((header + message).encode('latin-1')) |
|||
except error: |
|||
raise WebSocketError(MSG_SOCKET_DEAD) |
|||
|
|||
def send(self, message, binary=None): |
|||
""" |
|||
Send a frame over the websocket with message as its payload |
|||
""" |
|||
if binary is None: |
|||
binary = not isinstance(message, str) |
|||
|
|||
opcode = self.OPCODE_BINARY if binary else self.OPCODE_TEXT |
|||
|
|||
try: |
|||
self.send_frame(message, opcode) |
|||
except WebSocketError: |
|||
self.current_app.on_close(MSG_SOCKET_DEAD) |
|||
raise WebSocketError(MSG_SOCKET_DEAD) |
|||
|
|||
def close(self, code=1000, message=''): |
|||
""" |
|||
Close the websocket and connection, sending the specified code and |
|||
message. The underlying socket object is _not_ closed, that is the |
|||
responsibility of the initiator. |
|||
""" |
|||
|
|||
if self.closed: |
|||
self.current_app.on_close(MSG_ALREADY_CLOSED) |
|||
|
|||
try: |
|||
message = self._encode_bytes(message) |
|||
if isinstance(message, str): |
|||
message = message.encode('latin-1') |
|||
self.send_frame( |
|||
struct.pack('!H%ds' % len(message), code, message), |
|||
opcode=self.OPCODE_CLOSE) |
|||
except WebSocketError: |
|||
# Failed to write the closing frame but it's ok because we're |
|||
# closing the socket anyway. |
|||
self.logger.debug("Failed to write closing frame -> closing socket") |
|||
finally: |
|||
self.logger.debug("Closed WebSocket") |
|||
self.closed = True |
|||
|
|||
self.stream = None |
|||
self.raw_write = None |
|||
self.raw_read = None |
|||
|
|||
self.environ = None |
|||
|
|||
#self.current_app.on_close(MSG_ALREADY_CLOSED) |
|||
|
|||
|
|||
class Stream(object): |
|||
""" |
|||
Wraps the handler's socket/rfile attributes and makes it in to a file like |
|||
object that can be read from/written to by the lower level websocket api. |
|||
""" |
|||
|
|||
__slots__ = ('handler', 'read', 'write') |
|||
|
|||
def __init__(self, handler): |
|||
self.handler = handler |
|||
self.read = handler.rfile.read |
|||
self.write = handler.socket.sendall |
|||
|
|||
|
|||
class Header(object): |
|||
__slots__ = ('fin', 'mask', 'opcode', 'flags', 'length') |
|||
|
|||
FIN_MASK = 0x80 |
|||
OPCODE_MASK = 0x0f |
|||
MASK_MASK = 0x80 |
|||
LENGTH_MASK = 0x7f |
|||
|
|||
RSV0_MASK = 0x40 |
|||
RSV1_MASK = 0x20 |
|||
RSV2_MASK = 0x10 |
|||
|
|||
# bitwise mask that will determine the reserved bits for a frame header |
|||
HEADER_FLAG_MASK = RSV0_MASK | RSV1_MASK | RSV2_MASK |
|||
|
|||
def __init__(self, fin=0, opcode=0, flags=0, length=0): |
|||
self.mask = '' |
|||
self.fin = fin |
|||
self.opcode = opcode |
|||
self.flags = flags |
|||
self.length = length |
|||
|
|||
def mask_payload(self, payload): |
|||
payload = bytearray(payload) |
|||
mask = bytearray(self.mask) |
|||
|
|||
for i in range(self.length): |
|||
payload[i] ^= mask[i % 4] |
|||
|
|||
return str(payload) |
|||
|
|||
# it's the same operation |
|||
unmask_payload = mask_payload |
|||
|
|||
def __repr__(self): |
|||
return ("<Header fin={0} opcode={1} length={2} flags={3} at " |
|||
"0x{4:x}>").format(self.fin, self.opcode, self.length, |
|||
self.flags, id(self)) |
|||
|
|||
@classmethod |
|||
def decode_header(cls, stream): |
|||
""" |
|||
Decode a WebSocket header. |
|||
|
|||
:param stream: A file like object that can be 'read' from. |
|||
:returns: A `Header` instance. |
|||
""" |
|||
read = stream.read |
|||
data = read(2) |
|||
|
|||
if len(data) != 2: |
|||
raise WebSocketError("Unexpected EOF while decoding header") |
|||
|
|||
first_byte, second_byte = struct.unpack('!BB', data) |
|||
|
|||
header = cls( |
|||
fin=first_byte & cls.FIN_MASK == cls.FIN_MASK, |
|||
opcode=first_byte & cls.OPCODE_MASK, |
|||
flags=first_byte & cls.HEADER_FLAG_MASK, |
|||
length=second_byte & cls.LENGTH_MASK) |
|||
|
|||
has_mask = second_byte & cls.MASK_MASK == cls.MASK_MASK |
|||
|
|||
if header.opcode > 0x07: |
|||
if not header.fin: |
|||
raise ProtocolError( |
|||
"Received fragmented control frame: {0!r}".format(data)) |
|||
|
|||
# Control frames MUST have a payload length of 125 bytes or less |
|||
if header.length > 125: |
|||
raise FrameTooLargeException( |
|||
"Control frame cannot be larger than 125 bytes: " |
|||
"{0!r}".format(data)) |
|||
|
|||
if header.length == 126: |
|||
# 16 bit length |
|||
data = read(2) |
|||
|
|||
if len(data) != 2: |
|||
raise WebSocketError('Unexpected EOF while decoding header') |
|||
|
|||
header.length = struct.unpack('!H', data)[0] |
|||
elif header.length == 127: |
|||
# 64 bit length |
|||
data = read(8) |
|||
|
|||
if len(data) != 8: |
|||
raise WebSocketError('Unexpected EOF while decoding header') |
|||
|
|||
header.length = struct.unpack('!Q', data)[0] |
|||
|
|||
if has_mask: |
|||
mask = read(4) |
|||
|
|||
if len(mask) != 4: |
|||
raise WebSocketError('Unexpected EOF while decoding header') |
|||
|
|||
header.mask = mask |
|||
|
|||
return header |
|||
|
|||
@classmethod |
|||
def encode_header(cls, fin, opcode, mask, length, flags): |
|||
""" |
|||
Encodes a WebSocket header. |
|||
|
|||
:param fin: Whether this is the final frame for this opcode. |
|||
:param opcode: The opcode of the payload, see `OPCODE_*` |
|||
:param mask: Whether the payload is masked. |
|||
:param length: The length of the frame. |
|||
:param flags: The RSV* flags. |
|||
:return: A bytestring encoded header. |
|||
""" |
|||
first_byte = opcode |
|||
second_byte = 0 |
|||
extra = '' |
|||
|
|||
if fin: |
|||
first_byte |= cls.FIN_MASK |
|||
|
|||
if flags & cls.RSV0_MASK: |
|||
first_byte |= cls.RSV0_MASK |
|||
|
|||
if flags & cls.RSV1_MASK: |
|||
first_byte |= cls.RSV1_MASK |
|||
|
|||
if flags & cls.RSV2_MASK: |
|||
first_byte |= cls.RSV2_MASK |
|||
|
|||
# now deal with length complexities |
|||
if length < 126: |
|||
second_byte += length |
|||
elif length <= 0xffff: |
|||
second_byte += 126 |
|||
extra = struct.pack('!H', length) |
|||
elif length <= 0xffffffffffffffff: |
|||
second_byte += 127 |
|||
extra = struct.pack('!Q', length) |
|||
else: |
|||
raise FrameTooLargeException |
|||
|
|||
if mask: |
|||
second_byte |= cls.MASK_MASK |
|||
|
|||
extra += mask |
|||
|
|||
# FIX: in some situation extra will be |
|||
# an bytearray. here we need to decode it |
|||
# with latin-1 |
|||
if not isinstance(extra, str): |
|||
extra = extra.decode('latin-1') |
|||
|
|||
return chr(first_byte) + chr(second_byte) + extra |
Loading…
Reference in new issue