sdk icon indicating copy to clipboard operation
sdk copied to clipboard

Add paradigm for stream filter constraints and max_records constraints

Open aaronsteers opened this issue 3 years ago • 4 comments
trafficstars

Currently, developers are left to inspect self.replication_key, self.get_starting_replication_key_value(), self.get_starting_timestamp() themselves, within get_records() or get_batches(). This isn't great for a number of reasons.

To make the filters expected much more explicit, this proposal suggests that we pass something like a StreamFilter object to methods like get_records() and get_batches(), and perhaps also to methods like get_url_params() which may be able to pass down queries to the API call.

  • By localizing the filter to each method call, we also open up options for multiple partitions of the same dataset to be queried simultaneously.
  • By handling the filter rulesets generically, we unlock use cases that also want a "max" constraint, such as #922, which in turn unlocks parallel processing of time partitions, as noted above.
  • Developers get the option of using the generic apply, like filtered_records = filters.apply(unfiltered_records) and include: bool = filters.eval(record_dict).
  • Alternatively, developers can loop through the StreamFilter.filters set, and handle each filter in a custom manner if needed. (Such as sending eligible constraints as filters to the remote API.)

Psuedocode

Some possible psuedocode to get a feel for how this might look:

Details
class Stream
    # ...
    def get_batches(
        self,
        batch_config: BatchConfig,
        context: dict | None = None,
        filters: StreamFilters,
    ) -> Iterable[tuple[BaseBatchFileEncoding, list[str]]]:
        """Batch generator function.

        Developers are encouraged to override this method to customize batching
        behavior for databases, bulk APIs, etc.

        Args:
            batch_config: Batch config for this stream.
            context: Stream partition or context dictionary.
            filters: A StreamFilters object defining any restrictions which should be applied to the dataset.

        Yields:
            A tuple of (encoding, manifest) for each batch.
        """
from operator import le, lt, gt, ge, eq, ne

class RecordFilter:
    def __init__(self, property_name: str, property_value: Any, operator: Callable[[Any, Any], bool]):
        # Only support a finite set of operators:
        if operator not in [le, lt, gt, ge, eq, ne]:
            raise ValueError("Unsupported operator: {operator.__name__}")

        self.property_name = property_name
        self.property_value = property_value
        self.operator = operator

    def eval(self, record:dict) -> bool:
        """Return True to keep, False to exclude."""
        return self.operator(record[self.property_name], self.property_value)

class StreamFilter:
    filters: list[RecordFilter]
    max_record_limit: int | None

    def eval(self, record: dict) -> bool:
        """Return True to keep the record, False to exclude."""
        return all((filter.eval(record) for filter in self.filters))

    def apply(self, records: Iterable[dict]) -> Iterable[dict]
         """Can be called against a set of records to return only those which match."""
         if self.max_record_limit:
             yield from itertools.islice((record for record in records if self.filter(record), self.max_record_limit)
         else:
             yield from (record for record in records if self.filter(record))

Implementation for SQL taps

For SQL taps, we obviously would not use the inline Python-based evaluators, but instead we could map the filter constraints to WHERE clause filters and LIMIT restrictions, passed generically to SQLAlchemy.

As it relates to the get_batches() method, this could be introduced as a breaking change (sooner the better). Since there are no 'stable' BATCH message implementations as of this writing, it should be acceptable to make this change.

Implementing for get_records() implementations in a backwards-compatible manner.

In regards to existing taps that already implement get_records():

  1. Internally we can add a new Stream.filter_records() method that automatically applies the filterset to the records produced by Stream.get_records() - probably after Stream.post_process(), to ensure the properties are in the expected place.
  2. For performance reasons, we can advise developers to override Stream.filter_records() to no-op any filters they've already handled in get_records().
  3. When SDK 1.0 releases, we would update the signature of get_records(), perhaps still preserving the generic Stream.filter_records() for convenience in the default implementation.

Related

This probably also resolves:

  • https://github.com/meltano/sdk/issues/226

And would make this a pretty easy fast follow:

  • https://github.com/meltano/sdk/issues/922

aaronsteers avatar Oct 26 '22 23:10 aaronsteers