papermill icon indicating copy to clipboard operation
papermill copied to clipboard

Run parameter cell as `%%local` for sparkmagic kernels

Open thvasilo opened this issue 3 years ago • 1 comments

🚀 Feature

Allow setting the execution context for the parameter cell (through the use of %% magic commands)

Motivation

Currently, the way that papermill injects the parameter cell it will get executed by the "default" kernel of the notebook which when executing a sparkmagic kernel notebook is equivalent to using the %%spark magic.

However, we often need to perform some initial setup, for example creating and connecting the cluster that the notebook will interact with, which is done by executing %%local cells, then connecting to the cluster and executing the %%spark cells.

Without the ability to parse the parameters in the local executor, in our use-case there is no Spark cluster to connect to when the parameter cell is executed, leading to a failed execution.

So we'd like a way to annotate the injected parameter cell to run in %%local context, so we can set up and connect to a Spark cluster using local (e.g. boto3) commands, and execute the %%spark cells later.

thvasilo avatar Mar 02 '22 18:03 thvasilo

Came across the same problem. Taking inspiration from the engines in papermill, would like to propose that we move parameterising in the same manner

"""Parameterizers to perform different parameterization"""
import copy

import entrypoints
import nbformat

from papermill.log import logger
from papermill.exceptions import PapermillException
from papermill.utils import find_first_tagged_cell_index, nb_kernel_name, nb_language
from papermill.parameterize import read_yaml_file
from papermill.translators import translate_parameters


class PapermillParameterizer(object):
    """
    The holder which houses any parameterizer registered with the system.

    This object is used in a singleton manner to save and load particular
    named Parameterizer objects so they may be referenced externally.
    """

    def __init__(self):
        self._parameterizers = {}

    def register(self, name, parameterizer):
        """Register a named parameterizer"""
        self._parameterizers[name] = parameterizer

    def register_entry_points(self):
        """Register entrypoints for an

        Load handlers provided by other packages
        """
        for entrypoint in entrypoints.get_group_all("papermill.parameterizer"):
            self.register(entrypoint.name, entrypoint.load())

    def get_parameterizer(self, name=None):
        """Retrieves a parameterizer by name."""
        parameterizer = self._parameterizers.get(name)
        if not parameterizer:
            raise PapermillException("No parameterizer named '{}' found".format(name))
        return parameterizer

    def parameterize_notebook_with_parameterizer(self, parameterizer, nb, parameters, report_mode=False, comment='Parameters', kernel_name=None, language=None, **kwargs):
        """Fetch a named parameterizer and parameterize the nb object."""
        return self.get_parameterizer(parameterizer).parameterize_notebook(
            nb=nb,
            parameters=parameters,
            report_mode=report_mode,
            kernel_name=kernel_name,
            language=language,
            comment=comment,
            **kwargs
        )

class Parameterizer:
    @classmethod
    def _generate_param_content(cls, nb, parameters, kernel_name, language, comment):
        # Load from a file if 'parameters' is a string.
        if isinstance(parameters, str):
            parameters = read_yaml_file(parameters)

        # Fetch out the name and language from the notebook document
        kernel_name = nb_kernel_name(nb, kernel_name)
        language = nb_language(nb, language)

        # Generate parameter content based on the kernel_name
        param_content = translate_parameters(kernel_name, language, parameters, comment)
        return param_content

    @classmethod
    def _create_new_cell(cls, nb, param_content, report_mode):
        newcell = nbformat.v4.new_code_cell(source=param_content)
        newcell.metadata['tags'] = ['injected-parameters']

        if report_mode:
            newcell.metadata['jupyter'] = newcell.get('jupyter', {})
            newcell.metadata['jupyter']['source_hidden'] = True

        return newcell

    @classmethod
    def _interleave_new_cell(cls, nb, newcell):
        param_cell_index = find_first_tagged_cell_index(nb, 'parameters')
        injected_cell_index = find_first_tagged_cell_index(nb, 'injected-parameters')
        if injected_cell_index >= 0:
            # Replace the injected cell with a new version
            before = nb.cells[:injected_cell_index]
            after = nb.cells[injected_cell_index + 1 :]
        elif param_cell_index >= 0:
            # Add an injected cell after the parameter cell
            before = nb.cells[: param_cell_index + 1]
            after = nb.cells[param_cell_index + 1 :]
        else:
            # Inject to the top of the notebook
            logger.warning("Input notebook does not contain a cell with tag 'parameters'")
            before = []
            after = nb.cells

        nb.cells = before + [newcell] + after
        return nb
    
    @classmethod
    def parameterize_notebook(cls, nb, parameters, report_mode=False, comment='Parameters', kernel_name=None, language=None, **kwargs):
        # Copy the nb object to avoid polluting the input
        nb = copy.deepcopy(nb)

        param_content = cls._generate_param_content(nb, parameters, kernel_name, language, comment)
        newcell = cls._create_new_cell(nb, param_content, report_mode)
        nb = cls._interleave_new_cell(nb, newcell)

        nb.metadata.papermill['parameters'] = parameters
        return nb


class DefaultParameterizer(Parameterizer):
    pass

class SparkMagicParameterizer(DefaultParameterizer):
    @classmethod
    def _create_new_cell(cls, nb, param_content, report_mode):
        newcell = super(SparkMagicParameterizer, cls)._create_new_cell(nb, param_content, report_mode)

        param_cell_index = find_first_tagged_cell_index(nb, 'parameters')
        if param_cell_index == -1:  # no param cell
            return newcell
        
        param_cell = nb.cells[param_cell_index]
        param_cell_src = param_cell['source']

        param_cell_src_firstline = param_cell_src.splitlines()[0]
        if param_cell_src_firstline.startswith('%%'):
            # must be a cell magic annotated param cell. Prepend cell magic command to newcell 
            newcell['source'] = param_cell_src_firstline + '\n' + newcell['source']

        return newcell


papermill_parameterizers = PapermillParameterizer()
papermill_parameterizers.register(None, DefaultParameterizer)
papermill_parameterizers.register('sparkmagic', SparkMagicParameterizer)
papermill_parameterizers.register_entry_points()

Then the change needed in execute_notebook becomes

def execute_notebook(
        input_path,
        output_path,
        parameters=None,
        engine_name=None,
        request_save_on_cell_execute=True,
        prepare_only=False,
        kernel_name=None,
        language=None,
        progress_bar=True,
        log_output=False,
        stdout_file=None,
        stderr_file=None,
        start_timeout=60,
        report_mode=False,
        cwd=None,
        parameterizer_name=None,
        **engine_kwargs
):
      ...
      if parameters:
            nb = papermill_parameterizers.parameterize_notebook_with_parameterizer(
                parameterizer=parameterizer_name,
                nb=nb,
                parameters=parameters,
                report_mode=report_mode,
                kernel_name=kernel_name,
                language=language,
            )
      ...

Vs


def execute_notebook(
    input_path,
    output_path,
    parameters=None,
    engine_name=None,
    request_save_on_cell_execute=True,
    prepare_only=False,
    kernel_name=None,
    language=None,
    progress_bar=True,
    log_output=False,
    stdout_file=None,
    stderr_file=None,
    start_timeout=60,
    report_mode=False,
    cwd=None,
    **engine_kwargs
):
        ...
        if parameters:
            nb = parameterize_notebook(
                nb, parameters, report_mode, kernel_name=kernel_name, language=language
            )
        ...

The output notebook then looks like (when invoked with parameters)

image

vdksoda avatar Dec 05 '24 09:12 vdksoda