seafowl icon indicating copy to clipboard operation
seafowl copied to clipboard

Improve Parquet upload experience (schema coercion, error reporting)

Open mildbyte opened this issue 2 years ago • 0 comments

When uploading a Parquet file to an existing table, the schemas must match one-to-one, even if the uploaded file can be coerced into the table type without information loss.

For example, if I create a table with a TIMESTAMP type (which is the Arrow Timestamp("ns", tz=None)) and then use PyArrow to write a Parquet file to it (which doesn't seem to support the ns granularity and falls back to ms), I get a schema mismatch error Execution error: The table public.test_upload already exists but has a different schema than the one provided.

  • We should try to coerce the upload into the target schema (in this case, ms can be upcast to ns with no information loss, but maybe we should have an option to follow the same semantics as INSERT, including being able to pad missing columns with NULLs, cast e.g. strings to timestamps, omit columns that are present in the new file but not in the table). In this case, I actually have no way of fixing the error from my side, since PyArrow refuses to create an ns column.
  • The error should be more precise (showing which column(s) the mismatch is in)
import json
from io import BytesIO, StringIO
from typing import Optional, Any

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import requests

SCHEMA = {
    "text_column": "TEXT",
    "timestamp_column": "TIMESTAMP",
}

# even though this is "ns", this actually becomes "ms"
ARROW_TYPES = {"TEXT": pa.string(), "TIMESTAMP": pa.timestamp("ns", tz=None)}
ARROW_SCHEMA = pa.schema([(c, ARROW_TYPES[ct.upper()]) for c, ct in SCHEMA.items()])

TARGET_SCHEMA = "public"
TARGET_TABLE = "test_upload"

CSV_DATA_1 = """text_column,timestamp_column
january,2022-01-01T12:34:56.789012
february,2022-02-01T12:34:56.789012
"""

# Ignore this second CSV file, we're not using this in this example
CSV_DATA_2 = """text_column,timestamp_column
march,2022-03-01T12:34:56.789012
april,2022-04-01T12:34:56.789012
"""

SEAFOWL = "http://localhost:8080"


def query_seafowl(endpoint: str, sql: str, access_token: Optional[str] = None) -> Any:
    headers = {"Content-Type": "application/json"}
    if access_token:
        headers["Authorization"] = f"Bearer {access_token}"
    response = requests.post(f"{endpoint}/q", json={"query": sql}, headers=headers)

    if not response.ok:
        print(response.text)
    response.raise_for_status()
    if response.text:
        return [json.loads(t) for t in response.text.strip().split("\n")]
    return None


def upload_to_seafowl(
    endpoint: str, data: bytes, access_token: Optional[str] = None
) -> None:
    headers = {}
    if access_token:
        headers["Authorization"] = f"Bearer {access_token}"
    response = requests.post(
        f"{endpoint}/upload/{TARGET_SCHEMA}/{TARGET_TABLE}",
        files={"file": ("data.parquet", data)},
        headers=headers,
    )
    if not response.ok:
        print(response.text)
    response.raise_for_status()

    return None


def csv_to_parquet(csv_data: str) -> bytes:
    output = BytesIO()
    data = StringIO(csv_data)

    with pq.ParquetWriter(output, schema=ARROW_SCHEMA, compression="zstd") as writer:
        df = pd.read_csv(data)
        df["timestamp_column"] = pd.to_datetime(df["timestamp_column"])
        writer.write_batch(
            pa.RecordBatch.from_pandas(df=df, preserve_index=False, schema=ARROW_SCHEMA)
        )

    output.seek(0)
    return output.read()


parquet_data_1 = csv_to_parquet(CSV_DATA_1)
parquet_data_2 = csv_to_parquet(CSV_DATA_2)

# Delete the staging table
if query_seafowl(
    SEAFOWL,
    f"SELECT 1 AS exists FROM information_schema.tables "
    f"WHERE table_name = '{TARGET_TABLE}' AND table_schema = '{TARGET_SCHEMA}'",
):
    query_seafowl(SEAFOWL, f"DROP TABLE {TARGET_SCHEMA}.{TARGET_TABLE}")

# Create the table
query_seafowl(
    SEAFOWL,
    f"CREATE TABLE {TARGET_SCHEMA}.{TARGET_TABLE} ("
    + ",\n".join(f"{cname} {ctype}" for cname, ctype in SCHEMA.items())
    + ")",
)

# Upload the file
# Because PyArrow only supports timestamp[ms] and our CREATE TABLE (... TIMESTAMP) uses the Arrow
# timestamp[ns] type, these are deemed incompatible and this fails
upload_to_seafowl(SEAFOWL, parquet_data_1)
print(query_seafowl(SEAFOWL, f"SELECT * FROM {TARGET_SCHEMA}.{TARGET_TABLE}"))

mildbyte avatar Oct 28 '22 10:10 mildbyte