trino icon indicating copy to clipboard operation
trino copied to clipboard

Account for TrinoS3StreamingOutputStream#buffer in writer retained memory

Open sopel39 opened this issue 2 years ago • 10 comments

Currently, TrinoS3StreamingOutputStream#buffer in not accounted for by writers. Hence it can cause OOM rather than fail queries gracefully.

sopel39 avatar Sep 06 '22 22:09 sopel39

Alternatively, we should just skip buffering there and just rely on OutputStreamSliceOutput (which is accounted for and already does buffering) to do buffering instead.

cc @electrum @losipiuk @linzebing

sopel39 avatar Sep 06 '22 22:09 sopel39

See Slack discussion for context https://trinodb.slack.com/archives/CGB0QHWSW/p1662502274619329?thread_ts=1662298857.240529&cid=CGB0QHWSW

sopel39 avatar Sep 06 '22 22:09 sopel39

Alternatively, we should just skip buffering there and just rely on OutputStreamSliceOutput (which is accounted for and already does buffering) to do buffering instead.

cc @electrum @losipiuk @linzebing

Buffer in OutputStreamSliceOutput is tiny. And for S3 streaming we need much larger one. AWS requires at least 5MB in single request.

I think we need to account for memory used by TrinoS3StreamingOutputStream. It can be done directly in OutputStreamOrcDataSink, or OutputStreamSliceOutput.getRetainedBytes can account for size of OutputStream it wraps. The issue with the latter approach is a fact that the OutputStream interface does not expose the needed information. We can work around that by adding to airlift the MemoryAware interface which exposes the getRetainedSize method. The we can make the TrinoS3StreamingOutputStream implement that interface. Then OutputStreamOrcDataSink.getRetainedSize() can check if wrapped OutputStream is an instance of MemoryAware, and account for extra memory used in that case.

losipiuk avatar Sep 08 '22 10:09 losipiuk

Is https://github.com/trinodb/trino/issues/9879 related (or duplicate) as well?

hashhar avatar Sep 08 '22 10:09 hashhar

From @sopel39 on slack

TrinoS3StreamingOutputStream will by default use buffer of 16MB (per partition).

findepi avatar Sep 08 '22 13:09 findepi

We can work around that by adding to airlift the MemoryAware interface which exposes the getRetainedSize method. The we can make the TrinoS3StreamingOutputStream implement that interface.

I think OutputStreams can be wrapped (e.g. io.trino.hdfs.TrinoFileSystemCache.OutputStreamWrapper), so unwrapping should really be recursive. This is not easy because OutputSteram doesn't have build-in method for unwrapping.

I also think both TrinoS3FileSystem.TrinoS3StreamingOutputStream and io.airlift.slice.OutputStreamSliceOutput should also allocate buffer lazily. Also they should incrementally increase buffer size in order to prevent overallocation for small inserts

sopel39 avatar Sep 08 '22 19:09 sopel39

Is #9879 related (or duplicate) as well?

Good question. I don't know, @hashhar . cc @arhimondr

findepi avatar Sep 09 '22 07:09 findepi

I don't think it's a duplicate. Improving memory efficiency, fixing leaks != accounting for memory

sopel39 avatar Sep 09 '22 08:09 sopel39

We can work around that by adding to airlift the MemoryAware interface which exposes the getRetainedSize method. The we can make the TrinoS3StreamingOutputStream implement that interface.

I think OutputStreams can be wrapped (e.g. io.trino.hdfs.TrinoFileSystemCache.OutputStreamWrapper), so unwrapping should really be recursive. This is not easy because OutputSteram doesn't have build-in method for unwrapping.

Good point. Yet whatever whichever path we go with we need to do unwrapping somewhere unless we migrate from of OutputStream at all. Or maybe the other option would be to pass OutputStream around together with some function which is able to compute its memory usage. I would need to play with the code to see what is feasible.

I also think both TrinoS3FileSystem.TrinoS3StreamingOutputStream and io.airlift.slice.OutputStreamSliceOutput should also allocate buffer lazily. Also they should incrementally increase buffer size in order to prevent overallocation for small inserts

Buffer in OutputStreamSliceOutput is 4K so I do not think it is necessary.

losipiuk avatar Sep 09 '22 13:09 losipiuk

Yet another option is to pass query context in a static way. We already wrap FileSystem calls with doAs. Maybe we could pass memory context in same way.

sopel39 avatar Sep 21 '22 11:09 sopel39