Custom connector not loading data as expected
Topic
Custom connector to rest api not loading all data (Airbyte 0.50.38)
Relevant information
I have created as source that connects to API via url. The connection was tested in the browser via http. It returns 70k rows which is of 12MB volume. Not that much I think. The API link has no additional arguments and no paging, just asking credentials.
After seeing it works I have created a source in airbyte container (Win 10 + docker).
The connection works but not as expected. However the source returns 70k rows only 20k were uploaded to my DB.
Tried both - append and full refresh.
In full refresh mode only 20k rows are loaded.
Using append mode my DB grows 20k each sync exceeding the 70k that is being returned, which suggested to me the primary key columns are not working and Aibyte populates my column with duplicates. And that's the case - many duplicates in my tables.
To sum up neither way works:
- full refresh only inserts 20k rows (and I didnt set up any limits + source API tested to return 70k rows)
- append mode inserts duplicated values however I've filled in the primary key columns to avoid that.
- there is also no dedupli mode visible (however it exists in the sync documentation)
Is there any way to make that custom connection working? Or should I code python connector for that?
I tried to find some tutorials about that issued but didnt find any. In one doc I've seen 10k max rows due to low memory, but here I have 12MB of data (should not be problem for mem?) and 20k rows used to insert.
Here is the yaml code:
spec: type: Spec connection_specification: type: object $schema: http://json-schema.org/draft-07/schema# required: - username properties: password: type: string order: 1 title: Password always_show: true airbyte_secret: true username: type: string order: 0 title: Username additionalProperties: true type: DeclarativeSource check: type: CheckStream stream_names: - Customers streams:
- name: Customers
type: DeclarativeStream
retriever:
type: SimpleRetriever
paginator:
type: NoPagination
requester:
path: APICustomers
type: HttpRequester
url_base: >-
https://myapiurl
http_method: GET
authenticator:
type: BasicHttpAuthenticator
password: '{{ config[''password''] }}'
username: '{{ config[''username''] }}'
request_headers: {}
request_body_json: {}
request_parameters: {}
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- value
partition_router: []
primary_key:
- Document_Type
- Document_No schema_loader: type: InlineSchemaLoader schema: type: object $schema: http://json-schema.org/schema# properties: City: type: string Name: type: string Open: type: boolean Amount: type: number Address: type: string Due_Date: type: string Amount_LCY: type: number Customer_No: type: string Document_No: type: string Posting_Date: type: string Currency_Code: type: string Document_Type: type: string Territory_Code: type: string AuxiliaryIndex1: type: number AuxiliaryIndex2: type: string AuxiliaryIndex3: type: string Credit_Limit_LCY: type: number Remaining_Amount: type: number Salesperson_Code: type: string Salesperson_Name: type: string CustClassifiation: type: string Remaining_Amt_LCY: type: number Country_Region_Code: type: string VAT_Registration_No: type: string Gen_Bus_Posting_Group: type: string Customer_Posting_Group: type: string
- name: Sales
type: DeclarativeStream
retriever:
type: SimpleRetriever
paginator:
type: NoPagination
requester:
path: APISales
type: HttpRequester
url_base: >-
https://myapiurl
http_method: GET
authenticator:
type: BasicHttpAuthenticator
password: '{{ config[''password''] }}'
username: '{{ config[''username''] }}'
request_headers: {}
request_body_json: {}
request_parameters: {}
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path:
- value
partition_router: []
primary_key:
- Document_Type
- Documentt_No schema_loader: type: InlineSchemaLoader schema: type: object $schema: http://json-schema.org/schema# properties: Item_No: type: string Salespers: type: string Customer_No: type: string Entryt_Type: type: string Item_Vendor: type: string Documentt_No: type: string Customer_City: type: string Customer_Name: type: string Document_Type: type: string Item_Category: type: string Postingt_Date: type: string Customer_Chain: type: string Entry_Quantity: type: number Locationt_Code: type: string Item_Cateogory1: type: string Item_Cateogory2: type: string Customer_Balance: type: number Item_Description: type: string Item_Inv_Posting: type: string Salesperson_Name: type: string Customer_Territory: type: string Product_Group_Code: type: string Customer_Dimension_1: type: string Customer_Dimension_2: type: string Customer_Dimension_3: type: string Customer_Dimension_4: type: string Customer_Price_Group: type: string Customer_Salesperson: type: string Sales_Amount_Exl_VAT: type: number Customer_Posting_Group: type: string VAT_Prod_Posting_Group: type: string version: 0.57.0 metadata: autoImportSchema: Sales: true Customers: true
I also tried to experiment with pagination but this raises more questions: Why Airbyte is adding up + 2GB of RAM for data that is max 12MB (measured as plain txt) ... Also I never finished the request (cancelled taking too much time), which takes about 1 min in web browser but 7+ in Airbyte, which does not make sense to me anyway... So there's kind of problem with simple pagination too and memory management if 12MB request needs 2GB of RAM....
I think this issue may be closed. So I found the reason was actually there is pagination but non-standard using odata next link which caused the troubles. In any case the software should warn me about it not just running the same query in a infinit loop and taking 2GB mem. Also the fact that deduplication does not work / not as smart as could be / push me toward other more low level solution (the code) that can handle data using Airflow :)