|
|
@ -11,6 +11,7 @@ else: |
|
|
|
from socketio import namespace |
|
|
|
from socketio import packet |
|
|
|
from socketio import server |
|
|
|
from socketio import util |
|
|
|
|
|
|
|
|
|
|
|
@mock.patch('engineio.Server') |
|
|
@ -62,6 +63,97 @@ class TestServer(unittest.TestCase): |
|
|
|
self.assertIsNotNone(s.handlers['/ns']._get_event_handler('foo bar')) |
|
|
|
self.assertIsNone(s.handlers['/ns']._get_event_handler('abc')) |
|
|
|
|
|
|
|
def test_middleware(self, eio): |
|
|
|
class MW: |
|
|
|
def __init__(self): |
|
|
|
self.ignore_event = mock.MagicMock(side_effect=100*[False]) |
|
|
|
self.before_event = mock.MagicMock(side_effect=100*[None]) |
|
|
|
self.after_event = mock.MagicMock(side_effect=100*[None]) |
|
|
|
|
|
|
|
mw1 = MW() |
|
|
|
mw2 = MW() |
|
|
|
mw3 = MW() |
|
|
|
mw4 = MW() |
|
|
|
mw4.ignore_event = mock.MagicMock(side_effect=[True]+100*[False]) |
|
|
|
mw4.before_event = mock.MagicMock(side_effect=['x']+100*[None]) |
|
|
|
mw4.after_event = mock.MagicMock(side_effect=['x']+100*[None]) |
|
|
|
|
|
|
|
class NS(namespace.Namespace): |
|
|
|
def on_foo(self, sid): |
|
|
|
pass |
|
|
|
|
|
|
|
@namespace.Namespace.event_name('foo bar') |
|
|
|
@util.apply_middleware(mw4) |
|
|
|
def some_name(self, sid): |
|
|
|
pass |
|
|
|
|
|
|
|
s = server.Server() |
|
|
|
s.middlewares.append(mw1) |
|
|
|
|
|
|
|
@s.on('abc') |
|
|
|
@util.apply_middleware(mw2) |
|
|
|
def abc(sid): |
|
|
|
pass |
|
|
|
|
|
|
|
ns = s.register_namespace('/ns', NS) |
|
|
|
ns.middlewares.append(mw3) |
|
|
|
|
|
|
|
# only mw1 and mw3 should run completely |
|
|
|
s._trigger_event('foo', '/ns', '123') |
|
|
|
self.assertEqual(mw1.before_event.call_count, 1) |
|
|
|
self.assertEqual(mw1.after_event.call_count, 1) |
|
|
|
self.assertEqual(mw2.before_event.call_count, 0) |
|
|
|
self.assertEqual(mw2.after_event.call_count, 0) |
|
|
|
self.assertEqual(mw3.before_event.call_count, 1) |
|
|
|
self.assertEqual(mw3.after_event.call_count, 1) |
|
|
|
self.assertEqual(mw4.before_event.call_count, 0) |
|
|
|
self.assertEqual(mw4.after_event.call_count, 0) |
|
|
|
|
|
|
|
# only mw1 and mw3 should run completely, mw4 is enabled but ignored |
|
|
|
s._trigger_event('foo bar', '/ns', '123') |
|
|
|
self.assertEqual(mw1.before_event.call_count, 2) |
|
|
|
self.assertEqual(mw1.after_event.call_count, 2) |
|
|
|
self.assertEqual(mw2.before_event.call_count, 0) |
|
|
|
self.assertEqual(mw2.after_event.call_count, 0) |
|
|
|
self.assertEqual(mw3.before_event.call_count, 2) |
|
|
|
self.assertEqual(mw3.after_event.call_count, 2) |
|
|
|
self.assertEqual(mw4.before_event.call_count, 0) |
|
|
|
self.assertEqual(mw4.after_event.call_count, 0) |
|
|
|
|
|
|
|
# again, this time mw4 before_event should be triggered |
|
|
|
s._trigger_event('foo bar', '/ns', '123') |
|
|
|
self.assertEqual(mw1.before_event.call_count, 3) |
|
|
|
self.assertEqual(mw1.after_event.call_count, 2) |
|
|
|
self.assertEqual(mw2.before_event.call_count, 0) |
|
|
|
self.assertEqual(mw2.after_event.call_count, 0) |
|
|
|
self.assertEqual(mw3.before_event.call_count, 3) |
|
|
|
self.assertEqual(mw3.after_event.call_count, 2) |
|
|
|
self.assertEqual(mw4.before_event.call_count, 1) |
|
|
|
self.assertEqual(mw4.after_event.call_count, 0) |
|
|
|
|
|
|
|
# again, this time mw4 before + after_event should be triggered |
|
|
|
# but after_event should abort execution |
|
|
|
s._trigger_event('foo bar', '/ns', '123') |
|
|
|
self.assertEqual(mw1.before_event.call_count, 4) |
|
|
|
self.assertEqual(mw1.after_event.call_count, 2) |
|
|
|
self.assertEqual(mw2.before_event.call_count, 0) |
|
|
|
self.assertEqual(mw2.after_event.call_count, 0) |
|
|
|
self.assertEqual(mw3.before_event.call_count, 4) |
|
|
|
self.assertEqual(mw3.after_event.call_count, 2) |
|
|
|
self.assertEqual(mw4.before_event.call_count, 2) |
|
|
|
self.assertEqual(mw4.after_event.call_count, 1) |
|
|
|
|
|
|
|
# only mw1 and mw2 should run completely |
|
|
|
s._trigger_event('abc', '/', '123') |
|
|
|
self.assertEqual(mw1.before_event.call_count, 5) |
|
|
|
self.assertEqual(mw1.after_event.call_count, 3) |
|
|
|
self.assertEqual(mw2.before_event.call_count, 1) |
|
|
|
self.assertEqual(mw2.after_event.call_count, 1) |
|
|
|
self.assertEqual(mw3.before_event.call_count, 4) |
|
|
|
self.assertEqual(mw3.after_event.call_count, 2) |
|
|
|
self.assertEqual(mw4.before_event.call_count, 2) |
|
|
|
self.assertEqual(mw4.after_event.call_count, 1) |
|
|
|
|
|
|
|
def test_on_bad_event_name(self, eio): |
|
|
|
s = server.Server() |
|
|
|
self.assertRaises(ValueError, s.on, 'two-words') |
|
|
|