Does watermark work with events from the past?
I'm trying to do some streaming analytics on historical data pushed to Kafka. The kafka message timestamp is recent but the actual event date is older (over a year).
CREATE TABLE input_table (
ts TIMESTAMP NOT NULL,
sourceIPv6Address VARCHAR,
destinationIPv6Address VARCHAR,
octetDeltaCount BIGINT,
watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '30' SECOND) STORED
) WITH (
connector = 'kafka',
format = 'json',
bootstrap_servers = 'kafka-1:19091',
topic = 'input-topic',
type = 'source',
'source.offset' = 'earliest',
'source.read_mode' = 'read_uncommitted',
event_time_field = 'ts',
watermark_field = 'watermark'
);
CREATE VIEW AGG AS SELECT window.start as window_start, "sourceIPv6Address", "destinationIPv6Address", octetDeltaCount
FROM (
SELECT TUMBLE(interval '1 minute') as window, "sourceIPv6Address", "destinationIPv6Address", SUM("octetDeltaCount") AS octetDeltaCount
FROM input_table
GROUP BY window, "sourceIPv6Address", "destinationIPv6Address"
);
CREATE TABLE agg_sink (
window_start TIMESTAMP,
sourceIPv6Address VARCHAR,
destinationIPv6Address VARCHAR,
octetDeltaCount BIGINT
) WITH (
'connector' = 'kafka',
'bootstrap_servers' = 'kafka-1:19091',
'type' = 'sink',
'topic' = 'arroyo-agg',
'format' = 'json'
);
INSERT INTO agg_sink SELECT window_start, "sourceIPv6Address", "destinationIPv6Address", octetDeltaCount FROM AGG;
However nothing is inserted into the output kafka topic?
Hey there! I understand you're trying to process historical data with watermarks, but nothing is showing up in the output topic. This is actually a common issue when dealing with historical data processing.
The problem is likely related to how watermarks work with event timestamps. Looking at your SQL, I see you've set a 30-second watermark:
watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '30' SECOND) STORED
For historical data processing, this poses a challenge because:
- Watermarks are used to track the completeness of your data stream and are based on event timestamps
- When processing historical data with old timestamps, the watermark might immediately fall behind the current processing time, causing events to be considered "late" and dropped
Based on our documentation (from https://doc.arroyo.dev/concepts):
A watermark represents a claim that no more events with a timestamp earlier than the watermark will arrive in the future. Late events (which arrive after the watermark has passed) are dropped.
To fix this, you have a couple of options:
- Increase the watermark interval to account for the historical nature of your data:
watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '2 years' STORED)
- Or process the historical data without watermarks first, then switch to watermarking for real-time data
For your use case, since the events are over a year old, I'd recommend going with option 1 and setting a watermark interval that's larger than the age of your historical data.
Let me know if you need help adjusting the watermark settings or if you have any questions about how watermarks work with historical data processing.
References:
- Watermark behavior documentation: https://doc.arroyo.dev/concepts (Time-oriented processing section)
- Similar watermark timing issues have been reported in previous tickets
Be careful with:
- Setting too large watermark intervals can impact memory usage
- Make sure your watermark interval covers the full range of historical data
- Consider whether you'll need to process both historical and real-time data in the same pipeline
Let me know if you need any clarification or run into other issues!