iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

DecoderResolver may lead to OOM of flink jobs writing to iceberg tables

Open ldwnt opened this issue 3 years ago • 2 comments

Apache Iceberg version

0.13.1

Query engine

Flink

Please describe the bug 🐞

I have a flink job writing to iceberg tables. When the job runs for several days, an OOM occurs. The reason is described below:

The DecoderResolver holds a ThreadLocal variable of a two-layer map:

    private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> DECODER_CACHES = ThreadLocal.withInitial(() -> {
        return (new MapMaker()).weakKeys().makeMap();
    });

    private static ResolvingDecoder resolve(Decoder decoder, Schema readSchema, Schema fileSchema) throws IOException {
        Map<Schema, Map<Schema, ResolvingDecoder>> cache = (Map)DECODER_CACHES.get();
        Map<Schema, ResolvingDecoder> fileSchemaToResolver = (Map)cache.computeIfAbsent(readSchema, (k) -> {
            return Maps.newHashMap();
        });
...
    }

The outer map has a weak key while the inner map has a strong one. As the inner map holds a reference to a Schema object, the outer map holding the same weak reference to the Schema object will not release the weak key. That leads to the OOM.

What I suggest is to change the inner map to one with weak key, too:

        Map<Schema, ResolvingDecoder> fileSchemaToResolver = (Map)cache.computeIfAbsent(readSchema, (k) -> {
//            return Maps.newHashMap();
            return new WeakHashMap<>();
        });

So far it seems working with my jobs.

ldwnt avatar Aug 28 '22 10:08 ldwnt

@ldwnt while your analysis on the inner map makes sense to me, I have a couple of questions.

  • shouldn't DecoderResolver only be used by reader (not writer)?
  • I assume you confirmed the memory issue from a heap dump?

stevenzwu avatar Sep 07 '22 02:09 stevenzwu

@stevenzwu

  • shouldn't DecoderResolver only be used by reader (not writer)?

I'm not familiar with Iceberg source code, but are meta files (manifest list/manifest) read during writing and that involves DecoderResolver?

  • I assume you confirmed the memory issue from a heap dump?

Yes, please refer to the snapshot below: image

ldwnt avatar Sep 14 '22 14:09 ldwnt

@ldwnt @stevenzwu I meet the same issue and hope for the official fix.

magus0219 avatar Sep 28 '22 02:09 magus0219

@ldwnt can you create a PR for the weak reference fix?

Regarding the OOM, does your use case have many different read schemas and file schemas?

stevenzwu avatar Sep 28 '22 04:09 stevenzwu

@ldwnt can you create a PR for the weak reference fix?

Sure, I'll cerate one.

Regarding the OOM, does your use case have many different read schemas and file schemas?

I have a job writing to around 100 iceberg tables. The table schema barely changes, but snapshots and manifests are continuously generated due to incoming data.

ldwnt avatar Sep 29 '22 11:09 ldwnt

Checking your heap dump. The whole map has retained memory of 109 MB, which doesn't seem too much? is that the main cause of the OOM?

Avro Schema implements equals and hashCode. If there is no frequent schema change, the size of the DecoderResolver cache should remain stable. there is no memory leak. Now I think I understand why the file schema is not weak reference. Otherwise, we would invalidate the cached resolver on the file schema and create a new resolver for every data file. So it seems that the current behavior is the right behavior.

I have a job writing to around 100 iceberg tables. The table schema barely changes, but snapshots and manifests are continuously generated due to incoming data.

We also have a similar setup internally. We didn't run into OOM issue. Not sure what's the difference here.

Did you run all 100 tables and writer threads in a single JVM? how big is the JVM heap size?

stevenzwu avatar Sep 30 '22 20:09 stevenzwu

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Apr 06 '23 00:04 github-actions[bot]

Our flink app that commits every minute had caused OOM after a month. This may be because each commit generates a avro reader/writer to read/write the manifest.

With the current implementation, when readSchema and fileSchema are the same reference (this can happen), the weak map entry is not released even if the reference holder disappears. This is because the reference continues to exist as a key for the inner map.

So the fileSchema used as the key needs to be a cloned one.

takeono avatar Apr 14 '23 02:04 takeono

We have met a similar problem and I have submitted a PR for this. @stevenzwu Could you help to review it?

ConeyLiu avatar Jun 07 '23 09:06 ConeyLiu