Skip to content

Commit

Permalink
added garbage collection task to sessions manager
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jun 21, 2015
1 parent 7c566c2 commit 94d968d
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 122 deletions.
2 changes: 1 addition & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
CHANGES
=======

0.1.0 (Unreleased)
0.1.0 (06/21/2015)
------------------

- Initial release
3 changes: 1 addition & 2 deletions sockjs/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ def add_endpoint(app, handler, *, name='', prefix='/sockjs',
'%s/iframe{version}.html' % prefix, route.iframe, name=route_name)

# start session gc
# cfg.action('sockjs:gc:%s'%name,
# session_manager.start, order=999999+1)
manager.start()


class SockJSRoute:
Expand Down
55 changes: 31 additions & 24 deletions sockjs/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _remote_close(self, exc=None):
except:
log.exception('Exceptin in close handler.')

@asyncio.coroutine
def _remote_closed(self):
if self.state == STATE_CLOSED:
return
Expand Down Expand Up @@ -254,10 +255,11 @@ def close(self, code=3000, reason='Go away!'):
class SessionManager(dict):
"""A basic session manager."""

_hb_timer = None # heartbeat event loop timer
_hb_handle = None # heartbeat event loop timer
_hb_task = None # gc task

def __init__(self, name, app, handler, loop,
heartbeat=7.0, timeout=timedelta(seconds=10), debug=False):
heartbeat=15.0, timeout=timedelta(seconds=10), debug=False):
self.name = name
self.route_name = 'sockjs-url-%s' % name
self.app = app
Expand All @@ -267,6 +269,7 @@ def __init__(self, name, app, handler, loop,
self.sessions = []
self.heartbeat = heartbeat
self.timeout = timeout
self.heartbeat = heartbeat
self.loop = loop
self.debug = debug

Expand All @@ -275,21 +278,27 @@ def route_url(self, request):

@property
def started(self):
return self._hb_timer is not None
return self._hb_handle is not None

def start(self):
# if not self._hb_timer:
# loop = tulip.get_event_loop()
# self._hb_timer = loop.call_later(
# self.heartbeat, self._heartbeat, loop)
pass
if not self._hb_handle:
self._hb_handle = self.loop.call_later(
self.heartbeat, self._heartbeat)

def stop(self):
if self._hb_timer:
self._hb_timer.cancel()
self._hb_timer = None
if self._hb_handle is not None:
self._hb_handle.cancel()
self._hb_handle = None
if self._hb_task is not None:
self._hb_task.cancel()
self._hb_task = None

def _heartbeat(self):
if self._hb_task is None:
self._hb_task = asyncio.async(
self._heartbeat_task(), loop=self.loop)

def _heartbeat(self, loop):
def _heartbeat_task(self):
sessions = self.sessions

if sessions:
Expand All @@ -301,27 +310,25 @@ def _heartbeat(self, loop):

if session.expires < now:
# Session is to be GC'd immedietely
if not self.on_session_gc(session):
del self[session.id]
del self.sessions[idx]
if session.id in self.acquired:
del self.acquired[session.id]
yield from self.release(session)
if session.state == STATE_OPEN:
session.close()
yield from session._remote_close()
if session.state == STATE_CLOSING:
session.closed()
yield from session._remote_closed()

del self[session.id]
del self.sessions[idx]
continue

elif session.acquired:
session.heartbeat()
session._heartbeat()

idx += 1

self._hb_timer = loop.call_later(
self.heartbeat, self._heartbeat, loop)

def on_session_gc(self, session):
return session.on_remove()
self._hb_task = None
self._hb_handle = self.loop.call_later(
self.heartbeat, self._heartbeat)

def _add(self, session):
if session.expired:
Expand Down
169 changes: 74 additions & 95 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,36 +433,6 @@ def test_remote_messages_exc(self):
self.assertEqual(messages, [])


class _GcThreadTestCase: # (TestCase):

def setUp(self):
# super(GcThreadTestCase, self).setUp()

self.gc_executed = False

def gc(s):
self.gc_executed = True

from pyramid_sockjs.session import SessionManager

self.gc_origin = SessionManager._gc
SessionManager._gc = gc

def tearDown(self):
from pyramid_sockjs.session import SessionManager
SessionManager._gc = self.gc_origin

# super(GcThreadTestCase, self).tearDown()

def test_gc_thread(self):
from pyramid_sockjs.session import SessionManager

sm = SessionManager('sm', self.registry, gc_cycle=0.1)
sm.start()
sm.stop()
# self.assertTrue(self.gc_executed)


class SessionManagerTestCase(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -503,71 +473,6 @@ def test_fresh(self):
sm._add(s)
self.assertIn('test', sm)

def _test_gc_removed(self):
Session, sm = self.make_one()

sm._add(Session('id'))
del sm['id']

self.assertEqual(len(sm.pool), 1)
sm._gc()

self.assertEqual(len(sm.pool), 0)

def _test_gc_expire(self):
from pyramid_sockjs import STATE_CLOSED
Session, sm = self.make_one()

session = Session('id')
session.open()

sm._add(session)

self.now = session.expires + timedelta(seconds=10)

sm._gc()
self.assertNotIn('id', sm)
self.assertTrue(session.expired)
self.assertEqual(session.state, STATE_CLOSED)

def _test_gc_expire_acquired(self):
from pyramid_sockjs import STATE_CLOSED
Session, sm = self.make_one()

session = Session('id')
session.open()

sm._add(session)
sm.acquired['id'] = session

self.now = session.expires + timedelta(seconds=10)

sm._gc()
self.assertNotIn('id', sm)
self.assertNotIn('id', sm.acquired)
self.assertTrue(session.expired)
self.assertEqual(session.state, STATE_CLOSED)

def _test_gc_one_expire(self):
Session, sm = self.make_one()

session1 = Session('id1')
session1.open()

session2 = Session('id2')
session2.open()

sm._add(session1)
sm._add(session2)

self.now = session1.expires + timedelta(seconds=10)

session2.tick()

sm._gc()
self.assertNotIn('id1', sm)
self.assertIn('id2', sm)

def test_add(self):
s, sm = self.make_manager()

Expand Down Expand Up @@ -686,3 +591,77 @@ def test_clear(self):
self.assertTrue(s2.expired)
self.assertEqual(s1.state, protocol.STATE_CLOSED)
self.assertEqual(s2.state, protocol.STATE_CLOSED)

def test_heartbeat(self):
_, sm = self.make_manager()
self.assertFalse(sm.started)
self.assertIsNone(sm._hb_task)

sm.start()
self.assertTrue(sm.started)
self.assertIsNotNone(sm._hb_handle)

sm._heartbeat()
self.assertIsNotNone(sm._hb_task)

hb_task = sm._hb_task

sm.stop()
self.assertFalse(sm.started)
self.assertIsNone(sm._hb_handle)
self.assertIsNone(sm._hb_task)
self.assertTrue(hb_task._must_cancel)

def test_heartbeat_task(self):
_, sm = self.make_manager()
sm._hb_task = mock.Mock()

self.loop.run_until_complete(sm._heartbeat_task())
self.assertTrue(sm.started)
self.assertIsNone(sm._hb_task)

def test_gc_expire(self):
s, sm = self.make_manager()

sm._add(s)
self.loop.run_until_complete(sm.acquire(s))
self.loop.run_until_complete(sm.release(s))

s.expires = datetime.now() - timedelta(seconds=30)

self.loop.run_until_complete(sm._heartbeat_task())
self.assertNotIn(s.id, sm)
self.assertTrue(s.expired)
self.assertEqual(s.state, protocol.STATE_CLOSED)

def test_gc_expire_acquired(self):
s, sm = self.make_manager()

sm._add(s)
self.loop.run_until_complete(sm.acquire(s))

s.expires = datetime.now() - timedelta(seconds=30)

self.loop.run_until_complete(sm._heartbeat_task())
self.assertNotIn(s.id, sm)
self.assertNotIn(s.id, sm.acquired)
self.assertTrue(s.expired)
self.assertEqual(s.state, protocol.STATE_CLOSED)

def test_gc_one_expire(self):
_, sm = self.make_manager()
s1 = self.make_session('id1')
s2 = self.make_session('id2')

sm._add(s1)
sm._add(s2)
self.loop.run_until_complete(sm.acquire(s1))
self.loop.run_until_complete(sm.acquire(s2))
self.loop.run_until_complete(sm.release(s1))
self.loop.run_until_complete(sm.release(s2))

s1.expires = datetime.now() - timedelta(seconds=30)

self.loop.run_until_complete(sm._heartbeat_task())
self.assertNotIn(s1.id, sm)
self.assertIn(s2.id, sm)

0 comments on commit 94d968d

Please sign in to comment.