delta-rs icon indicating copy to clipboard operation
delta-rs copied to clipboard

Writing DeltaTable to Azure with partitioning fails and goes into endless loop w/o error message

Open stefnba opened this issue 2 years ago • 22 comments

Environment

Delta-rs version: 0.12.0

Binding: Python

Environment:

  • Cloud provider: Azure Storage Gen2
  • OS: Apple M1 Pro with Apple Sonoma (14.0)
  • Other: Python 3.10.11

Bug

What happened: I have a arrow dataset (~120m records, ~2.8GB as parquet files) with the following schema that I want to write to ADLS Gen2 partitioned by cols exchange_code and year:

schema = pa.schema(
    [
        pa.field("date", pa.date32()),
        pa.field("open", pa.float64()),
        pa.field("high", pa.float64()),
        pa.field("low", pa.float64()),
        pa.field("close", pa.float64()),
        pa.field("adjusted_close", pa.float64()),
        pa.field("volume", pa.int64()),
        pa.field("exchange_code", pa.string()),
        pa.field("security_code", pa.string()),
        pa.field("year", pa.int32()),
        pa.field("created_at", pa.timestamp("us")),
    ]
)

It works successfully when:

  • writing to local filesystem (takes about ~1min) using deltalake.write_deltalake
  • writing to Azure without any partitioning using deltalake.write_deltalake and arg storage_options
  • using pyarrow.dataset.write_dataset with partitions and adlfs.AzureBlobFileSystem as filesystem arg
  • writing only the first 1-5m records to Azure using deltalake.write_deltalake and above mentioned partitioning

But not when:

  • writing full dataset to Azure using deltalake.write_deltalake with above mentioned partitioning.

There is no error message - it just goes on and on forever w/o creating any dir or files and also no error message appears. In some cases it created 2-3 partition directories at the beginning but then nothing happens thereafter.

Column exchange_code has 15 distinct values and year ~10-80 distinct values depending on the exchange.

What you expected to happen: DeltaTable to be created successfully.

How to reproduce it:

Here is my code:

import os
import polars as pl
import pyarrow.dataset as ds
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
client = os.getenv("AZURE_CLIENT_ID")
tenant = os.getenv("AZURE_TENANT_ID")
secret = os.getenv("AZURE_CLIENT_SECRET")

storage_options = {"account_name": account_name, "client_id": client, "client_secret": secret, "tenant_id": tenant}

schema = pa.schema(
    [
        pa.field("date", pa.date32()),
        pa.field("open", pa.float64()),
        pa.field("high", pa.float64()),
        pa.field("low", pa.float64()),
        pa.field("close", pa.float64()),
        pa.field("adjusted_close", pa.float64()),
        pa.field("volume", pa.int64()),
        pa.field("exchange_code", pa.string()),
        pa.field("security_code", pa.string()),
        pa.field("year", pa.int32()),
        pa.field("created_at", pa.timestamp("us")),
    ]
)

dataset = ds.dataset(source="arrow_dataset", format="parquet", schema=schema)

write_deltalake(
    data=dataset.to_batches(),
    table_or_uri="abfs://temp/testDelta",
    storage_options=storage_options,
    schema=schema,
    partition_by=["exchange_code", "year"],
)

If the dataset is needed, please let me know.

More details: I have tried different options for args max_rows_per_file, max_rows_per_group, min_rows_per_group, max_open_files but also no success.

stefnba avatar Oct 25 '23 12:10 stefnba

@stefnba which PyArrow version are you using?

ion-elgreco avatar Nov 02 '23 08:11 ion-elgreco

PyArrow version 12 and just tried version 14 but same issue. Somehow I cannot install version 13 for some reason (need to investigate why when I have more time).

Fyi, when I use PyArrow directly to write a partitioned dataset to Azure, it works fine.

stefnba avatar Nov 02 '23 15:11 stefnba

@roeap it seems the issue was already flagged by @stefnba.

Since it works fine with PyArrow directly I would guess it's the delta filesystem handler. @stefnba can you confirm how you use pyarrow directly, did you create your own file system handler with the pyarrow filesystem?

ion-elgreco avatar Nov 09 '23 21:11 ion-elgreco

Hmm I'm guessing there's some sort of GIL deadlock. If there's some way to reliably repro, then it wouldn't be hard to attach lldb and confirm where it's happening. (We'd have to run with a development version of the library to get the symbols though).

Though what I don't get is creating a directory is a no-op here:

https://github.com/delta-io/delta-rs/blob/9b938308490a60808443ee36b4f7b2c3055a6c15/python/src/filesystem.rs#L86

But you are seeing partition directories being made?

wjones127 avatar Nov 10 '23 02:11 wjones127

@wjones127 maybe we can have a chat on how I could execute that debugging library, I have one pipeline this is occurring on, I could probably recreate the issue with dummy data.

Also check this discussion on slack: https://delta-users.slack.com/archives/C013LCAEB98/p1699482282569879?thread_ts=1699481918.955769&cid=C013LCAEB98

ion-elgreco avatar Nov 10 '23 11:11 ion-elgreco

@roeap it seems the issue was already flagged by @stefnba.

Since it works fine with PyArrow directly I would guess it's the delta filesystem handler. @stefnba can you confirm how you use pyarrow directly, did you create your own file system handler with the pyarrow filesystem?

@ion-elgreco Thanks for following up on this issue.

I'm on vacation and don't have access to our repo, so I can't copy paste the exact code that's working but it should be close to the following.

I'm using adlfs as an fsspec-compatible filesystem interface.

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

filesystem = AzureBlobFileSystem(account_name="my_account_name", anon=False)

# read local
input_ds = ds.dataset("path/to/local/ds")

# write to Azure using adlfs filesystem
ds.write_dataset(
	data=input_ds,
	base_dir="container/blob",
	filesystem=filesystem,
	partitioning_flavor="hive",
	partitioning=["col1", "col2"]
)

Hope that helps. I'm back in two weeks and then able to provide the working code.

stefnba avatar Nov 10 '23 13:11 stefnba

But you are seeing partition directories being made?

@wjones127 In some cases 1-2 partition dirs are created and a couple of sub dirs, in other cases none are created. Files are never created, all dirs are empty.

stefnba avatar Nov 10 '23 13:11 stefnba

@roeap it seems the issue was already flagged by @stefnba. Since it works fine with PyArrow directly I would guess it's the delta filesystem handler. @stefnba can you confirm how you use pyarrow directly, did you create your own file system handler with the pyarrow filesystem?

@ion-elgreco Thanks for following up on this issue.

I'm on vacation and don't have access to our repo, so I can't copy paste the exact code that's working but it should be close to the following.

I'm using adlfs as an fsspec-compatible filesystem interface.

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

filesystem = AzureBlobFileSystem(account_name="my_account_name", anon=False)

# read local
input_ds = ds.dataset("path/to/local/ds")

# write to Azure using adlfs filesystem
ds.write_dataset(
	data=input_ds,
	base_dir="container/blob",
	filesystem=filesystem,
	partitioning_flavor="hive",
	partitioning=["col1", "col2"]
)

Hope that helps. I'm back in two weeks and then able to provide the working code.

Ok then two temp workarounds are to pass a AzureBlobFileSystem to write_deltalake, since that is than passed to pyarrow dataset or you limit the max_files_open

ion-elgreco avatar Nov 10 '23 14:11 ion-elgreco

Ok then two temp workarounds are to pass a AzureBlobFileSystem to write_deltalake, since that is than passed to pyarrow dataset or you limit the max_files_open

Thanks for the suggestion. I remember that I tried to do that but when passing the arg filesystem to write_deltalake, it throws a NotImplementedError.

I guess it's because of this line in python/deltalake/writer.py:

if filesystem is not None:
        raise NotImplementedError("Filesystem support is not yet implemented.  #570")

Is there another way? Fyi, I’ve not fully read and tried some of the suggestions in #570 back then.

stefnba avatar Nov 10 '23 17:11 stefnba

Hmm I missed that, I thought it was implemented. Then I'll need to look a bit deeper at the code there.

In the linked issue, Will mentions that it is possible to use object store file systems though.

ion-elgreco avatar Nov 10 '23 17:11 ion-elgreco

@stefnba for me it consistently it happens more often on pyarrow v8 then on pyarrow v14, I need to find some time to debug it. Will gave some guidance to start doing that, but at least I can get it to occur more quickly with pyarrow v8

ion-elgreco avatar Nov 14 '23 18:11 ion-elgreco

I wanted to re-create this issueon my side but so far I am unable.

I tried matching @stefnba setup as close as I could (on Mac M2, python 3.10.7, generated dataset with a narrower schema - only 3 columns but with 800 million rows and about 900 total partitions over 2 columns).

Tried with different versions of pyarrow (from v14 down to v9, v8 causes segmentation error), also tried with both maturin develop and maturin build options.

Single write takes about 5 minutes, i'm running it in a loop now with pyarrow v9 and it's on attempt 9 - still going. What else can I try to cause the failure?

r3stl355 avatar Nov 21 '23 11:11 r3stl355

@r3stl355 can you try writing this from a databricks notebook to ADLS?

Im mainly running into the issue with Azure<->Azure connections

ion-elgreco avatar Nov 21 '23 11:11 ion-elgreco

Running it now on Azure Databricks notebook - still the same @ion-elgreco - started a loop with pyarrow v8 (deltalake is latest though, 0.13). A bit faster than on my laptop and no issues 🤷

Running command...
- Attempt 0: 2023-11-21 13:32:36.625640
- Attempt 1: 2023-11-21 13:35:17.566879
- Attempt 2: 2023-11-21 13:37:58.235321
- Attempt 3: 2023-11-21 13:40:42.188006
- Attempt 4: 2023-11-21 13:43:24.954625
- Attempt 5: 2023-11-21 13:46:07.584661
- Attempt 6: 2023-11-21 13:48:47.521191

r3stl355 avatar Nov 21 '23 13:11 r3stl355

@r3stl355 can you share the exact code you're running?

Is the databricks instance behind a vnet?

ion-elgreco avatar Nov 21 '23 13:11 ion-elgreco

Yes, it is VNET-injected workspace.

Also, looks like @stefnba is having similar problem running from his local machine so the issue may not be unique to Azure -> Azure

r3stl355 avatar Nov 21 '23 14:11 r3stl355

I'm running this (on Databricks, my source parquet files are in DBFS root)

%pip install deltalake
import os
import datetime as dt
import pyarrow.dataset as ds
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

SCOPE = "delta-rs"
account_name = dbutils.secrets.get(SCOPE, "AZURE_STORAGE_ACCOUNT_NAME")
client = dbutils.secrets.get(SCOPE, "AZURE_CLIENT_ID")
tenant = dbutils.secrets.get(SCOPE, "AZURE_TENANT_ID")
secret = dbutils.secrets.get(SCOPE, "AZURE_CLIENT_SECRET")

storage_options = {"account_name": account_name, "client_id": client, "client_secret": secret, "tenant_id": tenant}
schema = pa.schema(
    [
        pa.field("date", pa.timestamp("us")),
        pa.field("code", pa.string()),
        pa.field("year", pa.int64()),
    ]
)

# Read local dataset (Parquet)
SOURCE_DIR = "/dbfs/parquet"
dataset = ds.dataset(SOURCE_DIR)

target_uri = f"abfs://temp/from_databricks"
test_count = 10
for i in range(0, test_count):
    print(f"- Attempt {i}: {dt.datetime.now()}")
    write_deltalake(
        data=dataset.to_batches(),
        table_or_uri=target_uri,
        mode="overwrite",
        storage_options=storage_options,
        schema=schema,
        partition_by=["code", "year"],
    )

r3stl355 avatar Nov 21 '23 14:11 r3stl355

@r3stl355 can you try this:

Also, I am doing this with pyarrow v8, it already hangs on the first/second iteration.

import pyarrow as pa
import polars as pl
import os
from azureml.opendatasets import NycTlcGreen
from datetime import datetime
from dateutil import parser

start_date = parser.parse('2018-01-01')
end_date = parser.parse('2019-12-31')
nyc_tlc = NycTlcGreen(start_date=start_date, end_date=end_date)
nyc_tlc_df = pl.from_pandas(nyc_tlc.to_pandas_dataframe())

nyc_tlc_df = nyc_tlc_df.with_columns(
    pl.col('lpepPickupDatetime').dt.strftime("%G%m").alias('year_month'),
    pl.all().cast(pl.Utf8)
)

for i in range (250):
    nyc_tlc_df.write_delta(
        os.path.join(root_dir, '_TEMP_WRITE_DEBUG'), storage_options=storage_options, mode='overwrite', delta_write_options={"partition_by": ['year_month']}
    )
    print(i)

ion-elgreco avatar Nov 21 '23 16:11 ion-elgreco

Is this on Databricks? I get

/local_disk0/.ephemeral_nfs/envs/pythonEnv-2e004da5-e00b-47ff-b78a-3bcba4398610/lib/python3.10/site-packages/azureml/opendatasets/dataaccess/_blob_accessor.py:519: Warning: Please install azureml-dataset-runtimeusing pip install azureml-dataset-runtime
  warnings.warn(
NotImplementedError: Linux distribution ubuntu 22.04 does not have automatic support. 
Missing packages: {'liblttng-ust.so.0'}
.NET Core 3.1 can still be used via `dotnetcore2` if the required dependencies are installed.
Visit https://aka.ms/dotnet-install-linux for Linux distro specific .NET Core install instructions.
Follow your distro specific instructions to install `dotnet-runtime-*` and replace `*` with `3.1.23`.

r3stl355 avatar Nov 21 '23 16:11 r3stl355

Is this on Databricks? I get

/local_disk0/.ephemeral_nfs/envs/pythonEnv-2e004da5-e00b-47ff-b78a-3bcba4398610/lib/python3.10/site-packages/azureml/opendatasets/dataaccess/_blob_accessor.py:519: Warning: Please install azureml-dataset-runtimeusing pip install azureml-dataset-runtime
  warnings.warn(
NotImplementedError: Linux distribution ubuntu 22.04 does not have automatic support. 
Missing packages: {'liblttng-ust.so.0'}
.NET Core 3.1 can still be used via `dotnetcore2` if the required dependencies are installed.
Visit https://aka.ms/dotnet-install-linux for Linux distro specific .NET Core install instructions.
Follow your distro specific instructions to install `dotnet-runtime-*` and replace `*` with `3.1.23`.

No actually in a AzureML VM, Databricks was giving me the same issue even though Azure docs said it would work on databricks :S

ion-elgreco avatar Nov 21 '23 16:11 ion-elgreco

Quick update in case someone wants to follow up (I don't know how much time I'll have in the next few days). After lots of back and forth between different environments we now have a single parquet file which is consistently getting stuck on attempt to write to Azure storage. File is not large - about 200 MB and I'm only creating 23 partition folder, each file in the single partition about 15 MB. When I did a first run, it created one file and one partition folder and got stuck. This is using pyarrow v9. I'll try with other versions.

Parquet file is created from NYC taxi data as suggested by @ion-elgreco so I can put for download somewhere if any is interested in trying out, together with the code.

When it's stuck, terminal becomes non-responsive to Ctrl+C - is that a sign that the main thread is stuck waiting for a lock or IO?

r3stl355 avatar Nov 21 '23 21:11 r3stl355

the latest:

  • looks like the issue happens here on attempt to close the stream, once one of the threads is blocked, everything stops: https://github.com/delta-io/delta-rs/blob/72505449e9538371fe5fda35d545dbd662facd07/python/src/filesystem.rs#L529
  • usually it is intermittent but today it happens for every run, I am not sure if it is related to Azure resources or my laptop resources
  • i see that all writes to stream in that struct succeed - I put logs at the start and end of writes and can see all call to write succeed

r3stl355 avatar Nov 24 '23 16:11 r3stl355