MyST-NB
MyST-NB copied to clipboard
Fix incorrect output from prints originating from different processes
In the PipeFunc documentation I have the following problem when executing code with a ProcessPoolExecutor:
With this fix it becomes:
The root of the issue is that nbconvert --execute produces this output:
{
"cell_type": "code",
"execution_count": 47,
"id": "92",
"metadata": {
"execution": {
"iopub.execute_input": "2024-05-31T05:42:39.297713Z",
"iopub.status.busy": "2024-05-31T05:42:39.297474Z",
"iopub.status.idle": "2024-05-31T05:42:40.477462Z",
"shell.execute_reply": "2024-05-31T05:42:40.475729Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.410279 - Running double_it for x=3"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.408318 - Running double_it for x=0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.410888 - Running double_it for x=1"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.416024 - Running double_it for x=2"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.431485 - Running half_it for x=0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.434285 - Running half_it for x=1"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.433559 - Running half_it for x=2"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.439223 - Running half_it for x=3"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:40.459668 - Running take_sum"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"14\n"
]
}
],
"source": [
"from concurrent.futures import ProcessPoolExecutor\n",
"import datetime\n",
"import numpy as np\n",
"import time\n",
"from pipefunc import Pipeline, pipefunc\n",
"\n",
"\n",
"@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
"def double_it(x: int) -> int:\n",
" print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
" time.sleep(1)\n",
" return 2 * x\n",
"\n",
"\n",
"@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
"def half_it(x: int) -> int:\n",
" print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
" time.sleep(1)\n",
" return x // 2\n",
"\n",
"\n",
"@pipefunc(output_name=\"sum\")\n",
"def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
" print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
" return sum(half + double)\n",
"\n",
"\n",
"pipeline = Pipeline([double_it, half_it, take_sum])\n",
"inputs = {\"x\": [0, 1, 2, 3]}\n",
"run_folder = \"my_run_folder\"\n",
"executor = ProcessPoolExecutor(max_workers=8) # use 8 processes\n",
"results = pipeline.map(\n",
" inputs,\n",
" run_folder=run_folder,\n",
" parallel=True,\n",
" executor=executor,\n",
" storage=\"shared_memory_dict\",\n",
")\n",
"print(results[\"sum\"].output)"
]
},
Thanks for submitting your first pull request! You are awesome! :hugs:
If you haven't done so already, check out EBP's Code of Conduct and our Contributing Guide, as this will greatly help the review process.
Welcome to the EBP community! :tada:
@agoose77 the failing CI check is unrelated to these changes
@agoose77, friendly ping. Could you take a look at this?
I have a very similar problem in one of my repos, but this fix doesn't seem to work, I still get the fragmented output in the rendered HTML.
@bsipocz, did you set nb_merge_streams = True?
@bsipocz, did you set nb_merge_streams = True?
Yeap, I didn't have that, but discovered the option by following the link to your https://github.com/pipefunc/pipefunc/pull/125 PR. So thank you.
So now my issue is fixed even without this PR.
While this PR does pass all tests, and doesn't seem to break anything, I suppose it would be nice to add your failing case to the tests, too.
I added a test. Without the changes here the test will fail:
________________________________________________ test_merge_streams_parallel ________________________________________________
sphinx_run = <conftest.SphinxFixture object at 0x7e810491f9b0>
file_regression = <conftest.FileRegression object at 0x7e80b65fe3f0>
@pytest.mark.sphinx_params(
"merge_streams_parallel.ipynb",
conf={"nb_execution_mode": "off", "nb_merge_streams": True},
)
def test_merge_streams_parallel(sphinx_run, file_regression):
"""Test configuring multiple concurrent stdout/stderr outputs to be merged."""
sphinx_run.build()
assert sphinx_run.warnings() == ""
doctree = sphinx_run.get_resolved_doctree("merge_streams_parallel")
> file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8")
/home/bas.nijholt/repos/MyST-NB/tests/test_render_outputs.py:116:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <conftest.FileRegression object at 0x7e80b65fe3f0>
data = '<document source="merge_streams_parallel" translation_progress="{\'total\': 0, \'translated\': 0}">\n <container c... \n \n \n \n \n \n \n'
kwargs = {'encoding': 'utf-8', 'extension': '.xml'}
def check(self, data, **kwargs):
> return self.file_regression.check(self._strip_ignores(data), **kwargs)
E AssertionError: FILES DIFFER:
E /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.xml
E /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.xml
E HTML DIFF: /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.diff.html
E ---
E +++
E @@ -9,13 +9,13 @@
E pass
E <container classes="cell_output" nb_element="cell_code_output">
E <literal_block classes="output stream" language="myst-ansi" linenos="False" xml:space="preserve">
E + 000000000
E 0
E - 0
E - 0
E - 0
E - 0
E - 0
E - 0
E - 0
E - 0
E - 0
E +
E +
E +
E +
E +
E +
E +
E +
The notebook is executed via jupyter nbconvert --execute merge_streams_parallel.ipynb --to ipynb and I committed the executed ipynb.