ploomber
ploomber copied to clipboard
Setting task class default init parameters
(note that this is related to #513)
In some cases, we want to customize how a task is initialized, for example, if we want to hide the code in an output HTML report, we can create a task like this:
tasks:
- source: fit.py
product:
nb: output/nb.html
model: output/model.pickle
# hide code
nbconvert_export_kwargs:
exclude_input: True
However, if we have multiple tasks, and want to hide the code in all the outputs, we need to pass the initialization parameters to all of them, which is too verbose. Alternatively, we can provide a way to pass default initialization parameters:
init_defaults:
# passes to all tasks
Task:
params: {a: 1}
# passes to all notebook tasks
NotebookRunner:
nbconvert_export_kwargs:
exclude_input: True
Notes
- this conflicts with
clients, perhaps throw an error if clients appears here and tell the user to pass them in the clients section - do not allow certain arguments. For example all init methods take a dag argument but that should be allowed here
- this requires knowledge of the names of the underlying classes. is there any way to make this simpler?
@jramirez857 want to take a look?
@jramirez857 want to take a look?
Yes, I'll try this out
Hi, @edublancas @idomic . I have a couple questions
- Am I correct in understanding the yaml file would look something like this?
init_defaults:
# passes to all tasks
Task:
params: {a: 1}
# passes to all notebook tasks
NotebookRunner:
nbconvert_export_kwargs:
exclude_input: True
static_analysis: regular
SQLUpload:
chunksize : 20000
tasks:
- source: fit.py
product:
nb: output/nb.html
model: output/model.pickle
- If the init_defaults section is new to the YAML, which other parts of the codebase would this affect (e.g. DAG or loader)?
- Would this only encompass parameters in tasks.py, sql.py and aws.py classes?
I appreciate your help as I am familiarizing myself with the codebase!
Sure, so the changes need to go in dagspec.py and taskspec.py, the former deals with the pipeline.yaml spec, while the latter with each entry in the tasks section.
Now that I think about it task_defaults sounds like a better name (instead of init_defaults). What do you think?
So, yes, we need to add a new top-level section, as a pointer, here's where we validate the top keys in the pipeline.yaml:
https://github.com/ploomber/ploomber/blob/beb625cc977bcd34481608a91daddc5493e0983c/src/ploomber/spec/dagspec.py#L387
Then, this is the section where we process each task entry:
https://github.com/ploomber/ploomber/blob/beb625cc977bcd34481608a91daddc5493e0983c/src/ploomber/spec/dagspec.py#L785
you'll see that we are calling task_dict.to_task() - task_dict is an instance of TaskSpec, here's the definition of to_task:
https://github.com/ploomber/ploomber/blob/beb625cc977bcd34481608a91daddc5493e0983c/src/ploomber/spec/taskspec.py#L253
this is where you want to take into account the newly added section. note that for to_task won't have access to the new section, so you'll need to modify the TaskSpec.__init__ and pass it (e.g. TaskSpec(task_defaults=...))
you'll see that there is one conditional, let's focus on the first scenario for now:
https://github.com/ploomber/ploomber/blob/beb625cc977bcd34481608a91daddc5493e0983c/src/ploomber/spec/taskspec.py#L321
Inside _init_task, you'll see a class_ variable, this will contain the class of the task, so to make it generic for all tasks, you'll need to match the class name with the sub-sections in task_defaults, then use the information in task_defaults to modify the call to the constructor here:
https://github.com/ploomber/ploomber/blob/beb625cc977bcd34481608a91daddc5493e0983c/src/ploomber/spec/taskspec.py#L392
I think this should help you get started, but feel free to ask any questions!
I'd recommend go step-by-step:
- [ ] allow
task_defaultsas top-level section inpipeline.yaml - [ ] ensure
to_taskhas access to thetask_defaultsdictionary - [ ] take the values in
task_defaultsand pass them to initialize the Task object - [ ] test that one can set the
task_defaultsand the task is initialized with them - [ ] test that task_defaults also work with
grid - [ ] test that
task_defaultsvalidates the keys in the dictionary (they must be task classes)
Once this works, we can work with the second scenario (which happens when the user passes the grid argument to a task entry)
@edublancas I am working on the grid case, but I am not fully understand class meaning, in this test case, the NotebookRunner. What is it?
Also some weird behavior happens in this test case.
@pytest.fixture
def sample_dagspec_with_default_params(tmp_directory, write_tasks):
spec = {
'task_defaults': {
'NotebookRunner': {
'check_if_kernel_installed': False
}
},
'tasks': [{
'source': 'upstream.py',
'name': 'upstream-',
'product': 'output/param=[[param]].ipynb',
'grid': {
'param': [1, 2]
},
'params': {
'another': 100,
'one-more': 200
}
}]
}
Path('pipeline.yaml').write_text(yaml.dump(spec))
def test_default_parameter_grid(
sample_dagspec_with_default_params):
dag = DAGSpec('pipeline.yaml').to_dag()
# this assertion passes
assert dag['upstream-0'].params == {
'param': 1,
'check_if_kernel_installed': False,
'another': 100,
'one-more': 200
}
# this two assertion return error
assert dag['upstream-0'].check_if_kernel_installed is False
assert dag['upstream-1'].check_if_kernel_installed is False
I am not understanding some of the logic, in the test of no grid case, refer here
assert dag['script'].check_if_kernel_installed is False
This assertion passes.
What I did is the same as non-grid case, merge the default parameters to task paramters:
if task_defaults != None:
params = {**task_defaults[task_class.__name__], **params}
return TaskGroup.from_grid(task_class=task_class,
product_class=product_class,
product_primitive=product,
task_kwargs=data,
dag=dag,
grid=grid,
resolve_relative_to=self.project_root,
on_render=on_render,
on_finish=on_finish,
on_failure=on_failure,
params=params,
**name_arg), upstream
Now I am a bit confused and not sure what's the desired behavior
@edublancas I am working on the grid case, but I am not fully understand class meaning, in this test case, the NotebookRunner. What is it?
We represent each task in pipeline.yaml as an object internally. For example, when you're running a notebook, we create an instance of NotebookRunner. You can see all the classes here: https://docs.ploomber.io/en/latest/api/python_api.html#tasks
that's why we have the NotebookRunner there, we're essentially saying: I want this default parameters for all instances of NotebookRunner
for the CI issues: please open a PR, and let it fail, then ping me so I can look at the logs
@edublancas here is the log
I am a bit confused between these two definition: this and that
One is to generate a single task and one is to generate a taskgroup. I am trying to follow the single task case for the grid one, but it looks like these two cases function differently.
For example, in the test case of the single task, here we merge task params and default params together to make the task.
When I do the same for grid task case, it doesn't work. You can tell from above log that in params of the generated task, we have 'check_if_kernel_installed': False but the check assert dag['upstream-0'].check_if_kernel_installed is False is wrong.
Am I missing some steps somewhere?
Closing due to inactivity, feel free to open.