aws-sdk-pandas icon indicating copy to clipboard operation
aws-sdk-pandas copied to clipboard

awswrangler.athena.to_iceberg not supporting to synchronous/parallel lambda instances.

Open B161851 opened this issue 1 year ago • 14 comments

Describe the bug

wr.athena.to_iceberg(
            df=df,
            database='test_database',
            table='my_table2',
            table_location='s3://bucket-testing1/my_table2/',
            temp_path=f's3://bucket-testing1/temp_path/',
            keep_files=True
        )

For parallel writing, If keep_files=True then it is resulting the duplicates and I tried appending the nano timestamp to the temporary path so it's unique but now I have "ICEBERG_COMMIT_ERROR"
If keep_files=False then it is giving "HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error" when ingesting iceberg data in parallel and we observed if keep_files=False then in that library entire temp_path was removed from the s3 and getting the above error.

It's not supporting to write to the iceberg table using wrangler from lambda. So, how can we overcome the above issues in lambda parallel writing to iceberg table using awswrangler.

How to Reproduce

wr.athena.to_iceberg(
            df=df,
            database='test_database',
            table='my_table2',
            table_location='s3://bucket-testing1/my_table2/',
            temp_path=f's3://bucket-testing1/temp_path/',
            keep_files=False
        )

we observed if keep_files=False then in that library entire temp_path was removed from the s3 and resulted "HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error" if you remove the particular parquet file from the temp_path instead of removing entire temp_path from s3, I think might give the above error.

Expected behavior

No response

Your project

No response

Screenshots

No response

OS

Win

Python version

3.8

AWS SDK for pandas version

12

Additional context

No response

B161851 avatar Jan 31 '24 13:01 B161851

Hi @B161851 , if you are inserting concurrently, you ned to make sure temp_path is unique and empty for each run. Also, when you got ICEBERG_COMMIT_ERROR , did the table you are inserting in exist? It might be a race condition due to multiple runs trying to create a table. Checking.

kukushking avatar Feb 12 '24 11:02 kukushking

@kukushking hi, facing with same, even with two concurrent writers (lambdas). Table exists. Trying to perform upsert (MERGE INTO) operation. In my case upsert happens even on diffrent partitions (different parts of a table), so I don't think it's a race condition.

 tmp_table_name = f"my_table_{uuid.uuid4()}".replace("-", "_")
 tmp_path = f"s3://my_bucket/{tmp_table_name}"
 wr.athena.to_iceberg(
                    df=processed_df,
                    database='my_database',
                    table="my_table",
                    table_location="s3://my_bucket/my_table",
                    temp_path=tmp_path,
                    partition_cols=["col1", "col2"],
                    merge_cols=["col1","col2","col3"],
                    keep_files=False
                )

ICEBERG_COMMIT_ERROR: Failed to commit Iceberg update to table

Salatich avatar Feb 13 '24 18:02 Salatich

Just wanted to bump this issue up as well.

Particular use case is uses upsert very similar to @Salatich's last comment. Infrastructure is MSK Trigger on a Lambda.

Have had to lock lambda concurrency to 1 to avoid the ICEBERG_COMMIT_ERROR errors.

vibe avatar Feb 20 '24 20:02 vibe

Marking this issue as stale due to inactivity. This helps our maintainers find and focus on the active issues. If this issue receives no comments in the next 7 days it will automatically be closed.

github-actions[bot] avatar Apr 20 '24 21:04 github-actions[bot]

bump

Salatich avatar Apr 24 '24 13:04 Salatich

bump. addressing this feature will be very helpful

channingdata avatar May 03 '24 04:05 channingdata

All, looks like this is service-side issue. Please raise a support request.

@ChanTheDataExplorer @Salatich @vibe is it also HIVE_CANNOT_OPEN_SPLIT NoSuchKey or any other exception code? This error code may correspond to a multiple different root causes. Any additional info would be appreciated ie what is the size of your dataset? what is the key that is causing an issue and corresponding data frame? Does this consistently reproduce?

kukushking avatar May 13 '24 09:05 kukushking

in my side it is just ICEBERG_COMMIT_ERROR

channingdata avatar May 13 '24 10:05 channingdata

We're seeing a lot of ICEBERG_COMMIT_ERROR using the latest awswrangler with Athena and Glue when attempting parallel writes. Changing partition sizes so that no writes are ever merging into the same partition does not alleviate the problem. Little documentation on the error: https://repost.aws/knowledge-center/athena-iceberg-table-error

peterklingelhofer avatar May 21 '24 13:05 peterklingelhofer

When working with parallel writing to an Iceberg table using awswrangler in AWS Lambda, there are some specific considerations and configurations to handle to avoid issues like duplicates, ICEBERG_COMMIT_ERROR, and HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error. Suggestions to overcome:-

  1. Handling Duplicates Appending a nano timestamp to the temporary path can help make each parallel write unique, but it may still lead to ICEBERG_COMMIT_ERROR due to conflicts in committing changes to the table metadata.
  2. Keep Files Configuration If keep_files=False, the library removes the entire temp_path, which may cause the HIVE_CANNOT_OPEN_SPLIT NoSuchKey Error when subsequent operations try to access files that no longer exist.

Best approach:- 1.Generate Unique Temporary Paths: Use a unique identifier for each Lambda invocation to ensure that temporary paths do not conflict. 2. Handle File Cleanup Manually: Instead of relying on the keep_files parameter, manage the cleanup of temporary files manually after the write operation is successful. Sample code snippet:-

import boto3
import awswrangler as wr
import time
import uuid

def write_to_iceberg(df, database, table, table_location, bucket):
    unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
    
    try:
        wr.athena.to_iceberg(
            df=df,
            database=database,
            table=table,
            table_location=table_location,
            temp_path=unique_temp_path,
            keep_files=True  # Keep files to avoid NoSuchKey errors
        )
    finally:
        # Clean up temporary files manually
        s3 = boto3.resource('s3')
        bucket = s3.Bucket(bucket)
        for obj in bucket.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
            s3.Object(bucket.name, obj.key).delete()

# Example usage
df =  # Your DataFrame
write_to_iceberg(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')

Hope this helps, do let me know, if there are similar issues Thanks

Siddharth-Latthe-07 avatar Jul 15 '24 13:07 Siddharth-Latthe-07

@Siddharth-Latthe-07 want to check with you, your recommendations do not solve the ICEBERG_COMMIT_ERROR which in my case each partition is being written to by a different parallel process. Do you have any ideas on how to resolve such case?

Also @peterklingelhofer / @Salatich did you get any such luck to this issue.

Acehaidrey avatar Aug 05 '24 15:08 Acehaidrey

@Acehaidrey Here are some of the additional strategies you can look for:-

  1. Coordination with Commit Locks:- One approach to avoid commit conflicts is to implement a coordination mechanism using locks. Sample code:-
import boto3
import awswrangler as wr
import time
import uuid
import json

s3 = boto3.client('s3')

def acquire_lock(lock_key):
    try:
        s3.put_object(Bucket='your-lock-bucket', Key=lock_key, Body='')
        return True
    except Exception as e:
        return False

def release_lock(lock_key):
    s3.delete_object(Bucket='your-lock-bucket', Key=lock_key)

def write_to_iceberg(df, database, table, table_location, bucket):
    unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
    lock_key = f'iceberg_commit_lock/{table}'

    # Try to acquire the lock
    while not acquire_lock(lock_key):
        time.sleep(0.1)  # Wait before retrying

    try:
        wr.athena.to_iceberg(
            df=df,
            database=database,
            table=table,
            table_location=table_location,
            temp_path=unique_temp_path,
            keep_files=True  # Keep files to avoid NoSuchKey errors
        )
    finally:
        # Clean up temporary files manually
        s3_resource = boto3.resource('s3')
        bucket_resource = s3_resource.Bucket(bucket)
        for obj in bucket_resource.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
            s3_resource.Object(bucket_resource.name, obj.key).delete()

        # Release the lock
        release_lock(lock_key)

# Example usage
df =  # Your DataFrame
write_to_iceberg(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')
  1. Using AWS Glue for Coordination:- (Create a Glue job that reads data from S3 and writes to Iceberg. Schedule the Glue job or trigger it based on S3 events.)

  2. Use Iceberg's Built-In Conflict Resolution:- Apache Iceberg provides built-in mechanisms to handle conflicts

  3. Implementing Retries with Exponential Backoff:- Add retries with exponential backoff sample code:-

import boto3
import awswrangler as wr
import time
import uuid
from botocore.exceptions import ClientError

def exponential_backoff(retries):
    return min(60, (2 ** retries))

def write_to_iceberg_with_retries(df, database, table, table_location, bucket, max_retries=5):
    unique_temp_path = f's3://{bucket}/temp_path_{uuid.uuid4()}_{int(time.time() * 1e9)}/'
    retries = 0

    while retries < max_retries:
        try:
            wr.athena.to_iceberg(
                df=df,
                database=database,
                table=table,
                table_location=table_location,
                temp_path=unique_temp_path,
                keep_files=True
            )
            break
        except ClientError as e:
            retries += 1
            time.sleep(exponential_backoff(retries))
            if retries == max_retries:
                raise e
        finally:
            # Clean up temporary files manually
            s3_resource = boto3.resource('s3')
            bucket_resource = s3_resource.Bucket(bucket)
            for obj in bucket_resource.objects.filter(Prefix=unique_temp_path.replace(f's3://{bucket}/', '')):
                s3_resource.Object(bucket_resource.name, obj.key).delete()

# Example usage
df =  # Your DataFrame
write_to_iceberg_with_retries(df, 'test_database', 'my_table2', 's3://bucket-testing1/my_table2/', 'bucket-testing1')

Combining unique temporary paths with proper locking mechanisms, using AWS Glue for coordination, leveraging Iceberg's conflict resolution, and implementing retries can help overcome the ICEBERG_COMMIT_ERROR and other issues during parallel writes. Hope this helps, Thanks

Siddharth-Latthe-07 avatar Aug 05 '24 15:08 Siddharth-Latthe-07

Hi @Acehaidrey , unfortunately no luck. In my approach - I use unique names for temp_paths and update different partitions (so, there is no race I believe). Also, the table exists, so there is no race condition on creating a table.

I'm using exponential backoff - it kind of helps, but I see retry warnings in my lambdas constantly with ICEBERG_COMMIT_ERROR error.

Also, this statement confuses me (from https://repost.aws/knowledge-center/athena-iceberg-table-error): image Does it mean that iceberg doesn't support parallel writes?

Salatich avatar Aug 05 '24 16:08 Salatich

Thank you @Siddharth-Latthe-07 . Think this will slow down the program indeed. But seems the case is to limit the parallelism? which isnt the solution we want to go towards :/

Acehaidrey avatar Aug 07 '24 03:08 Acehaidrey