trino
trino copied to clipboard
Account for TrinoS3StreamingOutputStream#buffer in writer retained memory
Currently, TrinoS3StreamingOutputStream#buffer in not accounted for by writers. Hence it can cause OOM rather than fail queries gracefully.
Alternatively, we should just skip buffering there and just rely on OutputStreamSliceOutput (which is accounted for and already does buffering) to do buffering instead.
cc @electrum @losipiuk @linzebing
See Slack discussion for context https://trinodb.slack.com/archives/CGB0QHWSW/p1662502274619329?thread_ts=1662298857.240529&cid=CGB0QHWSW
Alternatively, we should just skip buffering there and just rely on
OutputStreamSliceOutput(which is accounted for and already does buffering) to do buffering instead.cc @electrum @losipiuk @linzebing
Buffer in OutputStreamSliceOutput is tiny. And for S3 streaming we need much larger one. AWS requires at least 5MB in single request.
I think we need to account for memory used by TrinoS3StreamingOutputStream. It can be done directly in OutputStreamOrcDataSink, or OutputStreamSliceOutput.getRetainedBytes can account for size of OutputStream it wraps.
The issue with the latter approach is a fact that the OutputStream interface does not expose the needed information.
We can work around that by adding to airlift the MemoryAware interface which exposes the getRetainedSize method. The we can make the TrinoS3StreamingOutputStream implement that interface.
Then OutputStreamOrcDataSink.getRetainedSize() can check if wrapped OutputStream is an instance of MemoryAware, and account for extra memory used in that case.
Is https://github.com/trinodb/trino/issues/9879 related (or duplicate) as well?
From @sopel39 on slack
TrinoS3StreamingOutputStreamwill by default use buffer of 16MB (per partition).
We can work around that by adding to airlift the MemoryAware interface which exposes the getRetainedSize method. The we can make the TrinoS3StreamingOutputStream implement that interface.
I think OutputStreams can be wrapped (e.g. io.trino.hdfs.TrinoFileSystemCache.OutputStreamWrapper), so unwrapping should really be recursive. This is not easy because OutputSteram doesn't have build-in method for unwrapping.
I also think both TrinoS3FileSystem.TrinoS3StreamingOutputStream and io.airlift.slice.OutputStreamSliceOutput should also allocate buffer lazily. Also they should incrementally increase buffer size in order to prevent overallocation for small inserts
Is #9879 related (or duplicate) as well?
Good question. I don't know, @hashhar . cc @arhimondr
I don't think it's a duplicate. Improving memory efficiency, fixing leaks != accounting for memory
We can work around that by adding to airlift the MemoryAware interface which exposes the getRetainedSize method. The we can make the TrinoS3StreamingOutputStream implement that interface.
I think
OutputStreamscan be wrapped (e.g.io.trino.hdfs.TrinoFileSystemCache.OutputStreamWrapper), so unwrapping should really be recursive. This is not easy becauseOutputSteramdoesn't have build-in method for unwrapping.
Good point. Yet whatever whichever path we go with we need to do unwrapping somewhere unless we migrate from of OutputStream at all. Or maybe the other option would be to pass OutputStream around together with some function which is able to compute its memory usage. I would need to play with the code to see what is feasible.
I also think both TrinoS3FileSystem.TrinoS3StreamingOutputStream and io.airlift.slice.OutputStreamSliceOutput should also allocate buffer lazily. Also they should incrementally increase buffer size in order to prevent overallocation for small inserts
Buffer in OutputStreamSliceOutput is 4K so I do not think it is necessary.
Yet another option is to pass query context in a static way. We already wrap FileSystem calls with doAs. Maybe we could pass memory context in same way.