airbyte
airbyte copied to clipboard
🐛 Source Mixpanel - Add new datatime formats for state for cohort_members stream, added obsolete state reset for cohort_members stream
What
ERROR # 1 cohorts stream
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
yield from self._read_stream(
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 236, in _read_stream
for record_data_or_message in record_iterator:
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 126, in read
slices = self.stream_slices(
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 142, in stream_slices
return self.retriever.stream_slices()
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 375, in stream_slices
return self.stream_slicer.stream_slices()
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 168, in stream_slices
start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 174, in _calculate_earliest_possible_value
cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 185, in _calculate_cursor_datetime_from_state
return self.parse_date(stream_state[self._cursor_field.eval(self.config)])
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 234, in parse_date
raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
ValueError: No format in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z'] matching 2024-04-16T14:32:42
Error 2 - cohort_members stream
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
yield from self._read_stream(
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 236, in _read_stream
for record_data_or_message in record_iterator:
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 145, in read
for record_data_or_message in records:
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 120, in read_records
yield from self.retriever.read_records(self.get_json_schema(), stream_slice)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 324, in read_records
for stream_data in self._read_pages(record_generator, self.state, _slice):
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 294, in _read_pages
next_page_token = self._next_page_token(response)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 258, in _next_page_token
return self._paginator.next_page_token(response, self._records_from_last_response)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py", line 105, in next_page_token
self._token = self.pagination_strategy.next_page_token(response, last_records)
File "/airbyte/integration_code/source_mixpanel/components.py", line 278, in next_page_token
if self._total and page_number is not None and self._total > self.page_size * (page_number + 1):
AttributeError: 'EngagePaginationStrategy' object has no attribute '_total'
Error 3 cohort_members::
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
yield from self._read_stream(
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 222, in _read_stream
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 85, in state
self.retriever.state = state
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 385, in state
self.cursor.set_initial_state(value)
File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py", line 86, in set_initial_state
for state in stream_state["states"]:
KeyError: 'states'
},
Problem 4: Fixing non-unique primary key for cohort_members stream https://github.com/airbytehq/airbyte/issues/37833 Old key: distinct_id New key: distinct_id, cohort_id
How
- Added new datetime formats for state for cohort_members stream,
- state for cohort_members stream is changed to new per-partition format
- initialize total
- changed key to distinct_id, cohort_id
Review guide
User Impact
Breaking change for CohortMembers stream:
- State is changed to new empty per-partition format.
- Key is changed to new unique key (based on 'distinct_id' and 'cohort_id' fields) since previous key was not unique and didn't support possibility for user be in a few different cohorts.
Can this PR be safely reverted and rolled back?
- [ ] YES 💚
- [x] NO ❌
The latest updates on your projects. Learn more about Vercel for Git ↗︎
1 Ignored Deployment
| Name | Status | Preview | Comments | Updated (UTC) |
|---|---|---|---|---|
| airbyte-docs | ⬜️ Ignored (Inspect) | Visit Preview | May 23, 2024 11:58am |
/format-fix
Format-fix job started... Check job output.
✅ Changes applied successfully. (c5f43d8165cefc63db3d207b485fe6fc93f2c829)
Regression test: https://github.com/airbytehq/airbyte/actions/runs/9082610503
I have double-checked those regression tests that you ran.
And I found that you did not test cohorts at all
-
As I said before cohorts and cohort_members are semi incremental streams so during each sync it reads all data and cover 95% of regular incremental. Of course Incremental with data in better.
-
So I have tested incremental syncs for cohorts and cohort_members stream with custom configured_catalog and state. The current connector version works fine, but
- the number of records is different because old client-side filtering did not work correctly on non-chronological data.
- the number of requests can also be different because of retries (API rate limit)
Comments for Regression tool report:
- "Untested streams (not in configured catalog or without data)": There is a difference in when stream was not configured to run or it was run but returned 0 records (which is very common for incremental streams)
- OVERALL: Report does not indicate errors that happened during sync. They can be only found in command_execution_artifacts/source-mixpanel/read-with-state/dev/stdout.log. Now report says that it returns 0 records (see item 1). There is a difference in when stream read 0 records (because it is an incremental stream with up-to-date state) or failed during state initialization or due to API rate limits or another problems.
- "Requested URLs": Probably it would be good to add HTTP body (if it is specified) to URLs as well. Otherwise we need to look at raw data (http_dump.har) to understand why there are the same URLs - is it because of retries or has different bodies
left a few comments. main questions
- can semi-incremental be released separately from the bug fixes?
- can the filters only rely on stream_interval instead of stream_state?
-
"semi-incremental" - WIthout filtering each incremental syncs returns all records. it is implemented in order to support proper 'incremental' sync which existed in python version of this connector (2.2.0). When we were discussing this change, it was agreed that we must implement the same behavior as it was before low code otherwise it is treated as another breaking change. Please correct me if this is a wrong statement.
-
"stream_interval " - I have rewritten filters to use stream_interval . It is indeed much more comfortable to use instead of stream_state and stream_slice, especially for streams with partitioning.