scio icon indicating copy to clipboard operation
scio copied to clipboard

Refreshing SideInput for Streaming Pipelines

Open syodage opened this issue 3 years ago • 3 comments

When we are using side inputs with streaming pipelines, most of the use cases require this side inputs to be refreshed(re-calculate) over time. Scio doesn't have a nicer way to do this. Apache beam has this refreshable side input patterns define to work with both global and non-global windowing to address this easily.

However exact example code snippets work with neither DirectRunner nor DataflowRunner. The DirectRunner case side input doesn't output any data to the main pipeline code and with DataflowRunner it throws this error[4].

This issue has been raised in the apache beam user mailing list[1][2][3] a few years ago and concluded with suggesting to address the use case with help of guava LoadingCache, which periodically updates the local cache. Which of course not the beam way of doing it.

Related Issues: https://github.com/spotify/scio/issues/3521 , https://github.com/spotify/scio/issues/3201 , https://github.com/spotify/scio/issues/1190 , https://github.com/spotify/scio/issues/2525

[1] https://lists.apache.org/thread.html/%[email protected] [2] https://lists.apache.org/thread.html/a5d804685a5810594a7860709fbcd6d3a22ead6e871fc3073a65ef1e@%3Cuser.beam.apache.org%3E [3] https://lists.apache.org/thread.html/681de1ae372951988a00b9affa7480f3117d3cae6dae9ee2c69baba4@%3Cuser.beam.apache.org%3E

[4]

2020-12-07 23:34:17.483 ESTError message from worker: java.lang.RuntimeException: Exception while fetching side input: org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:217) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:303) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:665) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:728) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:137) 

org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:53) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:312) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:236) 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: 
java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871) 
org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:196) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:303) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:665) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:728) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:137) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:53) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:312) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:236) 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. 
org.apache.beam.sdk.values.PCollectionViews$SingletonViewFn.apply(PCollectionViews.java:451) 
org.apache.beam.sdk.values.PCollectionViews$SingletonViewFn.apply(PCollectionViews.java:379) 
org.apache.beam.runners.dataflow.worker.StateFetcher.lambda$fetchSideInput$2(StateFetcher.java:177) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4876) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871) 
org.apache.beam.runners.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:196) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:303) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:70) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:665) 
org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:728) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:137) 
org.apache.beam.runners.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:53) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:312) 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:236) 
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:36) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101) 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
java.base/java.lang.Thread.run(Thread.java:834)

syodage avatar Dec 08 '20 19:12 syodage

@syodage Did you figure out a workaround for this? I am facing the exact same problem.

nullobject avatar Mar 29 '21 00:03 nullobject

Hey! Bumping this issue, since I've stumbled upon the same thing

Thanks for your thorough report @syodage

lurecas avatar Mar 29 '21 09:03 lurecas

@syodage any update on this issue

brunsgaard avatar Dec 15 '21 21:12 brunsgaard