Daft icon indicating copy to clipboard operation
Daft copied to clipboard

Add Kafka Data Source Support for Streaming Data Processing

Open huleilei opened this issue 6 months ago • 4 comments

Is your feature request related to a problem?

Currently, Daft lacks native support for consuming data directly from Apache Kafka, which creates significant limitations for real-time data processing scenarios. Users working with streaming data pipelines are forced to implement workaround solutions like:

  1. Writing Kafka data to intermediate storage (e.g., Parquet files) before loading to Daft
  2. Creating custom Python consumers with kafka-python/confluent-kafka followed by manual DataFrame conversions
  3. Relying on external stream processing engines before feeding data to Daft

These approaches introduce unnecessary latency, complexity and potential data consistency issues in streaming workflows.

Describe the solution you'd like

We propose implementing first-class Kafka support in Daft with:

  • Native Kafka DataSource integration supporting both batch and streaming modes
  • Structured Streaming capabilities including:
    • Offset management (automatic checkpointing)
    • Consumer group support
    • Exactly-once processing semantics
  • Schema inference from:
    • Kafka message headers
    • Embedded schemas (Avro/Protobuf via Schema Registry)
  • Integration with existing DataFrame API:
    df = daft.read_kafka(
      bootstrap_servers="kafka:9092",
      topics=["iot-sensors"],
      consumer_group="daft-processor",
      starting_offsets="earliest"
    ).where(col("sensor_type") == "temperature")
    
    

Describe alternatives you've considered

No response

Additional Context

No response

Would you like to implement a fix?

Yes

huleilei avatar Jun 24 '25 13:06 huleilei

This would be really interesting for sure, but its not something we were looking into at this time. If you (or anyone!) is interested to build this though, I'd recommend looking at how we implement our lance reader (https://github.com/Eventual-Inc/Daft/blob/main/daft/io/_lance.py).

In particular, our python scan operator API (see class LanceDBScanOperator(ScanOperator):) is useful to convert any arbitrary source to a Daft-compatible ScanTasks

srilman avatar Jun 24 '25 19:06 srilman

Okay, thank you. I'll study it.

huleilei avatar Jun 25 '25 13:06 huleilei

Hey @huleilei we have a simple data source interface. Please feel free to reach out directly with any questions. Here are some docs and examples.

  • https://docs.daft.ai/en/stable/api/io/#daft.io.source.DataSource
  • https://github.com/Eventual-Inc/Daft/blob/main/daft/io/_range.py#L143
  • https://gist.github.com/rchowell/d463b6bd81ce7f34cc8f4ab895e74530

rchowell avatar Aug 05 '25 23:08 rchowell

I've tagged this as "Help Wanted" and feel free to give it a go. You can tag me on the review 👍

rchowell avatar Aug 05 '25 23:08 rchowell

Hi @rchowell , I'm trying to make a boarder discussion on this topic.

I think there are some challenges if implement a new Kafka source on current architecture/design.

Unbounded data source

Kafka source or other streaming data sources are unbounded data source, but the current daft datasource is a bounded data source, which means we have to cut down the source stream by fetched target size or num rows.

Actually, the current daft schedule strategy is streaming with morsel granularity, it means we can launch a 7 x 24 long term running application, the kafka data source will generate unbounded RecordBatch with a provided batch size, similar with read_generator. But if data source is unbounded, we cannot get the metadata during build the logical plan, e.g. total num rows, total size bytes, it might impact the downstream optimization which depends on these metadata, maybe we can add a new source type(unbounded/bounded) for downstream optimization. And also, some APIs might not works, e.g. count_rows() cannot get the final result.

Job failover

I'm not familiar with the current failover strategy, for example, assume a pipeline csv source -> transform -> xxx sink, if the transform node or sink node failed and trigger the retry strategy, is there any case that require read the source again? if it is, the bounded data source might re-read the whole data, but regarding unbounded data source, it have to move the consume offset back to some position and fetch data again, which means we should have some mechanisms need to be build to store state/offset, and commit offset.

Another thing is the semantic of streaming processing, e.g. exactly-once, at-least-once, at-most-once. IMO, it's hard to build a powerful streaming processing engine like Flink, and it's not the goal of Daft, so at-least-once is better for daft.

so take a brief summary,

  • if we treat the kafka source as a simple bounded data source, the data kafka source required the user given a target num_rows, e.g. daft.read_kafka(target_num_rows=5000) and need a hook strategy to commit consume offset instead of auto-commit, otherwise the job failed and the re-run job doesn't consume the data which haven't been processed succeed. And even the job succeed, the user have to start job regularly to continue consume the source data.
  • If we launch a 7 * 24 job to read a unbounded kafka data source, we also need a mechanism to manage the consume offset, and is there any API can not be used?

stayrascal avatar Sep 15 '25 03:09 stayrascal

I'm not familiar with the current failover strategy, for example, assume a pipeline csv source -> transform -> xxx sink, if the transform node or sink node failed and trigger the retry strategy, is there any case that require read the source again? if it is, the bounded data source might re-read the whole data, but regarding unbounded data source, it have to move the consume offset back to some position and fetch data again, which means we should have some mechanisms need to be build to store state/offset, and commit offset.

Another thing is the semantic of streaming processing, e.g. exactly-once, at-least-once, at-most-once. IMO, it's hard to build a powerful streaming processing engine like Flink, and it's not the goal of Daft, so at-least-once is better for daft.

Good question! You're right that Daft doesn't aim to be a Flink-style streaming engine. It's still in the design procress, but our current thoughts for error handling is row-level: Daft tracks which rows or partitions hit errors, and users control whether to retry or drop those. That way, bad data/transient transformation or UDF errors/sink failures don’t force a full source re-read unless the user chooses to restart the entire pipeline. We plan to return the rows that had errors as a separate "Error" dataframe to the user that they can use for retries or for logging purposes.

On Kafka:

I like your bounded-mode suggestion (read_kafka(target_num_rows=...) with an explicit offset commit hook). I think this is possible to implement with our current architecture, again with the important distinction that we treat Kafka as a bounded source.

For unbounded sources, you’re right that some APIs just won’t make sense... Anything that needs global knowledge of the dataset (count_rows(), collect(), sorts, aggs) can’t return a final result.

But more fundamentally, our current architecture can't support unbounded sources. The reason being that our logical and physical planners currently require all scan tasks to be created prior to execution. See:

pub trait ScanOperator: Send + Sync + Debug {
    fn to_scan_tasks(&self, pushdowns: Pushdowns) -> DaftResult<Vec<ScanTaskLikeRef>>;
}

This isn't actually too hard to solve, and you're right, I think it will involve creating the concept of an unbounded source in our plans. That's probably step one.

These are still big unknowns, and we need to think more about unbounded sources, offset management, retries, and which APIs to allow or disable in streaming contexts. It would be great to involve you in that discussion as we shape what these look like in Daft!

desmondcheongzx avatar Sep 22 '25 06:09 desmondcheongzx