2 changed files with 147 additions and 0 deletions
@ -0,0 +1,103 @@ |
|||
import asyncio |
|||
|
|||
from sanic import Sanic |
|||
from sanic.response import html |
|||
|
|||
import socketio |
|||
|
|||
# --- NATS PubSub Manager --- |
|||
|
|||
import nats |
|||
from nats.aio.client import Client as NATS |
|||
from socketio.asyncio_pubsub_manager import AsyncPubSubManager |
|||
|
|||
import json |
|||
|
|||
class AsyncNatsManager(AsyncPubSubManager): |
|||
name = 'asyncionats' |
|||
|
|||
def __init__(self, |
|||
servers=None, |
|||
channel='socketio', |
|||
write_only=False, |
|||
loop=asyncio.get_event_loop(), |
|||
): |
|||
|
|||
if servers is None: |
|||
servers = ["nats://127.0.0.1:4222"] |
|||
self.servers = servers |
|||
self.queue = asyncio.Queue() |
|||
|
|||
# Establish single connection to NATS for the client. |
|||
self.nc = None |
|||
super().__init__(channel=channel, write_only=write_only) |
|||
|
|||
async def _publish(self, data): |
|||
print("Socket.io <<- NATS :", data) |
|||
|
|||
# Send the client events through NATS |
|||
if self.nc is None: |
|||
self.nc = NATS() |
|||
await self.nc.connect(servers=self.servers) |
|||
|
|||
# Skip broadcasted messages that were received from NATS. |
|||
if data['event'] != 'event': |
|||
payload = json.dumps(data['data']).encode() |
|||
await self.nc.publish("socketio.{}".format(data['event']), payload) |
|||
|
|||
async def _listen(self): |
|||
if self.nc is None: |
|||
self.nc = NATS() |
|||
await self.nc.connect(servers=self.servers) |
|||
|
|||
# Close over the socketio to be able to emit within |
|||
# the NATS callback. |
|||
sio = self |
|||
async def message_handler(msg): |
|||
nonlocal sio |
|||
|
|||
print("NATS ->> Socket.io:", msg.data.decode()) |
|||
|
|||
data = json.loads(msg.data.decode()) |
|||
|
|||
# Broadcast the bare message received via NATS as a Socket.io event |
|||
await sio.emit('nats', data, namespace='/test') |
|||
|
|||
await self.queue.put(data) |
|||
await self.nc.subscribe(self.channel, cb=message_handler) |
|||
return await self.queue.get() |
|||
|
|||
# --- Sanic + Socket.io based Application with attached PubSub Manager --- |
|||
|
|||
app = Sanic() |
|||
mgr = AsyncNatsManager() |
|||
sio = socketio.AsyncServer(client_manager=mgr, async_mode='sanic') |
|||
sio.attach(app) |
|||
|
|||
@app.route('/') |
|||
async def index(request): |
|||
with open('nats.html') as f: |
|||
return html(f.read()) |
|||
|
|||
@sio.on('event', namespace='/test') |
|||
async def test_message(sid, message): |
|||
await sio.emit('response', {'data': message['data']}, room=sid, |
|||
namespace='/test') |
|||
|
|||
@sio.on('nats', namespace='/test') |
|||
async def test_nats_message(sid, message): |
|||
await sio.emit('response', {'data': message['data']}, room=sid, |
|||
namespace='/test') |
|||
|
|||
@sio.on('connect', namespace='/test') |
|||
async def test_connect(sid, environ): |
|||
print("Client connected", sid) |
|||
await sio.emit('response', {'data': 'Connected', 'count': 0}, room=sid, |
|||
namespace='/test') |
|||
|
|||
@sio.on('disconnect', namespace='/test') |
|||
def test_disconnect(sid): |
|||
print('Client disconnected') |
|||
|
|||
if __name__ == '__main__': |
|||
app.run() |
@ -0,0 +1,44 @@ |
|||
<!DOCTYPE HTML> |
|||
<html> |
|||
<head> |
|||
<title>NATS + SocketIO </title> |
|||
<script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script> |
|||
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.5/socket.io.min.js"></script> |
|||
<script type="text/javascript" charset="utf-8"> |
|||
$(document).ready(function(){ |
|||
namespace = '/test'; |
|||
var socket = io.connect('http://' + document.domain + ':' + location.port + namespace); |
|||
socket.on('connect', function() { |
|||
$('#log').append('<br />Connected!'); |
|||
socket.emit('event', {data: 'Connection started...'}); |
|||
}); |
|||
socket.on('disconnect', function() { |
|||
socket.emit('event', {data: 'Disconnecting...'}); |
|||
$('#log').append('<br />Disconnected!'); |
|||
}); |
|||
socket.on('response', function(msg) { |
|||
$('#log').append('<br />Received: ' + msg.data); |
|||
}); |
|||
socket.on('nats', function(msg) { |
|||
console.log("NATS???"); |
|||
$('#log').append('<br />NATS: ' + msg.data); |
|||
}); |
|||
// event handler for server sent data |
|||
$('form#emit').submit(function(event) { |
|||
socket.emit('event', {data: $('#emit_data').val()}); |
|||
return false; |
|||
}); |
|||
}); |
|||
</script> |
|||
</head> |
|||
<body> |
|||
<h1>NATS + SocketIO</h1> |
|||
<h2>Publish to NATS</h2> |
|||
<form id="emit" method="POST" action='#'> |
|||
<input type="text" name="emit_data" id="emit_data" placeholder="Message"> |
|||
<input type="submit" value="Publish"> |
|||
</form> |
|||
<h2>Receive:</h2> |
|||
<div><p id="log"></p></div> |
|||
</body> |
|||
</html> |
Loading…
Reference in new issue