metaflow icon indicating copy to clipboard operation
metaflow copied to clipboard

Implement an S3 sensor

Open Atharva-Phatak opened this issue 6 months ago • 4 comments

Problem Statement

Metaflow currently lacks native support for S3-based event sensors. In many real-world ML and data engineering workflows, triggering a flow in response to an S3 event (such as a new file upload) is a fundamental need.

Currently, users must stitch together external systems like:

AWS Lambda + EventBridge

Argo Events with custom S3 event sources

Manual cron jobs or polling loops

This adds infrastructure complexity, external dependencies, and friction in building reactive pipelines.

Proposed Solution

✅ Proposed Solution

Introduce a lightweight pattern for "sensor-like" polling flows that:

Can run on a cadence via @schedule (e.g., every 10 minutes).

Check a data source (e.g., S3, a database, etc.).

Emit an event or trigger another flow when conditions are met (e.g., a new file appears).

This doesn't require a full sensor abstraction, but can be encouraged via:

A documented pattern or utility base class like PollingSensorFlowSpec.

Best practices for storing state (last_seen_key) using artifacts or S3.

Built-in support for launching flows programmatically via metaflow.client.run() from within Metaflow itself.

Example

@schedule(minutes=10)
class S3PollingFlow(FlowSpec):
    @step
    def start(self):
        # Check for new S3 files, compare with last seen state
        # If found, trigger downstream flow
        self.next(self.end)

    @step
    def end(self):
        pass

This approach is conceptually similar to how sensors are implemented in other systems like Airflow (via polling).

I would love to contribute this feature 🎧

Atharva-Phatak avatar Jun 27 '25 11:06 Atharva-Phatak

Hi, we implemented this at Outerbounds (see here), which may offer some inspiration. Everything should work on open-source Metaflow with no/minimal change, assuming an Argo Workflows enabled Metaflow deployment.

Happy to review or help you implement any modifications needed.

emattia avatar Jun 27 '25 14:06 emattia

@emattia That sounds awesome, I will try to go through the code and create a PR. Just one last question

It is okay to have this functionality in OSS version of metaflow right ? Since ik outerbounds is closed source just clarifying 😄

Atharva-Phatak avatar Jun 27 '25 21:06 Atharva-Phatak

@Atharva-Phatak yes, it should work seamlessly with an open-source deployment. The Outerbounds-specific piece is that 1) we reference "integrations" in that repo, a closed source feature, but from this repo's perspective AWS Secrets Manager will do the job, and 2) we assume argo-workflows for event triggering, so step functions wouldn't really work for deploying and triggering flows like this.

emattia avatar Jun 27 '25 21:06 emattia

@emattia I have started to work on initial PR here : https://github.com/Netflix/metaflow/pull/2475. Will update you once implemented.

Atharva-Phatak avatar Jul 03 '25 21:07 Atharva-Phatak