beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: RedisIO#readKeyPatterns failing with OutOfMemory on version 2.39.0

Open djaneluz opened this issue 3 years ago • 11 comments

What happened?

I'm using RedisIO to read/write values on cache and it was working fine with version 2.38.0.

When I moved to version 2.39.0 I started getting errors like:

Execution of work for computation 'P13' for key '<�֤#�͵.;]�NDC���^m��=�w<@癢P�u:k<��ުm�7����6�';��z �< z0��>2QN����9EX�-����k\����eL� �( ���a����;��φ' failed with out-of-memory. Work will not be retried locally. Heap dump not written.

With stacktrace:

java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
	at java.base/java.lang.Thread.start0(Native Method)
	at java.base/java.lang.Thread.start(Thread.java:803)
	at java.base/java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:937)
	at java.base/java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1583)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:346)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
	at java.base/java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:779)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed(OutputAndTimeBoundedSplittableProcessElementInvoker.java:312)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:60)
	at org.apache.beam.sdk.io.redis.RedisIO$ReadFn.processElement(RedisIO.java:399)

The error happens when calling RedisIO.readKeyPatterns()

The pipeline fails and gets stuck in the Step: .../ReadRedis/ParDo(Read)/ParMultiDo(Read)/ProcessKeyedElements.out0

When I move back to version 2.38.0 the problem no longer happens

Issue Priority

Priority: 2

Issue Component

Component: io-java-redis

djaneluz avatar Jun 13 '22 18:06 djaneluz

It can be related to this change #15549 as the latest big change in RedisIO.

@MiguelAnzoWizeline @benWize Could you take a look, please?

aromanenko-dev avatar Jun 14 '22 15:06 aromanenko-dev

Hi @aromanenko-dev! Miguel who made the larger changes in RedisIO is no longer working on this project, but I will sync with him and provide a response in a couple of days.

benEng avatar Jun 15 '22 16:06 benEng

@benWize Many thanks!

aromanenko-dev avatar Jun 15 '22 18:06 aromanenko-dev

@benWize Kind ping on this. Do you have any news by chance?

aromanenko-dev avatar Jul 18 '22 16:07 aromanenko-dev

Hi @aromanenko-dev, sorry for the late response, Miguel is busy and he can't take the issue, but we will find someone else in our team, to take this.

benEng avatar Jul 19 '22 16:07 benEng

Hi, could you give more details about how to reproduce this error?

roger-mike avatar Jul 25 '22 16:07 roger-mike

Ping @djaneluz

aromanenko-dev avatar Jul 25 '22 16:07 aromanenko-dev

Hello @roger-mike!

I have a streaming pipeline that consumes Pub/Sub messages, extracts keys, group them by window (FixedWindow of 5 min) and call Redis to get the values, that are used as side input to another step.

Something like:

    final PCollection<KV<String, Iterable<MyMessage>>> myMessageKV = pipeline
        .apply("ReadPubSubMessages", PubsubIO.readMessages().fromSubscription(options.getSubscription()))
        .apply("ExtractAndParse", ParDo.of(new ExtractAndParse()))
        .apply("MapMessageWithKey", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
    TypeDescriptor.of(MyMessage.class)))
            .via(m-> KV.of(m.getMyKey(), m)))
        .apply("5MinFixedWindow", Window.into(FixedWindows.of(Duration.standardMinutes(5))))
        .apply("GroupByKey", GroupByKey.create());

    final PCollectionView<Map<String, Iterable<String>>> myCache = myMessageKV 
            .apply("ExtractKeys", Keys.create())
            .apply("ReadRedis", RedisIO.readKeyPatterns()
                    .withEndpoint(redisHost, REDIS_PORT)
                    .withOutputParallelization(false))
            .apply("ViewAsMultiMapFromCache", View.asMultimap());

    final PCollectionTuple outputs = myMessageKV 
            .apply("EnrichMessage", ParDo.of(new EnrichMessages())
                     .withSideInput(REF_CODE_CACHE_TAG_ID, refCodeCache));

    ...

I just ran the pipeline again with BEAM version 2.41.0 and got the error. With version 2.38.0 it works just fine.

Let me know if you need any more information,

Thanks

djaneluz avatar Sep 22 '22 19:09 djaneluz

Hello, is there any update on this? Thanks

djaneluz avatar Feb 09 '23 20:02 djaneluz

I tested the pipeline with Apache Beam version 2.56.0 and the problem still happens

djaneluz avatar May 17 '24 16:05 djaneluz

ping @roger-mike @aromanenko-dev

djaneluz avatar May 17 '24 16:05 djaneluz