starrocks icon indicating copy to clipboard operation
starrocks copied to clipboard

Stream load local Parquet file

Open alanpaulkwan opened this issue 3 years ago • 15 comments

I imagine it's a lot more performant, given the bandwidth would be lower, the types are pre-defined, and the columnar storage is similar.

alanpaulkwan avatar Nov 13 '22 03:11 alanpaulkwan

@alanpaulkwan It's a good feature. Previously, we do not support the parquet in stream load. Because we are refactoring the ingestion framework. We are trying to unify the ingestion interface. We may need time to discuss whether the old interface should support it. Could you elaborate on which case you have to use parquet in stream load?

chaoyli avatar Nov 17 '22 19:11 chaoyli

The use case is simple: I am working on a local file with a scripting language like Python/R and want to insert into StarRocks. Parquet is faster to write, takes less space and thus faster to transfer, and programs like R/Python write parquet just like they do .csv files. It avoids having to maintain HIVE/Hudi/Deltalake etc. to load Parquet files

Most of my files I store as Parquet, right now I must convert to .csv or Clickhouse to get them into StarRocks.

alanpaulkwan avatar Nov 17 '22 23:11 alanpaulkwan

A cool add-on might be to leverage Parquet's self-describing feature to infer a schema for the Parquet file without the user having to define one.

alanpaulkwan avatar Nov 17 '22 23:11 alanpaulkwan

@alanpaulkwan Do not define a schema previously?

chaoyli avatar Nov 18 '22 04:11 chaoyli

yes, I define a schema first.

I am going to re-iterate this suggestion. I have a very simple file where it has company names. If I stream load a CSV the rows get filtered because 'HEGDE FUND NAME, LLC' is confused for a delimiter. It's simply not tenable I think. Parquet is designed for this purpose. Right now to use Parquet I have to bounce it off a minIO bucket, guess the schema, etc to get it into StarRocks. it's really painful to use.

alanpaulkwan avatar Mar 14 '23 14:03 alanpaulkwan

https://github.com/StarRocks/starrocks/issues/26060

alberttwong avatar Jul 11 '23 18:07 alberttwong

broker isn't available in the allin1 image yet.

alter system add broker local_load "172.26.199.40:8000";
load label xxxx1 (data infile("file:///home/disk1/zhaoheng/ssb-poc-0.10.0/ssb-poc/output/data_dir/user_behavior_sample_data.parquet") into table user_behavior format as "parquet"(UserID,ItemID,CategoryID,BehaviorType,Timestamp) ) with broker local_load properties("timeout"="3600");

alberttwong avatar Jul 11 '23 18:07 alberttwong

broker isn't available in the allin1 image yet.

alter system add broker local_load "172.26.199.40:8000";
load label xxxx1 (data infile("file:///home/disk1/zhaoheng/ssb-poc-0.10.0/ssb-poc/output/data_dir/user_behavior_sample_data.parquet") into table user_behavior format as "parquet"(UserID,ItemID,CategoryID,BehaviorType,Timestamp) ) with broker local_load properties("timeout"="3600");

@alberttwong Are the commands above sufficient to add the broker to the running allin1 image, or are those the commands that would work once the broker is added? I tried the above and I am seeing type:ETL_RUN_FAIL; msg:failed to find alive broker: local_load when I try to load the Parquet file.

DanRoscigno avatar Jul 25 '23 13:07 DanRoscigno

broker isn't available in the allin1 image yet.

alter system add broker local_load "172.26.199.40:8000";
load label xxxx1 (data infile("file:///home/disk1/zhaoheng/ssb-poc-0.10.0/ssb-poc/output/data_dir/user_behavior_sample_data.parquet") into table user_behavior format as "parquet"(UserID,ItemID,CategoryID,BehaviorType,Timestamp) ) with broker local_load properties("timeout"="3600");

@alberttwong Are the commands above sufficient to add the broker to the running allin1 image, or are those the commands that would work once the broker is added? I tried the above and I am seeing type:ETL_RUN_FAIL; msg:failed to find alive broker: local_load when I try to load the Parquet file.

the command will work when broker is added into allin1 image which is not done yet. stay tuned.

kevincai avatar Jul 25 '23 13:07 kevincai

@DanRoscigno broker in the allin1 image won't be ready until the 3.1 GA release. If you are using the linux binary install, this will work.

alberttwong avatar Jul 25 '23 14:07 alberttwong

Have built a private allin1 image at lvlouisaslia/allin1-ubi:test based on PR #28240 , a broker service is added into the image.

Tried following scenario:

Example parquet file downloaded from userdata1.parquet.

  • create test database
MySQL [(none)]> create database test;
Query OK, 0 rows affected (0.00 sec)

MySQL [(none)]> use test;
Database changed
  • create test table
CREATE TABLE IF NOT EXISTS sr_user (
id          int,    
first_name      string, 
last_name       string, 
email           string, 
gender          string, 
ip_address      string, 
cc          string, 
country         string, 
birthdate       string, 
salary          double, 
title           string, 
comments        string
) DISTRIBUTED BY HASH(id);

MySQL [test]> select count(*) from sr_user;
+----------+
| count(*) |
+----------+
|        0 |
+----------+
1 row in set (0.11 sec)

  • check the broker name
MySQL [test]> show broker;
+--------------+-----------+------+-------+---------------------+---------------------+--------+
| Name         | IP        | Port | Alive | LastStartTime       | LastUpdateTime      | ErrMsg |
+--------------+-----------+------+-------+---------------------+---------------------+--------+
| allin1broker | 127.0.0.1 | 8000 | true  | 2023-07-30 10:11:57 | 2023-07-30 10:16:57 |        |
+--------------+-----------+------+-------+---------------------+---------------------+--------+
1 row in set (0.00 sec)
  • create broker load job from local file
MySQL [test]> load label BROKER_LOAD_LOCAL_FILE (data infile("file:///data/deploy/starrocks/fe/meta/userdata1.parquet") into table sr_user format as "parquet"(id,first_name,last_name,email,gender,ip_address,cc,country, birthdate,salary,title,comments) ) with broker allin1broker properties("timeout"="3600");
Query OK, 0 rows affected (0.04 sec)

MySQL [test]> show load \G
*************************** 1. row ***************************
         JobId: 11082
         Label: BROKER_LOAD_LOCAL_FILE
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
      Priority: NORMAL
      ScanRows: 1000
  FilteredRows: 0
UnselectedRows: 0
      SinkRows: 1000
       EtlInfo: NULL
      TaskInfo: resource:N/A; timeout(s):3600; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2023-07-30 10:14:02
  EtlStartTime: 2023-07-30 10:14:08
 EtlFinishTime: 2023-07-30 10:14:08
 LoadStartTime: 2023-07-30 10:14:08
LoadFinishTime: 2023-07-30 10:14:08
   TrackingSQL: 
    JobDetails: {"All backends":{"079c44be-518f-4a10-8fe4-aa6aeb8dcdc6":[11003]},"FileNumber":1,"FileSize":113629,"InternalTableLoadBytes":162150,"InternalTableLoadRows":1000,"ScanBytes":113629,"ScanRows":1000,"TaskNumber":1,"Unfinished backends":{"079c44be-518f-4a10-8fe4-aa6aeb8dcdc6":[]}}
1 row in set (0.00 sec)
  • check the loaded data
MySQL [test]> select count(*) from sr_user;
+----------+
| count(*) |
+----------+
|     1000 |
+----------+
1 row in set (0.04 sec)

kevincai avatar Jul 30 '23 10:07 kevincai

https://github.com/StarRocks/starrocks/issues/23625

alberttwong avatar Jul 31 '23 18:07 alberttwong

If I understand correctly, this requires the local broker to be running on the same machine where SR is running?

Would this use case be able to accommodate the scenario where I have my laptop in one place and SR running remotely?

alanpaulkwan avatar Aug 21 '23 05:08 alanpaulkwan

As of right now, we still do not support stream load for parquet files.

alberttwong avatar Feb 06 '24 16:02 alberttwong

@alanpaulkwan Will the parquet files loaded from local be larger than 4GB? Even we can implement it in Stream Load, it needs to buffer a whole parquet file in memory, or to save the overall loaded file data into BE's disk and read from BE's local disk to continue the data loading.

jaogoy avatar Apr 28 '24 09:04 jaogoy

I'd like to reiterate this request as well, as stream load is definitely the most flexible ingestion option. Broker load has had performance issues, sometimes takes a long time to start up, etc., and I'd prefer relying on my own parallel ingestion code to handle it. Particularly when handling millions of parquet files, the other ingestion options end up far slower presumably due to scanning/buffering than what I'd like.

Apache Doris has implemented Parquet support for stream load, and it's ended up being the most performant ingestion option for me so far.

Regarding file limit, it makes sense to me that the size of individual parquet files will need to be within a configurable memory limit and should be expected to be read entirely into memory. As long as parallelism doesn't make this OOM trivially, it should be fine to have a limit.

johnpyp avatar Aug 11 '24 01:08 johnpyp

the broker part for allin1 image is done.

Assign to @jaogoy for the continuous discussion of stream load supporting parquet file

kevincai avatar Aug 23 '24 03:08 kevincai

It's good to support loading Parquet files through Stream Load. However, currently, there are few users to load Parquet files through Stream Load, b/c most of the Parquet files are on Cloud Storage.

jaogoy avatar Aug 26 '24 07:08 jaogoy