Run parameter cell as `%%local` for sparkmagic kernels
🚀 Feature
Allow setting the execution context for the parameter cell (through the use of %% magic commands)
Motivation
Currently, the way that papermill injects the parameter cell it will get executed by the "default" kernel of the notebook which when executing a sparkmagic kernel notebook is equivalent to using the %%spark magic.
However, we often need to perform some initial setup, for example creating and connecting the cluster that the notebook will interact with, which is done by executing %%local cells, then connecting to the cluster and executing the %%spark cells.
Without the ability to parse the parameters in the local executor, in our use-case there is no Spark cluster to connect to when the parameter cell is executed, leading to a failed execution.
So we'd like a way to annotate the injected parameter cell to run in %%local context, so we can set up and connect to a Spark cluster using local (e.g. boto3) commands, and execute the %%spark cells later.
Came across the same problem. Taking inspiration from the engines in papermill, would like to propose that we move parameterising in the same manner
"""Parameterizers to perform different parameterization"""
import copy
import entrypoints
import nbformat
from papermill.log import logger
from papermill.exceptions import PapermillException
from papermill.utils import find_first_tagged_cell_index, nb_kernel_name, nb_language
from papermill.parameterize import read_yaml_file
from papermill.translators import translate_parameters
class PapermillParameterizer(object):
"""
The holder which houses any parameterizer registered with the system.
This object is used in a singleton manner to save and load particular
named Parameterizer objects so they may be referenced externally.
"""
def __init__(self):
self._parameterizers = {}
def register(self, name, parameterizer):
"""Register a named parameterizer"""
self._parameterizers[name] = parameterizer
def register_entry_points(self):
"""Register entrypoints for an
Load handlers provided by other packages
"""
for entrypoint in entrypoints.get_group_all("papermill.parameterizer"):
self.register(entrypoint.name, entrypoint.load())
def get_parameterizer(self, name=None):
"""Retrieves a parameterizer by name."""
parameterizer = self._parameterizers.get(name)
if not parameterizer:
raise PapermillException("No parameterizer named '{}' found".format(name))
return parameterizer
def parameterize_notebook_with_parameterizer(self, parameterizer, nb, parameters, report_mode=False, comment='Parameters', kernel_name=None, language=None, **kwargs):
"""Fetch a named parameterizer and parameterize the nb object."""
return self.get_parameterizer(parameterizer).parameterize_notebook(
nb=nb,
parameters=parameters,
report_mode=report_mode,
kernel_name=kernel_name,
language=language,
comment=comment,
**kwargs
)
class Parameterizer:
@classmethod
def _generate_param_content(cls, nb, parameters, kernel_name, language, comment):
# Load from a file if 'parameters' is a string.
if isinstance(parameters, str):
parameters = read_yaml_file(parameters)
# Fetch out the name and language from the notebook document
kernel_name = nb_kernel_name(nb, kernel_name)
language = nb_language(nb, language)
# Generate parameter content based on the kernel_name
param_content = translate_parameters(kernel_name, language, parameters, comment)
return param_content
@classmethod
def _create_new_cell(cls, nb, param_content, report_mode):
newcell = nbformat.v4.new_code_cell(source=param_content)
newcell.metadata['tags'] = ['injected-parameters']
if report_mode:
newcell.metadata['jupyter'] = newcell.get('jupyter', {})
newcell.metadata['jupyter']['source_hidden'] = True
return newcell
@classmethod
def _interleave_new_cell(cls, nb, newcell):
param_cell_index = find_first_tagged_cell_index(nb, 'parameters')
injected_cell_index = find_first_tagged_cell_index(nb, 'injected-parameters')
if injected_cell_index >= 0:
# Replace the injected cell with a new version
before = nb.cells[:injected_cell_index]
after = nb.cells[injected_cell_index + 1 :]
elif param_cell_index >= 0:
# Add an injected cell after the parameter cell
before = nb.cells[: param_cell_index + 1]
after = nb.cells[param_cell_index + 1 :]
else:
# Inject to the top of the notebook
logger.warning("Input notebook does not contain a cell with tag 'parameters'")
before = []
after = nb.cells
nb.cells = before + [newcell] + after
return nb
@classmethod
def parameterize_notebook(cls, nb, parameters, report_mode=False, comment='Parameters', kernel_name=None, language=None, **kwargs):
# Copy the nb object to avoid polluting the input
nb = copy.deepcopy(nb)
param_content = cls._generate_param_content(nb, parameters, kernel_name, language, comment)
newcell = cls._create_new_cell(nb, param_content, report_mode)
nb = cls._interleave_new_cell(nb, newcell)
nb.metadata.papermill['parameters'] = parameters
return nb
class DefaultParameterizer(Parameterizer):
pass
class SparkMagicParameterizer(DefaultParameterizer):
@classmethod
def _create_new_cell(cls, nb, param_content, report_mode):
newcell = super(SparkMagicParameterizer, cls)._create_new_cell(nb, param_content, report_mode)
param_cell_index = find_first_tagged_cell_index(nb, 'parameters')
if param_cell_index == -1: # no param cell
return newcell
param_cell = nb.cells[param_cell_index]
param_cell_src = param_cell['source']
param_cell_src_firstline = param_cell_src.splitlines()[0]
if param_cell_src_firstline.startswith('%%'):
# must be a cell magic annotated param cell. Prepend cell magic command to newcell
newcell['source'] = param_cell_src_firstline + '\n' + newcell['source']
return newcell
papermill_parameterizers = PapermillParameterizer()
papermill_parameterizers.register(None, DefaultParameterizer)
papermill_parameterizers.register('sparkmagic', SparkMagicParameterizer)
papermill_parameterizers.register_entry_points()
Then the change needed in execute_notebook becomes
def execute_notebook(
input_path,
output_path,
parameters=None,
engine_name=None,
request_save_on_cell_execute=True,
prepare_only=False,
kernel_name=None,
language=None,
progress_bar=True,
log_output=False,
stdout_file=None,
stderr_file=None,
start_timeout=60,
report_mode=False,
cwd=None,
parameterizer_name=None,
**engine_kwargs
):
...
if parameters:
nb = papermill_parameterizers.parameterize_notebook_with_parameterizer(
parameterizer=parameterizer_name,
nb=nb,
parameters=parameters,
report_mode=report_mode,
kernel_name=kernel_name,
language=language,
)
...
Vs
def execute_notebook(
input_path,
output_path,
parameters=None,
engine_name=None,
request_save_on_cell_execute=True,
prepare_only=False,
kernel_name=None,
language=None,
progress_bar=True,
log_output=False,
stdout_file=None,
stderr_file=None,
start_timeout=60,
report_mode=False,
cwd=None,
**engine_kwargs
):
...
if parameters:
nb = parameterize_notebook(
nb, parameters, report_mode, kernel_name=kernel_name, language=language
)
...
The output notebook then looks like (when invoked with parameters)