Multiple upstream source ingestion support on Pinot
Pinot nowadays only supports realtime table ingested from one single source stream, e.g. one Kafka topic from a Kafka cluster. And inside the table manager, the internal segment partition concept is hard coupled with the stream's partition. For example, if Kafka topic has 8 partitions, then Pinot table segments are also partitioned by 8, and each segment is consuming from the Kafka topic partition with the exact same partition id. This is a workable and simple design which could fit most of straightforward use cases. But it also imposes the flexibilities on ingestions. In reality, users may produce data of same subject to different Kafka topics and ingest to a single Pinot table (with same Schema) to do centralized analysis. There was one Pinot open issue asking for the feature https://github.com/apache/pinot/issues/5647. Other OLAP technologies, e.g. Clickhouse and Druid, are developing or have developed similar features like https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka and https://github.com/apache/druid/pull/14424. Based on the current Pinot architecture, it is possible to add the feature with following features and constraints:
- Ingests from multiple stream topics and formats a same Pinot table.
- Different stream topics could be with different number of partitions, and even different data format (json, avro, protobuf, etc) meaning Pinot table should be able to use different decoder to decode data from different tables accordingly.
- Same transformation and indexing strategy is applied to the decoded data from different topics. This limitation is due to the TableConfig structure we are defining, could be resolved if some major TableConfig refactor done. Even with this limitation, transformation could be easily done by using existing dynamic transformation features like SchemaConformingTransformer introduced in https://github.com/apache/pinot/pull/12788.
- Starts from LLC.
- Table schema evolution, stream partition number expansion and auto catch-up, instance assignment strategies need to have same support without regressions.
- In short term, we do not consider adding or removing topics from the stream topics list.
The implementation strategy should consider decoupling the partition concept between stream and Pinot. Theoretically, stream and OLAP db are two independent infra and storages. They should have their own partition strategies instead of having hard dependencies on the other. Pinot segment partition is only directly used for segment management. The data consumption of each segment partition should not be hardly coupled with stream's partition. The abstraction layer could be built in between to manage the mapping. With this feature, it could also enhances ingestion performance and solves the issue like https://github.com/apache/pinot/issues/13319 to have multiple segment partitions consuming from same topic partition.
Duplicate of https://github.com/apache/pinot/issues/8806 ? Duplicate of https://github.com/apache/pinot/issues/5647 (as you mention in description)
+100 on this.. it's coming up multiple times.
Do you have any initial thoughts?
+100 on this.. it's coming up multiple times.
Do you have any initial thoughts?
I've already had a working prototyping, would contribute soon.
At a high level, we could treat each topic-partition pair as a single consuming partition in pinot.
We will need some more abstractions to be able to support different encoding or other changes amongst the topics.
I will assume that the table has only one schema. So, we need some set of intersecting columns in each topic that map into the table's columns.
Add a design doc
A few years ago, I had a working POC for multi-topic consumption. I also wrote a design doc for it, but we ended up not using it in linkedin, so I did not publish it. The idea is similar to what's proposed here, but IMO it's a bit simpler. Please take a look, and let me know what you think: https://docs.google.com/document/d/1gz3oQLdIfL_Iniu0XvIjKw5We_mo4LlroEDysoBhpHw/edit#heading=h.kvi1qo69zoqe
+1 on this!
as described:
The implementation strategy should consider decoupling the partition concept between stream and Pinot. Theoretically, stream and OLAP db are two independent infra and storages. They should have their own partition strategies instead of having hard dependencies on the other.
especially this decoupling is also important since there is not only stream ingestion solely from Kafka but also from other systems like Pulsar, Kinesis...
A few years ago, I had a working POC for multi-topic consumption. I also wrote a design doc for it, but we ended up not using it in linkedin, so I did not publish it. The idea is similar to what's proposed here, but IMO it's a bit simpler. Please take a look, and let me know what you think: https://docs.google.com/document/d/1gz3oQLdIfL_Iniu0XvIjKw5We_mo4LlroEDysoBhpHw/edit#heading=h.kvi1qo69zoqe
@sajjad-moradi which version of pinot are you using? because I'm using docker to pull the latest version from apachepinot/pinot but I am getting this error every time I run the AddTable command.
2024/10/16 07:04:33.529 INFO [AddTableCommand] [main] {"code":400,"error":"Invalid TableConfigs: transportSchedule. Only 1 stream is supported in REALTIME table"}