kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-14209 : Rewrite self joins to use single state store 2/3

Open vpapavas opened this issue 2 years ago • 1 comments

KIP can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join

It only applies to Stream-Stream joins and not n-way self-joins.

This is an inner-join topology (without the optimization)

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [topic2])
      --> KSTREAM-WINDOWED-0000000001, KSTREAM-WINDOWED-0000000002
    Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-JOINTHIS-0000000003
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-WINDOWED-0000000002 (stores: [KSTREAM-JOINOTHER-0000000004-store])
      --> KSTREAM-JOINOTHER-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-JOINOTHER-0000000004 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-WINDOWED-0000000002
    Processor: KSTREAM-JOINTHIS-0000000003 (stores: [KSTREAM-JOINOTHER-0000000004-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-MERGE-0000000005 (stores: [])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-JOINTHIS-0000000003, KSTREAM-JOINOTHER-0000000004
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000005

inner-join

and this is the optimized self-join topology

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [topic1])
      --> KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-WINDOWED-0000000001 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-MERGE-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MERGE-0000000005 (stores: [KSTREAM-JOINTHIS-0000000003-store])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-WINDOWED-0000000001
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-MERGE-0000000005

self-join

Testing: Unit tests (Integration and upgrade test in follow up PR)

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

vpapavas avatar Sep 15 '22 08:09 vpapavas

I opened the follow-up ticket for improving runtime by doing a single-loop https://issues.apache.org/jira/browse/KAFKA-14251.

vpapavas avatar Sep 22 '22 14:09 vpapavas

These test failures are unrelated:

[Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplication__/)
    [Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testCreatePartitions_String__quorum_kraft/)
    [Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.testAbortTransactionTimeout(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/kafka.api/TransactionsTest/Build___JDK_8_and_Scala_2_12___testAbortTransactionTimeout_String__quorum_kraft_2/)
    [Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12644/7/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/Build___JDK_17_and_Scala_2_13____3__tlsProtocol_TLSv1_3__useInlinePem_false/)

vvcephei avatar Oct 05 '22 12:10 vvcephei