Skip to content

Commit

Permalink
improve error handling in engine
Browse files Browse the repository at this point in the history
- refactor redirect output, redirect it later and stop redirecting during shutdown
  so errors in ipengine/kernel are more likely to be seen in the terminal
- fix SIGTERM handler so it actually fires in the event loop
  • Loading branch information
minrk committed Oct 25, 2024
1 parent 9118015 commit e19c40a
Showing 1 changed file with 59 additions and 32 deletions.
91 changes: 59 additions & 32 deletions ipyparallel/engine/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,37 @@ def _report_ping(self, msg):
# self.log.debug("Received a ping: %s", msg)
self._hb_last_pinged = time.time()

def redirect_output(self, iopub_socket):
"""Redirect std streams and set a display hook."""
if self.out_stream_factory:
sys.stdout = self.out_stream_factory(self.session, iopub_socket, 'stdout')
sys.stdout.topic = f"engine.{self.id}.stdout".encode("ascii")
sys.stderr = self.out_stream_factory(self.session, iopub_socket, 'stderr')
sys.stderr.topic = f"engine.{self.id}.stderr".encode("ascii")

# copied from ipykernel 6, which captures sys.__stderr__ at the FD-level
if getattr(sys.stderr, "_original_stdstream_copy", None) is not None:
for handler in self.log.handlers:
print(handler)
if isinstance(handler, StreamHandler) and (
handler.stream.buffer.fileno()
):
self.log.debug(
"Seeing logger to stderr, rerouting to raw filedescriptor."
)

handler.stream = TextIOWrapper(
FileIO(sys.stderr._original_stdstream_copy, "w")
)
if self.display_hook_factory:
sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
sys.displayhook.topic = f"engine.{self.id}.execute_result".encode("ascii")

def restore_output(self):
"""Restore output after redirect_output"""
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__

async def complete_registration(self, msg, connect, maybe_tunnel):
ctx = self.context
loop = self.loop
Expand Down Expand Up @@ -657,36 +688,6 @@ def urls(key):
# disable history:
self.config.HistoryManager.hist_file = ':memory:'

# Redirect input streams and set a display hook.
if self.out_stream_factory:
sys.stdout = self.out_stream_factory(
self.session, iopub_socket, 'stdout'
)
sys.stdout.topic = f"engine.{self.id}.stdout".encode("ascii")
sys.stderr = self.out_stream_factory(
self.session, iopub_socket, 'stderr'
)
sys.stderr.topic = f"engine.{self.id}.stderr".encode("ascii")

# copied from ipykernel 6, which captures sys.__stderr__ at the FD-level
if getattr(sys.stderr, "_original_stdstream_copy", None) is not None:
for handler in self.log.handlers:
if isinstance(handler, StreamHandler) and (
handler.stream.buffer.fileno() == 2
):
self.log.debug(
"Seeing logger to stderr, rerouting to raw filedescriptor."
)

handler.stream = TextIOWrapper(
FileIO(sys.stderr._original_stdstream_copy, "w")
)
if self.display_hook_factory:
sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
sys.displayhook.topic = f"engine.{self.id}.execute_result".encode(
"ascii"
)

# patch Session to always send engine uuid metadata
original_send = self.session.send

Expand Down Expand Up @@ -757,6 +758,9 @@ def send_with_metadata(
app.init_profile_dir()
app.init_code()

# redirect output at the end, only after start is called
self.redirect_output(iopub_socket)

# ipykernel 7, kernel.start is the long-running main loop
start_future = self.kernel.start()
if start_future is not None:
Expand Down Expand Up @@ -959,8 +963,29 @@ def _signal_sigint(self, sig, frame):

def _signal_stop(self, sig, frame):
self.log.critical(f"received signal {sig}, stopping")
self.loop.add_callback_from_signal(self.kernel.stop)
self.loop.add_callback_from_signal(self.loop.stop)
# we are shutting down, stop forwarding output
try:
self.restore_output()
# kernel.stop added in ipykernel 7
# claims to be threadsafe, but is not
kernel_stop = getattr(self.kernel, "stop", None)
if kernel_stop is not None:
# callback must be async for event loop to be
# detected by anyio
async def stop():
# guard against kernel stop being made async
# in the future. It is sync in 7.0
f = kernel_stop()
if f is not None:
await f

self.loop.add_callback_from_signal(stop)
if self._kernel_start_future is None:
# not awaiting start_future, stop loop directly
self.loop.add_callback_from_signal(self.loop.stop)
except Exception:
self.log.critical("Failed to stop kernel", exc_info=True)
self.loop.add_callback_from_signal(self.loop.stop)

def start(self):
if self.id is not None:
Expand All @@ -982,6 +1007,8 @@ async def _start(self):
await self._kernel_start_future
except asyncio.CancelledError:
pass
except Exception as e:
self.log.critical("Error awaiting start future", exc_info=True)


main = launch_new_instance = IPEngine.launch_instance
Expand Down

0 comments on commit e19c40a

Please sign in to comment.