iceberg-rust icon indicating copy to clipboard operation
iceberg-rust copied to clipboard

Scan does not work as expected

Open ndrluis opened this issue 1 year ago • 15 comments

I'm testing using the iceberg rest image from Tabular as a catalog.

Here's the docker-compose.yml file:

version: '3.8'

services:
  rest:
    image: tabulario/iceberg-rest:0.10.0
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
    depends_on:
      - minio
    ports:
      - "8181:8181"
    networks:
      iceberg_net:

  minio:
    image: minio/minio:RELEASE.2024-03-07T00-43-48Z
    environment:
      - MINIO_ROOT_USER=admin
      - MINIO_ROOT_PASSWORD=password
      - MINIO_DOMAIN=minio
    networks:
      iceberg_net:
        aliases:
          - warehouse.minio
    expose:
      - 9001
      - 9000
    ports:
      - "9000:9000"
      - "9001:9001"
    command: [ "server", "/data", "--console-address", ":9001" ]

  mc:
    depends_on:
      - minio
    image: minio/mc:RELEASE.2024-03-07T00-31-49Z
    environment:
      - AWS_ACCESS_KEY_ID=admin
      - AWS_SECRET_ACCESS_KEY=password
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
        until (/usr/bin/mc config host add minio http://minio:9000 admin password) do
          echo '...waiting...' && sleep 1;
        done;
        /usr/bin/mc mb minio/warehouse;
        /usr/bin/mc policy set public minio/warehouse;
        tail -f /dev/null
      "
    networks:
      iceberg_net:

networks:
  iceberg_net:

I created some data with PyIceberg:

from pyiceberg.catalog import load_catalog
import pyarrow as pa
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, DoubleType

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

catalog.create_namespace_if_not_exists("default")

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
)

tbl = catalog.create_table_if_not_exists("default.cities", schema=schema)

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014},
    ],
)

tbl.append(df)

And queried with PyIceberg to verify if it's okay:

from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table

catalog = load_catalog(
    "demo",
    **{
        "type": "rest",
        "uri": "http://localhost:8181/",
        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
        "warehouse": "demo",
    },
)

tbl: Table = catalog.load_table("default.cities")

res = tbl.scan().to_arrow()

print(len(res))

It returns 4.

And then with the Rust implementation:

use std::collections::HashMap;

use futures::TryStreamExt;
use iceberg::{
    io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY},
    Catalog, TableIdent,
};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};

#[tokio::main]
async fn main() {
    // Create catalog
    let config = RestCatalogConfig::builder()
        .uri("http://localhost:8181".to_string())
        .warehouse("demo".to_string())
        .props(HashMap::from([
            (S3_ENDPOINT.to_string(), "http://localhost:9000".to_string()),
            (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
            (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
            (S3_REGION.to_string(), "us-east-1".to_string()),
        ]))
        .build();

    let catalog = RestCatalog::new(config);

    let table = catalog
        .load_table(&TableIdent::from_strs(["default", "cities"]).unwrap())
        .await
        .unwrap();

    let scan = table.scan().select_all().build().unwrap();
    let batch_stream = scan.to_arrow().await.unwrap();

    dbg!(scan);

    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

    dbg!(batches.len());
}

Its returning nothing.

We have to define the S3 configurations because the Tabular image does not return the S3 credentials during the get config process.

ndrluis avatar Jul 28 '24 20:07 ndrluis

I performed another test using the Tabular catalog, attempting to scan the sandbox warehouse in the examples namespace, specifically targeting the nyc_taxi_yellow table, but it returned no results.

ndrluis avatar Jul 28 '24 21:07 ndrluis

I found the problem. I don’t know how to solve it, but I will try.

The while let Some(Ok(task)) = tasks.next().await statement is hiding some errors. In my previous attempt, I was trying to run it without the S3 credentials and was not receiving the access denied error. This happens because tasks.next() returns the error but does not expose it to the user.

While testing with Tabular, I'm receiving a 403 error from S3. So, we have two issues to solve.

One is to expose the reading errors to the user, and the other is to understand why we are getting these access denied errors.

ndrluis avatar Jul 28 '24 22:07 ndrluis

For the Tabular example, I encountered an 'access denied' problem. The FileIO does not work with remote signing. For the MinIO example, the problem was solved when I added a match statement to return the error while tasks.next().

ndrluis avatar Jul 28 '24 22:07 ndrluis

To scan with remote-signing we need to implement this

https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java

ndrluis avatar Jul 28 '24 23:07 ndrluis

Hi, does remote signing means presign in s3?

Xuanwo avatar Jul 29 '24 04:07 Xuanwo

I'm guessing https://github.com/apache/iceberg-rust/pull/498 should close this issue. Would you like to verify it?

Xuanwo avatar Jul 29 '24 05:07 Xuanwo

@Xuanwo

Hi, does remote signing means presign in s3?

Yes and no. I'm not sure if this is the flow, because I haven't found any documentation; this is based on my understanding from reading the Python implementation.

It's a presign process, but it's not the client's responsibility to presign. The get config will return the s3.signer.uri, and the load table will return s3.remote-signing-enabled as true along with some other S3 configurations. With that, we need to "presign" using the token returned in the load table.

This is the specification for the server responsible for the signing: s3-signer-open-api.yaml

I'm guessing https://github.com/apache/iceberg-rust/pull/498 should close this issue. Would you like to verify it?

I'm not comfortable closing this issue without a regression test that guarantees the expected behavior.

ndrluis avatar Jul 29 '24 14:07 ndrluis

I'm not comfortable closing this issue without a regression test that guarantees the expected behavior.

+1 on this. Currently we don't have regression tests on the whole reading progress, which involves integrating with external systems such as spark.

liurenjie1024 avatar Jul 29 '24 14:07 liurenjie1024

It's a presign process, but it's not the client's responsibility to presign. The get config will return the s3.signer.uri, and the load table will return s3.remote-signing-enabled as true along with some other S3 configurations. With that, we need to "presign" using the token returned in the load table.

Got it. So, we need to support presign in the REST catalog. Could you help by creating an issue for this? I'll review this section and draft a plan for its implementation.

Currently we don't have regression tests on the whole reading progress, which involves integrating with external systems such as spark.

I think we can start with very basic tests like just scan the whole table.

Xuanwo avatar Jul 29 '24 14:07 Xuanwo

I think we can start with very basic tests like just scan the whole table.

The reason I didn't start this yet is that I want to do it after integration with datafusion. Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

liurenjie1024 avatar Jul 29 '24 14:07 liurenjie1024

Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

I agree that we need a SQL engine to make testing easier.

However, maintaining basic unit tests based on fs or memory should be straightforward, right? We don't need separate test modules; just implement them as unit tests in the REST catalog. For example, it could be as simple as...

// catalog / file io setup, balbalba
let table = balabala();

let scan = table.scan().select_all().build().unwrap();
let batch_stream = scan.to_arrow().await.unwrap();

dbg!(scan);

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

Xuanwo avatar Jul 29 '24 16:07 Xuanwo

Got it. So, we need to support presign in the REST catalog. Could you help by creating an issue for this? I'll review this section and draft a plan for its implementation.

Issue #504 created

ndrluis avatar Jul 29 '24 18:07 ndrluis

Me and @ZENOTME did integration tests in icelake before, and I have to say that without sql engine support, it's painful to maintain those tests.

I agree that we need a SQL engine to make testing easier.

However, maintaining basic unit tests based on fs or memory should be straightforward, right? We don't need separate test modules; just implement them as unit tests in the REST catalog. For example, it could be as simple as...

// catalog / file io setup, balbalba
let table = balabala();

let scan = table.scan().select_all().build().unwrap();
let batch_stream = scan.to_arrow().await.unwrap();

dbg!(scan);

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

Correctly writing data into iceberg is not supported yet, so we need external systems such as spark to ingest data. Putting pre generated parquet files maybe an approach, but that requires maintaining binaries in repo.

liurenjie1024 avatar Jul 30 '24 14:07 liurenjie1024

Correctly writing data into iceberg is not supported yet, so we need external systems such as spark to ingest data. Putting pre generated parquet files maybe an approach, but that requires maintaining binaries in repo.

I've got some code in the perf testing branch that might help. It downloads NYC taxi data, and uses minio, the rest catalog and a spark container to create a table and insert NYC taxi data into it.

https://github.com/apache/iceberg-rust/pull/497

sdd avatar Jul 31 '24 17:07 sdd

I have fixed the issue where errors were not returned to the user, in https://github.com/apache/iceberg-rust/pull/535

sdd avatar Aug 10 '24 00:08 sdd

I believe this should have been fixed. Please feel free to open new issues if still exists.

Xuanwo avatar Aug 19 '24 16:08 Xuanwo