seatunnel
seatunnel copied to clipboard
[Improve][Connector-V2][Iceberg] Add hadoop s3 catalog e2e testcase
Purpose of this pull request
We use S3 as iceberg warehouse storage.
If catalog_type
is hadoop
, only hdfs is supported, so it is being customized and used.
I want to contribute my customized code, so I push a pull request.
Does this PR introduce any user-facing change?
we can use the iceberg table saved in s3.
config example
source {
iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="s3a://your_bucket/spark/warehouse/"
}
hadoop.config={
"fs.s3a.path.style.access" = "true"
"fs.s3a.connection.ssl.enabled" = "false"
"fs.s3a.signing.algorithm" = "S3SignerType"
"fs.s3a.encryption.algorithm" = "AES256"
"fs.s3a.connection.timeout" = "3000"
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"fs.s3a.endpoint" = "http://minio:9000"
"fs.s3a.access.key" = "xxx"
"fs.s3a.secret.key" = "xxx"
"fs.defaultFS" = "s3a://xxx"
}
namespace = "your_iceberg_database"
table = "your_iceberg_table"
result_table_name = "iceberg_test"
}
}
How was this patch tested?
add e2e test case : connector-iceberg-s3-e2e
Check list
- [ ] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
- [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
- [ ] If you are contributing the connector code, please check that the following files are updated:
- Update change log that in connector document. For more details you can refer to connector-v2
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- [ ] Update the
release-note
.
cc @ic4y
Shall we add a test case for this feature?
I don't think it's going to be easy because it requires an S3 bucket.
I think we can create a minio container in e2e test case for s3a file read.
@EricJoy2048 Thank you so much for the review. I made an additional commit today by adding the e2e test case. However, your previous review was automatically dismissed by the system. I apologize for that.
I think we can create a minio container in e2e test case for s3a file read.
I created minio test container and added e2e test case. It passes on spark3, but fails on flink/seatunnel/spark2.
I think I need to add hadoop-aws/aws-java-sdk jar file to the test container docker image and need your help.
- spark2 : hadoop-aws-2.7.5, aws-java-sdk-1.7.4
I'm also looking to see if there are other workarounds.
Thanks @4chicat , I will take a look in the few days.
Thanks @4chicat , I will take a look in the few days.
I modified to only run e2e tests for seatunnel engine and spark 3.3. (except flink and spark 2.4)
@Hisoka-X @ic4y PTAL. @Carl-Zhou-CN @liugddx
Look like this PR bring new leak thread in seatunnel server. Can you try closing these leaking threads? If not, please add it to the known issues list. https://github.com/apache/seatunnel/blob/92f847c99f219077b4d45c25d2b379d42879f613/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java#L291
This PR is very valuable and I think we need to look at it again and review it.
I think we can create a minio container in e2e test case for s3a file read.
I created minio test container and added e2e test case. It passes on spark3, but fails on flink/seatunnel/spark2.
I think I need to add hadoop-aws/aws-java-sdk jar file to the test container docker image and need your help.
- spark2 : hadoop-aws-2.7.5, aws-java-sdk-1.7.4
I'm also looking to see if there are other workarounds.
Like the driver, you can add hadoop-aws
and aws-java-sdk
to the test container docker like seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-oss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/oss/OssFileIT.java
@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/oss/lib && cd /tmp/seatunnel/plugins/oss/lib && curl -O "
+ OSS_SDK_DOWNLOAD);
Assertions.assertEquals(0, extraCommands.getExitCode());
extraCommands =
container.execInContainer(
"bash",
"-c",
"cd /tmp/seatunnel/plugins/oss/lib && curl -O " + JDOM_DOWNLOAD);
Assertions.assertEquals(0, extraCommands.getExitCode());
extraCommands =
container.execInContainer(
"bash",
"-c",
"cd /tmp/seatunnel/plugins/oss/lib && curl -O "
+ HADOOP_ALIYUN_DOWNLOAD);
Assertions.assertEquals(0, extraCommands.getExitCode());
extraCommands =
container.execInContainer(
"bash",
"-c",
"cd /tmp/seatunnel/lib && curl -O " + OSS_SDK_DOWNLOAD);
Assertions.assertEquals(0, extraCommands.getExitCode());
extraCommands =
container.execInContainer(
"bash", "-c", "cd /tmp/seatunnel/lib && curl -O " + JDOM_DOWNLOAD);
Assertions.assertEquals(0, extraCommands.getExitCode());
extraCommands =
container.execInContainer(
"bash",
"-c",
"cd /tmp/seatunnel/lib && curl -O " + HADOOP_ALIYUN_DOWNLOAD);
Assertions.assertEquals(0, extraCommands.getExitCode());
};