Skip to content

Commit

Permalink
Issue #12786: Create hook for dispatching messages out of order
Browse files Browse the repository at this point in the history
  • Loading branch information
Rollo Konig Brock committed Feb 1, 2021
1 parent 02a25a3 commit a4cda09
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
5 changes: 1 addition & 4 deletions ipykernel/inprocess/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ def _dispatch_to_kernel(self, msg):
raise RuntimeError('Cannot send request. No kernel exists.')

stream = DummySocket()
self.session.send(stream, msg)
msg_parts = stream.recv_multipart()
kernel.dispatch_shell(stream, msg_parts)

kernel.dispatch_shell(msg, stream=stream)
idents, reply_msg = self.session.recv(stream, copy=False)
self.shell_channel.call_handlers_later(reply_msg)

Expand Down
65 changes: 42 additions & 23 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@

from ._version import kernel_protocol_version

CONTROL_PRIORITY = 1
SHELL_PRIORITY = 10
CONTROL_PRIORITY = (1, 'control')
SHELL_PRIORITY = (10, 'shell')


class Kernel(SingletonConfigurable):
Expand Down Expand Up @@ -167,14 +167,10 @@ def __init__(self, **kwargs):
self.control_handlers[msg_type] = getattr(self, msg_type)

@gen.coroutine
def dispatch_control(self, msg):
def dispatch_control(self, msg, idents=None, stream=None):
"""dispatch control requests"""
idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Control Message", exc_info=True)
return
if idents is None:
idents = []

self.log.debug("Control received: %s", msg)

Expand Down Expand Up @@ -215,14 +211,10 @@ def should_handle(self, stream, msg, idents):
return True

@gen.coroutine
def dispatch_shell(self, stream, msg):
def dispatch_shell(self, msg, idents=None, stream=None):
"""dispatch shell requests"""
idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
if idents is None:
idents = []

# Set the parent message for side effects.
self.set_parent(idents, msg)
Expand Down Expand Up @@ -385,16 +377,43 @@ def dispatch_queue(self):
def _message_counter_default(self):
return itertools.count()

def schedule_dispatch(self, priority, dispatch, *args):
def should_dispatch_immediately(
self, msg, idents, stream, priority, dispatch
):
"""
This provides a hook for dispatching incoming messages
from the frontend immediately, and out of order.
It could be used to allow asynchronous messages from
GUIs to be processed.
"""
return False

def schedule_dispatch(self, msg, priority, dispatch, stream=None):
"""schedule a message for dispatch"""

idents, msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.deserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid %s Message", priority[1], exc_info=True)
return

new_args = (msg, idents, stream)

if self.should_dispatch_immediately(
msg, idents, stream, priority, dispatch, stream
):
return self.io_loop.add_callback(dispatch, *new_args)

idx = next(self._message_counter)

self.msg_queue.put_nowait(
(
priority,
idx,
dispatch,
args,
new_args,
)
)
# ensure the eventloop wakes up
Expand All @@ -411,8 +430,8 @@ def start(self):
self.control_stream.on_recv(
partial(
self.schedule_dispatch,
CONTROL_PRIORITY,
self.dispatch_control,
priority=CONTROL_PRIORITY,
dispatch=self.dispatch_control,
),
copy=False,
)
Expand All @@ -423,9 +442,9 @@ def start(self):
s.on_recv(
partial(
self.schedule_dispatch,
SHELL_PRIORITY,
self.dispatch_shell,
s,
priority=SHELL_PRIORITY,
dispatch=self.dispatch_shell,
stream=s,
),
copy=False,
)
Expand Down

0 comments on commit a4cda09

Please sign in to comment.