aws-sdk-pandas
aws-sdk-pandas copied to clipboard
awswrangler.athena.to_iceberg not supporting to synchronous/parallel lambda instances.
Describe the bug
wr.athena.to_iceberg(
df=df,
database='test_database',
table='my_table2',
table_location='s3://bucket-testing1/my_table2/',
temp_path=f's3://bucket-testing1/temp_path/',
keep_files=True
)
For parallel writing, If keep_files=True then it is resulting the duplicates and I tried appending the nano timestamp to the temporary path so it's unique but now I have "ICEBERG_COMMIT_ERROR"
If keep_files=False then it is giving "HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error" when ingesting iceberg data in parallel
and we observed if keep_files=False then in that library entire temp_path was removed from the s3 and getting the above error.
It's not supporting to write to the iceberg table using wrangler from lambda. So, how can we overcome the above issues in lambda parallel writing to iceberg table using awswrangler.
How to Reproduce
wr.athena.to_iceberg(
df=df,
database='test_database',
table='my_table2',
table_location='s3://bucket-testing1/my_table2/',
temp_path=f's3://bucket-testing1/temp_path/',
keep_files=False
)
we observed if keep_files=False then in that library entire temp_path was removed from the s3 and resulted "HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error" if you remove the particular parquet file from the temp_path instead of removing entire temp_path from s3, I think might give the above error.
Expected behavior
No response
Your project
No response
Screenshots
No response
OS
Win
Python version
3.8
AWS SDK for pandas version
12
Additional context
No response
Hi @B161851 , if you are inserting concurrently, you ned to make sure temp_path is unique and empty for each run. Also, when you got ICEBERG_COMMIT_ERROR , did the table you are inserting in exist? It might be a race condition due to multiple runs trying to create a table. Checking.
@kukushking hi, facing with same, even with two concurrent writers (lambdas). Table exists. Trying to perform upsert (MERGE INTO) operation. In my case upsert happens even on diffrent partitions (different parts of a table), so I don't think it's a race condition.
tmp_table_name = f"my_table_{uuid.uuid4()}".replace("-", "_")
tmp_path = f"s3://my_bucket/{tmp_table_name}"
wr.athena.to_iceberg(
df=processed_df,
database='my_database',
table="my_table",
table_location="s3://my_bucket/my_table",
temp_path=tmp_path,
partition_cols=["col1", "col2"],
merge_cols=["col1","col2","col3"],
keep_files=False
)
ICEBERG_COMMIT_ERROR: Failed to commit Iceberg update to table
Just wanted to bump this issue up as well.
Particular use case is uses upsert very similar to @Salatich's last comment.
Infrastructure is MSK Trigger on a Lambda.
Have had to lock lambda concurrency to 1 to avoid the ICEBERG_COMMIT_ERROR errors.
Marking this issue as stale due to inactivity. This helps our maintainers find and focus on the active issues. If this issue receives no comments in the next 7 days it will automatically be closed.
bump
bump. addressing this feature will be very helpful
All, looks like this is service-side issue. Please raise a support request.
@ChanTheDataExplorer @Salatich @vibe is it also HIVE_CANNOT_OPEN_SPLIT NoSuchKey or any other exception code? This error code may correspond to a multiple different root causes. Any additional info would be appreciated ie what is the size of your dataset? what is the key that is causing an issue and corresponding data frame? Does this consistently reproduce?
in my side it is just ICEBERG_COMMIT_ERROR
We're seeing a lot of ICEBERG_COMMIT_ERROR using the latest awswrangler with Athena and Glue when attempting parallel writes. Changing partition sizes so that no writes are ever merging into the same partition does not alleviate the problem. Little documentation on the error: https://repost.aws/knowledge-center/athena-iceberg-table-error
When working with parallel writing to an Iceberg table using awswrangler in AWS Lambda, there are some specific considerations and configurations to handle to avoid issues like duplicates, ICEBERG_COMMIT_ERROR, and HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error.
Suggestions to overcome:-
- Handling Duplicates Appending a nano timestamp to the temporary path can help make each parallel write unique, but it may still lead to ICEBERG_COMMIT_ERROR due to conflicts in committing changes to the table metadata.
- Keep Files Configuration If keep_files=False, the library removes the entire temp_path, which may cause the HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error when subsequent operations try to access files that no longer exist.
Best approach:- 1.Generate Unique Temporary Paths: Use a unique identifier for each Lambda invocation to ensure that temporary paths do not conflict. 2. Handle File Cleanup Manually: Instead of relying on the keep_files parameter, manage the cleanup of temporary files manually after the write operation is successful. Sample code snippet:-
import boto3
import awswrangler as wr
import time
import uuid
def write_to_iceberg(df, database, table, table_location, bucket):
unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
try:
wr.athena.to_iceberg(
df=df,
database=database,
table=table,
table_location=table_location,
temp_path=unique_temp_path,
keep_files=True # Keep files to avoid NoSuchKey errors
)
finally:
# Clean up temporary files manually
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket)
for obj in bucket.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
s3.Object(bucket.name, obj.key).delete()
# Example usage
df = # Your DataFrame
write_to_iceberg(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')
Hope this helps, do let me know, if there are similar issues Thanks
@Siddharth-Latthe-07 want to check with you, your recommendations do not solve the ICEBERG_COMMIT_ERROR which in my case each partition is being written to by a different parallel process. Do you have any ideas on how to resolve such case?
Also @peterklingelhofer / @Salatich did you get any such luck to this issue.
@Acehaidrey Here are some of the additional strategies you can look for:-
- Coordination with Commit Locks:- One approach to avoid commit conflicts is to implement a coordination mechanism using locks. Sample code:-
import boto3
import awswrangler as wr
import time
import uuid
import json
s3 = boto3.client('s3')
def acquire_lock(lock_key):
try:
s3.put_object(Bucket='your-lock-bucket', Key=lock_key, Body='')
return True
except Exception as e:
return False
def release_lock(lock_key):
s3.delete_object(Bucket='your-lock-bucket', Key=lock_key)
def write_to_iceberg(df, database, table, table_location, bucket):
unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
lock_key = f'iceberg_commit_lock/{table}'
# Try to acquire the lock
while not acquire_lock(lock_key):
time.sleep(0.1) # Wait before retrying
try:
wr.athena.to_iceberg(
df=df,
database=database,
table=table,
table_location=table_location,
temp_path=unique_temp_path,
keep_files=True # Keep files to avoid NoSuchKey errors
)
finally:
# Clean up temporary files manually
s3_resource = boto3.resource('s3')
bucket_resource = s3_resource.Bucket(bucket)
for obj in bucket_resource.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
s3_resource.Object(bucket_resource.name, obj.key).delete()
# Release the lock
release_lock(lock_key)
# Example usage
df = # Your DataFrame
write_to_iceberg(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')
-
Using AWS Glue for Coordination:- (Create a Glue job that reads data from S3 and writes to Iceberg. Schedule the Glue job or trigger it based on S3 events.)
-
Use Iceberg's Built-In Conflict Resolution:- Apache Iceberg provides built-in mechanisms to handle conflicts
-
Implementing Retries with Exponential Backoff:- Add retries with exponential backoff sample code:-
import boto3
import awswrangler as wr
import time
import uuid
from botocore.exceptions import ClientError
def exponential_backoff(retries):
return min(60, (2 ** retries))
def write_to_iceberg_with_retries(df, database, table, table_location, bucket, max_retries=5):
unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
retries = 0
while retries < max_retries:
try:
wr.athena.to_iceberg(
df=df,
database=database,
table=table,
table_location=table_location,
temp_path=unique_temp_path,
keep_files=True
)
break
except ClientError as e:
retries += 1
time.sleep(exponential_backoff(retries))
if retries == max_retries:
raise e
finally:
# Clean up temporary files manually
s3_resource = boto3.resource('s3')
bucket_resource = s3_resource.Bucket(bucket)
for obj in bucket_resource.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
s3_resource.Object(bucket_resource.name, obj.key).delete()
# Example usage
df = # Your DataFrame
write_to_iceberg_with_retries(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')
Combining unique temporary paths with proper locking mechanisms, using AWS Glue for coordination, leveraging Iceberg's conflict resolution, and implementing retries can help overcome the ICEBERG_COMMIT_ERROR and other issues during parallel writes. Hope this helps, Thanks
Hi @Acehaidrey , unfortunately no luck. In my approach - I use unique names for temp_paths and update different partitions (so, there is no race I believe). Also, the table exists, so there is no race condition on creating a table.
I'm using exponential backoff - it kind of helps, but I see retry warnings in my lambdas constantly with ICEBERG_COMMIT_ERROR error.
Also, this statement confuses me (from https://repost.aws/knowledge-center/athena-iceberg-table-error):
Does it mean that iceberg doesn't support parallel writes?
Thank you @Siddharth-Latthe-07 . Think this will slow down the program indeed. But seems the case is to limit the parallelism? which isnt the solution we want to go towards :/