sdk icon indicating copy to clipboard operation
sdk copied to clipboard

Make "internal stream map" transformations available to developers

Open aaronsteers opened this issue 3 years ago • 7 comments

[stub]

aaronsteers avatar Nov 30 '22 17:11 aaronsteers

As a SDK user, I would like to leverage stream maps from Stream classes in a developer-friendly way.

Currently, you can implement stream_maps for a stream, but it requires that you define transformation mappings as a string to be later interpreted by simpleeval internally (there is some nuance to this and I wasn't able to get it to work for our use case yet). The code required to achieve this is close to how you might define a stream map in JSON, and as a result is quite long-winded and the mapping expressions do not benefit from syntactic/semantic validation provided by IDEs.

class LogsStream(Auth0Stream):
    """Define logs stream."""

    name = "stream_auth0_logs"
    path = "/logs"
    primary_keys = ["log_id"]
    replication_key = "log_id"
    schema = LogObject.schema
    log_expired = False
    stream_maps = [
        CustomStreamMap(
            name,
            None,
            schema,
            primary_keys,
            {
                "scope": "record.get('scope') and record['scope'].split()",
            },
            None,
        )
    ]

As a POC, I started implementing something this might eventually look like in record_transforms:

class LogsStream(Auth0Stream):
    """Define logs stream."""

    name = "stream_auth0_logs"
    path = "/logs"
    primary_keys = ["log_id"]
    replication_key = "log_id"
    schema = LogObject.schema
    log_expired = False

    def jwtConfigurationExpToExpiresIn(value: str, record: dict):
        return record["expires_in"]

    @property
    def record_transforms(self):
        return {
            "scopes": {     # scopes is defined as `ArrayType` in `LogObject` schema
                str: str.split,     # function to apply if value is of type `str`
                list: lambda value, record: map(str.lower, value.split())     # function to apply if value is of type `list`
            },
            "jwt_configuration": {
                "exp": {    # nested properties supported
                    str: self.jwtConfigurationExpToExpiresIn
                }
            }
        }

record_transforms is applied in Stream.post_process:

CoercionMapType = Dict[Union[str, type], Union["CoercionMapType", Dict[_T, Callable[[_T], Any]]]]

...

class Stream(metaclass=abc.ABCMeta):
    """Abstract base class for tap streams."""

    ...

    def post_process(self, row: dict, context: dict | None = None) -> dict | None:
        """As needed, append or transform raw data to match expected structure.

        Optional. This method gives developers an opportunity to "clean up" the results
        prior to returning records to the downstream tap - for instance: cleaning,
        renaming, or appending properties to the raw record result returned from the
        API.

        Developers may also return `None` from this method to filter out
        invalid or not-applicable records from the stream.

        Args:
            row: Individual record in the stream.
            context: Stream partition or context dictionary.

        Returns:
            The resulting record dict, or `None` if the record should be excluded.
        """
        
        row = _apply_record_transforms(row, self.record_transforms)
        return row

    @property
    def record_transforms(self) -> dict[str, CoercionMapType]:
        """A map defining how record property values of a given type should be transformed.
    
        Returns:
            A `dict` mapping of property keys to nested property keys or a mapping of value match types to transform functions.
        """
        return {}

def _apply_record_transforms(obj: dict, coercion_map: CoercionMapType):
    for key, value in coercion_map.items():
        obj_value = obj.get(key)

        if obj_value is None:
            continue

        if any(isinstance(v, dict) for v in value.values()):
            _apply_record_transforms(obj_value, value)
            continue
    
        transform_fn = value[type(obj_value)]
        obj[key] = transform_fn(obj_value)

    return obj

Ideally, this would integrate with the existing stream maps implementation, but a refactor of some sort is probably necessary to support native Python transform mappings and processing of nested properties.


Related to https://github.com/meltano/sdk/issues/792

ReubenFrankel avatar Nov 30 '22 23:11 ReubenFrankel

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

stale[bot] avatar Jul 18 '23 04:07 stale[bot]

Still relevant IMO.

ReubenFrankel avatar Jul 18 '23 08:07 ReubenFrankel

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

stale[bot] avatar Jul 17 '24 08:07 stale[bot]

Still relevant

ReubenFrankel avatar Jul 17 '24 09:07 ReubenFrankel

This has been marked as stale because it is unassigned, and has not had recent activity. It will be closed after 21 days if no further activity occurs. If this should never go stale, please add the evergreen label, or request that it be added.

stale[bot] avatar Jul 17 '25 10:07 stale[bot]

Still relevant IMO

ReubenFrankel avatar Jul 17 '25 15:07 ReubenFrankel