sdk
sdk copied to clipboard
fix: Initialize `max_replication_key_value` via `SELECT max(<replication_key>) ...` before starting a native `BATCH` sync
UPDATE (from @aaronsteers) per https://github.com/meltano/sdk/issues/976#issuecomment-1292805141:
Since this is high urgency, I took an initial pass over the break here on tap-snowflake:
- https://github.com/MeltanoLabs/tap-snowflake/pull/13
That work can be migrated here but I wanted to start with a 'real' implementation for testing and to prove the design approach.
Details
Singer SDK Version
0.10.0
Python Version
3.8
Bug scope
Taps (catalog, state, stream maps, etc.)
Operating System
macOS
Description
When passing previous state to a tap with batch mode enabled, that state is not made available via the documented Stream.get_starting_timestamp()
and Stream.get_starting_replication_key_value()
methods.
I believe this is because of missing setup of a starting_replication_value
, which the methods above depend on to retrieve state. This is easiest to see when comparing the Stream._sync_records()
and Stream._sync_batches()
methods on singer_sdk.streams.core.Stream
(snippets below). I think the critical missing call is self._write_starting_replication_value(current_context)
.
https://github.com/meltano/sdk/blob/main/singer_sdk/streams/core.py#L1034
Code
def _sync_records(
self,
context: dict | None = None,
write_messages: bool = True,
) -> Generator[dict, Any, Any]:
"""Sync records, emitting RECORD and STATE messages.
Args:
context: Stream partition or context dictionary.
write_messages: Whether to write Singer messages to stdout.
Raises:
InvalidStreamSortException: TODO
Yields:
Each record from the source.
"""
record_count = 0
current_context: dict | None
context_list: list[dict] | None
context_list = [context] if context is not None else self.partitions
selected = self.selected
for current_context in context_list or [{}]:
partition_record_count = 0
current_context = current_context or None
state = self.get_context_state(current_context)
state_partition_context = self._get_state_partition_context(current_context)
self._write_starting_replication_value(current_context)
child_context: dict | None = (
None if current_context is None else copy.copy(current_context)
)
for record_result in self.get_records(current_context):
...
def _sync_batches(
self,
batch_config: BatchConfig,
context: dict | None = None,
) -> None:
"""Sync batches, emitting BATCH messages.
Args:
batch_config: The batch configuration.
context: Stream partition or context dictionary.
"""
for encoding, manifest in self.get_batches(batch_config, context):
self._write_batch_message(encoding=encoding, manifest=manifest)
self._write_state_message()
@edgarrmondragon I guess incremental replication with BATCH
is still an open question, but it seems to me that this is worth adding for cases when i) users first enable BATCH
having run incrementally previously or ii) self-manage a state.json
(as I have done for testing). It is so close to 'just working', at least on the select side π
Yeah, so the implementation of Stream._sync_batches
is missing the following:
- Initial state via
Stream._write_starting_replication_value
- State update via
State._increment_stream_state(record, context=...
They should be called at some point, but the latter relies on record-by-record sync so it's not useful really...
Makes sense π In terms of the mechanism to retrieve the bookmark (because it cannot be automatically inferred from the batch itself without decompressing and reading the last line of the last file π±), what do you think of allowing the user to return it? Something like:
for encoding, manifest, state in self.get_batches(batch_config, context):
...
where state
is just the max replication key value for that given manifest (defaulting to None
)? For tap-snowflake
I would then start a transaction and retrieve the max value before executing the copy
command π
~~Note to self: @aaronsteers - reply here after some thoughts.~~ β
@kgpayne - thanks for raising this! @edgarrmondragon and I spoke about it today in our regular 1:1.
What about this approach?
- Run a "min/max query" before starting the batch.
- Before extracting a batch, we use generic SQLAlchemy methods to collect the
starting_replication_key_value
andending_replication_key_value
. - Non-SQL implementations would need their own way of initializing these values, but presumably all SQL-based implementations could use something built into the SDK, such as:
SELECT min({self.replication_key}) AS starting_replication_key_value, max({self.replication_key}) AS ending_replication_key_value FROM {self.table_name} AS source_table WHERE {self.replication_key} >= {self.get_starting_replication_key_value()}
- The reason of running before the batch executes is because running afterwards would in some cases cause a gap between the max value at time of unload versus the max value at time of post-query. That gap could and would cause data loss, which we would want to avoid.
- Before extracting a batch, we use generic SQLAlchemy methods to collect the
- Use the min/max to limit the batch.
- When extracting the batch, developers can optionally limit their output to be within the bounds of min/max (inclusive).
- If records are not able to be limited by min or the max range, then some records may arrive more than once - which is acceptable according to spec.
- This should be a small number of sources that have native BATCH support but no support for range filtering. Certainly not ideal, but we may want to consider just declaring that
STATE
isn't supported for those streams when run in BATCH mode, which would mean that basicallyBATCH
on streams with no batch filtering capability would require replication mode ofFULL_TABLE
mode instead ofINCREMENTAL
.
- This should be a small number of sources that have native BATCH support but no support for range filtering. Certainly not ideal, but we may want to consider just declaring that
- Send STATE messages as usual, but with output from the "min/max query".
- The value of
ending_replication_key_value
would determine theSTATE
message content after the batch is transmitted.
- The value of
Other notes:
- If we want to keep function signatures mostly the same, the starting value could be gotten via
Stream.get_starting_replication_key_value()
and the ending value could be gotten viaStream.get_replication_key_signpost()
. The signpost definition is essentially the same function as we have here: the max value we expect to see before we start limiting.
@aaronsteers approach sounds good π What are the next steps? Would you like me to work on this, or will we ship tap-snowflake
without incremental support in batch mode for now?
@kgpayne - Let's ship batch messaging as soon as it's available on tap- and target-snowflake. We need the example implementation as soon as possible for community members to reference.
If we can put placeholders, at discussed, that would light up based on upstream SDK capability getting implemented, that would be ideal.
Code comments and a note in the readme, with cross-links to the logged issue(s), would be much appreciated.
@aaronsteers based on your comment above, how does:
Other notes:
- If we want to keep function signatures mostly the same, the starting value could be gotten via Stream.get_starting_replication_key_value() and the ending value could be gotten via Stream.get_replication_key_signpost(). The signpost definition is essentially the same function as we have here: the max value we expect to see before we start limiting.
square with #1011? In my mind, they are the same feature - i.e. min, max and limit values are passed in the get_records
/get_batches
signatures, and in the incremental case the value of max
is derived from a SQLAlchemy MAX(replication_key)
π€ The tap would then i) only get records <=
the max
and ii) emit the max
as its state value.
It would be good to get to a complete spec for tap-snowflake
as it sounded from our 1:1 like having INCREMENTAL
support sooner rather than later is preferable? Not least to set the overall pattern for anyone wanting to build their own Bigquery/Redshift etc. with incremental support π
@edgarrmondragon FYI
@kgpayne - Following from our past conversations, I logged a new proposal that would suggests a StreamFilter
object that can be passed to get_records()
, get_batches()
, get_url_params()
, etc.
- https://github.com/meltano/sdk/issues/1119
This would eliminate the need for the developer directly calling get_starting_replication_key_value()
, get_replication_key_signpost()
, get_starting_timestamp()
. The developer can then also completely ignore self.replication_method
and self.replication_key
, since all constraints are precalculated and baked into the filter object that is sent to the method.
This also bakes in the max_record_limit
constraint discussed elsewhere.
cc @edgarrmondragon
@kgpayne and @edgarrmondragon - The above discussion regarding how to communicate constraints to get_records()
and get_batches()
are important, but I don't think they necessarily should be a blocker here.
The critical path item here, I think, is to something like the following to capture the max_replication_key_value
even while not touching any records ourselves:
- Before the SDK calls
SQLStream.get_batches()
, we run using SQL Alchemy the equivalent ofSELECT MIN(replication_key), MAX(replication_key) FROM source_table
.- The min value has no function as of now, so we can omit it if we want from the query.
- We set the internal signpost value to the max replication key value.
- This has no critical function, and is purely cosmetic for our purposes. It does not filter records - unlike we previously were thinking, because it is unsafe for us to do so without ensuring we then do another 'catchup sync' afterwards.
- After the batch messages are sent, the same signpost value (
MAX(replication_key)
from our original query) should be set as themax_replication_key_value
- and then would be sent automatically in the subsequentSTATE
message for that stream. - On subsequent executions: any implementation of
SQLStream.get_batches()
that consults eitherget_starting_replication_key_value()
or the new/proposedStreamFilter
argument will immediately get incremental support.
Does this sound right?
@kgpayne - If this sounds reasonable, feel free to pick up as you have availability. Delivery sooner will mean less rework for those who build on the tap-snowflake
example over the coming weeks.
@kgpayne - Do I remember correctly that you had this fixed, either in SDK or in the Snowflake implementation?
@aaronsteers I think you might be thinking of this PR π:
- https://github.com/MeltanoLabs/tap-snowflake/pull/13
@aaronsteers I think you might be thinking of this PR π:
@kgpayne Thanks. I didn't realize until now that you'd added the commits into the existing PR. Definitely we should try to merge that and get into the SDK. I'll see if I can get is merged while you are out next week, and maybe Derek can assist with adapting to the SDK.
As the fix (linked above) has now merged into tap-snowflake
, next step is to port to the SDK.
What's the progress of this?
ping @kgpayne since this is assigned to you. I've added it to the SDK v1 issue as well.
@tayloramurphy @luisvicenteatprima this is implemented in tap-snowflake
but not yet ported to the SDK. Its marked as v1, so should hopefully be picked up in this cadence π
@edgarrmondragon FYI this was one of my next up which would be good to hand over π£
This is on my TODO list for the week.
Update: PR in https://github.com/meltano/sdk/pull/1894
Any progress on this? There haven't been any updates in the PR for almost 2 months.
@edgarrmondragon
Bumping to see if there's any progress on this? Thank you!