trino icon indicating copy to clipboard operation
trino copied to clipboard

[Support]: S3 express bucket using a Switching Filesystem Logic

Open divincode opened this issue 1 year ago • 4 comments

Description

[Support]: S3 express bucket using a Switching Filesystem Logic

By Trino uses a file system which is built on sdkv1 to support s3 express buckets it needs a filesystem which uses sdkv2. https://aws.amazon.com/s3/storage-classes/express-one-zone/

So a switching file system logic has been implemented.

Testing

Tested with hive, iceberg, delta -- scenarios below work fine.

Added fs.s3-express.enabled which is true by default. This can be turned off in case of any regressions on standard bucket path. This flag enables us to selectively enable S3 native filesystem only for S3Express.
Testing

trino:vinaydev_s3express_test> CREATE SCHEMA vinaydev_s3express_test_trino_435

                          ->   WITH
                          ->  (LOCATION = 's3://vinaydev--use1-az6--x-s3/schemas/vinaydev_s3express_test_trino_435/');
CREATE SCHEMA

============================= Hive ============================ 

trino:vinaydev_s3express_test> use hive.vinaydev_s3express_test_trino_435; 
trino:vinaydev_s3express_test_trino_435> create table t1 as select 1 as c1, 'str' as c2; CREATE TABLE: 1 row
trino:vinaydev_s3express_test_trino_435> insert into t1 values (2, 'str'); 
trino:vinaydev_s3express_test_trino_435> select "$path", * from t1;

                                                                  $path                                                                    | c1 | c2

---------------------------------------------------------------------------------------------------------------------------------------------+----+----- s3://vinaydev--use1-az6--x-s3/schemas/vinaydev_s3express_test_trino_435/t1/20231214_104808_00011_pw5bz_1111ee1a-cb26-4e20-b3b9-dcd97ab4fb79 | 2 | str s3://vinaydev--use1-az6--x-s3/schemas/vinaydev_s3express_test_trino_435/t1/20231214_104347_00009_pw5bz_6433338a-31d2-49c5-9ac2-0e7485dc35ce | 1 | str (2 rows)

============================= Iceberg ============================

trino:vinaydev_s3express_test_trino_435> use iceberg.vinaydev_s3express_test; trino:vinaydev_s3express_test_trino_435> create table t1_iceberg_parquet as select from hive.vinaydev_s3express_test_trino_435.t1; trino:vinaydev_s3express_test_trino_435> select "$path", from t1_iceberg_parquet;

                                                                                                 $path                                                                                                   | c1 | c2

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--- -+----- s3://vinaydev--use1-az6--x-s3/schemas/vinaydev_s3express_test_trino_435/t1_iceberg_parquet-f1f0dee6ca644745bf41ffee391d9291/data/20231214_105110_00021_pw5bz-a956107a-5fb1-4e55-b8a4-8d3ac5d02bda.parquet | 2 | str s3://vinaydev--use1-az6--x-s3/schemas/vinaydev_s3express_test_trino_435/t1_iceberg_parquet-f1f0dee6ca644745bf41ffee391d9291/data/20231214_105110_00021_pw5bz-a956107a-5fb1-4e55-b8a4-8d3ac5d02bda.parquet | 1 | str (2 rows)

trino:vinaydev_s3express_test_trino_435> update t1_iceberg_parquet set c2 = 'up1' where c1 = 1; trino:vinaydev_s3express_test_trino_435> delete from t1_iceberg_parquet where c1 = 1; trino:vinaydev_s3express_test_trino_435> insert into t1_iceberg_parquet values (3, 'str'); trino:vinaydev_s3express_test_trino_435> select "$path", * from t1_iceberg_parquet;

                                                                                                 $path                                                                                                   | c1 | c2

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+----- s3://vinaydev--use1-az6--x-s3/schemas/vinaydev_s3express_test_trino_435/t1_iceberg_parquet-f1f0dee6ca644745bf41ffee391d9291/data/20231214_105426_00026_pw5bz-6546d0c4-fa36-40ab-bc02-fbdd4bbd1390.parquet | 3 | str s3://vinaydev--use1-az6--x-s3/schemas/vinaydev_s3express_test_trino_435/t1_iceberg_parquet-f1f0dee6ca644745bf41ffee391d9291/data/20231214_105110_00021_pw5bz-a956107a-5fb1-4e55-b8a4-8d3ac5d02bda.parquet | 2 | str (2 rows)

============================= Delta ============================

trino:vinaydev_s3express_test_trino_435> use delta.vinaydev_s3express_test_trino_435; USE trino:vinaydev_s3express_test_trino_435> create table t1_delta_parquet as select from hive.vinaydev_s3express_test_trino_435.t1; trino:vinaydev_s3express_test_trino_435> select from t1_delta_parquet; c1 | c2 ----+-----

1 | str
2 | str

(2 rows)

trino:vinaydev_s3express_test_trino_435> update t1_delta_parquet set c2 = 'up1' where c1 = 1; trino:vinaydev_s3express_test_trino_435> select * from t1_delta_parquet; c1 | c2 ----+-----

2 | str
1 | up1

(2 rows)

trino:vinaydev_s3express_test_trino_435> delete from t1_delta_parquet where c1 = 1; trino:vinaydev_s3express_test_trino_435> select * from t1_delta_parquet; c1 | c2 ----+-----

2 | str

(1 row)

trino:vinaydev_s3express_test_trino_435> insert into t1_delta_parquet values (3, 'str'); trino:vinaydev_s3express_test_trino_435> select * from t1_delta_parquet; c1 | c2 ----+-----

3 | str
2 | str

(2 rows)

trino:vinaydev_s3express_test_trino_435> select "$path", * from t1_delta_parquet;

                                                                                         $path                                                                                            | c1 | c2

Release notes

( ) This is not user-visible or is docs only, and no release notes are required. ( ) Release notes are required. Please propose a release note for me. ( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

divincode avatar Feb 07 '24 14:02 divincode

Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Vinay Devadiga. This is most likely caused by a git client misconfiguration; please make sure to:

  1. check if your git client is configured with an email to sign commits git config --list | grep email
  2. If not, set it up using git config --global user.email [email protected]
  3. Make sure that the git commit email is configured in your GitHub account settings, see https://github.com/settings/emails

cla-bot[bot] avatar Feb 07 '24 14:02 cla-bot[bot]

I believe the plan is to enable fs.native-s3.enabled by default and get rid of the v1 SDK code in near future. I don't think we want to add a flag to enable the new filesystem just for S3 express, you can just enable it for everything and report issues if you run into any with the new filesystem implementation. cc: @electrum

raunaqmorarka avatar Feb 07 '24 15:02 raunaqmorarka

@raunaqmorarka that's correct. Amazon has deprecated the v1 SDK, so we'll remove the old FS in the future. We plan to make the new one the default soon.

@divincode please enable the new FS for all your workloads and let us know if you run into any issues.

electrum avatar Feb 07 '24 19:02 electrum

Hi @electrum while testing s3 express , for various tpcds queries i see throttling issues .

2024-02-17T14:29:25.994Z	ERROR	query-execution-6	io.trino.execution.scheduler.PipelinedQueryScheduler	Failure in distributed stage for query 20240217_142908_00009_rixz6
io.trino.spi.TrinoException: Failed to list directory: s3a://vinaydev--usw2-az1--x-s3/perf_tpcds_partitioned30M_3TB_parquet_2000tasks_default/web_sales/ws_sold_date_sk=2451472
	at io.trino.plugin.hive.fs.HiveFileIterator$FileStatusIterator.processException(HiveFileIterator.java:176)
	at io.trino.plugin.hive.fs.HiveFileIterator$FileStatusIterator.<init>(HiveFileIterator.java:145)
	at io.trino.plugin.hive.fs.HiveFileIterator.<init>(HiveFileIterator.java:57)
	at io.trino.plugin.hive.BackgroundHiveSplitLoader.createInternalHiveSplitIterator(BackgroundHiveSplitLoader.java:744)
	at io.trino.plugin.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:542)
	at io.trino.plugin.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:391)
	at io.trino.plugin.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:305)
	at io.trino.plugin.hive.util.ResumableTasks$1.run(ResumableTasks.java:38)
	at io.trino.$gen.Trino_435____20240217_142422_2.run(Unknown Source)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:79)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: Failed to list location: s3a://vinaydev--usw2-az1--x-s3/perf_tpcds_partitioned30M_3TB_parquet_2000tasks_default/web_sales/ws_sold_date_sk=2451472
	at io.trino.filesystem.s3.S3FileSystem.listFiles(S3FileSystem.java:191)
	at io.trino.filesystem.manager.SwitchingFileSystem.listFiles(SwitchingFileSystem.java:110)
	at io.trino.filesystem.tracing.TracingFileSystem.lambda$listFiles$4(TracingFileSystem.java:109)
	at io.trino.filesystem.tracing.Tracing.withTracing(Tracing.java:47)
	at io.trino.filesystem.tracing.TracingFileSystem.listFiles(TracingFileSystem.java:109)
	at io.trino.plugin.hive.fs.CachingDirectoryLister.listFilesRecursively(CachingDirectoryLister.java:97)
	at io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryLister.createListingRemoteIterator(TransactionScopeCachingDirectoryLister.java:97)
	at io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryLister.lambda$listInternal$0(TransactionScopeCachingDirectoryLister.java:78)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4938)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3576)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2318)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2191)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2081)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4019)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4933)
	at io.trino.cache.EvictableCache.get(EvictableCache.java:112)
	at io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryLister.listInternal(TransactionScopeCachingDirectoryLister.java:78)
	at io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryLister.listFilesRecursively(TransactionScopeCachingDirectoryLister.java:70)
	at io.trino.plugin.hive.fs.HiveFileIterator$FileStatusIterator.<init>(HiveFileIterator.java:140)
	... 11 more
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: Reduce your request rate. (Service: S3, Status Code: 503, Request ID: 01fee93ae100018db778d5660509ae0718291124, Extended Request ID: egIUQKBtBIjscMw)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
	at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:93)
	at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:279)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)

Any suggestion which configs i can use to reduce the number of s3 list requests. As i see this kind of control is not there in the native file system . The S3FileSystemConfig is not as comprehensive as the old filesystem config . And the only relevant config s3 max connections does not seem to be used in the new file system by code search . Am i missing something is the old filesystem configs binded to the new one as well

divincode avatar Feb 17 '24 14:02 divincode

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

github-actions[bot] avatar Mar 11 '24 17:03 github-actions[bot]

@divincode are you still seeing issues with the latest codebase from Trino 442? Maybe rebase and test again?

mosabua avatar Mar 19 '24 17:03 mosabua

@mosabua , after debug its not a trino issue. s3 express has certain list limit so it was throttling. While working on it i found out that native filesystem does not has a mechanism to set number of retries . Maybe i can work on that. I will raise a pr for it but need to figure out the cla, thing before that.

divincode avatar Mar 22 '24 06:03 divincode

It would be great if you could help us tune the Trino defaults so that they work correctly out-of-the-box. We're using the AWS SDK defaults, so it's disappointing that it doesn't correctly handle retries for throttling errors, but maybe Trino's workloads will require custom settings.

cc @pettyjamesm

electrum avatar Mar 22 '24 07:03 electrum