sdk
sdk copied to clipboard
Add paradigm for stream filter constraints and max_records constraints
Currently, developers are left to inspect self.replication_key, self.get_starting_replication_key_value(), self.get_starting_timestamp() themselves, within get_records() or get_batches(). This isn't great for a number of reasons.
To make the filters expected much more explicit, this proposal suggests that we pass something like a StreamFilter object to methods like get_records() and get_batches(), and perhaps also to methods like get_url_params() which may be able to pass down queries to the API call.
- By localizing the filter to each method call, we also open up options for multiple partitions of the same dataset to be queried simultaneously.
- By handling the filter rulesets generically, we unlock use cases that also want a "max" constraint, such as #922, which in turn unlocks parallel processing of time partitions, as noted above.
- Developers get the option of using the generic apply, like
filtered_records = filters.apply(unfiltered_records)andinclude: bool = filters.eval(record_dict). - Alternatively, developers can loop through the
StreamFilter.filtersset, and handle each filter in a custom manner if needed. (Such as sending eligible constraints as filters to the remote API.)
Psuedocode
Some possible psuedocode to get a feel for how this might look:
Details
class Stream
# ...
def get_batches(
self,
batch_config: BatchConfig,
context: dict | None = None,
filters: StreamFilters,
) -> Iterable[tuple[BaseBatchFileEncoding, list[str]]]:
"""Batch generator function.
Developers are encouraged to override this method to customize batching
behavior for databases, bulk APIs, etc.
Args:
batch_config: Batch config for this stream.
context: Stream partition or context dictionary.
filters: A StreamFilters object defining any restrictions which should be applied to the dataset.
Yields:
A tuple of (encoding, manifest) for each batch.
"""
from operator import le, lt, gt, ge, eq, ne
class RecordFilter:
def __init__(self, property_name: str, property_value: Any, operator: Callable[[Any, Any], bool]):
# Only support a finite set of operators:
if operator not in [le, lt, gt, ge, eq, ne]:
raise ValueError("Unsupported operator: {operator.__name__}")
self.property_name = property_name
self.property_value = property_value
self.operator = operator
def eval(self, record:dict) -> bool:
"""Return True to keep, False to exclude."""
return self.operator(record[self.property_name], self.property_value)
class StreamFilter:
filters: list[RecordFilter]
max_record_limit: int | None
def eval(self, record: dict) -> bool:
"""Return True to keep the record, False to exclude."""
return all((filter.eval(record) for filter in self.filters))
def apply(self, records: Iterable[dict]) -> Iterable[dict]
"""Can be called against a set of records to return only those which match."""
if self.max_record_limit:
yield from itertools.islice((record for record in records if self.filter(record), self.max_record_limit)
else:
yield from (record for record in records if self.filter(record))
Implementation for SQL taps
For SQL taps, we obviously would not use the inline Python-based evaluators, but instead we could map the filter constraints to WHERE clause filters and LIMIT restrictions, passed generically to SQLAlchemy.
As it relates to the get_batches() method, this could be introduced as a breaking change (sooner the better). Since there are no 'stable' BATCH message implementations as of this writing, it should be acceptable to make this change.
Implementing for get_records() implementations in a backwards-compatible manner.
In regards to existing taps that already implement get_records():
- Internally we can add a new
Stream.filter_records()method that automatically applies the filterset to the records produced byStream.get_records()- probably afterStream.post_process(), to ensure the properties are in the expected place. - For performance reasons, we can advise developers to override
Stream.filter_records()to no-op any filters they've already handled inget_records(). - When SDK 1.0 releases, we would update the signature of
get_records(), perhaps still preserving the genericStream.filter_records()for convenience in the default implementation.
Related
This probably also resolves:
- https://github.com/meltano/sdk/issues/226
And would make this a pretty easy fast follow:
- https://github.com/meltano/sdk/issues/922