incubator-xtable
incubator-xtable copied to clipboard
unable to create target format Delta with source format as Iceberg when the source table is on S3
I followed the documentation "Creating your first interoperable table", able to build the utilities-0.1.0-SNAPSHOT-bundled.jar successfully.
Initiated a pyspark session using below command. Spark version is 3.4.1 running on Amazon EMR 6.14
pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"
Create an Iceberg table using below commands:
data =[("James","Smith","01012020","M","3000"), ("Michael","","02012021","M","4000"), ("Robert","Williams","03012023","M","4000"), ("Maria","Jones","04012024","F","4000"), ("Jen","Brown","05012025","F","-1")]
columns=["firstname","lastname","dob","gender","salary"]
df=spark.createDataFrame(data,columns)
spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (firstname string,lastname string,dob string,gender string,salary string) USING iceberg""");
df.writeTo("iceberg_table").append()
I see the data and metadata directory under the table name on s3.
Created my_config.yaml as mentioned in the documentation my_config.txt
executed below command and see failing with metadata/version-hint.text not available sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
2024-04-30 10:24:25 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync.
2024-04-30 10:24:25 WARN org.apache.iceberg.hadoop.HadoopTableOperations:325 - Error reading version hint file s3:/
I was looking into the documentation and understand if the source is Iceberg table I need to include catalog.yaml as well. But I am not sure what should be the value for catalogImpl in my case. Any insights on this would be very helpful.
catalogImpl: io.my.CatalogImpl catalogName: name catalogOptions: # all other options are passed through in a map key1: value1 key2: value2
Hi @rajender07! The error clarifies the problem. It says the version-hint.text
file was not found in the source table format (Iceberg). Do you see it on S3?
This is the metadata file on Iceberg side when used with a Hadoop
catalog. XTable would need this file to translate into the target Delta format.
The important part to understand here is that Iceberg needs a CATALOG to get started with. Your config currently connects Iceberg with a Hive catalog but I don't see any thrift URL or such here.
pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"
Can you instead use a Hadoop catalog & configure with something like this:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = s3a://your-bucket
@dipankarmazumdar , Thank you for looking into the issue. No, I do not version-hint.text this file on s3. when I looked into the documentation I understand this file is created while using Hadoop catalog. Since i was use Iceberg session catalog its not generated.
I will try as you suggested using Hadoop catalog and let you know the findings.
Could you please guide me to solve the issue while using Iceberg catalog. Should I use catalog.yaml file? if yes, I am confused on catalogName that should be used. FYI, I have added Thrift related properties under /etc/spark/conf/spark-default.conf and /etc/spark/conf/hive-site.xml. I have no issues connecting to my metastore and read/write data from it.
@rajender07 Which catalog are you using? If it is HMS, the implementation is org.apache.iceberg.hive.HiveCatalog
, the other args and name are going to be used to configure any required configurations for using this catalog like a uri
for your thrift server.
@dipankarmazumdar @the-other-tim-brown
I used Hadoop catalog as you mentioned and created a new Iceberg table. Now, I can see version-hint.text file as well.
However when I executed sync command it is with below error. Could you please assist how to resolve this issue. sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
2024-05-13 13:43:04 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync. Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object;
Here is my my_config.yaml
**sourceFormat: ICEBERG targetFormats:
- DELTA datasets:
- tableBasePath:
s3://
/ /x4_iceberg_table tableName: x4_iceberg_table**
@rajender07 - I am not really sure about this particular error. However, I tried reproducing this on my end and I was able to translate from ICEBERG to DELTA using the setup I suggested.
ICEBERG TABLE CONFIG & CREATION:
import pyspark
from pyspark.sql import SparkSession
import os
conf = (
pyspark.SparkConf()
.setAppName('app_name')
.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
.set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/new_iceberg/')
.set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")
spark.sql("CREATE TABLE hdfs_catalog.table1 (name string) USING iceberg")
spark.sql("INSERT INTO hdfs_catalog.table1 VALUES ('Alex'), ('Dipankar'), ('Mary')")
my_config.yaml
sourceFormat: ICEBERG
targetFormats:
- DELTA
datasets:
-
tableBasePath: s3://my-bucket/new_iceberg/table1/
tableDataPath: s3://my-bucket/new_iceberg/table1/data
tableName: table1
Run Sync
java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml
@rajender07 Which catalog are you using? If it is HMS, the implementation is
org.apache.iceberg.hive.HiveCatalog
, the other args and name are going to be used to configure any required configurations for using this catalog like auri
for your thrift server. @the-other-tim-brown referring this when I'm using hive catalog and passing catalogImpl: org.apache.iceberg.hive.HiveCatalog I'm getting java.lang.NoSuchMethodException: Cannot find constructor for interface org,apache.iceberg.catalog.Catalog while if i use 'org.apache.iceberg.hadoop.HadoopCatalog' iam getting no such error. Is there anything else we need to implement if we are using hive Catalog for our iceberg tables?
@rajender07 - LMK if you were able to get past the error with the recommendation.
@rajender07 Can you pull the latest master try again ? This is the fix. https://github.com/apache/incubator-xtable/pull/441
@vinishjail97, Thank you. I will test the fix today and share an update.
Creating empty _delta_log dir and erroring out.
--Config File sourceFormat: ICEBERG targetFormats:
- DELTA datasets:
- tableBasePath: s3://<>/prod/orders tableDataPath: s3://<>/prod/orders/data tableName: orders namespace: prod.db
--
java -jar /Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig /Users/satyak/iceberg/demo/xtable/s3_orders_ice_delta.yaml
WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.
2024-06-26 15:10:21 INFO org.apache.xtable.utilities.RunSync:148 - Running sync for basePath s3://<>/prod/orders for following table formats [DELTA]
2024-06-26 15:10:22 WARN org.apache.hadoop.util.NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2024-06-26 15:10:22 WARN org.apache.spark.util.Utils:72 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2024-06-26 15:10:24 WARN org.apache.hadoop.metrics2.impl.MetricsConfig:136 - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2024-06-26 15:10:25 WARN org.apache.hadoop.fs.s3a.SDKV2Upgrade:39 - Directly referencing AWS SDK V1 credential provider com.amazonaws.auth.DefaultAWSCredentialsProviderChain. AWS SDK V1 credential providers will be removed once S3A is upgraded to SDK V2
2024-06-26 15:10:25 INFO org.apache.spark.sql.delta.storage.DelegatingLogStore:60 - LogStore LogStoreAdapter(io.delta.storage.S3SingleDriverLogStore)
is used for scheme s3
2024-06-26 15:10:26 INFO org.apache.spark.sql.delta.DeltaLog:60 - Creating initial snapshot without metadata, because the directory is empty
2024-06-26 15:10:27 INFO org.apache.spark.sql.delta.InitialSnapshot:60 - [tableId=9f0c6a5d-2170-4167-b464-ec54fee685c3] Created snapshot InitialSnapshot(path=s3://ambaricloudsatya/prod/orders/data/_delta_log, version=-1, metadata=Metadata(e3727e72-0eda-476e-8cd7-bf7f85269529,null,null,Format(parquet,Map()),null,List(),Map(),Some(1719432627880)), logSegment=LogSegment(s3://ambaricloudsatya/prod/orders/data/_delta_log,-1,List(),None,-1), checksumOpt=None)
2024-06-26 15:10:28 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object;
at org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:282)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:326)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:427)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:545)
at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:320)
at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104)
at org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:84)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:94)
at org.apache.xtable.iceberg.IcebergTableManager.lambda$getTable$1(IcebergTableManager.java:58)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.xtable.iceberg.IcebergTableManager.getTable(IcebergTableManager.java:58)
at org.apache.xtable.iceberg.IcebergConversionSource.initSourceTable(IcebergConversionSource.java:81)
at org.apache.xtable.iceberg.IcebergConversionSource.getSourceTable(IcebergConversionSource.java:60)
at org.apache.xtable.iceberg.IcebergConversionSource.getCurrentSnapshot(IcebergConversionSource.java:121)
at org.apache.xtable.spi.extractor.ExtractFromSource.extractSnapshot(ExtractFromSource.java:38)
at org.apache.xtable.conversion.ConversionController.syncSnapshot(ConversionController.java:183)
at org.apache.xtable.conversion.ConversionController.sync(ConversionController.java:121)
at org.apache.xtable.utilities.RunSync.main(RunSync.java:169)