sling-cli
sling-cli copied to clipboard
Redshift -> Postgres sync fail
Issue Description
I tried to create minimal reproducible example, it fails on another problem then the problem I had on my actual table
-
Sling version (
sling --version): 1.2.10 -
Operating System (
linux,mac,windows): mac
Initialized tables:
REDSHIFT
create table base_table (
id varchar,
updated_at timestamp,
uuid_col varchar,
another_timestamp timestamp,
value_col int
);
CREATE OR REPLACE PROCEDURE insert_values()
AS $$
DECLARE
i INT := 0;
BEGIN
FOR i IN 1..1000 LOOP
INSERT INTO base_table VALUES
(md5(random()::text), GETDATE(), '9c67d793-dfeb-49be-9f02-0c543027471c', GETDATE(), 1);
END LOOP;
END;
$$ LANGUAGE plpgsql;
CALL insert_values();
POSTGRES
create table target_table (
id varchar,
updated_at timestamptz,
uuid_col uuid,
another_timestamp timestamptz,
value_col int
);
- Replication Configuration:
{
"source": "SLING_REDSHIFT",
"target": "SLING_PG",
"streams": {
"public.base_table": {
"mode": "truncate",
"object": "public.target_table",
"primary_key": [
"id"
],
"update_key": "updated_at",
"source_options": {
"columns": {
"updated_at": "datetime",
"another_timestamp": "datetime"
},
"transforms": {
"uuid_col": [
"parse_uuid"
]
}
}
}
}
}
- Log Output (please run command with
-d):
2024-05-24 13:56:31 INF [1 / 1] running stream public.base_table
2024-05-24 13:56:31 DBG Sling version: 1.2.10 (darwin arm64)
2024-05-24 13:56:31 DBG type is db-db
2024-05-24 13:56:31 DBG using source options: {"empty_as_null":false,"null_if":"NULL","datetime_format":"AUTO","max_decimals":-1,"columns":{"another_timestamp":"datetime","updated_at":"datetime"},"transforms":{"uuid_col":["parse_uuid"]}}
2024-05-24 13:56:31 DBG using target options: {"datetime_format":"auto","file_max_rows":0,"max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
2024-05-24 13:56:31 INF connecting to source database (redshift)
2024-05-24 13:56:31 DBG opened "redshift" connection (conn-redshift-EnA)
2024-05-24 13:56:33 INF connecting to target database (postgres)
2024-05-24 13:56:33 DBG opened "postgres" connection (conn-postgres-HXX)
2024-05-24 13:56:34 INF reading from source database
2024-05-24 13:56:35 INF unloading from redshift to s3
2024-05-24 13:56:35 DBG unload ('select * from "public"."base_table"')
to 's3://sling-temp/temp/stream/1716548195213.csv/u01-'
credentials 'aws_access_key_id=***;aws_secret_access_key=***'
gzip allowoverwrite CSV PARALLEL
2024-05-24 13:56:37 DBG Unloaded to s3://sling-temp/temp/stream/1716548195213.csv
2024-05-24 13:56:37 DBG reading datastream from s3://sling-temp/temp/stream/1716548195213.csv [format=csv]
2024-05-24 13:56:37 DBG merging csv readers of 4 files [concurrency=10] from s3://sling-temp/temp/stream/1716548195213.csv
2024-05-24 13:56:37 DBG processing reader from s3://sling-temp/temp/stream/1716548195213.csv/u01-0003_part_00.gz
2024-05-24 13:56:37 DBG processing reader from s3://sling-temp/temp/stream/1716548195213.csv/u01-0001_part_00.gz
2024-05-24 13:56:37 DBG processing reader from s3://sling-temp/temp/stream/1716548195213.csv/u01-0002_part_00.gz
2024-05-24 13:56:37 DBG processing reader from s3://sling-temp/temp/stream/1716548195213.csv/u01-0000_part_00.gz
2024-05-24 13:56:38 DBG could not detect delimiter. Using ","
2024-05-24 13:56:38 INF execution failed
2024-05-24 13:56:38 INF ~ dataflow error while waiting for ready state
--- fs.go:954 func1 ---
--- fs.go:1188 MergeReaders ---
~ Unable to merge paths at s3://sling-temp/temp/stream/1716548195213.csv
--- datastream.go:1110 ConsumeCsvReaderChl ---
~ Error consuming reader for fileType: 'csv'
--- datastream.go:1099 func1 ---
~ could not get reader
--- csv.go:254 getReader ---
Unable to detect number of columns since `header=false`. Need to pass property `fields_per_rec`
context canceled
2024-05-24 13:56:38 INF Sling Replication Completed in 6s | SLING_REDSHIFT -> SLING_PG | 0 Successes | 1 Failures
fatal:
--- proc.go:267 main ---
--- sling_cli.go:442 main ---
--- sling_cli.go:474 cliInit ---
--- cli.go:286 CliProcess ---
~ failure running replication (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:188 processRun ---
--------------------------- public.base_table ---------------------------
~ execution failed
--- task_run.go:99 func1 ---
~ Could not ReadFromDB
--- task_run.go:523 runDbToDb ---
~ Could not BulkExportFlow
--- task_run_read.go:195 ReadFromDB ---
~ Could not read S3 Path for UNLOAD: s3://sling-temp/temp/stream/1716548195213.csv
--- database_redshift.go:189 BulkExportFlow ---
~ error getting dataflow
--- fs.go:551 ReadDataflow ---
--- fs.go:988 GetDataflow ---
~ dataflow error while waiting for ready state
--- dataflow.go:616 WaitReady ---
--- fs.go:954 func1 ---
--- fs.go:1188 MergeReaders ---
~ Unable to merge paths at s3://sling-temp/temp/stream/1716548195213.csv
--- datastream.go:1110 ConsumeCsvReaderChl ---
~ Error consuming reader for fileType: 'csv'
--- datastream.go:1099 func1 ---
~ could not get reader
--- csv.go:254 getReader ---
Unable to detect number of columns since `header=false`. Need to pass property `fields_per_rec`
Sling command failed:
Just pushed this commit.
Could you try to build the binary on branch v1.2.11 and test?
Actually, since you're using a mac, I just compiled it and uploaded here: https://f.slingdata.io/sling-mac-20240524.zip
Solved it, thanks 🙏
When running on sample it works (I did create table temp as select * from source_table limit 1000 and it synced well),
but when running on the full data (15M rows) I get the following error:
2024-05-24 23:10:44 DBG create table if not exists "public"."target_table_tmp" ("id" varchar(65500),
"updated_at" timestamp,
"store_id" text,
"metric" text,
"total_exposures" bigint)
2024-05-24 23:10:45 INF streaming data
9m8s 3,711,796 6785 r/s 1.7 GB | 73% MEM | 17% CPU
2024-05-24 23:19:56 DBG drop table if exists "public"."target_table_tmp"
2024-05-24 23:19:57 DBG table "public"."target_table_tmp" dropped
2024-05-24 23:19:57 DBG closed "postgres" connection (conn-postgres-qXU)
2024-05-24 23:19:57 INF execution failed
2024-05-24 23:19:57 INF ~ execution failed
--- task_run.go:99 func1 ---
--- task_run.go:541 runDbToDb ---
--- task_run_write.go:311 WriteToDb ---
--- database.go:2418 BulkImportFlow ---
~ could not bulk import
--- database.go:2411 func1 ---
--- datastream.go:814 func9 ---
--- datastream.go:2558 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 45520: wrong number of fields
context canceled
--- datastream.go:899 func9 ---
~ error during iteration
--- datastream.go:809 2 ---
--- datastream.go:814 func9 ---
--- datastream.go:2558 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 45520: wrong number of fields
context canceled
context canceled
--- task_run.go:99 func1 ---
~ Could not WriteToDb
--- task_run.go:541 runDbToDb ---
~ could not insert into "public"."target_table_tmp"
--- task_run_write.go:317 WriteToDb ---
--- datastream.go:814 func9 ---
--- datastream.
go:2558 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 45520: wrong number of fields
context canceled
--- datastream.go:899 func9 ---
~ error during iteration
--- datastream.go:809 2 ---
--- datastream.go:814 func9 ---
--- datastream.go:2558 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 45520: wrong number of fields
context canceled
--- task_run.go:99 func1 ---
--- task_run.go:541 runDbToDb ---
--- task_run_write.go:311 WriteToDb ---
--- database.go:2418 BulkImportFlow ---
~ could not bulk import
--- database.go:2411 func1 ---
--- datastream.go:814 func9 ---
--- datastream.go:2558 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 45520: wrong number of fields
context canceled
--- datastream.go:899 func9 ---
~ error during iteration
--- datastream.go:809 2 ---
--- datastream.go:814 func9 ---
--- datastream.go:2558 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 45520: wrong number of fields
context canceled
Perhaps setting the delimiter (source_options.delimiter) would help? See https://docs.slingdata.io/sling-cli/run/configuration#source
2024-05-24 23:19:57 INF Sling Replication Completed in 14m 49s | SLING_REDSHIFT -> SLING_PG | 0 Successes | 1 Failures
fatal:
--- proc.go:271 main ---
--- sling_cli.go:442 main ---
--- sling_cli.go:474 cliInit ---
--- cli.go:286 CliProcess ---
~ failure running replication (see docs @ https://docs.slingdata.io/sling-cli)
--- sling_run.go:188 processRun ---
--------------------------- public.source_table ---------------------------
--- proc.go:271 main ---
--- sling_cli.go:442 main ---
--- sling_cli.go:474 cliInit ---
--- cli.go:286 CliProcess ---
--- sling_run.go:186 processRun ---
--- sling_run.go:427 runReplication ---
--- sling_run.go:367 runTask ---
~ execution failed
--- task_run.go:138 Execute ---
--- task_run.go:99 func1 ---
--- task_run.go:541 runDbToDb ---
--- task_run_write.go:311 WriteToDb ---
--- database.go:2418 BulkImportFlow ---
~ could not bulk import
--- database.go:2411 func1 ---
--- datastream.go:814 func9 ---
--- datastream.go:2558 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 45520: wrong number of fields
--- datastream.go:899 func9 ---
~ error during iteration
--- datastream.go:809 2 ---
--- task_run.go:99 func1 ---
~ Could not WriteToDb
--- task_run.go:541 runDbToDb ---
~ could not insert into "public"."target_table_tmp"
--- task_run_write.go:317 WriteToDb ---
--- task_run.go:99 func1 ---
--- task_run.go:541 runDbToDb ---
--- task_run_write.go:311 WriteToDb ---
--- database.go:2418 BulkImportFlow ---
~ could not bulk import
--- database.go:2411 func1 ---
Interesting 🤔 . That's strange. Redshift must output different columns lengths at some point? unless it's not quoting fields with new lines or escaping properly? What you could do is manually run the unload command and inspect the output CSVs.
this line cuts at 3.7M rows while the table has 15M rows, so I suspect maybe it's just OOM problem (it doesn't have enough memory to load the whole parts?) but then I'd expect it to just crash and not finish all the logs. does it makes sense?
9m8s 3,711,796 6785 r/s 1.7 GB | 73% MEM | 17% CPU
If it was OOM the process would just be killed, right? we wouldn't even see the error. You could also monitor the mem to confirm.
Another idea, is to try to read the manually unloaded files with python to see if it reads it correctly.
I didn't find the problem, the CSVs are looking fine to me. However, I changed the export to use parquet and it worked https://github.com/slingdata-io/sling-cli/pull/309/files
Now I have error pq: invalid input syntax for type timestamp with time zone: "2024-05-01 00:43:11.319289 +0000 UTC"
I see in target datetime_format is only for file target
Interesting, I'm a bit perplexed about those CSVs not working...
For this timestamp error, I'm wondering if redshift writes the timestamp as string when exporting as parquet. Same exercise, could you manually unload as parquet and inspect the schema/values?
Redshift does export them as timestamp[ns] type (example value 2024-02-15 02:34:12.862000)
Ok, that should work then, I'll take a closer look.
About the CSV: I have a column which has mixed text and numeric values, I had to manually set it type to string. However, it's still works partially ,when I try to sync all the data I still get "wrong number of fields", but when I'm doing backfill and splits the data into 2 ranges, each sync works
Another error on other table:
~ could not copy data
--- database_postgres.go:205 BulkImportStream ---
~ could not execute statement
--- database_postgres.go:190 func2 ---
pq: invalid input syntax for type double precision: "N"
It fails on double column which has null (also good reason to use parquet) Is it possible to get more information on such errors like the column name?
It fails on double column which has null (also good reason to use parquet)
in file core/dbio/templates/redshift.yaml, in copy_to_s3, can you remove the part NULL '\N'?
I recently added it in the open PR and haven't tested yet.
Is it possible to get more information on such errors like the column name?
No, that's the error returned by the driver.
I found out that escaping the null option in redshift unload (NULL '\\N') solved the null problems
About the wrong number of fields error:
I've found out it only happens on large files when they are processed in parallel.
I tried adding env CONCURRENCY_LIMIT=1 but it didn't help I still saw merging csv readers of 4 files [concurrency=10].
In order to force the files processing one by one I added SLING_MERGE_READERS=false and changed this line
https://github.com/slingdata-io/sling-cli/blob/b38e6c0085afc21a5a41e523ec871119144b43f9/core/dbio/filesys/fs.go#L977
into
if g.In(fs.FsType(), dbio.TypeFileLocal, dbio.TypeFileS3) {
Yeah I have to spend some time on this. I'm quite busy with everything at the moment so I haven't been able to do much. I just pushed a commit from a change I made this weekend. Can you rebase to latest state in branch V1.2.11 and try again?
On Tue, May 28, 2024, 6:27 PM matanper @.***> wrote:
About the wrong number of fields error: I've found out it only happens on large files when they are processed in parallel. I tried adding env CONCURRENCY_LIMIT=1 but it didn't help I still saw merging csv readers of 4 files [concurrency=10]. In order to force the files processing one by one I added SLING_MERGE_READERS=false and changed this line
https://github.com/slingdata-io/sling-cli/blob/b38e6c0085afc21a5a41e523ec871119144b43f9/core/dbio/filesys/fs.go#L977 into
if g.In(fs.FsType(), dbio.TypeFileLocal, dbio.TypeFileS3) {
I also tried using CONCURRENCY_LIMIT but couldn't make it work without the change above
— Reply to this email directly, view it on GitHub https://github.com/slingdata-io/sling-cli/issues/306#issuecomment-2136261647, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB2QZYXIM672KURWD4HC2HDZEUHEZAVCNFSM6AAAAABIHLKEHWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCMZWGI3DCNRUG4 . You are receiving this because you were mentioned.Message ID: @.***>
same error on latest v1.2.11
Do you have a dataset that is erroring that you could share with me so I can test with? I tested with a 10M records (~2GB) file that I have (exporting from Redshift to PG) but it works just fine. I need to be able to reproduce the error...
Sent you an email
hey @flarco I still get this error for large datasets. It seems like it has to do with parallel reading of the csv, is there a way to force running sequentially?
--- task_run.go:99 func1 ---
--- task_run.go:541 runDbToDb ---
--- task_run_write.go:317 WriteToDb ---
--- database.go:2417 BulkImportFlow ---
~ could not bulk import
--- database.go:2410 func1 ---
--- datastream.go:814 func9 ---
--- datastream.go:2547 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 1653: wrong number of fields
context canceled
--- datastream.go:899 func9 ---
~ error during iteration
--- datastream.go:809 2 ---
--- datastream.go:814 func9 ---
--- datastream.go:2547 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 1653: wrong number of fields
context canceled
context canceled
--- task_run.go:99 func1 ---
~ Could not WriteToDb
--- task_run.go:541 runDbToDb ---
~ could not insert into "analytics_sync_v4"."exposures_outcomes_aggregation_tmp"
--- task_run_write.go:323 WriteToDb ---
--- datastream.go:814 func9 ---
--- datastream.go:2547 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 1653: wrong number of fields
context canceled
--- datastream.go:899 func9 ---
~ error during iteration
--- datastream.go:809 2 ---
--- datastream.go:814 func9 ---
--- datastream.go:2547 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 1653: wrong number of fields
context canceled
--- task_run.go:99 func1 ---
--- task_run.go:541 runDbToDb ---
--- task_run_write.go:317 WriteToDb ---
--- database.go:2417 BulkImportFlow ---
~ could not bulk import
--- database.go:2410 func1 ---
--- datastream.go:814 func9 ---
--- datastream.go:2547 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 1653: wrong number of fields
context canceled
--- datastream.go:899 func9 ---
~ error during iteration
--- datastream.go:809 2 ---
--- datastream.go:814 func9 ---
--- datastream.go:2547 next ---
~ Error reading file
--- datastream.go:1195 func2 ---
record on line 1653: wrong number of fields
context canceled
I just added logic to read the concurrency prop here: https://github.com/slingdata-io/sling-cli/pull/318/commits/325e76f2fb1bc3477d5efd53a500c38301aaab6e
Try to specify concurrency: 1 as part of your target connection cred. Default is 10.
I changed it and I see it used it:
2024-06-30 10:47:19 DBG using target options: {"concurrency":1,"datetime_format":"auto","file_max_rows":0,"max_decimals":-1,"use_bulk":true,"add_new_columns":true,"adjust_column_type":false,"column_casing":"source"}
but when merging the csv readers it's still using concurrency=10, and fails:
2024-06-30 10:48:37 DBG merging csv readers of 4 files [concurrency=10] from redshift://[REDACTED]:7e%5E%25LGi%2A%242xp2h@[REDACTED]:5439/prod
I added SLING_MERGE_READERS=false and it seems to work now
interesting, thanks for reporting