ipykernel icon indicating copy to clipboard operation
ipykernel copied to clipboard

WIP: Feature: fork kernel

Open maartenbreddels opened this issue 6 years ago • 8 comments

Issue

For dashboarding, it is useful to have a kernel + the final state ready instantly. If for instance, it takes ~5 minutes to execute a notebook, it does not lead to good user experience.

Solution

Execute a notebook, and after it is done, consider it a template, and give each user a fork() of this template process.

Implementation

The kernel gets a new message, which is simply called fork. Asyncio/tornado-ioloop with fork is a no-go, it is basically not supported. To avoid this, we stop the current ioloop, and use the ioloop object to communicate the fork request up the callstack. The kernelapp now checks if a fork was requested and it will fork:

  • The parent resumes the ioloop
  • The child should clean up the ioloop and start listening on new ports:

Usage (although it does not work yet)

It also needs this PR: https://github.com/jupyter/jupyter_client/pull/441

Shell1:

$ python -m ipykernel_launcher -f conn.json --debug

Shell2:

$ python fork_kernel.py ./conn.json # this will print the result of the fork
DEBUG:traitlets:Loading connection file ./conn.json
DEBUG:traitlets:connecting shell channel to tcp://127.0.0.1:56573
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:56573
DEBUG:traitlets:connecting iopub channel to tcp://127.0.0.1:56574
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:56574
DEBUG:traitlets:connecting stdin channel to tcp://127.0.0.1:56575
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:56575
DEBUG:traitlets:connecting heartbeat channel to tcp://127.0.0.1:56577
{'header': {'msg_id': '81e4a9b2-155b535e6541ce736c126d47', 'msg_type': 'execute_reply', 'username': 'maartenbreddels', 'session': 'b5b03194-ba9276d2bbc5a711d5e5d44c', 'date': datetime.datetime(2019, 5, 28, 21, 39, 2, 604627, tzinfo=tzutc()), 'version': '5.3'}, 'msg_id': '81e4a9b2-155b535e6541ce736c126d47', 'msg_type': 'execute_reply', 'parent_header': {'msg_id': '73914843-0da368d574d8c114b8dfd531', 'msg_type': 'fork', 'username': 'maartenbreddels', 'session': '9c61f20c-d8a4e43f945eaf915031a3d4', 'date': datetime.datetime(2019, 5, 28, 21, 39, 2, 596944, tzinfo=tzutc()), 'version': '5.3'}, 'metadata': {'status': 'ok'}, 'content': {'status': 'ok', 'fork_id': 1042}, 'buffers': []}
$ python fork_kernel.py  /Users/maartenbreddels/Library/Jupyter/runtime/conn_fork.json
DEBUG:traitlets:Loading connection file /Users/maartenbreddels/Library/Jupyter/runtime/conn_fork.json
DEBUG:traitlets:connecting shell channel to tcp://127.0.0.1:54452
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:54452
DEBUG:traitlets:connecting iopub channel to tcp://127.0.0.1:54455
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:54455
DEBUG:traitlets:connecting stdin channel to tcp://127.0.0.1:54453
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:54453
DEBUG:traitlets:connecting heartbeat channel to tcp://127.0.0.1:54457
^[[A^CTraceback (most recent call last):
  File "fork_kernel.py", line 12, in <module>
    client.wait_for_ready(100)
  File "/Users/maartenbreddels/src/jupyter/jupyter_client/jupyter_client/blocking/client.py", line 111, in wait_for_ready
    msg = self.shell_channel.get_msg(block=True, timeout=1)
  File "/Users/maartenbreddels/src/jupyter/jupyter_client/jupyter_client/blocking/channels.py", line 50, in get_msg
    ready = self.socket.poll(timeout)
  File "/Users/maartenbreddels/miniconda3/envs/kernel_fork/lib/python3.7/site-packages/zmq/sugar/socket.py", line 697, in poll
    evts = dict(p.poll(timeout))
  File "/Users/maartenbreddels/miniconda3/envs/kernel_fork/lib/python3.7/site-packages/zmq/sugar/poll.py", line 99, in poll
    return zmq_poll(self.sockets, timeout=timeout)
  File "zmq/backend/cython/_poll.pyx", line 123, in zmq.backend.cython._poll.zmq_poll
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt

The last command never does anything, so I ctrl-c'ed it, so the stacktrace is included.

Problem

It doesn't work 😄. The forked child process doesn't seem to be reachable by zmq, and no debug info is printed, so I'm giving up here, and hope that someone else has good ideas or wants to pick this up.

Side uses

Forking the kernel could be used for 'undoing' the excution of a cell/line, by forking after each execute(..). Note that since fork uses copy on write, this will be fairly cheap in resource usage.

maartenbreddels avatar May 28 '19 21:05 maartenbreddels

Hey @maartenbreddels ! I'm interested in helping you with this.

I don't quite get what is the strategy here... Decreasing run time can indeed be done with more than one process, but you did mention this:

Execute a notebook, and after it is done, consider it a template, and give each user a fork() of this template process.

If you are waiting for it to be done, then we are waiting for the whole run time, right? Could you please be more specific about this idea?

It does sound interesting, though! :)

LucianaMarques avatar Jun 02 '19 22:06 LucianaMarques

Hello @maartenbreddels, I'm very interested in this feature.

Is there a way I can help on how we can implement this together?

edisongustavo avatar Aug 22 '19 11:08 edisongustavo

I currently don't have much time to work on this, but I'm happy for you to take it over a bit, or test it out.

maartenbreddels avatar Aug 23 '19 14:08 maartenbreddels

Hey @maartenbreddels, I believe I found out why the child kernel does not receive the messages.

That's due to the check_pid flag of Session in the send() method:

class Session:
    def send(...):
        ...
        if self.check_pid and not os.getpid() == self.pid:
            get_logger().warning("WARNING: attempted to send message from fork\n%s",
                msg
            )
            return        

See here

If I set check_pid = False, then the following works (This is a modification of your fork_kernel.py script:

import itertools
import sys
import time

from jupyter_client import BlockingKernelClient
import logging
logging.basicConfig()

connection_file = sys.argv[1]

client = BlockingKernelClient()
client.log.setLevel('DEBUG')
client.load_connection_file(sys.argv[1])
client.start_channels()
client.wait_for_ready(100)

obj = client.fork()
msg = client.shell_channel.get_msg(timeout=100)

print("=========================================")
print(" Client after fork")
print("=========================================")

client_after_fork = BlockingKernelClient()
client_after_fork.log.setLevel('DEBUG')
client_after_fork.load_connection_info(msg["content"]["conn"])
client_after_fork.start_channels()
client_after_fork.wait_for_ready(100)

client_after_fork.execute(code="foo = 5+5", user_expressions={"my-user-expresssion": "foo"})
msg = client_after_fork.get_shell_msg(timeout=100)
print("Value of last execution is {}".format(msg["content"]["user_expressions"]["my-user-expresssion"]["data"]["text/plain"]))

client_after_fork.execute(code="bar = 1+2", user_expressions={"my-user-expresssion": "foo,bar"})
msg = client_after_fork.get_shell_msg(timeout=100)
print("Value of last execution is {}".format(msg["content"]["user_expressions"]["my-user-expresssion"]["data"]["text/plain"]))

Will print to the console:

DEBUG:traitlets:Loading connection file conn.json
DEBUG:traitlets:connecting shell channel to tcp://127.0.0.1:56573
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:56573
DEBUG:traitlets:connecting iopub channel to tcp://127.0.0.1:56574
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:56574
DEBUG:traitlets:connecting stdin channel to tcp://127.0.0.1:56575
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:56575
DEBUG:traitlets:connecting heartbeat channel to tcp://127.0.0.1:56577
DEBUG:traitlets:connecting shell channel to tcp://127.0.0.1:48419
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:48419
DEBUG:traitlets:connecting iopub channel to tcp://127.0.0.1:52073
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:52073
DEBUG:traitlets:connecting stdin channel to tcp://127.0.0.1:50093
DEBUG:traitlets:Connecting to: tcp://127.0.0.1:50093
DEBUG:traitlets:connecting heartbeat channel to tcp://127.0.0.1:60527
=========================================
 Client after fork
=========================================
Value of last execution is 10
Value of last execution is (10, 3)

edisongustavo avatar Aug 25 '19 18:08 edisongustavo

I've also tried connecting to a "forked" kernel and it does work!

edisongustavo avatar Aug 25 '19 18:08 edisongustavo

Awesome work, I'm really glad I put out this half working PR now. Hooray to open source :)

(from mobile phone)

On Sun, 25 Aug 2019, 20:28 Edison Gustavo Muenz, [email protected] wrote:

I've also tried connecting in another process to a "forked" kernel and it does work!

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ipython/ipykernel/pull/410?email_source=notifications&email_token=AANPEPOU6W3N3WEA4DPQ4ALQGLFOBA5CNFSM4HQHMM5KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD5CZEFI#issuecomment-524653077, or mute the thread https://github.com/notifications/unsubscribe-auth/AANPEPMDFFDSJMYX7DWKPCLQGLFOBANCNFSM4HQHMM5A .

maartenbreddels avatar Aug 25 '19 18:08 maartenbreddels

Testing shutdown()

So I was testing if the shutdown() message would work and it doesn't out of the box.

I found out that the shell variable must also be updated, so I modified the block where you reset the kernel to this:

# NOTE: we actually start a new kernel, but once this works
# we can actually think about reusing the kernel object
self.kernel_class.clear_instance()
self.kernel.shell_class.clear_instance()

Then shutdown() does work correctly.

edisongustavo avatar Aug 25 '19 19:08 edisongustavo

I'm working now on getting to reuse the Kernel, I've forked the repo here: https://github.com/edisongustavo/ipykernel/, I believe it will take some time now, since I can only work on this on my free time.

But I'm getting there!

edisongustavo avatar Sep 02 '19 13:09 edisongustavo