incubator-xtable icon indicating copy to clipboard operation
incubator-xtable copied to clipboard

unable to create target format Delta with source format as Iceberg when the source table is on S3

Open rajender07 opened this issue 9 months ago • 11 comments

I followed the documentation "Creating your first interoperable table", able to build the utilities-0.1.0-SNAPSHOT-bundled.jar successfully.

Initiated a pyspark session using below command. Spark version is 3.4.1 running on Amazon EMR 6.14

pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"

Create an Iceberg table using below commands:

data =[("James","Smith","01012020","M","3000"), ("Michael","","02012021","M","4000"), ("Robert","Williams","03012023","M","4000"), ("Maria","Jones","04012024","F","4000"), ("Jen","Brown","05012025","F","-1")]

columns=["firstname","lastname","dob","gender","salary"]

df=spark.createDataFrame(data,columns)

spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (firstname string,lastname string,dob string,gender string,salary string) USING iceberg""");

df.writeTo("iceberg_table").append()

I see the data and metadata directory under the table name on s3.

Created my_config.yaml as mentioned in the documentation my_config.txt

executed below command and see failing with metadata/version-hint.text not available sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml

2024-04-30 10:24:25 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync. 2024-04-30 10:24:25 WARN org.apache.iceberg.hadoop.HadoopTableOperations:325 - Error reading version hint file s3:///iceberg_table_1/metadata/version-hint.text java.io.FileNotFoundException: No such file or directory: s3:////iceberg_table_1/metadata/version-hint.text at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3801) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT] at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3652) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT] at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5288) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT] at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$executeOpen$6(S3AFileSystem.java:1578) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT] at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT]

rajender07 avatar Apr 30 '24 15:04 rajender07

I was looking into the documentation and understand if the source is Iceberg table I need to include catalog.yaml as well. But I am not sure what should be the value for catalogImpl in my case. Any insights on this would be very helpful.

catalogImpl: io.my.CatalogImpl catalogName: name catalogOptions: # all other options are passed through in a map key1: value1 key2: value2

rajender07 avatar Apr 30 '24 18:04 rajender07

Hi @rajender07! The error clarifies the problem. It says the version-hint.text file was not found in the source table format (Iceberg). Do you see it on S3? This is the metadata file on Iceberg side when used with a Hadoop catalog. XTable would need this file to translate into the target Delta format.

The important part to understand here is that Iceberg needs a CATALOG to get started with. Your config currently connects Iceberg with a Hive catalog but I don't see any thrift URL or such here. pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"

Can you instead use a Hadoop catalog & configure with something like this:

spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = s3a://your-bucket

dipankarmazumdar avatar May 10 '24 15:05 dipankarmazumdar

@dipankarmazumdar , Thank you for looking into the issue. No, I do not version-hint.text this file on s3. when I looked into the documentation I understand this file is created while using Hadoop catalog. Since i was use Iceberg session catalog its not generated.

I will try as you suggested using Hadoop catalog and let you know the findings.

Could you please guide me to solve the issue while using Iceberg catalog. Should I use catalog.yaml file? if yes, I am confused on catalogName that should be used. FYI, I have added Thrift related properties under /etc/spark/conf/spark-default.conf and /etc/spark/conf/hive-site.xml. I have no issues connecting to my metastore and read/write data from it.

rajender07 avatar May 11 '24 13:05 rajender07

@rajender07 Which catalog are you using? If it is HMS, the implementation is org.apache.iceberg.hive.HiveCatalog, the other args and name are going to be used to configure any required configurations for using this catalog like a uri for your thrift server.

the-other-tim-brown avatar May 13 '24 02:05 the-other-tim-brown

@dipankarmazumdar @the-other-tim-brown

I used Hadoop catalog as you mentioned and created a new Iceberg table. Now, I can see version-hint.text file as well.

However when I executed sync command it is with below error. Could you please assist how to resolve this issue. sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml

2024-05-13 13:43:04 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync. Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object;

Here is my my_config.yaml

**sourceFormat: ICEBERG targetFormats:

  • DELTA datasets:
  • tableBasePath: s3:////x4_iceberg_table tableName: x4_iceberg_table**

rajender07 avatar May 14 '24 00:05 rajender07

@rajender07 - I am not really sure about this particular error. However, I tried reproducing this on my end and I was able to translate from ICEBERG to DELTA using the setup I suggested.

ICEBERG TABLE CONFIG & CREATION:

import pyspark
from pyspark.sql import SparkSession
import os
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
        .set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/new_iceberg/')
        .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")
spark.sql("CREATE TABLE hdfs_catalog.table1 (name string) USING iceberg")
spark.sql("INSERT INTO hdfs_catalog.table1 VALUES ('Alex'), ('Dipankar'), ('Mary')")

my_config.yaml

sourceFormat: ICEBERG
targetFormats:
  - DELTA
datasets:
  -
    tableBasePath: s3://my-bucket/new_iceberg/table1/
    tableDataPath: s3://my-bucket/new_iceberg/table1/data
    tableName: table1

Run Sync

java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml

dipankarmazumdar avatar May 14 '24 15:05 dipankarmazumdar

@rajender07 Which catalog are you using? If it is HMS, the implementation is org.apache.iceberg.hive.HiveCatalog, the other args and name are going to be used to configure any required configurations for using this catalog like a uri for your thrift server. @the-other-tim-brown referring this when I'm using hive catalog and passing catalogImpl: org.apache.iceberg.hive.HiveCatalog I'm getting java.lang.NoSuchMethodException: Cannot find constructor for interface org,apache.iceberg.catalog.Catalog while if i use 'org.apache.iceberg.hadoop.HadoopCatalog' iam getting no such error. Is there anything else we need to implement if we are using hive Catalog for our iceberg tables?

amnchauhan avatar May 16 '24 14:05 amnchauhan

@rajender07 - LMK if you were able to get past the error with the recommendation.

dipankarmazumdar avatar May 23 '24 19:05 dipankarmazumdar

@rajender07 Can you pull the latest master try again ? This is the fix. https://github.com/apache/incubator-xtable/pull/441

vinishjail97 avatar Jun 05 '24 05:06 vinishjail97

@vinishjail97, Thank you. I will test the fix today and share an update.

rajender07 avatar Jun 05 '24 13:06 rajender07

Creating empty _delta_log dir and erroring out.

--Config File sourceFormat: ICEBERG targetFormats:

  • DELTA datasets:
  • tableBasePath: s3://<>/prod/orders tableDataPath: s3://<>/prod/orders/data tableName: orders namespace: prod.db

-- java -jar /Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig /Users/satyak/iceberg/demo/xtable/s3_orders_ice_delta.yaml WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features. 2024-06-26 15:10:21 INFO org.apache.xtable.utilities.RunSync:148 - Running sync for basePath s3://<>/prod/orders for following table formats [DELTA] 2024-06-26 15:10:22 WARN org.apache.hadoop.util.NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2024-06-26 15:10:22 WARN org.apache.spark.util.Utils:72 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 2024-06-26 15:10:24 WARN org.apache.hadoop.metrics2.impl.MetricsConfig:136 - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties 2024-06-26 15:10:25 WARN org.apache.hadoop.fs.s3a.SDKV2Upgrade:39 - Directly referencing AWS SDK V1 credential provider com.amazonaws.auth.DefaultAWSCredentialsProviderChain. AWS SDK V1 credential providers will be removed once S3A is upgraded to SDK V2 2024-06-26 15:10:25 INFO org.apache.spark.sql.delta.storage.DelegatingLogStore:60 - LogStore LogStoreAdapter(io.delta.storage.S3SingleDriverLogStore) is used for scheme s3 2024-06-26 15:10:26 INFO org.apache.spark.sql.delta.DeltaLog:60 - Creating initial snapshot without metadata, because the directory is empty 2024-06-26 15:10:27 INFO org.apache.spark.sql.delta.InitialSnapshot:60 - [tableId=9f0c6a5d-2170-4167-b464-ec54fee685c3] Created snapshot InitialSnapshot(path=s3://ambaricloudsatya/prod/orders/data/_delta_log, version=-1, metadata=Metadata(e3727e72-0eda-476e-8cd7-bf7f85269529,null,null,Format(parquet,Map()),null,List(),Map(),Some(1719432627880)), logSegment=LogSegment(s3://ambaricloudsatya/prod/orders/data/_delta_log,-1,List(),None,-1), checksumOpt=None) 2024-06-26 15:10:28 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync. Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object; at org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147) at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282) at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435) at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122) at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468) at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404) at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:282) at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:326) at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:427) at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:545) at java.base/java.io.DataInputStream.read(DataInputStream.java:149) at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181) at java.base/java.io.BufferedReader.fill(BufferedReader.java:161) at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326) at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392) at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:320) at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104) at org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:84) at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:94) at org.apache.xtable.iceberg.IcebergTableManager.lambda$getTable$1(IcebergTableManager.java:58) at java.base/java.util.Optional.orElseGet(Optional.java:369) at org.apache.xtable.iceberg.IcebergTableManager.getTable(IcebergTableManager.java:58) at org.apache.xtable.iceberg.IcebergConversionSource.initSourceTable(IcebergConversionSource.java:81) at org.apache.xtable.iceberg.IcebergConversionSource.getSourceTable(IcebergConversionSource.java:60) at org.apache.xtable.iceberg.IcebergConversionSource.getCurrentSnapshot(IcebergConversionSource.java:121) at org.apache.xtable.spi.extractor.ExtractFromSource.extractSnapshot(ExtractFromSource.java:38) at org.apache.xtable.conversion.ConversionController.syncSnapshot(ConversionController.java:183) at org.apache.xtable.conversion.ConversionController.sync(ConversionController.java:121) at org.apache.xtable.utilities.RunSync.main(RunSync.java:169)

ambaricloud avatar Jun 26 '24 20:06 ambaricloud