ksql
ksql copied to clipboard
Multiple Entries for same primary key in CTAS query
Hello. We have a question regarding windowed aggregations on tables, specifically with CREATE TABLE AS SELECT
queries. We have the following pipeline for processing our data.
graph TD;
Raw_Stream_From_Kafka_1-->Joined_Stream_with_Tumbling_Window;
Raw_Stream_From_Kafka_2-->Joined_Stream_with_Tumbling_Window;
Joined_Stream_with_Tumbling_Window-->Statistics_Table_with_Non-Windowed_Aggregation;
Joined_Stream_with_Tumbling_Window-->Enrichment_Table_with_Windowed_Aggregation;
Statistics_Table_with_Non-Windowed_Aggregation-->Enrichment_Table_with_Windowed_Aggregation;
Enrichment table will be used for checking several conditions using CASE
clauses. We would like to use some non-windowed aggregations with windowed aggregations, but apparently that is not possible in the same SQL clause. Hence, we are creating a new table for non-windowed aggregations as an extra step.
We would like to use these persistent, non-windowed statistics with windowed statistics, (such as comparing the last 5 minute average with standard deviation of entire sensor value history values etc.) and check some conditions and apply enrichment to our data. If there is a status change, our consumers from Kafka (or some service that checks ksqlDB with push queries).
As a solution, we thought creating an enrichment table with windowed aggregations would solve our issue, since values in a table are updated rather than appending it end to a stream. In final, whenever we check the Enrichment_Table_with_Windowed_Aggregation
we will have the current updated status regarding our conditions.
But we applied it to our systems, we came across an interesting behavior of ksqlDB. I should also mention, we saw this behavior in "Mastering Kafka Streams and ksqlDB" as well. In documentations, we did not encounter such examples. Question is given below.
Multiple Entries for same primary key in CTAS query
When we create a table with windowed aggregations, there are multiple entries for same PRIMARY KEY
. These are different than each other regarding window's starting and ending times, but still if we need the last value, we can not access it with just a simple SELECT
query. The main structure of our table is given below.
ksql> describe test_table;
Name : TEST_TABLE
Field | Type
-----------------------------------------------------------------
field_1 | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
field_2 | VARCHAR(STRING)
-----------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
When we run a simple SELECT
query, multiple results return for same PRIMARY KEY
.
SELECT * FROM TEST_TABLE;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|field_1 |WINDOWSTART |WINDOWEND |field_2 |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|1 |1660135860000 |1660135870000 |SOME_VALUE |
|5 |1660135860000 |1660135870000 |SOME_VALUE |
|3 |1660135870000 |1660135880000 |SOME_VALUE |
|8 |1660135870000 |1660135880000 |SOME_VALUE |
|1 |1660135870000 |1660135880000 |SOME_VALUE |
In the example, you can see there are multiple entries with same primary key. In this case 1
has more than one value. But this duplication happens for all primary keys. How can we prevent that and obtain only the last value?
Also we know that there are WINDOWSTART
and WINDOWEND
columns which are indicating that these are windows from different time frames, but instead of appending to table, shouldn't all of the values be changed, including WINDOWSTART
and WINDOWEND
and only display the latest value?
Thank you for your patience, any help would be great!
I've found something in #8643 but is this the correct answer? I could not be certain.
@emrekuecuek , #8643 is 1 way to go about it.
Hi @emrekuecuek !
The behavior you are experiencing is correct. Pull queries will return the rows in all windows. There is no way currently to get the rows in the latest window only. Not sure what exactly you are trying to do but EMIT FINAL
would be one way to explore and LATEST_BY_OFFSET
another.
Thank you for your insights. My problem requires windowed aggregation over a sensor, and I would like to apply enrichment to the data over some conditions. For that, I am using CASE
clauses. In the application, I need to make this enrichment according to last 5 minutes' aggregation. Hence, previous aggregations are not required in my case.
If some condition is met, I would like to update the table's status as ALERT
and let the system know that. Moreover, our application requires the most updated aggregation (to inform the current status). That's why I need the latest aggregated results.
I tried EMIT FINAL
and LATEST_BY_OFFSET
, but since EMIT FINAL
is a undocumented feature that should be used cautious, I decided not to use that (also there were other issues such as I could not arranged time windows correctly etc. and not having a documentation did not help unfortunately.) and LATEST_BY_OFFSET
did not work for CTAS
because I could not apply nested aggregation over sensors, maybe there is some work around for that?
Thank you so much for your input, I really appreciate it. Regards,