Do we want to support iterators for data loading?
Issue by skrawcz
Monday Feb 07, 2022 at 17:25 GMT
Originally opened as https://github.com/stitchfix/hamilton/issues/68
What?
If we want to chunk over data, a natural way to do that is via an iterator.
Example: Enable "input" functions to be iterators
def my_loading_funct(...) -> Iterator[pd.Series]:
...
yield some_chunk
This is fraught with some edge cases. But could be a more natural way to chunk over large data sets? This perhaps requires a new driver -- as we'd want some next() type semantic logic on the output of execute...
Originally posted by @skrawcz in https://github.com/stitchfix/hamilton/issues/43#issuecomment-1030961335
Things to think through whether this is something useful to have:
- Where would we allow this? Only on "input" nodes?
- How would we exercise them in a deterministic fashion? i.e. does execute() care? and we iterate over them until they're exhausted? Or does
execute()only do one iteration? - How do we coordinate multiple inputs that are iterators? What if they're of different lengths?
- How would we ensure people don't create a mess that's hard to debug?
- Would this work for the distributed graph adapters?
Comment by elijahbenizzy
Saturday Oct 29, 2022 at 17:31 GMT
Yeah I'd need to see more of a workflow/motivating example for this TBH. E.G. a graphadapter that chunks up data and iterates in a streaming sense could be high-value as well...
OK, inspired by @jdonaldson's hamilton-like framework, I'm curious what could happen if we use chunking. Going to walk through a few different UIs I've been mulling over that would fit into the way we currently do things.
some use-cases:
- Run the whole DAG on a stream of data -- E.G. run it within a kafka stream
- Mini-batch training (everything streaming until the end, when we combine it all together to form a model)
- Training until resolution (E.G. train until a loss hits a certain value or a number of iterations has been hit)
- Running over multi-configurations (another way of doing subdags, actually, but this could be dynamic/runtime generated)
- Generating the same data from multiple files and joining them together
requirements/nice-to-have
- Superset of Hamilton/what can be run now
- Ability to run a node only once, and run a node every time a batch is executed (see #90)
- Does not preclude intelligent caching -- E.G. it gets halfway through and dies, we should be able to store the state . Hamilton does not store everything (as we currently do now) -- ideally we'd just store the outputs in memory
Ideas
Chunking with type annotations
The idea here is that we use typing.Annotated to specify a function "shape
def training_files() -> List[str]:
return [file_1, file_2, file_3]
def training_batch(training_files: List[str]) -> Chunked[pd.DataFrame]:
"""Releases the dataframe in batches for training."""
for mini_training_file in training_files:
yield pd.read_csv(mini_training_file)
def bad_data_filtered(training_batch: pd.DataFrame) -> pd.DataFrame:
"""Map operation that takes in a dataframe and outputs a dataframe. This is run each time the training_batch function is completed. We know that to be the case, as it is a pure map function."""
return training_batch.filter(_filter_fn(training_batch))
def X(training_batch: pd.DataFrame) -> pd.DataFrame:
return training_batch[FEATURE_COLUMNS]
def Y(training_batch: pd.DataFrame) -> pd.Series:
return training_batch[TARGET_COLUMNS]
def model(X: Chunked[pd.DataFrame], Y: Chunked[pd.Series]) -> Model:
"""Aggregate operation. This is run one time per each yield from before, but the final model is returned"""
model = Model()
for x, y in zip(X,Y):
model.minibatch_train(x,y)
Say we have three batches -- what's the order of operations?
training_filesis executed once- the subdag consisting of
bad_data_filtered -> X,Yis executed four times (one for each input) - the subdag consisting of
modelis executed once -- looping over the inputs
So, the rules are:
- Generator fns Anything with
Chunked[...]as the return type is treated as a generator - Map fns Anything without
Chunkedas the return type or the input type is... a. Called once per yield if aChunkedis upstream b. Called once if noChunkedis upstream - Aggregator fns Anything with
Chunkedas the input type butChunkednot as the output type is run once using the upstream generator - custom maps Anything with
Chunkedas both is a little weird -- its equivalent to (1.a), but has the potential to compress/extend the iteration. TBD if this is supported... E.G. batching over batches.
There's some interesting implications/extensions.
Hydrating from cache
In this case, model isn't saved (or rather, it is, but it can't be used). You could imagine something like this:
def model(X: Chunked[pd.DataFrame], Y: Chunked[pd.Series], model: Model=None) -> Model:
"""Aggregate operation. This is run using the generator from before, but the final model is returned"""
model = Model() if model is None else model
for x, y in zip(X,Y):
model.minibatch_train(x,y)
In this case, it accepts a previous model from the DAG run, or you can seed it from a certain result. Note, however, that this would have to come with a truncation of the upstream data. So, what this implies is that you could rerun from where you left off, so long as you add the ability to self-hydrate.
Parallelization
Any _map_ function could easily be parallelized, which could provide natural node fusion (fuse all the nodes in the map operation). This is just plain-old map-reduce.
Logging functions
We could potentially break the aggregators into two and have one map it. There's likely a much cleaner way to write this, but...
def model_training(X: Chunked[pd.DataFrame], Y: Chunked[pd.Series]) -> Tuple[Model, TrainingMetrics]:
"""Aggregate operation broken up. Note that the model is just returned on the last yield."""
model = Model() if model is None else model
for x, y in zip(X,Y):
metrics = model.minibatch_train(x,y)
yield [model, metrics]
# Just returns the ID of where it was logged
# This is a side-effecty function and should likely be a materialized result
def logger(model_training: Chunked[Tuple[Model, TrainingMetrics]]) -> str:
run_instance = generate_run_intance_from_logger()
for _, metrics in model_training:
run_instance.log(metrics)
return run_instance.id
def model(model_training: Chunked[Tuple[Model, TrainingMetrics]]) -> Model:
*_, (final_model, __) = model_training
return final_model
Note that this implementation likely involves intelligent caching, as two outputs depend on it.
Nested Chunking
Yeah, this is too messy to allow for now, so I'm going to go ahead and say its banned. Could be implemented if we allow the Chunk->Chunk shape of a function.
OK, latest API proposal -- feel free to react/comment. Planning to scope/implement shortly. Note these are just adaptations of the above.
Two primary use-cases we want to handle:
- Map over an arbitrary number of files, process, then join (either in parallel or in series)
- Minibatch training -- loading a bunch of data, doing feature engineering, updating the model's state (likely in series, possibly in parallel)
API
We are using type-hints to handle this -- they will be loose wrappers over generators. Three generic types, inspired (in part) by classic map-reduce:
Sequential, E.G.
def files_to_process(dir: str) -> Sequential[str]:
"""Anything downstream of this that depends on `files_to_process` as an `str`
will run in sequence, until a Collect function."""
for f in list_dir(dir):
if f.endswith('.csv'):
yield f
Parallel (likely not in v0), E.G.
def files_to_process(dir: str) -> Parallel[str]:
"""Anything downstream of this that depends on `files_to_process` as an `str`
can run in parallel, until a Collect function. Note that this means that this entire generator might be run greedily, and the
results are dished out"""
for f in list_dir(dir):
if f.endswith('.csv'):
yield f
Collect, E.G.
# setting up basic map reduce example
def file_loaded(files_to_process: str) -> str:
with open(files_to_process, 'r') as f:
return f.read()
def file_counted(file_loaded: str) -> dict:
return Counter(tokenize(file_loaded))
def word_counts(file_counted: Collect[Counter]) -> dict:
"""Joins all the word counts"""
full_counter = Counter()
for counter in file_counted:
for word, counter in counter.items():
full_counter[word] += count
return full_counter
The rules are as follows:
- These are all valid edges:
Sequential[T_1] -> T_1Parallel[T_1] -> T_1Parallel[T_1] -> Collect[T_1](degenerate case)Sequential[T_1] -> Collect[T_1](degenerate case)Sequential[T_1] -> [T_1] -> ... [T_n] -> Collect[T_n](rerun a whole group of nodes with different inputs)Parallel[T_1] -> [T_1] -> ... [T_n] -> Collect[T_n](rerun a whole group of nodes with different inputs)
- Nothing "mapped" by
ParallelorSequentialcan be left dangling (it must be "reduced" withCollect) - Multiple nodes can subscribe to a generator (E.G. two functions can yield) -- they will get the same result, and it will be treated like a spliterator.
- diamond patterns will follow from (3) -- effectively a join within a subdag
Edge cases
- See above for multiple listeners/diamond patterns
- No commitments on nested generators yet -- TBD on complexity
- Joining two generators (multiple
Collecttypes on different ones) should probably not be allowed for simplicity sake. Rather, the operation over the input types should probably be pushed upstream to the generator.
Implementation
TBD for now -- will scope out and report. First implementation will likely break some of the edge cases above. Some requirements though:
- We should have a path for caching mid-sequential (E.G. idempotent DAGs)
- We should have a path for debugging "subdags"
- We should reduce the memory used so we don't store anything we don't need -- otherwise it'll be messy (this requires changing core Hamilton)
- We should know/track that different subdags are that -- both in the viz (dotted lines/groups), as well as in some metadata that's reported afterwards.
More edge cases:
- Something outside a
Sequential/Collectgroup depends on something inside it (this is actually not an edge case, this is dangling group) - Two collects on the same
Sequential-- this should probably work, but we can ban it for now
We have some of this with Parallelizable and Collect. So closing for now as partially implemented.