PyAirbyte
PyAirbyte copied to clipboard
🐛 Bug: Google Ads fails due to nested primary keys
The Google Ads source is failing due to nested primary keys. Logged here in slack: https://airbytehq-team.slack.com/archives/C06FZ238P8W/p1712842623104989?thread_ts=1712842217.390879&cid=C06FZ238P8W
Example error message:
-
NotImplementedError: Nested primary keys are not yet supported. Found: ad_group.id
Source code:
- https://github.com/airbytehq/PyAirbyte/blob/ce20b56c708d8814e775ce5391ce9ba295497be1/airbyte/_future_cdk/sql_processor.py#L802-L806
As a first step to resolving, we should try to find out (1) which stream is causing this error and (2) if that stream is standard or dynamic from the user workspace and/or user config.
Facing this error, @aaronsteers do we have any RCA on this after the issue was initially raised.
Sample code I am running:
source_gads = ab.get_source('source-google-ads', install_if_missing=True, config=google_payload.get('configuration'))
source_gads.select_streams(['ad_group'])
result = source_gads.read()
Response:
Read Progress
Started reading at 14:08:55.
Read 1,562 records over 7 seconds (223.1 records / second).
Wrote 1,562 records over 1 batches.
Finished reading at 14:09:03.
Started finalizing streams at 14:09:03.
Finalized 0 batches over 0 seconds.
Failed `source-google-ads` read operation at 19:39:03.
Error stacktrace:
NotImplementedError Traceback (most recent call last)
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:804, in Source.read(self, cache, streams, write_strategy, force_full_refresh, skip_validation)
[803](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:803) try:
--> [804](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:804) cache_processor.process_airbyte_messages(
[805](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:805) self._read_with_catalog(
[806](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:806) catalog=self.configured_catalog,
[807](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:807) state=state_provider,
[808](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:808) ),
[809](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:809) write_strategy=write_strategy,
[810](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:810) )
[812](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:812) # TODO: We should catch more specific exceptions here
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:217, in RecordProcessorBase.process_airbyte_messages(self, messages, write_strategy)
[215](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:215) # We've finished processing input data.
[216](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:216) # Finalize all received records and state messages:
--> [217](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:217) self.write_all_stream_data(
[218](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:218) write_strategy=write_strategy,
[219](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:219) )
[221](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:221) self.cleanup_all()
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:226, in RecordProcessorBase.write_all_stream_data(self, write_strategy)
[225](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:225) for stream_name in self.catalog_provider.stream_names:
--> [226](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:226) self.write_stream_data(
[227](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:227) stream_name,
[228](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:228) write_strategy=write_strategy,
[229](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/record_processor.py:229) )
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:510, in SqlProcessorBase.write_stream_data(self, stream_name, write_strategy)
[509](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:509) try:
--> [510](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:510) self._write_temp_table_to_final_table(
[511](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:511) stream_name=stream_name,
[512](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:512) temp_table_name=temp_table_name,
[513](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:513) final_table_name=final_table_name,
[514](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:514) write_strategy=write_strategy,
[515](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:515) )
[516](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:516) finally:
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:693, in SqlProcessorBase._write_temp_table_to_final_table(self, stream_name, temp_table_name, final_table_name, write_strategy)
[692](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:692) """Write the temp table into the final table using the provided write strategy."""
--> [693](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:693) has_pks: bool = bool(self._get_primary_keys(stream_name))
[694](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:694) has_incremental_key: bool = bool(self._get_incremental_key(stream_name))
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:792, in SqlProcessorBase._get_primary_keys(self, stream_name)
[791](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:791) msg = f"Nested primary keys are not yet supported. Found: {pk}"
--> [792](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:792) raise NotImplementedError(msg)
[794](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/_future_cdk/sql_processor.py:794) return joined_pks
NotImplementedError: Nested primary keys are not yet supported. Found: ad_group.id
The above exception was the direct cause of the following exception:
AirbyteConnectorFailedError Traceback (most recent call last)
Cell In[30], [line 4](vscode-notebook-cell:?execution_count=30&line=4)
[1](vscode-notebook-cell:?execution_count=30&line=1) import time
[3](vscode-notebook-cell:?execution_count=30&line=3) start_time = time.time()
----> [4](vscode-notebook-cell:?execution_count=30&line=4) result = source_gads.read()
[7](vscode-notebook-cell:?execution_count=30&line=7) end_time = time.time()
[8](vscode-notebook-cell:?execution_count=30&line=8) print(f"Time required : {end_time - start_time}")
File ~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:815, in Source.read(self, cache, streams, write_strategy, force_full_refresh, skip_validation)
[813](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:813) except Exception as ex:
[814](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:814) self._log_sync_failure(cache=cache, exception=ex)
--> [815](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:815) raise exc.AirbyteConnectorFailedError(
[816](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:816) log_text=self._last_log_messages,
[817](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:817) ) from ex
[819](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:819) self._log_sync_success(cache=cache)
[820](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:820) return ReadResult(
[821](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:821) processed_records=self._processed_records,
[822](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:822) cache=cache,
[823](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:823) processed_streams=[stream.stream.name for stream in self.configured_catalog.streams],
[824](https://file+.vscode-resource.vscode-cdn.net/Users/sukantaroy/pixis/pyAirbyte-prefect-POC/~/pixis/pyAirbyte-prefect-POC/myenv/lib/python3.11/site-packages/airbyte/sources/base.py:824) )
AirbyteConnectorFailedError: AirbyteConnectorFailedError: Connector failed.
Log output:
Found 1 customers: ['<AD_ACCOUNT_ID_REMOVED>']
Marking stream ad_group as STARTED
Syncing stream: ad_group
I0000 00:00:1721398140.339362 84332 check_gcp_environment_no_op.cc:29] ALTS: Platforms other than Linux and Windows are not supported
Marking stream ad_group as RUNNING
Read 1562 records from ad_group stream
Marking stream ad_group as STOPPED
Finished syncing ad_group
SourceGoogleAds runtimes:
Syncing stream ad_group 0:00:03.165605
Finished syncing SourceGoogleAds
```
Having the same error message here.
My code:
import airbyte as ab
source = ab.get_source("source-google-ads")
source.set_config(
{
"credentials": {
"developer_token": "...",
"client_id": "...",
"client_secret": "...",
"refresh_token": "...",
},
"customer_id": '...',
}
)
source.check()
source.select_streams(['campaign_criterion'])
read_result: ab.ReadResult = source.read()
raises:
NotImplementedError: Nested primary keys are not yet supported. Found: campaign_criterion.resource_name