diff --git a/ipykernel/inprocess/client.py b/ipykernel/inprocess/client.py index 5de4f774d..ce0f2bf2b 100644 --- a/ipykernel/inprocess/client.py +++ b/ipykernel/inprocess/client.py @@ -172,10 +172,7 @@ def _dispatch_to_kernel(self, msg): raise RuntimeError('Cannot send request. No kernel exists.') stream = DummySocket() - self.session.send(stream, msg) - msg_parts = stream.recv_multipart() - kernel.dispatch_shell(stream, msg_parts) - + kernel.dispatch_shell(msg, stream=stream) idents, reply_msg = self.session.recv(stream, copy=False) self.shell_channel.call_handlers_later(reply_msg) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index aba3223a9..0dfdf1774 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -38,8 +38,8 @@ from ._version import kernel_protocol_version -CONTROL_PRIORITY = 1 -SHELL_PRIORITY = 10 +CONTROL_PRIORITY = (1, 'control') +SHELL_PRIORITY = (10, 'shell') class Kernel(SingletonConfigurable): @@ -167,14 +167,10 @@ def __init__(self, **kwargs): self.control_handlers[msg_type] = getattr(self, msg_type) @gen.coroutine - def dispatch_control(self, msg): + def dispatch_control(self, msg, idents=None, stream=None): """dispatch control requests""" - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Control Message", exc_info=True) - return + if idents is None: + idents = [] self.log.debug("Control received: %s", msg) @@ -215,14 +211,10 @@ def should_handle(self, stream, msg, idents): return True @gen.coroutine - def dispatch_shell(self, stream, msg): + def dispatch_shell(self, msg, idents=None, stream=None): """dispatch shell requests""" - idents, msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Message", exc_info=True) - return + if idents is None: + idents = [] # Set the parent message for side effects. self.set_parent(idents, msg) @@ -385,8 +377,35 @@ def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, priority, dispatch, *args): + def should_dispatch_immediately( + self, msg, idents, stream, priority, dispatch + ): + """ + This provides a hook for dispatching incoming messages + from the frontend immediately, and out of order. + + It could be used to allow asynchronous messages from + GUIs to be processed. + """ + return False + + def schedule_dispatch(self, msg, priority, dispatch, stream=None): """schedule a message for dispatch""" + + idents, msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid %s Message", priority[1], exc_info=True) + return + + new_args = (msg, idents, stream) + + if self.should_dispatch_immediately( + msg, idents, stream, priority, dispatch, stream + ): + return self.io_loop.add_callback(dispatch, *new_args) + idx = next(self._message_counter) self.msg_queue.put_nowait( @@ -394,7 +413,7 @@ def schedule_dispatch(self, priority, dispatch, *args): priority, idx, dispatch, - args, + new_args, ) ) # ensure the eventloop wakes up @@ -411,8 +430,8 @@ def start(self): self.control_stream.on_recv( partial( self.schedule_dispatch, - CONTROL_PRIORITY, - self.dispatch_control, + priority=CONTROL_PRIORITY, + dispatch=self.dispatch_control, ), copy=False, ) @@ -423,9 +442,9 @@ def start(self): s.on_recv( partial( self.schedule_dispatch, - SHELL_PRIORITY, - self.dispatch_shell, - s, + priority=SHELL_PRIORITY, + dispatch=self.dispatch_shell, + stream=s, ), copy=False, )