airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

Normalized destinations: Use _airbyte_normalized_at for incremental queries

Open edgao opened this issue 2 years ago • 3 comments

What

As described in https://github.com/airbytehq/airbyte/issues/21831, deeply-nested, sparsely-populated objects may see unexpectedly high amounts of duplicate records after normalization.

Use > _airbyte_normalized_at for incremental queries (where possible) to prevent this. After this change, duplication will be limited to 2x (because we query the raw tables via >= _airbyte_emitted_at).

How

_airbyte_normalized_at represents when a record was inserted/updated by normalization. It's generated using dbt's current_timestamp() macro, which provides a millis-precision (or better) destination-server timestamp. So we should be OK to rely on it as a monotonically-increasing cursor across syncs.

This means we won't normalize records multiple times (again, other than from the raw table), and therefore will no longer see the runaway duplication behavior.

Recommended reading order

  1. incremental.sql - this defines a dbt macro incremental_clause. I've (a) renamed the args for clarity (it's no longer always the emitted_at column) and (b) added a comparisonOperator arg.
  2. stream_processor.py

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user? If yes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • [ ] Community member? Grant edit access to maintainers (instructions)
  • [ ] Secrets in the connector's spec are annotated with airbyte_secret
  • [ ] Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • [ ] Code reviews completed
  • [ ] Documentation updated
    • [ ] Connector's README.md
    • [ ] Connector's bootstrap.md. See description and examples
    • [ ] docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • [ ] docs/integrations/README.md
    • [ ] airbyte-integrations/builds.md
  • [ ] PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • [ ] Create a non-forked branch based on this PR and test the below items on it
  • [ ] Build is successful
  • [ ] If new credentials are required for use in CI, add them to GSM. Instructions.
  • [ ] /test connector=connectors/<name> command is passing
  • [ ] New Connector version released on Dockerhub by running the /publish command described here
  • [ ] After the connector is published, connector added to connector index as described here
  • [ ] Seed specs have been re-generated by building the platform and committing the changes to the seed spec files, as described here
Updating a connector

Community member or Airbyter

  • [ ] Grant edit access to maintainers (instructions)
  • [ ] Secrets in the connector's spec are annotated with airbyte_secret
  • [ ] Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • [ ] Code reviews completed
  • [ ] Documentation updated
    • [ ] Connector's README.md
    • [ ] Connector's bootstrap.md. See description and examples
    • [ ] Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • [ ] PR name follows PR naming conventions

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • [ ] Create a non-forked branch based on this PR and test the below items on it
  • [ ] Build is successful
  • [ ] If new credentials are required for use in CI, add them to GSM. Instructions.
  • [ ] /test connector=connectors/<name> command is passing
  • [ ] New Connector version released on Dockerhub and connector version bumped by running the /publish command described here
Connector Generator
  • [ ] Issue acceptance criteria met
  • [ ] PR name follows PR naming conventions
  • [ ] If adding a new generator, add it to the list of scaffold modules being tested
  • [ ] The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • [ ] Documentation which references the generator is updated as needed

Tests

Unit

Put your unit tests output here.

Integration

Put your integration tests output here.

Acceptance

Put your acceptance tests output here.

edgao avatar Jan 27 '23 21:01 edgao

Affected Connector Report

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to do the following as needed:

  • Run integration tests
  • Bump connector or module version
  • Add changelog
  • Publish the new version

✅ Sources (0)

Connector Version Changelog Publish
  • See "Actionable Items" below for how to resolve warnings and errors.

❌ Destinations (16)

Connector Version Changelog Publish
destination-bigquery 1.2.13
destination-bigquery-denormalized 1.2.12
(diff seed version)
destination-clickhouse 0.2.2
(changelog missing)
destination-clickhouse-strict-encrypt 0.2.2 🔵
(ignored)
🔵
(ignored)
destination-jdbc 0.3.14 🔵
(ignored)
🔵
(ignored)
destination-mssql 0.1.22
destination-mssql-strict-encrypt 0.1.22 🔵
(ignored)
🔵
(ignored)
destination-mysql 0.1.20
destination-mysql-strict-encrypt 0.1.21
(mismatch: 0.1.20)
🔵
(ignored)
🔵
(ignored)
destination-oracle 0.1.19
destination-oracle-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
destination-postgres 0.3.26
destination-postgres-strict-encrypt 0.3.26 🔵
(ignored)
🔵
(ignored)
destination-redshift 0.3.56
destination-snowflake 0.4.47
destination-tidb 0.1.0
  • See "Actionable Items" below for how to resolve warnings and errors.

👀 Other Modules (1)

  • base-normalization

Actionable Items

(click to expand)

Category Status Actionable Item
Version
mismatch
The version of the connector is different from its normal variant. Please bump the version of the connector.

doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.
Changelog
doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.

changelog missing
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog.
Publish
not in seed
The connector is not in the seed file (e.g. source_definitions.yaml), so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that it is not a bug.

diff seed version
The connector exists in the seed file, but the latest version is not listed there. This usually means that the latest version is not published. Please use the /publish command to publish the latest version.

github-actions[bot] avatar Jan 27 '23 21:01 github-actions[bot]

/test connector=connectors/destination-snowflake

:clock2: connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/4028314663 :white_check_mark: connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/4028314663 Python tests coverage:

Name                                                              Stmts   Miss  Cover
-------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                            2      0   100%
normalization/transform_catalog/reserved_keywords.py                 14      0   100%
normalization/transform_catalog/__init__.py                           2      0   100%
normalization/destination_type.py                                    14      0   100%
normalization/__init__.py                                             4      0   100%
normalization/transform_catalog/destination_name_transformer.py     166      8    95%
normalization/transform_catalog/table_name_registry.py              174     34    80%
normalization/transform_config/transform.py                         189     48    75%
normalization/transform_catalog/utils.py                             51     14    73%
normalization/transform_catalog/dbt_macro.py                         22      7    68%
normalization/transform_catalog/catalog_processor.py                147     80    46%
normalization/transform_catalog/transform.py                         61     38    38%
normalization/transform_catalog/stream_processor.py                 595    400    33%
-------------------------------------------------------------------------------------
TOTAL                                                              1441    629    56%

Build Passed

Test summary info:

All Passed

edgao avatar Jan 27 '23 21:01 edgao

/test connector=bases/base-normalization

:clock2: bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/4028714268 :white_check_mark: bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/4028714268 Python tests coverage:

Name                                                              Stmts   Miss  Cover
-------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                            2      0   100%
normalization/transform_catalog/reserved_keywords.py                 14      0   100%
normalization/transform_catalog/__init__.py                           2      0   100%
normalization/destination_type.py                                    14      0   100%
normalization/__init__.py                                             4      0   100%
normalization/transform_catalog/destination_name_transformer.py     166      8    95%
normalization/transform_catalog/table_name_registry.py              174     34    80%
normalization/transform_config/transform.py                         189     48    75%
normalization/transform_catalog/utils.py                             51     14    73%
normalization/transform_catalog/dbt_macro.py                         22      7    68%
normalization/transform_catalog/catalog_processor.py                147     80    46%
normalization/transform_catalog/transform.py                         61     38    38%
normalization/transform_catalog/stream_processor.py                 595    400    33%
-------------------------------------------------------------------------------------
TOTAL                                                              1441    629    56%
	 Name                                                 Stmts   Miss  Cover   Missing
	 ----------------------------------------------------------------------------------
	 source_acceptance_test/base.py                          12      4    67%   16-19
	 source_acceptance_test/config.py                       141      5    96%   87, 93, 239, 243-244
	 source_acceptance_test/conftest.py                     211     95    55%   36, 42-44, 49, 54, 77, 83, 89-91, 110, 115-117, 123-125, 131-132, 137-138, 143, 149, 158-167, 173-178, 193, 217, 248, 254, 262-267, 275-285, 293-306, 311-317, 324-335, 342-358
	 source_acceptance_test/plugin.py                        69     25    64%   22-23, 31, 36, 120-140, 144-148
	 source_acceptance_test/tests/test_core.py              402    115    71%   53, 58, 93-104, 109-116, 120-121, 125-126, 308, 346-363, 376-387, 391-396, 402, 435-440, 478-485, 528-530, 533, 598-606, 618-621, 626, 682-683, 689, 692, 728-738, 751-776
	 source_acceptance_test/tests/test_incremental.py       160     14    91%   58-65, 70-83, 246
	 source_acceptance_test/utils/asserts.py                 39      2    95%   62-63
	 source_acceptance_test/utils/common.py                  94     10    89%   16-17, 32-38, 72, 75
	 source_acceptance_test/utils/compare.py                 62     23    63%   21-51, 68, 97-99
	 source_acceptance_test/utils/connector_runner.py       133     33    75%   24-27, 46-47, 50-54, 57-58, 73-75, 78-80, 83-85, 88-90, 93-95, 124-125, 159-161, 208
	 source_acceptance_test/utils/json_schema_helper.py     107     13    88%   30-31, 38, 41, 65-68, 96, 120, 192-194
	 ----------------------------------------------------------------------------------
	 TOTAL                                                 1609    339    79%
Name                                                              Stmts   Miss  Cover
-------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                            2      0   100%
normalization/transform_catalog/reserved_keywords.py                 14      0   100%
normalization/transform_catalog/__init__.py                           2      0   100%
normalization/destination_type.py                                    14      0   100%
normalization/__init__.py                                             4      0   100%
normalization/transform_catalog/destination_name_transformer.py     166      8    95%
normalization/transform_catalog/table_name_registry.py              174     34    80%
normalization/transform_config/transform.py                         189     48    75%
normalization/transform_catalog/utils.py                             51     14    73%
normalization/transform_catalog/dbt_macro.py                         22      7    68%
normalization/transform_catalog/catalog_processor.py                147     80    46%
normalization/transform_catalog/transform.py                         61     38    38%
normalization/transform_catalog/stream_processor.py                 595    400    33%
-------------------------------------------------------------------------------------
TOTAL                                                              1441    629    56%
Name                                                              Stmts   Miss  Cover
-------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                            2      0   100%
normalization/transform_catalog/reserved_keywords.py                 14      0   100%
normalization/transform_catalog/__init__.py                           2      0   100%
normalization/destination_type.py                                    14      0   100%
normalization/__init__.py                                             4      0   100%
normalization/transform_catalog/utils.py                             51      1    98%
normalization/transform_catalog/destination_name_transformer.py     166      5    97%
normalization/transform_catalog/stream_processor.py                 595     35    94%
normalization/transform_catalog/catalog_processor.py                147     12    92%
normalization/transform_catalog/dbt_macro.py                         22      3    86%
normalization/transform_catalog/table_name_registry.py              174     51    71%
normalization/transform_catalog/transform.py                         61     22    64%
normalization/transform_config/transform.py                         189     77    59%
-------------------------------------------------------------------------------------
TOTAL                                                              1441    206    86%

Build Passed

Test summary info:

	 =========================== short test summary info ============================
	 SKIPPED [1] integration_tests/test_drop_scd_overwrite.py:56: DestinationType.ORACLE does not support incremental sync with schema change yet
	 SKIPPED [1] integration_tests/test_drop_scd_overwrite.py:56: DestinationType.TIDB does not support incremental sync with schema change yet
	 SKIPPED [3] integration_tests/test_ephemeral.py:102: ephemeral materialization isn't supported in ClickHouse yet
	 SKIPPED [1] integration_tests/test_ephemeral.py:59: Skipping test for column limit, because in MySQL, the max number of columns is limited by row size (8KB)
	 SKIPPED [1] integration_tests/test_normalization.py:144: DestinationType.CLICKHOUSE is disabled as it doesnt support schema change in incremental yet (column type changes)
	 SKIPPED [1] integration_tests/test_normalization.py:82: Destinations DestinationType.CLICKHOUSE does not support nested streams
	 SKIPPED [1] integration_tests/test_normalization.py:147: DestinationType.MSSQL is disabled as it doesnt fully support schema change in incremental yet
	 SKIPPED [2] integration_tests/test_normalization.py:135: DestinationType.MYSQL does not support incremental yet
	 SKIPPED [1] integration_tests/test_normalization.py:135: DestinationType.ORACLE does not support incremental yet
	 SKIPPED [1] integration_tests/test_normalization.py:82: Destinations DestinationType.ORACLE does not support nested streams
	 SKIPPED [1] integration_tests/test_normalization.py:144: DestinationType.SNOWFLAKE is disabled as it doesnt support schema change in incremental yet (column type changes)
	 SKIPPED [1] integration_tests/test_normalization.py:144: DestinationType.TIDB is disabled as it doesnt support schema change in incremental yet (column type changes)
	 [32m================= [32m[1m47 passed[0m, [33m15 skipped[0m[32m in 4237.37s (1:10:37)[0m[32m ==================[0m
	 =========================== short test summary info ============================
	 SKIPPED [1] integration_tests/test_drop_scd_overwrite.py:56: DestinationType.ORACLE does not support incremental sync with schema change yet
	 SKIPPED [1] integration_tests/test_drop_scd_overwrite.py:56: DestinationType.TIDB does not support incremental sync with schema change yet
	 SKIPPED [3] integration_tests/test_ephemeral.py:102: ephemeral materialization isn't supported in ClickHouse yet
	 SKIPPED [1] integration_tests/test_ephemeral.py:59: Skipping test for column limit, because in MySQL, the max number of columns is limited by row size (8KB)
	 SKIPPED [1] integration_tests/test_normalization.py:82: Destinations DestinationType.CLICKHOUSE does not support nested streams
	 SKIPPED [1] integration_tests/test_normalization.py:144: DestinationType.CLICKHOUSE is disabled as it doesnt support schema change in incremental yet (column type changes)
	 SKIPPED [1] integration_tests/test_normalization.py:147: DestinationType.MSSQL is disabled as it doesnt fully support schema change in incremental yet
	 SKIPPED [2] integration_tests/test_normalization.py:135: DestinationType.MYSQL does not support incremental yet
	 SKIPPED [1] integration_tests/test_normalization.py:82: Destinations DestinationType.ORACLE does not support nested streams
	 SKIPPED [1] integration_tests/test_normalization.py:135: DestinationType.ORACLE does not support incremental yet
	 SKIPPED [1] integration_tests/test_normalization.py:144: DestinationType.SNOWFLAKE is disabled as it doesnt support schema change in incremental yet (column type changes)
	 SKIPPED [1] integration_tests/test_normalization.py:144: DestinationType.TIDB is disabled as it doesnt support schema change in incremental yet (column type changes)
	 [32m================= [32m[1m47 passed[0m, [33m15 skipped[0m[32m in 3975.89s (1:06:15)[0m[32m ==================[0m

edgao avatar Jan 27 '23 22:01 edgao

/test connector=bases/base-normalization

:clock2: bases/base-normalization https://github.com/airbytehq/airbyte/actions/runs/4049293072

edgao avatar Jan 30 '23 23:01 edgao

/test connector=connectors/destination-snowflake

:clock2: connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/4049288790 :white_check_mark: connectors/destination-snowflake https://github.com/airbytehq/airbyte/actions/runs/4049288790 Python tests coverage:

Name                                                              Stmts   Miss  Cover
-------------------------------------------------------------------------------------
normalization/transform_config/__init__.py                            2      0   100%
normalization/transform_catalog/reserved_keywords.py                 14      0   100%
normalization/transform_catalog/__init__.py                           2      0   100%
normalization/destination_type.py                                    14      0   100%
normalization/__init__.py                                             4      0   100%
normalization/transform_catalog/destination_name_transformer.py     166      8    95%
normalization/transform_catalog/table_name_registry.py              174     34    80%
normalization/transform_config/transform.py                         189     48    75%
normalization/transform_catalog/utils.py                             51     14    73%
normalization/transform_catalog/dbt_macro.py                         22      7    68%
normalization/transform_catalog/catalog_processor.py                147     80    46%
normalization/transform_catalog/transform.py                         61     38    38%
normalization/transform_catalog/stream_processor.py                 595    400    33%
-------------------------------------------------------------------------------------
TOTAL                                                              1441    629    56%

Build Passed

Test summary info:

All Passed

edgao avatar Jan 30 '23 23:01 edgao

closing in favor of https://github.com/airbytehq/airbyte/pull/22381

edgao avatar Feb 04 '23 01:02 edgao