From e19c40a5a2476480fc34702033e14d7d4edbcc15 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 25 Oct 2024 13:09:07 +0200 Subject: [PATCH] improve error handling in engine - refactor redirect output, redirect it later and stop redirecting during shutdown so errors in ipengine/kernel are more likely to be seen in the terminal - fix SIGTERM handler so it actually fires in the event loop --- ipyparallel/engine/app.py | 91 +++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 32 deletions(-) diff --git a/ipyparallel/engine/app.py b/ipyparallel/engine/app.py index 9d064d16..c0fa19b3 100755 --- a/ipyparallel/engine/app.py +++ b/ipyparallel/engine/app.py @@ -576,6 +576,37 @@ def _report_ping(self, msg): # self.log.debug("Received a ping: %s", msg) self._hb_last_pinged = time.time() + def redirect_output(self, iopub_socket): + """Redirect std streams and set a display hook.""" + if self.out_stream_factory: + sys.stdout = self.out_stream_factory(self.session, iopub_socket, 'stdout') + sys.stdout.topic = f"engine.{self.id}.stdout".encode("ascii") + sys.stderr = self.out_stream_factory(self.session, iopub_socket, 'stderr') + sys.stderr.topic = f"engine.{self.id}.stderr".encode("ascii") + + # copied from ipykernel 6, which captures sys.__stderr__ at the FD-level + if getattr(sys.stderr, "_original_stdstream_copy", None) is not None: + for handler in self.log.handlers: + print(handler) + if isinstance(handler, StreamHandler) and ( + handler.stream.buffer.fileno() + ): + self.log.debug( + "Seeing logger to stderr, rerouting to raw filedescriptor." + ) + + handler.stream = TextIOWrapper( + FileIO(sys.stderr._original_stdstream_copy, "w") + ) + if self.display_hook_factory: + sys.displayhook = self.display_hook_factory(self.session, iopub_socket) + sys.displayhook.topic = f"engine.{self.id}.execute_result".encode("ascii") + + def restore_output(self): + """Restore output after redirect_output""" + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + async def complete_registration(self, msg, connect, maybe_tunnel): ctx = self.context loop = self.loop @@ -657,36 +688,6 @@ def urls(key): # disable history: self.config.HistoryManager.hist_file = ':memory:' - # Redirect input streams and set a display hook. - if self.out_stream_factory: - sys.stdout = self.out_stream_factory( - self.session, iopub_socket, 'stdout' - ) - sys.stdout.topic = f"engine.{self.id}.stdout".encode("ascii") - sys.stderr = self.out_stream_factory( - self.session, iopub_socket, 'stderr' - ) - sys.stderr.topic = f"engine.{self.id}.stderr".encode("ascii") - - # copied from ipykernel 6, which captures sys.__stderr__ at the FD-level - if getattr(sys.stderr, "_original_stdstream_copy", None) is not None: - for handler in self.log.handlers: - if isinstance(handler, StreamHandler) and ( - handler.stream.buffer.fileno() == 2 - ): - self.log.debug( - "Seeing logger to stderr, rerouting to raw filedescriptor." - ) - - handler.stream = TextIOWrapper( - FileIO(sys.stderr._original_stdstream_copy, "w") - ) - if self.display_hook_factory: - sys.displayhook = self.display_hook_factory(self.session, iopub_socket) - sys.displayhook.topic = f"engine.{self.id}.execute_result".encode( - "ascii" - ) - # patch Session to always send engine uuid metadata original_send = self.session.send @@ -757,6 +758,9 @@ def send_with_metadata( app.init_profile_dir() app.init_code() + # redirect output at the end, only after start is called + self.redirect_output(iopub_socket) + # ipykernel 7, kernel.start is the long-running main loop start_future = self.kernel.start() if start_future is not None: @@ -959,8 +963,29 @@ def _signal_sigint(self, sig, frame): def _signal_stop(self, sig, frame): self.log.critical(f"received signal {sig}, stopping") - self.loop.add_callback_from_signal(self.kernel.stop) - self.loop.add_callback_from_signal(self.loop.stop) + # we are shutting down, stop forwarding output + try: + self.restore_output() + # kernel.stop added in ipykernel 7 + # claims to be threadsafe, but is not + kernel_stop = getattr(self.kernel, "stop", None) + if kernel_stop is not None: + # callback must be async for event loop to be + # detected by anyio + async def stop(): + # guard against kernel stop being made async + # in the future. It is sync in 7.0 + f = kernel_stop() + if f is not None: + await f + + self.loop.add_callback_from_signal(stop) + if self._kernel_start_future is None: + # not awaiting start_future, stop loop directly + self.loop.add_callback_from_signal(self.loop.stop) + except Exception: + self.log.critical("Failed to stop kernel", exc_info=True) + self.loop.add_callback_from_signal(self.loop.stop) def start(self): if self.id is not None: @@ -982,6 +1007,8 @@ async def _start(self): await self._kernel_start_future except asyncio.CancelledError: pass + except Exception as e: + self.log.critical("Error awaiting start future", exc_info=True) main = launch_new_instance = IPEngine.launch_instance