Support `TRANSFORM USING <SELECT>` for sources, maybe for tables.
Feature request
Sources of data, including both sources and tables, could support a
TRANSFORM USING <SELECT ...>
clause that applies a transform to the data provided by the source before it is made available to those who would use it.
This clause could be used for a variety of tasks, but can also be arbitrarily deferred to the rendering of dataflows. Examples include:
- Parsing and transformation of data,
- Managing data expiration using
WHERE mz_now() < expire_tstemporal filters. - Applying
TopKstyleUPSERTtransforms, - Grouping and aggregation.
In the general case, any SELECT clause can be attached to the source or table definition and installed atop the source in any rendered dataflow. In more specific cases, the associated expressions can be transformed to pull down transformation, filtering, and other organization into the storaged implementation, before data land in persist. In each of these cases, we have the potential to dramatically reduce the amount of retained data.
There is no fundamental reason that the SELECT clause needs to only reference the one source of data, but I can imagine that early days we may only be able to implement the transform as capped atop the one source. More generally, transforming a source of data using other existing data assets (other sources, tables) checks out, but is unlikely to be implementable in the near future.
Tables?
Doing this for tables seems as easy and valuable as for sources, but introduces some limits: one would no longer certainly be able to apply UPDATE or DELETE to transformed tables, as we could not certainly read back their input contents. At the same time, providing declarative mechanisms for transformations, expiration, grouping, etc that can be interposed before data are recorded has the same value there as it does for sources.
At the moment, sources have substantially more evaluation autonomy than tables, all of which are handled in the environmentd process. For this reason, we might want to skip that until tables are made more robust.
Alternatives
We could alternately allow a CREATE TEMP SOURCE command that creates a .. say .. transaction scoped source, from which one would write
CREATE MATERIALIZED VIEW name AS SELECT ... FROM temp_source;
In fact, all sources could be realized this way, as creating a materialized view from a temporary source. Perhaps all sources should be temporary.
Subtasks/pre-requisites
- [x] #17491
@heeringa and I discussed today the possibility of allowing multiple transforms on the same source. This gives you the same flexibility as creating a temp source followed by a materialized view, though perhaps not elegantly.
Candidate syntaxes:
CREATE SOURCE ...
DEMUX WITH (
sub1 AS SELECT …,
sub2 AS SELECT …,
)
CREATE SOURCE ...
TRANSFORM WITH (
sub1 AS SELECT …,
sub2 AS SELECT …,
)
The idea is that sub1 and sub2 would be "subsources", just like you get with PostgreSQL sources.
One comment folks have had is that effecting UPSERT behavior is non-trivial with SQL because to match Kafka-style upserts you need to look for tombstone records without valid data. This is fundamentally hard to surface in SQL because .. no data. The tombstones are an important part of cleaning up using SQL, communicating that a key should be dropped even without the arrival of new valid data.
This seems like it shouldn't be too hard to work around, if that were important to use (e.g. if we wanted to avoid a proliferation of ENVELOPE types and parameters and custom code paths to handle the same reasoning in multiple ways). Right now we surface various tidbits with each records, Kafka keys, partitions, offsets, timestamps, headers. Having an additional column NULL_PAYLOAD or the like would be the .. SQL way .. to represent enum-style data.
CREATE SOURCE ...
TRANSFORM USING
SELECT DISTINCT ON (key), *
FROM raw_source
ORDER BY key, TIMESTAMP
HAVING NULL_PAYLOAD = false
I'm not actually sure how DISTINCT ON works with WHERE / HAVING; the filter needs to be applied after the DISTINCT ON though, and putting a WHERE next to the FROM would not be correct.
Porting over some thoughts from Slack about how to make FORMAT AVRO produce NULL in the face of NULL Kafka message payloads.
The design I had in mind was
ENVELOPE NOTFLAT(better name pending) where the source had exactly one nullable column named data (maybe a better name here too?). And the type of that column would be exactly the type of the top-level Avro type.So to express
ENVELOPE UPSERTin SQL, you’d useENVELOPE NOTFLATto get the right semantics for top-level NULLs, do your upsert logic usingTRANSFORM USING, and then flatten out your record if desired by writingSELECT data.f1, data.f2, data.f3etc.
In other words: I think the implicit default of ENVELOPE FLATTEN was a mistake, and that probably should have been behavior that you have to explicitly opt in to.
I'm not actually sure how DISTINCT ON works with WHERE / HAVING; the filter needs to be applied after the DISTINCT ON though, and putting a WHERE next to the FROM would not be correct.
One mystery that this led to me is how to express tombstones with custom timestamps/orders. A Kafka message may have a null payload, but it will have a key and a timestamp, which is enough to mark it as a tombstone. But if the payload is null then it is ambiguous what its "custom timestamp" should be, and whether it should clobber things it comes after or not. Probably we just say "oh, I guess it should, use the real timestamp in that case", but if that's the case perhaps we don't need to surface the null payload in SQL at all.
Recording some discussion from today. I believe you can simulate the UPSERT operator in SQL today with something like:
CREATE TABLE t (key text, value text, ts timestamp, off int64);
CREATE VIEW upsert AS
SELECT * FROM (SELECT DISTINCT ON (key, off) * FROM t ORDER BY key, off, ts DESC) WHERE value IS NOT NULL
To be used with Kafka sources that emit NULL values to indicate deletions, we'd need #17491 as well.
To decrease memory use when t is not monotonic (as in the example above), OPTIONS (EXPECTED GROUP SIZE = 1) should do the trick (#18121) provided there are only a few updates expected with each key.