PyAirbyte icon indicating copy to clipboard operation
PyAirbyte copied to clipboard

Shopify<>GCS (State Cache: Postgres): Frozen After Completion

Open Guilherme-B opened this issue 11 months ago • 2 comments

I'm running PyAirbyte to sync a Shopify Source to a Google Cloud Storage Destination using a Postgres Cache as a State Cache. The process runs smoothly and takes around 15 minutes in total, as depicted by the image below:

Image

The odd thing is, the process never completes after this. It just gets stuck and the Python process never ends.

Below are the code and the logs:

Code:

from __future__ import annotations

import datetime

import airbyte as ab
from airbyte.caches import PostgresCache

def get_shopify_source() -> ab.Source:
    return ab.get_source(
        "source-shopify",
        config={
            "shop": "",
            "credentials": {
                "auth_method": "api_password",
                "api_password": "",
            },
        },
        # docker_image=True,
        streams=[
            "blogs",
            "collections",
            "collects",
            "countries",
            "custom_collections",
            "customer_address",
            "customers",
            "discount_codes",
            "draft_orders",
            "fulfillment_orders",
            "inventory_items",
            "inventory_levels",
            "locations",
            "metafield_collections",
            "metafield_customers",
            "metafield_products",
            "metafield_shops",
            "metafield_smart_collections",
            "order_risks",
            "orders",
            "pages",
            "price_rules",
            "product_images",
            "product_variants",
            "products",
            # "products_graph_ql",
            "shop",
            "smart_collections",
            "tender_transactions",
            "transactions",
        ],
    )

def get_gcs_destination() -> ab.Destination:
    # Destination configuration
    gcs_config = {
        "gcs_bucket_name": "",
        "gcs_bucket_path": "",
        "gcs_bucket_region": "",
        "credential": {
            "credential_type": "HMAC_KEY",
            "hmac_key_access_id": "",
            "hmac_key_secret": "",
        },
        "format": {
            "format_type": "Parquet",
            "compression_codec": "UNCOMPRESSED",
            "block_size_mb": 128,
        },
    }

    destination = ab.get_destination(
        name="destination-gcs",
        config=gcs_config,
        install_if_missing=True,
        docker_image=False,
    )

    return destination


def main() -> None:
    """Test writing from the source to the destination."""
    source = get_shopify_source()
    source.check()
    destination = get_gcs_destination()
    destination.check()

    # Define a Postgres Cache and pass the necessary configuration
    pg_state_cache = PostgresCache(
        host="localhost",
        port=5432,
        username="X",
        password="X",
        database="pyairbyte_demo",
        schema_name="airbyte_internal",
    )

    destination._name = "client_" + destination.name
    source._name = "client_" + source.name

    write_result: ab.WriteResult = destination.write(
        source_data=source,
        state_cache=pg_state_cache,
        cache=False,
    )

    print(
        f"Completed writing {write_result.processed_records:,} records "
        f"to destination at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.",
    )


if __name__ == "__main__":
    main()

Guilherme-B avatar Jan 21 '25 13:01 Guilherme-B

@Guilherme-B - Thanks for raising this. I don't see anything wrong with your setup. Are you still running into this or did you find a workaround?

I see there are a lot of log messages without descriptions in the destination log. That seems to be internal to the destination though, and not something we can necessarily debug from PyAirbyte side.

The only theories I can suggest would be:

(1) That GCS is locked somehow on finalizing. If so, you would see the docker image for GCS still running. (2) The GCS destination finishes but PyAirbyte for some reason isn't properly shutting down threads.

It might be worthwhile trying to tail destination logs via docker, and/or trying with another destination to see if it is a connector-specific or PyAirbyte-specific issue.

Let me know if any of this helps or if you have any other clues/observations that might point us to a diagnosis. Thanks!

aaronsteers avatar Jan 24 '25 20:01 aaronsteers

Hey @aaronsteers !

I temporarily managed to find a workaround by syncing each stream separately. It solved the issue temporarily, although on some occasions, it still occurs. It might be the high number of streams as using a single Faker stream, works fine.

I will keep trying to debug the issue, might definitely be unclosed threads given the writing to destination logs as being successful!

Guilherme-B avatar Feb 04 '25 12:02 Guilherme-B