hamilton icon indicating copy to clipboard operation
hamilton copied to clipboard

Caching for async driver

Open nemilentsau opened this issue 11 months ago • 20 comments

I am getting a value error when trying to use caching with an async_driver

Current behavior

dr = await ( async_driver.Builder().with_modules(pdf_processor_async).with_cache().build() )

ValueError Traceback (most recent call last) /home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 6 line 1 ----> 1 dr = await ( 2 async_driver.Builder().with_modules(pdf_processor_async).with_cache().build() 3 )

ValueError: Multiple adapters cannot (currently) implement the same lifecycle method. Sync methods: ['do_node_execute']. Async methods: []

Stack Traces


ValueError Traceback (most recent call last) /home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 6 line 1 ----> 1 dr = await ( 2 async_driver.Builder().with_modules(pdf_processor_async).with_cache().build() 3 )

File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:512, in Builder.build(self) 504 async def build(self): 505 """Builds the async driver. This also initializes it, hence the async definition. 506 If you don't want to use async, you can use build_without_init and call ainit later, 507 but we recommend using this in an asynchronous lifespan management function (E.G. in fastAPI), (...) 510 :return: The fully 511 """ --> 512 dr = self.build_without_init() 513 return await dr.ainit()

File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:496, in Builder.build_without_init(self) 494 result_builders = [adapter for adapter in adapters if isinstance(adapter, base.ResultMixin)] 495 specified_result_builder = base.DictResult() if len(result_builders) == 0 else None --> 496 return AsyncDriver( 497 self.config, 498 *self.modules, 499 adapters=self.adapters, 500 result_builder=specified_result_builder, 501 allow_module_overrides=self._allow_module_overrides, 502 )

File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:237, in AsyncDriver.init(self, config, result_builder, adapters, allow_module_overrides, *modules) 235 # it will be defaulted by the graph adapter 236 result_builder = result_builders[0] if len(result_builders) == 1 else None --> 237 super(AsyncDriver, self).init( 238 config, 239 *modules, 240 adapter=[ 241 # We pass in the async adapters here as this can call node-level hooks 242 # Otherwise we trust the driver/fn graph to call sync adapters 243 AsyncGraphAdapter( 244 result_builder=result_builder, 245 async_lifecycle_adapters=lifecycle_base.LifecycleAdapterSet(*async_adapters), 246 ), 247 # We pass in the sync adapters here as this can call 248 *sync_adapters, 249 *async_adapters, # note async adapters will not be called during synchronous execution -- this is for access later 250 ], 251 allow_module_overrides=allow_module_overrides, 252 ) 253 self.initialized = False

File /opt/conda/lib/python3.10/site-packages/hamilton/driver.py:434, in Driver.init(self, config, adapter, allow_module_overrides, _materializers, _graph_executor, _use_legacy_adapter, *modules) 413 """Constructor: creates a DAG given the configuration & modules to crawl. 414 415 :param config: This is a dictionary of initial data & configuration. (...) 430 431 """ 433 self.driver_run_id = uuid.uuid4() --> 434 adapter = self.normalize_adapter_input(adapter, use_legacy_adapter=_use_legacy_adapter) 435 if adapter.does_hook("pre_do_anything", is_async=False): 436 adapter.call_all_lifecycle_hooks_sync("pre_do_anything")

File /opt/conda/lib/python3.10/site-packages/hamilton/driver.py:341, in Driver.normalize_adapter_input(adapter, use_legacy_adapter) 339 if use_legacy_adapter: 340 adapter.append(base.PandasDataFrameResult()) --> 341 return lifecycle_base.LifecycleAdapterSet(*adapter)

File /opt/conda/lib/python3.10/site-packages/hamilton/lifecycle/base.py:770, in LifecycleAdapterSet.init(self, *adapters) 768 self._adapters = self._uniqify_adapters(adapters) 769 self.sync_hooks, self.async_hooks = self._get_lifecycle_hooks() --> 770 self.sync_methods, self.async_methods = self._get_lifecycle_methods() 771 self.sync_validators = self._get_lifecycle_validators()

File /opt/conda/lib/python3.10/site-packages/hamilton/lifecycle/base.py:838, in LifecycleAdapterSet._get_lifecycle_methods(self) 834 multiple_implementations_async = [ 835 method for method, adapters in async_methods.items() if len(adapters) > 1 836 ] 837 if len(multiple_implementations_sync) > 0 or len(multiple_implementations_async) > 0: --> 838 raise ValueError( 839 f"Multiple adapters cannot (currently) implement the same lifecycle method. " 840 f"Sync methods: {multiple_implementations_sync}. " 841 f"Async methods: {multiple_implementations_async}" 842 ) 843 return ( 844 {method: list(adapters) for method, adapters in sync_methods.items()}, 845 {method: list(adapters) for method, adapters in async_methods.items()}, 846 )

ValueError: Multiple adapters cannot (currently) implement the same lifecycle method. Sync methods: ['do_node_execute']. Async methods: []

Steps to replicate behavior

  1. dr = await ( async_driver.Builder().with_modules(pdf_processor_async).with_cache().build() )

Library & System Information

python=3.10.14 hamilton=1.85.1

nemilentsau avatar Dec 30 '24 03:12 nemilentsau

@nemilentsau thanks! Currently this is a known limitation.

What's your use case / context? There could be something we could help you do in the meantime with some more details.

skrawcz avatar Dec 30 '24 06:12 skrawcz

@skrawcz I am building a pipeline to process documents with complex layouts that involves multiple calls to various LLMs. Certain calls can be run in parallel as they extract complimentary yet independent information from the documents. I'd like to cache those calls as not to re-run them while I am developing. In principle overrides could do the trick, although it can get a bit cumbersome when you have large graph. But also, it seems that materialization doesn't work with async_driver as well. Do you have any means to automate saving nodes outputs that would work for async?

dr = await ( async_driver.Builder().with_modules(pdf_processor_async).with_materializers(*materializers).build())

---------------------------------------------------------------------------ValueError Traceback (most recent call last)/home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 12 line 2 1 dr = await (----> 2 async_driver.Builder().with_modules(pdf_processor_async).with_materializers(*materializers).build() 3 )File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:472, in Builder.with_materializers(self, *materializers) 469 def with_materializers( 470 self, *materializers: typing.Union[ExtractorFactory, MaterializerFactory] 471 ) -> "Builder":--> 472 self._not_supported("with_materializers")File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:462, in Builder._not_supported(self, method_name, additional_message) 461 def _not_supported(self, method_name: str, additional_message: str = ""):--> 462 raise ValueError( 463 f"Builder().{method_name}() is not supported for the async driver. {additional_message}" 464 )ValueError: Builder().with_materializers() is not supported for the async driver.

nemilentsau avatar Dec 30 '24 12:12 nemilentsau

@skrawcz I am building a pipeline to process documents with complex layouts that involves multiple calls to various LLMs. Certain calls can be run in parallel as they extract complimentary yet independent information from the documents. I'd like to cache those calls as not to re-run them while I am developing. In principle overrides could do the trick, although it can get a bit cumbersome when you have large graph.

Ah okay. So for development and you've seen overrides, good; you can just request the outputs you want cached and then feed them back in as overrides on the next cycle... but yes can be a little messy when the graph gets large. Question, is this happening in a web-service? Or? Just curious about the async python requirement - Hamilton has a synchronous python way to do parallelization that does work with caching.

But also, it seems that materialization doesn't work with async_driver as well. Do you have any means to automate saving nodes outputs that would work for async?

dr = await ( async_driver.Builder().with_modules(pdf_processor_async).with_materializers(*materializers).build())

---------------------------------------------------------------------------ValueError Traceback (most recent call last)/home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 12 line 2 1 dr = await (----> 2 async_driver.Builder().with_modules(pdf_processor_async).with_materializers(*materializers).build() 3 )File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:472, in Builder.with_materializers(self, *materializers) 469 def with_materializers( 470 self, *materializers: typing.Union[ExtractorFactory, MaterializerFactory] 471 ) -> "Builder":--> 472 self._not_supported("with_materializers")File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:462, in Builder._not_supported(self, method_name, additional_message) 461 def _not_supported(self, method_name: str, additional_message: str = ""):--> 462 raise ValueError( 463 f"Builder().{method_name}() is not supported for the async driver. {additional_message}" 464 )ValueError: Builder().with_materializers() is not supported for the async driver.

Yes unfortunately the async driver doesn't have full parity with synchronous Hamilton driver (for materializers we just need to implement an async version of it). I think short term you could write a quick async decorator that could check the filesystem for a serialized result or not for that function? E.g. make a decorator that does something similar to this?

skrawcz avatar Dec 30 '24 21:12 skrawcz

Yeah, I've implemented parallelization as well and it works. But I feel that it makes code less clear. Conceptually, I am trying to run some document through text-only and multimodal llms and compare the results. If I parallelize execution of the nodes with async, I am getting a graph that is much easier to read and understand (and hence maintain in the future), compared to using map-reduce type of parallelization. I actually implemented the same graph with Burr, but unfortunately Burr doesn't seem to allow to run several branches in parallel, only conditional branching

Async Screenshot 2024-12-31 at 8 48 29 AM

Map-reduce Screenshot 2024-12-31 at 8 55 25 AM

nemilentsau avatar Dec 31 '24 13:12 nemilentsau

Yeah, I've implemented parallelization as well and it works. But I feel that it makes code less clear. Conceptually, I am trying to run some document through text-only and multimodal llms and compare the results. If I parallelize execution of the nodes with async, I am getting a graph that is much easier to read and understand (and hence maintain in the future), compared to using map-reduce type of parallelization. I actually implemented the same graph with Burr, but unfortunately Burr doesn't seem to allow to run several branches in parallel, only conditional branching

Async Screenshot 2024-12-31 at 8 48 29 AM

Map-reduce Screenshot 2024-12-31 at 8 55 25 AM

Makes sense -- looks like you have a few good approaches. FWIW Burr does allow parallelism -- this is a new but very powerful feature. Works with async too. Some reading:

That said, it's also the "map/reduce" style of parallelism, and doesn't have the multiple branches, but it's pretty good for this kind of stuff. IMO Hamilton is a bit more natural for non-map-reduce in your case, but agreed regarding async there's some work to do with caching/reusing results... @skrawcz's idea for decorators might be a good approach

elijahbenizzy avatar Dec 31 '24 16:12 elijahbenizzy

Agree. I think implementing the async decorator might be a good solution for my use case. I did play with parllelism in burr. It's pretty nice. It's just in my case I would prefer non map-reduce style implementation. Thank you for your help guys

nemilentsau avatar Dec 31 '24 21:12 nemilentsau

Agree. I think implementing the async decorator might be a good solution for my use case. I did play with parllelism in burr. It's pretty nice. It's just in my case I would prefer non map-reduce style implementation. Thank you for your help guys

Yep, makes sense! Let us know how your decorator goes -- feel free to post code samples back that we can share out/provide feedback on if that's helpful!

elijahbenizzy avatar Dec 31 '24 21:12 elijahbenizzy

Agree. I think implementing the async decorator might be a good solution for my use case. I did play with parllelism in burr. It's pretty nice. It's just in my case I would prefer non map-reduce style implementation. Thank you for your help guys

Thanks for raising the issue. Sorry we don't have anything first class yet for async.

Just to summarize your situation:

  1. You want branch level parallelism because your code isn't conducive to map-reduce style modeling (it's not like parallelize this loop and the number of inputs is dynamic).
  2. Hamilton async allows for running parts of a DAG in parallel, while regular Hamilton does not because it's sequential.
  3. Caching would help as you iterate on the DAG -- but it wouldn't be used in a production setting (?).

Your ideal solution would:

  1. Walk the DAG, parallelizing what could be parallelized.
  2. Support finegrained fingerprint caching for fast development.

Is that right?

skrawcz avatar Dec 31 '24 21:12 skrawcz

Yes, when it comes to async.

A bit more complex than that when it comes to caching. I do want to materialize certain assets throughout the pipeline for debugging/analysis purposes. Outputs of all the intermediary llm call, ocr engines outputs, etc, for example

Moving forward, I might want to re-run my pipeline in production from certain steps if I detect extraction/hallucination issues. Or most likely utilize a different extraction scenario based on the type of issue. But I have other issues to handle before I get to implementing these functionalities. At the moment my pipe simply throws an error in such a case and thus I don't need full caching capabilities in production

nemilentsau avatar Dec 31 '24 22:12 nemilentsau

A bit more complex than that when it comes to caching. I do want to materialize certain assets throughout the pipeline for debugging/analysis purposes. Outputs of all the intermediary llm call, ocr engines outputs, etc, for example

Have you tried using the Hamilton UI here? It supports async and logs outputs; we'd love more input here. E.g. maybe we should expose and API to pull back out what was tracked for a run? On the roadmap is to converge caching with what the Hamilton UI exposes...

Another idea:

  1. Implement your own async logger via lifecycle hooks - see async tracker example, e.g. this stores results to disk, and you can use @tag() to mark specific nodes for this or not...
  2. Write a little utility around driver execution, that given a driver (i.e. some graph), determines what's stored via (1), loads it, and then injects it as overrides. Something like
async def execute_wrapper(dr, cache_path, requested_outputs, ...):
     nodes =  dr.list_available_variables(...)
     # find what's in the cache
     cached = _whats_in_the_cache(cache_path, nodes) 
     # inject
     return await dr.execute(requested_outputs, inputs=..., overrides=cached)

skrawcz avatar Dec 31 '24 23:12 skrawcz

@nemilentsau happy new year -- want to give this branch a try?

What I did was make regular Hamilton behave similar to the async driver but using multithreading. It seems to work as expected, along with caching and the Hamilton UI working out of the box... See the example in the PR. I didn't try materializers but they should also just work.

skrawcz avatar Jan 01 '25 06:01 skrawcz

@skrawcz Happy new year! I did run this branch and it works! Thank you so much! This is exactly what I've been looking for

nemilentsau avatar Jan 01 '25 18:01 nemilentsau

Have you tried using the Hamilton UI here? It supports async and logs outputs; we'd love more input here. E.g. maybe we should expose and API to pull back out what was tracked for a run? On the roadmap is to converge caching with what the Hamilton UI exposes...

I haven't tried UI yet. I'll share my feedback after I play with it. But generally speaking having API access to log outputs shall be beneficial. For example, I might want to run a script to do a batch analysis on logs to see what is happening at certain nodes in my pipe

nemilentsau avatar Jan 01 '25 18:01 nemilentsau

@skrawcz Happy new year! I did run this branch and it works! Thank you so much! This is exactly what I've been looking for

Awesome. I can try to get this released this week / early next week. If you wouldn't mind testing it, that would be great please...

skrawcz avatar Jan 02 '25 03:01 skrawcz

@skrawcz Happy new year! I did run this branch and it works! Thank you so much! This is exactly what I've been looking for

Awesome. I can try to get this released this week / early next week. If you wouldn't mind testing it, that would be great please...

Perfect! This would be great. I will test it out then and let you know if I find any issues

nemilentsau avatar Jan 02 '25 10:01 nemilentsau

@skrawcz Happy new year! I did run this branch and it works! Thank you so much! This is exactly what I've been looking for

Awesome. I can try to get this released this week / early next week. If you wouldn't mind testing it, that would be great please...

Perfect! This would be great. I will test it out then and let you know if I find any issues

Would you mind taking pip install sf-hamilton==1.86.0rc0 for a spin? I'll plan to publish tomorrow if I don't hear from you.

skrawcz avatar Jan 02 '25 20:01 skrawcz

Would you mind taking pip install sf-hamilton==1.86.0rc0 for a spin? I'll plan to publish tomorrow if I don't hear from you.

@skrawcz Sure thing. I will check it out

nemilentsau avatar Jan 02 '25 21:01 nemilentsau

Okay merged #1264, but will properly release tomorrow.

skrawcz avatar Jan 04 '25 05:01 skrawcz

Awesome!

nemilentsau avatar Jan 04 '25 15:01 nemilentsau

Okay it's out as 1.86.1

skrawcz avatar Jan 04 '25 17:01 skrawcz