beam
beam copied to clipboard
[Bug]: PaneInfo not populated in Go SDK
What happened?
I'm attempting to use early triggering and PaneInfo
to limit bundle sizes to avoid running into the dataflow limit of 80MB and have found that PaneInfo does not appear to be populated correctly.
Runner: Dataflow Beam Version: 2.55.1
Here's a test that I believe demonstrates the problem:
func init() {
register.Function2x0(produceFn)
register.Function4x0(getPanes)
register.Emitter1[int]()
}
func produceFn(_ []byte, emit func(beam.EventTime, int)) {
baseT := mtime.Now()
for i := 0; i < 10; i++ {
emit(baseT.Add(time.Minute), i)
}
}
func Produce(s beam.Scope) beam.PCollection {
return beam.ParDo(s, produceFn, beam.Impulse(s))
}
func getPanes(ctx context.Context, pi typex.PaneInfo, _ int, emit func(int)) {
log.Output(ctx, log.SevWarn, 0, fmt.Sprintf("got pane %+v", pi))
emit(int(pi.Index))
}
func TestPanes(t *testing.T) {
p, scp := beam.NewPipelineWithRoot()
c := Produce(scp)
windowed := beam.WindowInto(
scp,
window.NewFixedWindows(5*time.Minute),
c,
beam.Trigger(trigger.AfterEndOfWindow().
EarlyFiring(
trigger.Repeat(
trigger.AfterCount(2),
),
),
),
beam.PanesDiscard(),
)
panes := beam.ParDo(scp, getPanes, windowed)
paneIdxs := beam.WindowInto(scp, window.NewGlobalWindows(), panes)
passert.Count(scp, paneIdxs, "pane idxs", 10)
passert.EqualsList(scp, paneIdxs, []int{0, 0, 1, 1, 2, 0, 0, 1, 1, 2})
ptest.RunAndValidate(t, p)
}
The logs are all:
got pane {Timing:0 IsFirst:false IsLast:false Index:0 NonSpeculativeIndex:0}
Even if I don't have the indexes correct in the test (the test is failing on the EqualsList
), I would expect these to be internally consistent. That is, I would expect there to be at least one IsFirst:true
and IsLast:true
each.
Issue Priority
Priority: 2 (default)
Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [X] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [x] Component: Google Cloud Dataflow Runner
I messed up the bug report (submitted too early) and it got classified as P3 whereas I think it should be P2
I've tossed up a draft PR that essentially tries to pipe pane info through all the FullValue
writes. I now am getting the default NoFiringPane()
in the log. Still working on figuring out why the triggering info is not being populated as expected.
got paneinfo: {Timing:3 IsFirst:true IsLast:true Index:0 NonSpeculativeIndex:0}
Still working on figuring out why the triggering info is not being populated as expected.
I think it was actually coming in correctly, but I was misapplying triggering, so things were not happening as expected because I was attempting to use paneinfo before bundles were being committed to the backend.
First, thank for finding and reporting this!
Agreed that p2 is more appropriate for this issue generally, updated labels.
But probably not higher than that. In principle, using State and Timers should enable the same semantics as they are lower level primitives. BUT, that won't work very well for executions on Batch Dataflow, since timers behave differently when all data is available a-priory.
This would be a blocker for getting triggers working properly on the Go SDK's local runner, Prism, as it's not doing anything with Panes or Triggers at present, though that work is coming up. (see #29650 for the Prism implementation list).
And proper Pane propagation would allow for implementing natively in the Go SDK sophisticated Streaming enabled File Sinks, which rely on correct pane information to output and update files written in an unbounded pipeline.
The example code is demonstrating that the default pane isn't being set to the NoFiringPane. That is a bug, that is probably broken due to a lack of propagation and should be fixed.
What it's not demonstrating is that the pane should be different due to a trigger.
IIRC Triggers only resolve at the downstream GBK/Aggregation, so that's when there would be multiple firings, and different Panes.
Panes are only updated after a trigger is enacted ("fired") from a runner source, like after a GBK. More precisely, The default "No Firing Pane" is the expected default until a trigger actually resolves. The "No Firing Pane" means the given pane was not due to a trigger firing.
So, having the following pipeline should show different firings:
- DoFn: produce some data
- WindowInto with some triggers set to fire early (eg, by num elements), and have the default firing.
- GBK
- DoFn: Detect panes here.
- Other Dofn: Do more stuff.
- DoFn: Detect the same panes here.