KAFKA-10721: Rewrite topology to allow for overlapping unequal topic subscriptions
Background
This PR addresses the limitation where StreamsBuilder throws a TopologyException when multiple source nodes subscribe to overlapping but not identical sets of topics.
For example, the following code previously failed:
builder.stream("topicA");
builder.stream(Arrays.asList("topicA", "topicB"));
Error: Two source nodes are subscribed to overlapping but not equal input topics
Changes
To support this, I have implemented a mechanism to flatten multi-topic source nodes into individual single-topic source nodes before the topology optimization phase.
-
InternalStreamsBuilder#flattenSourceNodesAndRearrange(...)- Iterates through the root children to find
StreamSourceNodessubscribed to multiple topics. - Splits these nodes into multiple single-topic source nodes.
- Preserves the original
buildPriorityand child nodes (downstream processors) for each split node to ensure consistent topology construction.
- Iterates through the root children to find
-
InternalStreamsBuilder#mergeDuplicateSourceNodes(...)- Removed the check that threw
TopologyExceptionfor overlapping but unequal topics. - Since nodes are now flattened to single topics first, the existing merging logic simply coalesces nodes subscribed to the exact same topic (e.g., the "topicA" node from the first stream and the "topicA" node derived from the second stream).
- Removed the check that threw
Note
I found the bug or non-determistic behavior. More details are available in here (https://issues.apache.org/jira/browse/KAFKA-19923)
That issue has existed independently of this PR and has been present for quite some time. In addition, once the topology actually starts running, it results in a ClassCastException, which immediately terminates the Kafka Streams application. Because of this fail-fast behavior, the bug is unlikely to affect any real-world, correctly configured Kafka Streams deployments.
While this PR does relax some of the previous constraints, I believe it remains highly unlikely for users to subscribe to the same topic with different ConsumedInternal configurations. Therefore, I do not expect this change to introduce any practical risk.
That said, it may still be helpful to document that all source nodes reading from the same topic should use semantically identical ConsumedInternal configurations. Here, “identical” means that they should not differ in their TimestampExtractor or their key and value Serdes.
I hope this clarification is helpful.
Result
- Close : https://issues.apache.org/jira/browse/KAFKA-10721
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.