gcp-ingestion
gcp-ingestion copied to clipboard
Remove read timer from PubsubIntegrationTest
the test currently has a 15 second run timer, because the test is for an unbounded input which does not have a natural end.
When we have time, we should investigate ways to terminate the pipeline after the expected number of messages are processed.
I'm thinking more about this and I bet it's our windowing. The default trigger fires only closes the window once a message arrives with event time beyond the end of the window. In our test, we're sending several messages close together in time, so I expect they all fall in the same window, so the window doesn't close.
We also may be in danger of dropping records because the default trigger has 0 allowedLateness. For Pubsub input, I believe the event time is taken from the records pulled from pubsub. It's not clear to me what exactly that timestamp will be and whether it's possible for those timestamps to arrive out of order; I assume they can.
From the PubsubIO docs:
If timestampAttribute is not provided, the system will generate record timestamps the first time it sees each record. All windowing will be done relative to these timestamps.
So we are essentially windowing on processing time. The timestamps aren't assigned until the messages actually arrive. So we may be fine with the default triggering.
Cool. It looks like AfterPane.elementCountAtLeast()
can be used to collect an exact number of records, and can be combined with an AfterProcessingTime
trigger using composite triggers to additionally use a timeout.
I am not sure how this is effected by the pubsub input creating an unbounded pcollection, but I have a feeling that's going to be relevant too.
I think I have this completely wrong. I just added in some debug statements that run directly after a PubsubMessage is read, and there's 11 seconds between starting the pipeline and when we see the 4 messages arrive. So the delay may actually be something about polling behavior in PubsubIO.