kafka
kafka copied to clipboard
KAFKA-14209 : Rewrite self joins to use single state store 2/3
KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join
It only applies to Stream-Stream joins and not n-way self-joins.
This is an inner-join topology (without the optimization)
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [topic2])
--> KSTREAM-WINDOWED-0000000001, KSTREAM-WINDOWED-0000000002
Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-JOINTHIS-0000000003
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINOTHER-0000000004-store])
--> KSTREAM-JOINOTHER-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-JOINOTHER-0000000004 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-WINDOWED-0000000002
Processor: KSTREAM-JOINTHIS-0000000003 (stores: [KSTREAM-JOINOTHER-0000000004-store])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-WINDOWED-0000000001
Processor: KSTREAM-MERGE-0000000005 (stores: [])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-JOINTHIS-0000000003, KSTREAM-JOINOTHER-0000000004
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-MERGE-0000000005
and this is the optimized self-join topology
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [topic1])
--> KSTREAM-WINDOWED-0000000001
Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MERGE-0000000005 (stores: [KSTREAM-JOINTHIS-0000000003-store])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-WINDOWED-0000000001
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-MERGE-0000000005
Testing: Unit tests (Integration and upgrade test in follow up PR)
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
I opened the follow-up ticket for improving runtime by doing a single-loop https://issues.apache.org/jira/browse/KAFKA-14251.
These test failures are unrelated:
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplication__/)
[Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testCreatePartitions_String__quorum_kraft/)
[Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.testAbortTransactionTimeout(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/kafka.api/TransactionsTest/Build___JDK_8_and_Scala_2_12___testAbortTransactionTimeout_String__quorum_kraft_2/)
[Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/Build___JDK_17_and_Scala_2_13____3__tlsProtocol_TLSv1_3__useInlinePem_false/)