ksql
ksql copied to clipboard
Stream-Table Inner Join Producing Duplicates record
Hi Team,
We are using confluent kafka version 6.1.1
We have following setup:
- Source topic(st): This topic consist of data published from third party kafka producer. We have a retention for this topic is 20min.
- Metadata topic(mt): This topic consist of data published from oracle database. Retention for this topic is 7 days.
- Raw export topic(et): This topic consist of data after joining in KSQLDB from st and mt. Retention for this topic is 6 hour.
Data is in avro format from source topic and metadata topic is in json format.
Process follow in ksqldb:
-
First we have created stream from source topic by following query: CREATE STREAM sdb_data7 WITH (KAFKA_TOPIC = 'st', VALUE_FORMAT='AVRO');
-
Secondly we have created table in ksqldb on top of our metadata topic by following query: CREATE TABLE device_data (attribute1 VARCHAR, hostName VARCHAR PRIMARY KEY, attribute3 VARCHAR, attribute4 BIGINT,atribute5 VARCHAR) WITH (KAFKA_TOPIC = 'mt', VALUE_FORMAT='JSON');
-
After creating both we have applied inner join on both by following query :
CREATE STREAM enriched7 WITH (kafka_topic='rawexport-topic7') AS SELECT s.att1 as attribute1 , s.deviceName as DEVICENAME, s.att3 as attribute3, s.att4 as attribute4, s.att5 as attribute5, s.att6 as attribute6, s.att7 as attribute7, s.att8 as attribute8, s.att9 as attribute9, s.att10 as attribute10, n.att1 as attribute1, n.att2 as attribute2, n.hostname as HOSTNAME FROM sdb_data7 s INNER JOIN device_data n ON s.deviceName = n.hostname WHERE n.att1 is not null;
Problem Statement As we have seen that source stream and metadata table doesn't contain any duplicates messages but after join we are seeing duplicates messages in kafka topic.
- Workaround till now:*
- Till now we will make sure that source topic and metadata topic doesn't contain any duplicate messages.
- We have also modify and put config in ksqldb as "processing.guarantee": "exactly_once_beta" for deduplication.
But we are still seeing duplicate messages in raw export topic.
Can anyone please suggest more workaround this or any help in this scenario will much appreciate.
Thanks in Advance!!
Ronak Kabra
Did you verify the result topic using a producer with isolation level "read_committed"?
From what you describe, the result topic should contain at most one (enriched) record per sdb_data7 record.
Hi @mjsax ,
I have put consumer isolation level as "read_committed" in ksqldb configs. After putting into result topic , I have passing this topic to mirror maker to replicate to downstream layer.
Above configuration also don't help to remove duplicates in result topic.
Still can you provide me certain inputs which will help to verify result topic with producer isolation level, will certainly check.
Thanks, Ronak Kabra