airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

🐛 Source Mixpanel - Add new datatime formats for state for cohort_members stream, added obsolete state reset for cohort_members stream

Open midavadim opened this issue 1 year ago • 5 comments

What

ERROR # 1 cohorts stream

  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 236, in _read_stream
    for record_data_or_message in record_iterator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 126, in read
    slices = self.stream_slices(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 142, in stream_slices
    return self.retriever.stream_slices()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 375, in stream_slices
    return self.stream_slicer.stream_slices()
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 168, in stream_slices
    start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 174, in _calculate_earliest_possible_value
    cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 185, in _calculate_cursor_datetime_from_state
    return self.parse_date(stream_state[self._cursor_field.eval(self.config)])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py", line 234, in parse_date
    raise ValueError(f"No format in {self.cursor_datetime_formats} matching {date}")
ValueError: No format in ['%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z'] matching 2024-04-16T14:32:42

Error 2 - cohort_members stream

  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 236, in _read_stream
    for record_data_or_message in record_iterator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 145, in read
    for record_data_or_message in records:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 120, in read_records
    yield from self.retriever.read_records(self.get_json_schema(), stream_slice)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 324, in read_records
    for stream_data in self._read_pages(record_generator, self.state, _slice):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 294, in _read_pages
    next_page_token = self._next_page_token(response)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 258, in _next_page_token
    return self._paginator.next_page_token(response, self._records_from_last_response)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py", line 105, in next_page_token
    self._token = self.pagination_strategy.next_page_token(response, last_records)
  File "/airbyte/integration_code/source_mixpanel/components.py", line 278, in next_page_token
    if self._total and page_number is not None and self._total > self.page_size * (page_number + 1):
AttributeError: 'EngagePaginationStrategy' object has no attribute '_total'

Error 3 cohort_members::


		
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 136, in read
    yield from self._read_stream(
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 222, in _read_stream
    stream_instance.state = stream_state  # type: ignore # we check that state in the dir(stream_instance)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 85, in state
    self.retriever.state = state
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 385, in state
    self.cursor.set_initial_state(value)
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py", line 86, in set_initial_state
    for state in stream_state["states"]:
KeyError: 'states'
  },

Problem 4: Fixing non-unique primary key for cohort_members stream https://github.com/airbytehq/airbyte/issues/37833 Old key: distinct_id New key: distinct_id, cohort_id

How

  1. Added new datetime formats for state for cohort_members stream,
  2. state for cohort_members stream is changed to new per-partition format
  3. initialize total
  4. changed key to distinct_id, cohort_id

Review guide

User Impact

Breaking change for CohortMembers stream:

  • State is changed to new empty per-partition format.
  • Key is changed to new unique key (based on 'distinct_id' and 'cohort_id' fields) since previous key was not unique and didn't support possibility for user be in a few different cohorts.

Can this PR be safely reverted and rolled back?

  • [ ] YES 💚
  • [x] NO ❌

midavadim avatar May 08 '24 18:05 midavadim

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 23, 2024 11:58am

vercel[bot] avatar May 08 '24 18:05 vercel[bot]

/format-fix

Format-fix job started... Check job output.

✅ Changes applied successfully. (c5f43d8165cefc63db3d207b485fe6fc93f2c829)

lazebnyi avatar May 14 '24 15:05 lazebnyi

Regression test: https://github.com/airbytehq/airbyte/actions/runs/9082610503

midavadim avatar May 14 '24 16:05 midavadim

/format-fix

Format-fix job started... Check job output.

midavadim avatar May 14 '24 17:05 midavadim

/format-fix

Format-fix job started... Check job output.

midavadim avatar May 14 '24 17:05 midavadim

I have double-checked those regression tests that you ran.

And I found that you did not test cohorts at all

  1. As I said before cohorts and cohort_members are semi incremental streams so during each sync it reads all data and cover 95% of regular incremental. Of course Incremental with data in better.

  2. So I have tested incremental syncs for cohorts and cohort_members stream with custom configured_catalog and state. The current connector version works fine, but

  • the number of records is different because old client-side filtering did not work correctly on non-chronological data.
  • the number of requests can also be different because of retries (API rate limit)

Comments for Regression tool report:

  1. "Untested streams (not in configured catalog or without data)": There is a difference in when stream was not configured to run or it was run but returned 0 records (which is very common for incremental streams)
  2. OVERALL: Report does not indicate errors that happened during sync. They can be only found in command_execution_artifacts/source-mixpanel/read-with-state/dev/stdout.log. Now report says that it returns 0 records (see item 1). There is a difference in when stream read 0 records (because it is an incremental stream with up-to-date state) or failed during state initialization or due to API rate limits or another problems.
  3. "Requested URLs": Probably it would be good to add HTTP body (if it is specified) to URLs as well. Otherwise we need to look at raw data (http_dump.har) to understand why there are the same URLs - is it because of retries or has different bodies

midavadim avatar May 17 '24 11:05 midavadim

left a few comments. main questions

  • can semi-incremental be released separately from the bug fixes?
  • can the filters only rely on stream_interval instead of stream_state?
  1. "semi-incremental" - WIthout filtering each incremental syncs returns all records. it is implemented in order to support proper 'incremental' sync which existed in python version of this connector (2.2.0). When we were discussing this change, it was agreed that we must implement the same behavior as it was before low code otherwise it is treated as another breaking change. Please correct me if this is a wrong statement.

  2. "stream_interval " - I have rewritten filters to use stream_interval . It is indeed much more comfortable to use instead of stream_state and stream_slice, especially for streams with partitioning.

midavadim avatar May 20 '24 21:05 midavadim