diff --git a/trio/_core/_run.py b/trio/_core/_run.py index a10d6cb57f..382a9584dc 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -747,6 +747,23 @@ def __exit__(self): # pragma: no cover assert False, """Never called, but should be defined""" +@attr.s(eq=False, repr=False) +class NurseryExiter: + """This runs only the __aexit__ of the NurseryManager that it wraps. + It is used only for the system nursery, which gets entered outside + of async context. + """ + + _manager = attr.ib() + + async def __aenter__(self): + pass + + # async/await are stripped to remove a traceback frame + def __aexit__(self, *exc): + return self._manager.__aexit__(*exc) + + def open_nursery(): """Returns an async context manager which must be used to create a new `Nursery`. @@ -1490,14 +1507,18 @@ def spawn_system_task(self, async_fn, *args, name=None): async_fn, args, self.system_nursery, name, system_task=True ) - async def init(self, async_fn, args): - async with open_nursery() as system_nursery: - self.system_nursery = system_nursery + async def init(self, system_nursery_manager, async_fn, args): + # We did the __aenter__ part of 'async with open_nursery() as + # self.system_nursery:' in unrolled_run(), so only the __aexit__ part + # needs to be done in this function. + async with NurseryExiter(system_nursery_manager): try: - self.main_task = self.spawn_impl(async_fn, args, system_nursery, None) + self.main_task = self.spawn_impl( + async_fn, args, self.system_nursery, None + ) except BaseException as exc: self.main_task_outcome = Error(exc) - system_nursery.cancel_scope.cancel() + self.system_nursery.cancel_scope.cancel() self.entry_queue.spawn() ################ @@ -1982,10 +2003,30 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): if runner.instruments: runner.instrument("before_run") - runner.clock.start_clock() + + # This strange construct allows start_clock() to spawn system tasks + # before the first step of init(). Basically we're doing the __aenter__ + # of open_nursery() here, leaving only the __aexit__ for init() to do. + # This works because __aenter__ is secretly synchronous (doesn't await). + system_nursery_manager = open_nursery() runner.init_task = runner.spawn_impl( - runner.init, (async_fn, args), None, "", system_task=True, + runner.init, + (system_nursery_manager, async_fn, args), + None, + "", + system_task=True, ) + GLOBAL_RUN_CONTEXT.task = runner.init_task + try: + system_nursery_manager.__aenter__().send(None) + except StopIteration as exc: + runner.system_nursery = exc.value + else: # pragma: no cover + raise TrioInternalError("NurseryManager.__aenter__() yielded") + finally: + del GLOBAL_RUN_CONTEXT.task # pragma: no branch + + runner.clock.start_clock() # You know how people talk about "event loops"? This 'while' loop right # here is our event loop: diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 78b46b7adc..1f2926527d 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -295,8 +295,8 @@ async def child(): stats = _core.current_statistics() print(stats) - # 2 system tasks + us - assert stats.tasks_living == 3 + # init + autojumper + run_sync_soon task + us + assert stats.tasks_living == 4 assert stats.run_sync_soon_queue_size == 0 async with _core.open_nursery() as nursery: @@ -307,8 +307,8 @@ async def child(): token.run_sync_soon(lambda: None, idempotent=True) stats = _core.current_statistics() print(stats) - # 2 system tasks + us + child - assert stats.tasks_living == 4 + # as above + child + assert stats.tasks_living == 5 # the exact value here might shift if we change how we do accounting # (currently it only counts tasks that we already know will be # runnable on the next pass), but still useful to at least test the diff --git a/trio/testing/_mock_clock.py b/trio/testing/_mock_clock.py index 843f51197f..1d6912ad3e 100644 --- a/trio/testing/_mock_clock.py +++ b/trio/testing/_mock_clock.py @@ -88,8 +88,11 @@ def __init__(self, rate=0.0, autojump_threshold=inf): self._virtual_base = 0.0 self._rate = 0.0 self._autojump_threshold = 0.0 - self._autojump_task = None - self._autojump_cancel_scope = None + + # Whenever autojump_threshold changes, this must be cancelled + # in order to wake up the autojump task. + self._autojump_cancel_scope = _core.CancelScope() + # kept as an attribute so that our tests can monkeypatch it self._real_clock = time.perf_counter @@ -124,47 +127,37 @@ def autojump_threshold(self): @autojump_threshold.setter def autojump_threshold(self, new_autojump_threshold): self._autojump_threshold = float(new_autojump_threshold) - self._maybe_spawn_autojump_task() - if self._autojump_cancel_scope is not None: - # Task is running and currently blocked on the old setting, wake - # it up so it picks up the new setting - self._autojump_cancel_scope.cancel() + self._autojump_cancel_scope.cancel() async def _autojumper(self): while True: - with _core.CancelScope() as cancel_scope: - self._autojump_cancel_scope = cancel_scope - try: - # If the autojump_threshold changes, then the setter does - # cancel_scope.cancel(), which causes the next line here - # to raise Cancelled, which is absorbed by the cancel - # scope above, and effectively just causes us to skip back - # to the start the loop, like a 'continue'. - await _core.wait_all_tasks_blocked(self._autojump_threshold, inf) - statistics = _core.current_statistics() - jump = statistics.seconds_to_next_deadline - if 0 < jump < inf: - self.jump(jump) - else: - # There are no deadlines, nothing is going to happen - # until some actual I/O arrives (or maybe another - # wait_all_tasks_blocked task wakes up). That's fine, - # but if our threshold is zero then this will become a - # busy-wait -- so insert a small-but-non-zero _sleep to - # avoid that. - if self._autojump_threshold == 0: - await _core.wait_all_tasks_blocked(0.01) - finally: - self._autojump_cancel_scope = None - - def _maybe_spawn_autojump_task(self): - if self._autojump_threshold < inf and self._autojump_task is None: - try: - clock = _core.current_clock() - except RuntimeError: - return - if clock is self: - self._autojump_task = _core.spawn_system_task(self._autojumper) + # If the autojump_threshold changes, then the setter does + # self._autojump_cancel_scope.cancel(), which causes the + # wait_task_rescheduled or wait_all_tasks_blocked to raise + # Cancelled, which is absorbed by the cancel scope above, + # and effectively just causes us to skip back to the start + # the loop, like a 'continue'. + with _core.CancelScope() as self._autojump_cancel_scope: + if self._autojump_threshold == inf: + # Autojump is disabled; just sleep until it's enabled + await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED) + assert False # pragma: no cover + + # Otherwise autojump is enabled + await _core.wait_all_tasks_blocked(self._autojump_threshold, inf) + statistics = _core.current_statistics() + jump = statistics.seconds_to_next_deadline + if 0 < jump < inf: + self.jump(jump) + else: + # There are no deadlines, nothing is going to happen + # until some actual I/O arrives (or maybe another + # wait_all_tasks_blocked task wakes up). That's fine, + # but if our threshold is zero then this will become a + # busy-wait -- so insert a small-but-non-zero _sleep to + # avoid that. + if self._autojump_threshold == 0: + await _core.wait_all_tasks_blocked(0.01) def _real_to_virtual(self, real): real_offset = real - self._real_base @@ -172,8 +165,7 @@ def _real_to_virtual(self, real): return self._virtual_base + virtual_offset def start_clock(self): - token = _core.current_trio_token() - token.run_sync_soon(self._maybe_spawn_autojump_task) + _core.spawn_system_task(self._autojumper) def current_time(self): return self._real_to_virtual(self._real_clock())