diff --git a/CHANGES.txt b/CHANGES.txt index f8dc2aa..0295595 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -2,7 +2,7 @@ CHANGES ======= -0.1.0 (Unreleased) +0.1.0 (06/21/2015) ------------------ - Initial release diff --git a/sockjs/route.py b/sockjs/route.py index 26c455f..bfe1689 100644 --- a/sockjs/route.py +++ b/sockjs/route.py @@ -93,8 +93,7 @@ def add_endpoint(app, handler, *, name='', prefix='/sockjs', '%s/iframe{version}.html' % prefix, route.iframe, name=route_name) # start session gc - # cfg.action('sockjs:gc:%s'%name, - # session_manager.start, order=999999+1) + manager.start() class SockJSRoute: diff --git a/sockjs/session.py b/sockjs/session.py index 2f4976f..8ec3dad 100644 --- a/sockjs/session.py +++ b/sockjs/session.py @@ -168,6 +168,7 @@ def _remote_close(self, exc=None): except: log.exception('Exceptin in close handler.') + @asyncio.coroutine def _remote_closed(self): if self.state == STATE_CLOSED: return @@ -254,10 +255,11 @@ def close(self, code=3000, reason='Go away!'): class SessionManager(dict): """A basic session manager.""" - _hb_timer = None # heartbeat event loop timer + _hb_handle = None # heartbeat event loop timer + _hb_task = None # gc task def __init__(self, name, app, handler, loop, - heartbeat=7.0, timeout=timedelta(seconds=10), debug=False): + heartbeat=15.0, timeout=timedelta(seconds=10), debug=False): self.name = name self.route_name = 'sockjs-url-%s' % name self.app = app @@ -267,6 +269,7 @@ def __init__(self, name, app, handler, loop, self.sessions = [] self.heartbeat = heartbeat self.timeout = timeout + self.heartbeat = heartbeat self.loop = loop self.debug = debug @@ -275,21 +278,27 @@ def route_url(self, request): @property def started(self): - return self._hb_timer is not None + return self._hb_handle is not None def start(self): - # if not self._hb_timer: - # loop = tulip.get_event_loop() - # self._hb_timer = loop.call_later( - # self.heartbeat, self._heartbeat, loop) - pass + if not self._hb_handle: + self._hb_handle = self.loop.call_later( + self.heartbeat, self._heartbeat) def stop(self): - if self._hb_timer: - self._hb_timer.cancel() - self._hb_timer = None + if self._hb_handle is not None: + self._hb_handle.cancel() + self._hb_handle = None + if self._hb_task is not None: + self._hb_task.cancel() + self._hb_task = None + + def _heartbeat(self): + if self._hb_task is None: + self._hb_task = asyncio.async( + self._heartbeat_task(), loop=self.loop) - def _heartbeat(self, loop): + def _heartbeat_task(self): sessions = self.sessions if sessions: @@ -301,27 +310,25 @@ def _heartbeat(self, loop): if session.expires < now: # Session is to be GC'd immedietely - if not self.on_session_gc(session): - del self[session.id] - del self.sessions[idx] if session.id in self.acquired: - del self.acquired[session.id] + yield from self.release(session) if session.state == STATE_OPEN: - session.close() + yield from session._remote_close() if session.state == STATE_CLOSING: - session.closed() + yield from session._remote_closed() + + del self[session.id] + del self.sessions[idx] continue elif session.acquired: - session.heartbeat() + session._heartbeat() idx += 1 - self._hb_timer = loop.call_later( - self.heartbeat, self._heartbeat, loop) - - def on_session_gc(self, session): - return session.on_remove() + self._hb_task = None + self._hb_handle = self.loop.call_later( + self.heartbeat, self._heartbeat) def _add(self, session): if session.expired: diff --git a/tests/test_session.py b/tests/test_session.py index 73d0ec1..42912de 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -433,36 +433,6 @@ def test_remote_messages_exc(self): self.assertEqual(messages, []) -class _GcThreadTestCase: # (TestCase): - - def setUp(self): - # super(GcThreadTestCase, self).setUp() - - self.gc_executed = False - - def gc(s): - self.gc_executed = True - - from pyramid_sockjs.session import SessionManager - - self.gc_origin = SessionManager._gc - SessionManager._gc = gc - - def tearDown(self): - from pyramid_sockjs.session import SessionManager - SessionManager._gc = self.gc_origin - - # super(GcThreadTestCase, self).tearDown() - - def test_gc_thread(self): - from pyramid_sockjs.session import SessionManager - - sm = SessionManager('sm', self.registry, gc_cycle=0.1) - sm.start() - sm.stop() - # self.assertTrue(self.gc_executed) - - class SessionManagerTestCase(unittest.TestCase): def setUp(self): @@ -503,71 +473,6 @@ def test_fresh(self): sm._add(s) self.assertIn('test', sm) - def _test_gc_removed(self): - Session, sm = self.make_one() - - sm._add(Session('id')) - del sm['id'] - - self.assertEqual(len(sm.pool), 1) - sm._gc() - - self.assertEqual(len(sm.pool), 0) - - def _test_gc_expire(self): - from pyramid_sockjs import STATE_CLOSED - Session, sm = self.make_one() - - session = Session('id') - session.open() - - sm._add(session) - - self.now = session.expires + timedelta(seconds=10) - - sm._gc() - self.assertNotIn('id', sm) - self.assertTrue(session.expired) - self.assertEqual(session.state, STATE_CLOSED) - - def _test_gc_expire_acquired(self): - from pyramid_sockjs import STATE_CLOSED - Session, sm = self.make_one() - - session = Session('id') - session.open() - - sm._add(session) - sm.acquired['id'] = session - - self.now = session.expires + timedelta(seconds=10) - - sm._gc() - self.assertNotIn('id', sm) - self.assertNotIn('id', sm.acquired) - self.assertTrue(session.expired) - self.assertEqual(session.state, STATE_CLOSED) - - def _test_gc_one_expire(self): - Session, sm = self.make_one() - - session1 = Session('id1') - session1.open() - - session2 = Session('id2') - session2.open() - - sm._add(session1) - sm._add(session2) - - self.now = session1.expires + timedelta(seconds=10) - - session2.tick() - - sm._gc() - self.assertNotIn('id1', sm) - self.assertIn('id2', sm) - def test_add(self): s, sm = self.make_manager() @@ -686,3 +591,77 @@ def test_clear(self): self.assertTrue(s2.expired) self.assertEqual(s1.state, protocol.STATE_CLOSED) self.assertEqual(s2.state, protocol.STATE_CLOSED) + + def test_heartbeat(self): + _, sm = self.make_manager() + self.assertFalse(sm.started) + self.assertIsNone(sm._hb_task) + + sm.start() + self.assertTrue(sm.started) + self.assertIsNotNone(sm._hb_handle) + + sm._heartbeat() + self.assertIsNotNone(sm._hb_task) + + hb_task = sm._hb_task + + sm.stop() + self.assertFalse(sm.started) + self.assertIsNone(sm._hb_handle) + self.assertIsNone(sm._hb_task) + self.assertTrue(hb_task._must_cancel) + + def test_heartbeat_task(self): + _, sm = self.make_manager() + sm._hb_task = mock.Mock() + + self.loop.run_until_complete(sm._heartbeat_task()) + self.assertTrue(sm.started) + self.assertIsNone(sm._hb_task) + + def test_gc_expire(self): + s, sm = self.make_manager() + + sm._add(s) + self.loop.run_until_complete(sm.acquire(s)) + self.loop.run_until_complete(sm.release(s)) + + s.expires = datetime.now() - timedelta(seconds=30) + + self.loop.run_until_complete(sm._heartbeat_task()) + self.assertNotIn(s.id, sm) + self.assertTrue(s.expired) + self.assertEqual(s.state, protocol.STATE_CLOSED) + + def test_gc_expire_acquired(self): + s, sm = self.make_manager() + + sm._add(s) + self.loop.run_until_complete(sm.acquire(s)) + + s.expires = datetime.now() - timedelta(seconds=30) + + self.loop.run_until_complete(sm._heartbeat_task()) + self.assertNotIn(s.id, sm) + self.assertNotIn(s.id, sm.acquired) + self.assertTrue(s.expired) + self.assertEqual(s.state, protocol.STATE_CLOSED) + + def test_gc_one_expire(self): + _, sm = self.make_manager() + s1 = self.make_session('id1') + s2 = self.make_session('id2') + + sm._add(s1) + sm._add(s2) + self.loop.run_until_complete(sm.acquire(s1)) + self.loop.run_until_complete(sm.acquire(s2)) + self.loop.run_until_complete(sm.release(s1)) + self.loop.run_until_complete(sm.release(s2)) + + s1.expires = datetime.now() - timedelta(seconds=30) + + self.loop.run_until_complete(sm._heartbeat_task()) + self.assertNotIn(s1.id, sm) + self.assertIn(s2.id, sm)