sdk icon indicating copy to clipboard operation
sdk copied to clipboard

fix: Initialize `max_replication_key_value` via `SELECT max(<replication_key>) ...` before starting a native `BATCH` sync

Open kgpayne opened this issue 2 years ago β€’ 20 comments

UPDATE (from @aaronsteers) per https://github.com/meltano/sdk/issues/976#issuecomment-1292805141:

Since this is high urgency, I took an initial pass over the break here on tap-snowflake:

  • https://github.com/MeltanoLabs/tap-snowflake/pull/13

That work can be migrated here but I wanted to start with a 'real' implementation for testing and to prove the design approach.

Details

Singer SDK Version

0.10.0

Python Version

3.8

Bug scope

Taps (catalog, state, stream maps, etc.)

Operating System

macOS

Description

When passing previous state to a tap with batch mode enabled, that state is not made available via the documented Stream.get_starting_timestamp() and Stream.get_starting_replication_key_value() methods.

I believe this is because of missing setup of a starting_replication_value, which the methods above depend on to retrieve state. This is easiest to see when comparing the Stream._sync_records() and Stream._sync_batches() methods on singer_sdk.streams.core.Stream (snippets below). I think the critical missing call is self._write_starting_replication_value(current_context).

https://github.com/meltano/sdk/blob/main/singer_sdk/streams/core.py#L1034

Code

def _sync_records(
        self,
        context: dict | None = None,
        write_messages: bool = True,
    ) -> Generator[dict, Any, Any]:
        """Sync records, emitting RECORD and STATE messages.

        Args:
            context: Stream partition or context dictionary.
            write_messages: Whether to write Singer messages to stdout.

        Raises:
            InvalidStreamSortException: TODO

        Yields:
            Each record from the source.
        """
        record_count = 0
        current_context: dict | None
        context_list: list[dict] | None
        context_list = [context] if context is not None else self.partitions
        selected = self.selected

        for current_context in context_list or [{}]:
            partition_record_count = 0
            current_context = current_context or None
            state = self.get_context_state(current_context)
            state_partition_context = self._get_state_partition_context(current_context)
            self._write_starting_replication_value(current_context)
            child_context: dict | None = (
                None if current_context is None else copy.copy(current_context)
            )

            for record_result in self.get_records(current_context):
              ...
    def _sync_batches(
        self,
        batch_config: BatchConfig,
        context: dict | None = None,
    ) -> None:
        """Sync batches, emitting BATCH messages.

        Args:
            batch_config: The batch configuration.
            context: Stream partition or context dictionary.
        """
        for encoding, manifest in self.get_batches(batch_config, context):
            self._write_batch_message(encoding=encoding, manifest=manifest)
            self._write_state_message()

kgpayne avatar Sep 16 '22 14:09 kgpayne

@edgarrmondragon I guess incremental replication with BATCH is still an open question, but it seems to me that this is worth adding for cases when i) users first enable BATCH having run incrementally previously or ii) self-manage a state.json (as I have done for testing). It is so close to 'just working', at least on the select side πŸ˜…

kgpayne avatar Sep 16 '22 15:09 kgpayne

Yeah, so the implementation of Stream._sync_batches is missing the following:

  • Initial state via Stream._write_starting_replication_value
  • State update via State._increment_stream_state(record, context=...

They should be called at some point, but the latter relies on record-by-record sync so it's not useful really...

edgarrmondragon avatar Sep 16 '22 16:09 edgarrmondragon

Makes sense πŸ™‚ In terms of the mechanism to retrieve the bookmark (because it cannot be automatically inferred from the batch itself without decompressing and reading the last line of the last file 😱), what do you think of allowing the user to return it? Something like:

for encoding, manifest, state in self.get_batches(batch_config, context):
    ...

where state is just the max replication key value for that given manifest (defaulting to None)? For tap-snowflake I would then start a transaction and retrieve the max value before executing the copy command πŸš€

kgpayne avatar Sep 16 '22 17:09 kgpayne

~~Note to self: @aaronsteers - reply here after some thoughts.~~ βœ…

@kgpayne - thanks for raising this! @edgarrmondragon and I spoke about it today in our regular 1:1.

What about this approach?

  1. Run a "min/max query" before starting the batch.
    • Before extracting a batch, we use generic SQLAlchemy methods to collect the starting_replication_key_value and ending_replication_key_value.
    • Non-SQL implementations would need their own way of initializing these values, but presumably all SQL-based implementations could use something built into the SDK, such as:
      SELECT 
          min({self.replication_key}) AS starting_replication_key_value,
          max({self.replication_key}) AS ending_replication_key_value
      FROM {self.table_name} AS source_table
      WHERE {self.replication_key} >= {self.get_starting_replication_key_value()}
      
    • The reason of running before the batch executes is because running afterwards would in some cases cause a gap between the max value at time of unload versus the max value at time of post-query. That gap could and would cause data loss, which we would want to avoid.
  2. Use the min/max to limit the batch.
    • When extracting the batch, developers can optionally limit their output to be within the bounds of min/max (inclusive).
    • If records are not able to be limited by min or the max range, then some records may arrive more than once - which is acceptable according to spec.
      • This should be a small number of sources that have native BATCH support but no support for range filtering. Certainly not ideal, but we may want to consider just declaring that STATE isn't supported for those streams when run in BATCH mode, which would mean that basically BATCH on streams with no batch filtering capability would require replication mode of FULL_TABLE mode instead of INCREMENTAL.
  3. Send STATE messages as usual, but with output from the "min/max query".
    • The value of ending_replication_key_value would determine the STATE message content after the batch is transmitted.

Other notes:

aaronsteers avatar Sep 19 '22 20:09 aaronsteers

@aaronsteers approach sounds good πŸ‘ What are the next steps? Would you like me to work on this, or will we ship tap-snowflake without incremental support in batch mode for now?

kgpayne avatar Sep 26 '22 10:09 kgpayne

@kgpayne - Let's ship batch messaging as soon as it's available on tap- and target-snowflake. We need the example implementation as soon as possible for community members to reference.

If we can put placeholders, at discussed, that would light up based on upstream SDK capability getting implemented, that would be ideal.

Code comments and a note in the readme, with cross-links to the logged issue(s), would be much appreciated.

aaronsteers avatar Sep 26 '22 15:09 aaronsteers

@aaronsteers based on your comment above, how does:

Other notes:

square with #1011? In my mind, they are the same feature - i.e. min, max and limit values are passed in the get_records/get_batches signatures, and in the incremental case the value of max is derived from a SQLAlchemy MAX(replication_key) πŸ€” The tap would then i) only get records <= the max and ii) emit the max as its state value.

It would be good to get to a complete spec for tap-snowflake as it sounded from our 1:1 like having INCREMENTAL support sooner rather than later is preferable? Not least to set the overall pattern for anyone wanting to build their own Bigquery/Redshift etc. with incremental support πŸ˜…

@edgarrmondragon FYI

kgpayne avatar Oct 26 '22 17:10 kgpayne

@kgpayne - Following from our past conversations, I logged a new proposal that would suggests a StreamFilter object that can be passed to get_records(), get_batches(), get_url_params(), etc.

  • https://github.com/meltano/sdk/issues/1119

This would eliminate the need for the developer directly calling get_starting_replication_key_value(), get_replication_key_signpost(), get_starting_timestamp(). The developer can then also completely ignore self.replication_method and self.replication_key, since all constraints are precalculated and baked into the filter object that is sent to the method.

This also bakes in the max_record_limit constraint discussed elsewhere.

cc @edgarrmondragon

aaronsteers avatar Oct 26 '22 23:10 aaronsteers

@kgpayne and @edgarrmondragon - The above discussion regarding how to communicate constraints to get_records() and get_batches() are important, but I don't think they necessarily should be a blocker here.

The critical path item here, I think, is to something like the following to capture the max_replication_key_value even while not touching any records ourselves:

  1. Before the SDK calls SQLStream.get_batches(), we run using SQL Alchemy the equivalent of SELECT MIN(replication_key), MAX(replication_key) FROM source_table.
    • The min value has no function as of now, so we can omit it if we want from the query.
  2. We set the internal signpost value to the max replication key value.
    • This has no critical function, and is purely cosmetic for our purposes. It does not filter records - unlike we previously were thinking, because it is unsafe for us to do so without ensuring we then do another 'catchup sync' afterwards.
  3. After the batch messages are sent, the same signpost value (MAX(replication_key) from our original query) should be set as the max_replication_key_value - and then would be sent automatically in the subsequent STATE message for that stream.
  4. On subsequent executions: any implementation of SQLStream.get_batches() that consults either get_starting_replication_key_value() or the new/proposed StreamFilter argument will immediately get incremental support.

Does this sound right?

@kgpayne - If this sounds reasonable, feel free to pick up as you have availability. Delivery sooner will mean less rework for those who build on the tap-snowflake example over the coming weeks.

aaronsteers avatar Oct 27 '22 00:10 aaronsteers

@kgpayne - Do I remember correctly that you had this fixed, either in SDK or in the Snowflake implementation?

aaronsteers avatar Feb 16 '23 22:02 aaronsteers

@aaronsteers I think you might be thinking of this PR πŸ™‚:

  • https://github.com/MeltanoLabs/tap-snowflake/pull/13

kgpayne avatar Feb 16 '23 22:02 kgpayne

@aaronsteers I think you might be thinking of this PR πŸ™‚:

@kgpayne Thanks. I didn't realize until now that you'd added the commits into the existing PR. Definitely we should try to merge that and get into the SDK. I'll see if I can get is merged while you are out next week, and maybe Derek can assist with adapting to the SDK.

aaronsteers avatar Feb 17 '23 18:02 aaronsteers

As the fix (linked above) has now merged into tap-snowflake, next step is to port to the SDK.

kgpayne avatar Apr 06 '23 15:04 kgpayne

What's the progress of this?

luisvicenteatprima avatar Jun 30 '23 07:06 luisvicenteatprima

ping @kgpayne since this is assigned to you. I've added it to the SDK v1 issue as well.

tayloramurphy avatar Jul 04 '23 13:07 tayloramurphy

@tayloramurphy @luisvicenteatprima this is implemented in tap-snowflake but not yet ported to the SDK. Its marked as v1, so should hopefully be picked up in this cadence πŸ‘

@edgarrmondragon FYI this was one of my next up which would be good to hand over 🐣

kgpayne avatar Jul 05 '23 11:07 kgpayne

This is on my TODO list for the week.

Update: PR in https://github.com/meltano/sdk/pull/1894

edgarrmondragon avatar Aug 01 '23 00:08 edgarrmondragon

Any progress on this? There haven't been any updates in the PR for almost 2 months.

luisvicenteatprima avatar Oct 11 '23 09:10 luisvicenteatprima

@edgarrmondragon

luisvicenteatprima avatar Oct 11 '23 09:10 luisvicenteatprima

Bumping to see if there's any progress on this? Thank you!

cooley-hi avatar Jan 04 '24 17:01 cooley-hi