sdk
sdk copied to clipboard
feat: `streams.core.Stream` should read the `key_properties` automatically.
Feature scope
Taps (catalog, state, stream maps, tests, etc.)
Description
If the key_properties
is present in the schema
, then the primary_keys
attribute should be derived from it. This will allow tap authors to keep the definition contained in 1 place (within the schema) and not have to override the method primary_keys
for each stream.
key_properties
is in the singer spec. However, I was not able to find primary_keys
being set in these classes (Stream
, GraphQLStream
, RESTStream
).
The SQLStream
class defines primary_keys
as self._singer_catalog_entry.metadata.root.table_key_properties or []
. This is likely safe as, if the key properties are available in the metadata, they should be used, though I haven't read enough code to determine exactly how the key properties are set in the metadata.
Hey @haleemur, thanks for filing!
Can you say more about where a key_properties
definition would fit in a stream's JSON schema? What JSON schema features could we use for this? Or would this be a new top-level attribute
that's not part of the JSON schema spec?
This will allow tap authors to keep the definition contained in 1 place
Then we should probably include replication_key
in there as well.
and not have to override the method
primary_keys
for each stream.
I'm not sure I follow. You can have a parent class define primary_keys
to avoid duplication.
Hmm, maybe I mis-understood the flow of data as well & as a result my idea is not very clearly expressed. Please let me know if I'm incorrect in the following design.
I see this pattern in several plugin code bases & requires the developer to specify details about 1 structure in a couple of different places.
example (taken from tap-klaviyo):
- specify the schema in the schemas path
- define the schema as a class inheriting from some [common class] https://github.com/MeltanoLabs/tap-klaviyo/blob/main/tap_klaviyo/streams.py#L16 (in this case KlaviyoStream <- RestStream <- Stream)
- set the property
primary_keys
to a list of fields in the class.
In this example the schema is specified in 1 place but the primary key is specified elsewhere. Maybe there's an opportunity to define primary_keys in the same place if schema.yml file has the top level attribute key_properties
or $key_properties
(confession: I don't know what the significance of the $
sigil is in the tap's json schema files).
example (using the same stream as above):
Maybe the schema file could be written as:
{
"$key_properties": ["id"] <-- this is the only material edit. Other edits just condense the schema vertically
"properties": {
"name": {
"type": ["string", "null"]
},
"id": {
"description": "The event ID",
"type": ["string", "null"]
},
"type": {
"description": "The event type",
"type": ["string", "null"]
},
"datetime": {
"description": "Event timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.mmmmmm)",
"format": "date-time",
"type": ["string", "null"]
},
"attributes": {
"properties": {
"metric_id": {
"description": "The metric ID",
"type": ["string", "null"]
},
"profile_id": {
"description": "Profile ID of the associated profile, if available",
"type": ["string", "null"]
},
"timestamp": {
"description": "Event timestamp in seconds",
"type": ["integer", "null"]
},
"event_properties": {
"description": "Event properties, can include attribution data, identifiers and extra properties",
"type": ["object", "null"]
},
"datetime": {
"description": "Event timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.mmmmmm)",
"format": "date-time",
"type": ["string", "null"]
},
"uuid": {
"description": "A unique identifier for the event, this can be used as a cursor in pagination",
"type": ["string", "null"]
}
},
"type": ["object", "null"]
}
}
}
This would allow the tap's class to be made more concise. If we similarly read the name
& replication_key
from some attribute specified in the schema, we can avoid a couple more attributes that need to be declared in python code. Sticking with the example above, the EventsStream
class could be expressed a little more concisely, but more importantly, all the declarative bits of the stream (schema, primary key & replication key) are specified in 1 place.
class EventsStream(KlaviyoStream):
"""Define custom stream."""
path = "/events"
schema_filepath = SCHEMAS_DIR / "event.json"
def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
row["datetime"] = row["attributes"]["datetime"]
return row
@property
def is_sorted(self) -> bool:
return True
We could support this in a backward compatible manner. When the Stream
class reads the schema, if it finds the property $key_properties
, we could set the streams primary key to be so. In the Stream class's init method, maybe we could add the following logic in the base class. All existing code would still be valid, and upgrading the sdk would still be safe as this still support setting the property primary_keys
in a child class.
if '$key_properties' in self.schema and not self.primary_keys:
self.primary_keys = self.schema["$key_properties"]