[WIP] An initial stab at a flow decorator
Summary
Include a summary of major changes in bullet points:
I was talking to @Andrew-S-Rosen about implementing a @flow decorator for jobflow, and a previous discussion on this topic. Here's an initial stab at it. It's using a "flow context" (an idea also used in the Prefect library) so that a job can figure out if it's running inside a decorated flow function, and if so, add itself to the flow's job list. I believe this should help us get around the "updating global variables" issue as mentioned in a related PR in the linked discussion.
- Added a
@flowdecorator - Added some tests, one for a dynamic run with a
replacement.
Additional dependencies introduced (if any)
None
TODO (if any)
I have almost no history with using jobflow, so certainly I'm not thinking about all use cases, so any pointers (initially failing tests that I can add to cover different scenarios before tackling them) will be welcome.
Checklist
Work-in-progress pull requests are encouraged, but please put [WIP] in the pull request title.
Before a pull request can be merged, the following items must be checked:
- [x] Code is in the standard Python style. The easiest way to handle this is to run the following in the correct sequence on your local machine. Start with running black on your new code. This will automatically reformat your code to PEP8 conventions and removes most issues. Then run pycodestyle, followed by flake8.
- [ ] Docstrings have been added in theNumpy docstring format. Run pydocstyle on your code.
- [ ] Type annotations are highly encouraged. Run mypy to type check your code.
- [x] Tests have been added for any new functionality or bug fixes.
- [x] All linting and tests pass.
Note that the CI system will run all the above checks. But it will be much more
efficient if you already fix most errors prior to submitting the PR. It is highly
recommended that you use the pre-commit hook provided in the repository. Simply
pip install pre-commit and then pre-commit install and a check will be run
prior to allowing commits.
@Andrew-S-Rosen, @davidwaroquiers , @utf
Also pinging @gpetretto in case he has any thoughts about what considerations might need to come into play with regards to getting this or a similar approach to work nicely by Jobflow-Remote.
At a high level, to break down what's happening:
- The
flowdecorator sets a context variable, and returns aDecoratedFlow, a subclass ofFlow, which saves the callable, args and kwargs. - A
Job, when run, checks to see if its running in the context of aflowdecorator, if so, it adds itself to theDecoratedFlow(much like the user composing a traditionalFlowwould have done explicitly). -
run_locallyruns theDecoratedFlowbycalling it with its saved args/kwargs, and at the end, resolves any references in the output. - When resolving output references, some extra care is taken to resolve lists/dicts etc.
- If the job has been replaced by another, the output of the replaced job is returned instead.
- The latest run (as determined by
job_index) of a job is used to determine what output will be returned for that job. This seems reasonable to me, but I'm making some assumptions here (job_indices always increase and only the latest one is of interest to the caller etc), so perhaps this could use a thorough review and more test cases. - The legacy behavior is retained (returning the entire response dictionary) for traditional
Flowclasses (i.e. notDecoratedFlow).
Parallel changes would have to be made in jobflow-remote if this is on the right path, of course.
I've added some more tests to cover what I think would be typical scenarios in using the @flow decorator:
@flow
def my_flow(a, b):
# typical case
return add(a, b).result # or the inadvertent shorthand `return add(a, b)` which is handled by existing code correctly.
@flow
def my_flow():
# some jobs are run, but only for their side effects
return 42
@flow
def my_flow(a, b):
# some complex structure with hashable keys but jobs/flows as values
return [add(a, b), add(b, a).output], {
"foo": add(a, b).output,
"bar": [add(a, b), subtract(a, b)],
}
# replaced job in a flow
@job
def replace_job(x, y):
another_job = subtract(x, y)
return Response(replace=another_job)
@flow
def my_flow(a, b):
return add(a, b).result
Hi @vineetbansal and @Andrew-S-Rosen, thanks a lot for implementing this. I have made some tests and here are my initial comments.
- It is not clear why for a
DecoratedFlowthe output should be different from the standard case of a normal Flow. In the example below I would expect that the output of the two executions would be equivalent.
This, however, is just concerning how@flow def my_flow(a, b): j= add(a, b) return j j = add(3,4) f = Flow(j, output=j.output) result = run_locally(f, ensure_success=True) result = run_locally(my_flow(3,4), ensure_success=True)run_locallygenerates the outputs - Just to clarify, I suppose there is no aim at executing any piece of code inside a
flowdecorated function, aside from combining jobs actions. For example the following code raises an error:
I would not see any reasonable way of implementing this, especially for jobflow-remote or fireworks executions. Just for my undestanding, is something similar possible in prefect?@flow def my_flow(a, b): j1= add(a, b) x = j1.output + 5 j2 = add(x, b) return j2.output - It seems that is not possible to nest multiple
flowdecorated functions. The following code does not execute any Job and returns None:
Maybe here it is possible to change the@flow def add_single(a, b): j1 = add(a, b) return add(j1.output, 2) @flow def add_combine(a, b): f1 = add_single(a, b) return add_single(f1.output, 3) f = add_combine(1, 2) result = run_locally(f, ensure_success=True)flowdecorator so that when the function is called can also check if it is inside theflow_build_contextand trigger a recursive unravel of the flow functions? In general I would expect this to work more or less like the following code based on Makers:class AddSingleMaker(Maker): name = "single" def make(self, a, b): j1 = add(a, b) j2 = add(j1.output, 2) return Flow([j1, j2], output=j2.output) class AddCombineMaker(Maker): name = "combine" def make(self, a, b): f1 = AddSingleMaker().make(1, 2) f2 = AddSingleMaker().make(f1.output, 3) return Flow([f1, f2], output=f2.output) f = AddCombineMaker().make(1, 2) result = run_locally(f, ensure_success=True) - Concerning jobflow-remote, as things are now in the end I now think it would be relatively easy to integrate this. My understanding is that the
flowdecorated function should just be used to first generate the "complete" Flow object. I have tried the code below in jobflow-remote and it seems to be working as expected
So I can basically include this check below in the@flow def my_flow(a,b): j= add(a,b) return j print("create flow") f = my_flow(1, 2) with flow_build_context(f): output = f.fn(*f.args, **f.kwargs) f.name = f.fn.__name__ f.output = output.output submit_flow(f, worker="local_shell")submit_flowfunction and I would expect this to work:
Maybe the setting ofif isinstance(flow, DecoratedFlow): with flow_build_context(flow): output = flow.fn(*flow.args, **flow.kwargs) flow.name = flow.fn.__name__ flow.output = output.outputnameandoutputcan even be done directly by theflowdecorator? If instead you expect that the code will evolve to include code execution as in the example above or other functionalities things may get more complicated. - If this solution works, I would expect that it may be applied in the same way for
flow_to_workflowfor Fireworks.
Thanks @gpetretto - your comments were most insightful and after a few iterations of overly complicating and then simplifying things on my end, I think I've ended up at more or less the same place as you had intuited.
It is not clear why for a DecoratedFlow the output should be different from the standard case of a normal Flow..
Indeed - my original intuition was that the @flow decorator could be used on arbitrary functions, not necessarily ones for accumulating Jobs and Flows, so I was trying to resolve the output (to an admittedly incomplete extent) before returning it. I see that all the resolving is happening in the responses dict anyway, so that simplifies things on my end (and gets rid of a few test cases related to this return behavior).
Just to clarify, I suppose there is no aim at executing any piece of code inside a flow decorated function, aside from combining jobs actions. .. I would not see any reasonable way of implementing this, especially for jobflow-remote or fireworks executions.
Yes - I talked to @Andrew-S-Rosen and we're not using @flow decorators (across workflow libraries) to execute code other than as a glue to create a DAG. So my initial (incomplete) attempt at trying to do this is not needed.
is something similar possible in prefect?
Yes - it seems to be. Prefect seems to be passing in the "manager" to the context to execute arbitrary code in the flow:
with engine.start():
while engine.is_running():
with engine.run_context():
engine.call_flow_fn()
It seems that is not possible to nest multiple flow decorated functions. .. Maybe here it is possible to change the flow decorator so that when the function is called can also check if it is inside the flow_build_context and trigger a recursive unravel of the flow functions?
This should now be fixed. If I understand correctly, the current DecoratedFlow should add itself to the context before "executing" (building the DAG for) the decorated function, and the DecoratedFlow should check if it is nested inside another (by checking the flow context) to add itself to the parent flow, which is what I'm doing now.
I've added a test case for this.
Maybe the setting of name and output can even be done directly by the flow decorator?
Good idea. Done!
As it stands now, Response(replace=..) and dynamic flows are not currently working with this approach, and represented by two xfail tests.
This seems to have to do with the creation of brand-new Flow objects, independent of existing flows, when replace is called:
https://github.com/materialsproject/jobflow/blob/d208e460d7bad3533a6323703276cf75304c0891/src/jobflow/core/job.py#L1383
and
https://github.com/materialsproject/jobflow/blob/d208e460d7bad3533a6323703276cf75304c0891/src/jobflow/core/job.py#L1409
These 2 are the cases that we'd like to support in jobflow to be able to use it in our project. I'll dig into this some more, unless you have some thoughts already. I didn't want to delay this reply any more!
Hi @vineetbansal, thanks a lot for the updates! I will have a look at them. I just wanted to quickly clarify one point: I tried to fetch your latest changes and run the tests in test_flow_decorator.py, but they all seem to pass on my laptop. Even those marked with xfail. Could there be some differences depending on the environment? Or am I missing something?
@gpetretto - no - you're right. Somewhere along the line during my refactoring the Response(replace=..) seems to have been fixed - I should have used strict=True for those xfails. I'm adding another commit that removes those xfails. Fingers crossed..
@gpetretto - when you're looking at this PR, I wanted to bring something else to your attention as well, as I was going through its possible integration with quacc. This is again going back to your question:
It is not clear why for a DecoratedFlow the output should be different from the standard case of a normal Flow..
In the case of classic flows, the caller who creates the Flow is responsible for reaching into the results depending on what they "know" the eventual output of interest is (presumably by how they put the jobs together):
job1 = add(1, 2)
job2 = mult(job1.output, 3)
flow = jf.Flow([job1, job2])
responses = run_locally(flow, ensure_success=True)
assert responses[job2.uuid][1].output == 9
or, the same thing but during Flow construction time:
flow = jf.Flow([job1, job2], output=job2.output)
responses = run_locally(flow, ensure_success=True)
assert responses[flow.output.uuid][1].output == 9
However, in the case of a @flow decorated function (which should be opaque to the caller except for the input and output), it's not clear what uuid in the results the caller should be reaching in for (since the jobs/flows are all internal details of the @flow). For it to "just work", the use case would look like:
@flow
def workflow():
job1 = add(1, 2)
job2 = mult(job1.output, 3)
return job2.output # or `return job`
response = run_locally(workflow(), ensure_success=True)
assert response == 9
The latter is the use case where we use jobflow in quacc recipes.
We have a small tweak to this PR here that allows us to do this. Perhaps we can incorporate something like that into this PR (either now or as a separate PR?)
The way I envision it working would be (in case you do want to preserve compatibility with Flow):
@flow
def workflow(...)
# return a dict of responses, keyed by uuid, similar to Flow()
@flow(return_dict=False)
def workflow(...)
# return the resolved output
This also allows us to support cases where the output is not a single OutputReference or Flow, but could easily be resolved anyway:
@flow(return_dict=False)
def my_flow(a, b):
return [add(a, a), add(b, b), add(a, b)]
Hi @vineetbansal,
thanks for your additional comments.
I think that your comparison between the Flow and @flow is not entirely fair though. It seems to me that in the @flow example you are making it more complicated to access the results by passing the workflow() directly to run_locally. In fact you can again achieve the same behaviour as for the Flow example by simply assigning the workflow() to a variable:
@flow
def workflow():
job1 = add(1, 2)
job2 = mult(job1.output, 3)
return job2.output # or `return job`
f = workflow()
response = run_locally(f, ensure_success=True)
assert response[f.output.uuid][1].output == 9
assert response[f.jobs[1].uuid][1].output == 9
If you instead consider necessary to have the run_locally to return just the output of the whole Flow, instead of the list of responses, I think it would make more sense to have an option directly in run_locally and apply this independently from the fact that the input is a Flow or a DecoratedFlow.
--- a/src/jobflow/managers/local.py
+++ b/src/jobflow/managers/local.py
@@ -22,6 +22,7 @@ def run_locally(
ensure_success: bool = False,
allow_external_references: bool = False,
raise_immediately: bool = False,
+ return_flow_result: bool = False,
) -> dict[str, dict[int, jobflow.Response]]:
"""
Run a :obj:`Job` or :obj:`Flow` locally.
@@ -84,6 +85,9 @@ def run_locally(
flow = get_flow(flow, allow_external_references=allow_external_references)
+ if return_flow_result and flow.output is None:
+ raise Value("The Flow should have an output if return_flow_result is True")
+
stopped_parents: set[str] = set()
errored: set[str] = set()
responses: dict[str, dict[int, jobflow.Response]] = defaultdict(dict)
@@ -183,4 +187,7 @@ def run_locally(
if ensure_success and not finished_successfully:
raise RuntimeError("Flow did not finish running successfully")
+ if return_flow_result:
+ max_out_ind = max(responses[flow.output.uuid].keys())
+ return responses[flow.output.uuid][max_out_ind].output
return dict(responses)
With these changes you can just run the same for both Flow and @flow:
job1 = add(1, 2)
job2 = mult(job1.output, 3)
f = Flow([job1, job2], output=job2.output)
responses = run_locally(f, ensure_success=True, return_flow_result=True)
assert responses == 9
@flow
def workflow():
job1 = add(1, 2)
job2 = mult(job1.output, 3)
return job2.output # or `return job`
f = workflow()
response = run_locally(f, ensure_success=True, return_flow_result=True)
assert responses == 9
I did not test this with more complicated workflows or indexes larger than 1, so maybe some adaptation might be needed.
Does this solve your issue? If not I think it would be better if you can link an example from quacc so that it is clearer where this is actually used.
Also, note that all these solutions will only work when executing the workflows with run_locally. In fireworks and jobflow-remote there is no "Flow output" in the DB, so the only option to get the final results is to use the job uuids. It might be interesting to add these, but the changes will be much bigger in that case.
Comments on the implementation
I also went through your last changes. I think it is a very good solution, as it may not require any change at all for jobflow-remote. I tried to submit a simple @flow and it worked directly. Some further investigation is needed to verify that everything works as expected, but it is definitely promising.
I have also found one more way to break the current implementation, but I may also have a solution.
Consider that, especially in atomate2 the Flow Makers are used very often. So it may be important to combine a @flow function with a Maker. However, currently the following example does not work:
@job
def add(a, b):
return a + b
@dataclass
class AddFlowMaker(Maker):
def make(self, a, b):
j1 = add(a, b)
j2 = add(j1.output, 2)
return Flow([j1, j2])
@flow
def add_combine(a, b):
j = add(a, b)
f = AddFlowMaker().make(1, j.output)
return f
f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)
The reason is that in this case the code tries to assign the same Job to multiple flows as if it was its first flow addition. I have made a change that makes the code above run and seems to keep consistency with the expected behaviour, but more investigation might be needed here as well:
--- a/src/jobflow/core/flow.py
+++ b/src/jobflow/core/flow.py
@@ -827,10 +828,13 @@ class Flow(MSONable):
job_ids = set(self.all_uuids)
hosts = [self.uuid, *self.hosts]
for job in jobs:
- if job.host is not None and job.host != self.uuid:
+ flow_context = _current_flow_context.get()
+
+ if job.host is not None and job.host != self.uuid and (flow_context is not None and flow_context.uuid != job.host):
+ #if job.host is not None and job.host != self.uuid:
raise ValueError(
f"{type(job).__name__} {job.name} ({job.uuid}) already belongs "
f"to another flow."
)
if job.uuid in job_ids:
raise ValueError(
@@ -845,7 +849,8 @@ class Flow(MSONable):
)
job_ids.add(job.uuid)
if job.host != self.uuid:
- job.add_hosts_uuids(hosts)
+ prepend = flow_context is not None and flow_context.uuid == job.host
+ job.add_hosts_uuids(hosts, prepend=prepend)
self._jobs += tuple(jobs)
def remove_jobs(self, indices: int | list[int]):
Do you think this will be compatible in general with your solution?
Thanks for all the efforts!
Just adding a note for @vineetbansal that the Maker class can be found in jobflow.core.maker.Maker. Additional context can be found here.
Just adding a note for @vineetbansal that the
Makerclass can be found injobflow.core.maker.Maker. Additional context can be found here.
Just one additional clarification here: the examples in the link are all Jobs Makers. I did not test them, but I suppose they will work, since basically it is just a convenient way for defining the Job, but it still uses the @job decorator. In the case of Flow Makers, they build and return a Flow object, which is the critial point that generates the error in the code above. The same error happens if you explicitly define a Flow inside a @flow:
@flow
def add_combine(a, b):
j = add(a, b)
f1 = Flow(j, j.output)
return add_single(f1.output, 3)
f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)
This is basically equivalent to the code with the Maker. The Maker in the example of the previous post is just including the creation of the Flow object in its make method.
Small examples of Flow Makers are here or, more involved, in atomate
Thanks @gpetretto - this all makes sense.
I think that your comparison between the Flow and @flow is not entirely fair though..
Fair point - this was a brain-freeze on my end! The f.output reach-in will of course still work, and should be fine for our use case.
Thanks also for the changes you suggest to run_locally to return results. I think they need not go in for right now - eventually quacc integrates with jobflow through jobflow-remote, and if jobflow-remote won't handle those, then we can just document how the final results can be obtained from the response dictionary (as we document it now for the Flow object).
I've added one last minor tweak in the latest commit - I've removed the insistence on the output of the @flow decorated function to be a Job/Flow/OutputReference:
- elif not isinstance(output, OutputReference):
- raise RuntimeError(
- "A @flow decorated function must return a Job or an OutputReference"
Even if the return value of those functions are not returned directly but in a dict (which is fine), these @flows in quacc are often constructed such that they are returning lists of flows. The fact that the list is returned is immaterial - its only the proper instantiation of the Job/Flow objects that we eventually care about. A toy example of this in the tests is:
@flow
def my_flow(a, b):
return [add(a, a), add(b, b)]
f = my_flow(1, 2)
_ = run_locally(f, ensure_success=True)
In these cases, DecoratedFlow will still construct the correct DAG, and jobflow-remote need not return the flow output, so hopefully this will be okay.
For the other failure mode that you found, I've added an xfail test for that - test_flow_returns_flow. I didn't want to make the suggested changes to the code directly, because I don't totally understand the order of these uuids and why they have to be prepended, but also because with this change, some of the current tests that are expected to raise Exceptions no longer do so. For example:
add_job = get_test_job()
Flow([add_job])
with pytest.raises(ValueError, match="already belongs to another flow"):
Flow([add_job])
https://github.com/materialsproject/jobflow/blob/d208e460d7bad3533a6323703276cf75304c0891/tests/core/test_flow.py#L130
Handling this failure mode is not a blocker for us at this stage, but let me know if you'd rather get it in with this PR, and I can investigate and incorporate it as well. However, with this latest commit, I think we have all we need to use this flow decorator in quacc.
Hi @vineetbansal,
thanks for the updates and sorry for the delay. I spent some time on the point of using the flow decorator with Makers, because I think this is a potentially important use case. Let me address the different points separately.
Return objects
In the example that you proposed:
@flow def my_flow(a, b): return [add(a, a), add(b, b)] f = my_flow(1, 2) _ = run_locally(f, ensure_success=True)
Is this the correct approach? Shouldn't the return be return [add(a, a).output, add(b, b).output]? Otherwise, if this is combined with other jobs/flows will lead to failures:
@flow
def add_single_list(a, b):
return [add(a, 2), add(b, 3)]
@flow
def add_combine(a, b):
f1 = add_single_list(a, b)
return add_single_list(f1.output[0], 3)
f = add_combine(1, 2)
result = run_locally(f, ensure_success=True)
Instead it works if the return of the first @flow is return [add(a, 2).output, add(b, 3).output].
This part thus behaves a bit inconsistently:
https://github.com/vineetbansal/jobflow/blob/907944bad66c73d08d721a9072285be21429d935/src/jobflow/core/flow.py#L951-L960
It warns of one potential issue, but then "fixes" it for the user without specifying it. But then for the case above one still gets the warning from here: https://github.com/vineetbansal/jobflow/blob/907944bad66c73d08d721a9072285be21429d935/src/jobflow/core/flow.py#L283
but in this case the problem is not fixed automatically.
Since handling all the possible cases, replacing each Job/Flow with their outputs, would be difficult, I think maybe it would be possible to always leave the output as is and rely on the already existing warning if any Job/Flow is present in any form in the output. However, I do not have a strong opinion on this matter. If the replacing of the output is left as is, I would at least suggest to explicitly mention in the warning message that the output has been replaced.
Combining with makers
Sorry, my previous condition was partially incorrect. I found a better one but this still left inconsitencies in the order of the hosts. The reason for the initial prepend that I proposed was related to how the hosts attribute is defined:
https://github.com/vineetbansal/jobflow/blob/907944bad66c73d08d721a9072285be21429d935/src/jobflow/core/job.py#L273
But I tested with more involved cases and I have verified that it did not handle all of them correctly.
So, I took the liberty of making a somewhat larger change to the procedure in which the jobs are added to the decorated Flow. It seems that this could handle all the previous cases, plus all possible usage of nested Flow and Job objects inside them. I made some tests with some involved combinations and it seems that the execution and hosts order was correct in that case. However, it is difficult to make tests that involve all possible combinations, so a further check of the logic might be needed.
I have pushed it in this branch: https://github.com/gpetretto/jobflow/tree/test_flow_dec
The main change here is that instead of calling add_jobs when the Job is created, calling the function decorated with @flow first collects all the jobs, but adds to the DecoratedFlow only those that have not already been assigned to another host. This should both prevent the error "already belongs to another flow" and provide the correct order in the hosts attribute.
In addition, I handled the case of the Flow Makers, so that the name attribute is taken into account in an example like this:
@dataclass
class TestMaker(Maker):
a: int
name: str = "test_maker"
@flow
def make(self, b):
j = add(self.a, b)
return j.output
As mentioned above, I think these functionalities will be interesting to have, but I don't know if it is strictly necessary. At this point I think it is up to @utf to decide if it is important to cover all the cases (with the suggestion in my branch or any other solution) or to go on with the current implementation in this PR.
@gpetretto - thanks for the detailed feedback and the fixes for the more involved tests. Yes - your fix regarding having the context as a list instead of a single Flow and only adding jobs that have their host as None makes sense, and still allows all our tests in our quacc repo to pass (I tested against this branch).
Thanks also for the fix for the Makers.
For cases like these:
@flow
def my_flow(a, b):
return [add(a, a), add(b, b)]
The intention is simply to construct the Jobs that go inside the @flow, and the [add(a, a), add(b, b)] part (or that anything is returned really) is just an excuse to construct the appropriate Jobs. We're able to handle this (and more complicated cases of @flow functions returning list/dicts etc). in our code, so it doesn't have to go in the jobflow.
For the one minor thing that you're flexible on:
However, I do not have a strong opinion on this matter. If the replacing of the output is left as is, I would at least suggest to explicitly mention in the warning message that the output has been replaced.
Inclusion of the following snippet as I've done in my latest commit to this branch (after merging your changes), and being explicit about the automatic replacement:
if isinstance(output, (jobflow.Job, jobflow.Flow)):
warnings.warn(
f"@flow decorated function '{name}' contains a Flow or"
f"Job as an output. Usually the output should be the output of"
f"a Job or another Flow (e.g. job.output). Replacing the"
f"output of the @flow with the output of the Flow/Job."
f"If this message is unexpected then double check the outputs"
f"of your @flow decorated function.",
stacklevel=2,
)
output = output.output
This will allow our "recipes" to work seamlessly with jobflow in addition to prefect/parsl etc, (because the latter workflow engines make no distinction between a job and its output), without a lot of surgery, so hopefully this is still okay.
If you're satisfied with these changes, then perhaps we prod @utf to merge this? I'd be happy to add any additional docstrings/testcases of course, but otherwise I think we're there now..
Hi @vineetbansal, thanks for checking the changes and including them in the PR.
The intention is simply to construct the
Jobs that go inside the@flow, and the[add(a, a), add(b, b)]part (or that anything is returned really) is just an excuse to construct the appropriateJobs. We're able to handle this (and more complicated cases of@flowfunctions returning list/dicts etc). in our code, so it doesn't have to go in thejobflow.
I just have a doubt about this point. Since the resolution of the references is done inside jobflow, how do you deal with its inconsistencies inside quacc, if that is sitting on top of jobflow? Do you actually change the Flow output there?
If you would rather keep the replacement of the output, I wonder if it may be worth to recursively explore the output object (similarly to what is done here)
and replace all the instances of Job and Flow with their outputs.
If you're satisfied with these changes, then perhaps we prod @utf to merge this? I'd be happy to add any additional docstrings/testcases of course, but otherwise I think we're there now..
I don't have further comments and I think the code is at a good stage. I agree that this can probably be merged, so @utf can review the changes. If everything is fine with him, maybe a small section in the documentation can be added, so that other users can discover the new functionality?
Thanks @gpetretto. I've tweaked certain docstrings now that what goes inside the flow context is a list of Job/Flow objects, and not the decorated job.
For your final doubt:
I just have a doubt about this point. Since the resolution of the references is done inside jobflow, how do you deal with its inconsistencies inside quacc, if that is sitting on top of jobflow? Do you actually change the Flow output there?
In response to my comment:
The intention is simply to construct the Jobs that go inside the @flow, and the [add(a, a), add(b, b)] part (or that anything is returned really) is just an excuse to construct the appropriate Jobs. We're able to handle this (and more complicated cases of @flow functions returning list/dicts etc). in our code, so it doesn't have to go in the jobflow.
It turns out that I was wrong, and though this was a patch initially, we're no longer manipulating any output from @flows in quacc anymore, so I should have just said:
The intention is simply to construct the Jobs that go inside the @flow, and the [add(a, a), add(b, b)] part (or that anything is returned really) is just an excuse to construct the appropriate Jobs.
The returning of a single Job or Flow is indeed a special case we were hoping to support, and the minor accommodation of resolving it to its .output as we've done here is all we need.
@utf - This is ready for your review. Despite the detailed back and forth between me and @gpetretto , you may find that the actual code changes are surprisingly minimal. The tests should cover almost all cases that we're hoping to support with a @flow decorator.