Skip to content

Commit

Permalink
Fix ask#54: exits correctly on exception/ctrl-c
Browse files Browse the repository at this point in the history
  • Loading branch information
jbooth-mastery committed Jan 6, 2020
1 parent 5fac900 commit 120280d
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 14 deletions.
15 changes: 11 additions & 4 deletions mode/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ def __init__(self,
super().__init__(loop=self.thread_loop, **kwargs)
assert self._shutdown.loop is self.parent_loop

def __del__(self):
# Since services can restart, the thread loop must not be closed
# until the service is being fully removed, not just stopped.
self.thread_loop.close()

async def on_thread_started(self) -> None:
...

Expand Down Expand Up @@ -175,8 +180,8 @@ async def start(self) -> None:
self._thread_running = None

async def crash(self, exc: BaseException) -> None:
if self._thread_running and not self._thread_running.done():
self._thread_running.set_exception(exc) # <- .start() will raise
self.parent_loop.call_soon_threadsafe(
maybe_set_exception, self._thread_running, exc)
await super().crash(exc)

def _start_thread(self) -> None:
Expand All @@ -189,10 +194,14 @@ def _start_thread(self) -> None:
# shutdown here, since _shutdown_thread will not execute.
self.set_shutdown()
raise
finally:
self.loop.stop()

async def stop(self) -> None:
if self._started.is_set():
await super().stop()
if self._thread is not None:
self._thread.stop()

async def _stop_children(self) -> None:
... # called by thread instead of .stop()
Expand All @@ -208,8 +217,6 @@ async def _shutdown_thread(self) -> None:
await self.on_thread_stop()
self.set_shutdown()
await self._default_stop_futures()
if self._thread is not None:
self._thread.stop()
await self._default_stop_exit_stacks()

async def _serve(self) -> None:
Expand Down
5 changes: 3 additions & 2 deletions mode/utils/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def maybe_set_result(fut: Optional[asyncio.Future],
result: Any) -> bool:
"""Set future result if not already done."""
if fut is not None and not fut.done():
fut.set_result(result)
notify(fut, result)
return True
return False

Expand All @@ -165,4 +165,5 @@ def notify(fut: Optional[asyncio.Future], result: Any = None) -> None:
# can be used to turn a Future into a lockless, single-consumer condition,
# for multi-consumer use asyncio.Condition
if fut is not None and not fut.done():
fut.set_result(result)
loop = fut.get_loop()
loop.call_soon_threadsafe(fut.set_result, result)
7 changes: 4 additions & 3 deletions mode/utils/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ def set(self) -> None:
All coroutines waiting for it to become true are awakened.
Coroutine that call wait() once the flag is true will not block at all.
"""
from .futures import notify

if not self._value:
self._value = True

for fut in self._waiters:
if not fut.done():
fut.set_result(True)
for fut in self._waiters.copy():
notify(fut, True)

def clear(self) -> None:
"""Reset the internal flag to false.
Expand Down
2 changes: 2 additions & 0 deletions t/functional/utils/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ async def test_maybe_set_result():
loop = asyncio.get_event_loop()
future = loop.create_future()
maybe_set_result(future, 42)
await asyncio.sleep(0.001)
assert future.result() == 42
maybe_set_result(future, 53)
maybe_set_result(None, 57)
await asyncio.sleep(0.001)
assert future.result() == 42
4 changes: 3 additions & 1 deletion t/unit/test_supervisors.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def test_wakeup(self, *, sup):
wakeup = sup._please_wakeup = Mock()
wakeup.done.return_value = False
wakeup.cancelled.return_value = False
loop = wakeup.get_loop.return_value = Mock()
sup.wakeup()
assert sup._stopped.is_set()
wakeup.set_result.assert_called_with(None)
wakeup.get_loop.assert_called_once()
loop.call_soon_threadsafe.assert_called_with(wakeup.set_result, None)
7 changes: 3 additions & 4 deletions t/unit/test_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ async def test_stop(self, *, thread):
await thread.stop()
thread._started.set()
thread._shutdown.set()
thread._thread = Mock()
await thread.stop()
thread._thread.stop.assert_called_once_with()

@pytest.mark.asyncio
async def test_stop_children(self, *, thread):
Expand All @@ -183,10 +185,6 @@ async def test_shutdown_thread(self, *, thread):
thread._default_stop_futures.assert_called_once_with()
thread._shutdown.is_set()

thread._thread = Mock()
await thread._shutdown_thread()
thread._thread.stop.assert_called_once_with()

@pytest.mark.asyncio
async def test__thread_keepalive(self, *, thread):

Expand Down Expand Up @@ -262,6 +260,7 @@ async def test_crash(self, *, thread):
thread._thread_running = Mock()
thread._thread_running.done.return_value = False
await thread.crash(exc)
await asyncio.sleep(0.001)
thread._thread_running.set_exception.assert_called_with(exc)


Expand Down

0 comments on commit 120280d

Please sign in to comment.