ksql
ksql copied to clipboard
Stream-Stream self-join requires duplicate topic
Source events in topic atm_txns_gess
ksql> DESCRIBE EXTENDED ATM_TXNS_GESS;
Name : ATM_TXNS_GESS
Type : STREAM
Key field :
Key format : STRING
Timestamp field : TIMESTAMP
Value format : JSON
Kafka topic : atm_txns_gess (partitions: 1, replication: 1)
Field | Type
-------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCOUNT_ID | VARCHAR(STRING)
ATM | VARCHAR(STRING)
LOCATION | STRUCT<LON DOUBLE, LAT DOUBLE>
AMOUNT | INTEGER
TIMESTAMP | VARCHAR(STRING)
TRANSACTION_ID | VARCHAR(STRING)
-------------------------------------------------
Local runtime statistics
------------------------
consumer-messages-per-sec: 2.12 consumer-total-message-bytes: 2961608 consumer-total-messages: 13708 last-message: 10/9/18 2:56:30 PM UTC
consumer-failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic atm_txns_gess)
Self-join :
ksql> SELECT A.ATM, B.ATM FROM ATM_TXNS_GESS A INNER JOIN ATM_TXNS_GESS B WITHIN (0 MINUTES, 10 MINUTES) ON A.ACCOUNT_ID=B.ACCOUNT_ID;
Invalid topology: Topic atm_txns_gess has already been registered by another source.
ksql>
This error (Invalid topology: Topic atm_txns_gess has already been registered by another source.) forces a workaround whereby a second topic is built from the first, and then used for the join:
ksql> CREATE STREAM ATM_TXNS_GESS_02 WITH (PARTITIONS=1) AS SELECT * FROM ATM_TXNS_GESS;
ksql> SELECT A.ATM, B.ATM FROM ATM_TXNS_GESS A INNER JOIN ATM_TXNS_GESS_02 B WITHIN (0 MINUTES, 10 MINUTES) ON A.ACCOUNT_ID=B.ACCOUNT_ID;
ATM : 3409740389 | ATM : 3409740389
ATM : 3663514950 | ATM : 3663514950
Barclays Bank PLC | Barclays Bank PLC
Lloyds | Lloyds
Morrisons | Morrisons
Santander | Santander
ATM : 51401467 | ATM : 51401467
Can KSQL permit self-joins on streams sourced from the same topic?
Seems to be an Kafka Streams limitation? Having said this, I think KSQL could build a workaround by duplicating the data into a second topic under the hood. However, this would be wasteful. The better fix would be to add a self-join to Kafka Streams IMHO. Do you mind creating an AK Jira for this?
+1 for fixing in KStreams and avoiding duplicating the topic data.
I've raised https://issues.apache.org/jira/browse/KAFKA-7497
@mjsax KSQL could just add another child to the source Node. Highly recommend not fixing in streams but have KSQL reuse the Kstreamsource instead of re-registering. How would that fix look anyways? if 2 sources register with overlapping wildcards?
@rmoff Thanks for creating the AK ticket.
@Kaiserchen I left a comment on the AK ticket (partly copied below) that explains why I think KSQL cannot solve this issue itself:
Once could express a self-joining like this in current Streams API:
KStream stream = builder.stream(...);stream.join(stream, ...);However, the execution of the join would not be efficient, as two state stores with two changelog topics would be created (both containing the exact same data). Also, and this seems to be the most severs issue, each record would join with itself, what is actually not desired...
https://issues.apache.org/jira/browse/KAFKA-6687 might be useful for this case? We still should address https://issues.apache.org/jira/browse/KAFKA-7497 at some point, but maybe KAFKA-6687 is good enough to just unlock this feature while KAFKA-7497 would be an internal optimization to compute the self-join more efficient?
Given my above comment:
Also, and this seems to be the most severs issue, each record would join with itself, what is actually not desired...
I think this is actually not correct... At least if we consider self-joins in standard SQL, a record would join with itself. We should follow the same semantics, and thus, it's possible (even not efficient) today with Kafka Stream to do a self-join.
Thus, if we don't worry too much about efficiency, ksqlDB could implement it.
Hello.
https://issues.apache.org/jira/browse/KAFKA-7497 (https://issues.apache.org/jira/browse/KAFKA-14209) had been fixed. Will it be supported then in ksql?