gcp-ingestion icon indicating copy to clipboard operation
gcp-ingestion copied to clipboard

Remove read timer from PubsubIntegrationTest

Open relud opened this issue 6 years ago • 4 comments

the test currently has a 15 second run timer, because the test is for an unbounded input which does not have a natural end.

When we have time, we should investigate ways to terminate the pipeline after the expected number of messages are processed.

relud avatar Nov 20 '18 20:11 relud

I'm thinking more about this and I bet it's our windowing. The default trigger fires only closes the window once a message arrives with event time beyond the end of the window. In our test, we're sending several messages close together in time, so I expect they all fall in the same window, so the window doesn't close.

We also may be in danger of dropping records because the default trigger has 0 allowedLateness. For Pubsub input, I believe the event time is taken from the records pulled from pubsub. It's not clear to me what exactly that timestamp will be and whether it's possible for those timestamps to arrive out of order; I assume they can.

jklukas avatar Nov 28 '18 19:11 jklukas

From the PubsubIO docs:

If timestampAttribute is not provided, the system will generate record timestamps the first time it sees each record. All windowing will be done relative to these timestamps.

So we are essentially windowing on processing time. The timestamps aren't assigned until the messages actually arrive. So we may be fine with the default triggering.

jklukas avatar Nov 28 '18 19:11 jklukas

Cool. It looks like AfterPane.elementCountAtLeast() can be used to collect an exact number of records, and can be combined with an AfterProcessingTime trigger using composite triggers to additionally use a timeout.

I am not sure how this is effected by the pubsub input creating an unbounded pcollection, but I have a feeling that's going to be relevant too.

relud avatar Nov 28 '18 19:11 relud

I think I have this completely wrong. I just added in some debug statements that run directly after a PubsubMessage is read, and there's 11 seconds between starting the pipeline and when we see the 4 messages arrive. So the delay may actually be something about polling behavior in PubsubIO.

jklukas avatar Nov 28 '18 20:11 jklukas