materialize icon indicating copy to clipboard operation
materialize copied to clipboard

Retain granularity of upstream updates ("strict reclocking")

Open benesch opened this issue 1 year ago • 2 comments

Feature request

Summary

Make it possible for users to indicate that they want each update in an upstream system (Kafka, PostgreSQL) reflected in Materialize.

Problem description

(h/t @pH14)

Let’s say I have a key k and its value is 0. In Postgres if I then run:

UPDATE table SET value = 1 WHERE key = 'k';
<wait a second>
UPDATE table SET value = 2 WHERE key = 'k';
<wait a second>
UPDATE table SET value = 3 WHERE key = 'k';

Then in my SUBSCRIBE I’ll see each individual change:

1693231879000	-1	k	0
1693231879000	1	k	1
1693231885000	-1	k	1
1693231885000	1	k	2
1693231888000	-1	k	2
1693231888000	1	k	3

But if the updates in PostgreSQL are instead done in quick succession, I may see only a single update:

1693231943000	-1	k	0
1693231943000	1	k	3

Details

Here is the current statement of correctness property #2:

Materialize respects the explicit or implied event order of its sources. This includes partial orders. In practice this means Materialize assigns new timestamps to events in sources. This assignment of new timestamps is called reclocking. Once reclocking occurs, it defines a permanent order of events within Materialize. For sources without transactional semantics (basic Kafka streams), Materialize reclocks the events to reflect the order in which the source emits each event. For sources that do have transactional semantics (Postgres, Debezium) Materialize reclocks events to respect the transactional boundaries. For partially ordered events x and y:

  1. if x < y in the source, then x ≤ y in Materialize
  2. if x = y in the source, then x = y in Materialize

I believe the behavior we'd want to allow users to opt into is changing (1) to "if x < y in the source, then x < y in Materialize." Note the change from ≤ to <.

In terms of syntax, it's useful to consider https://github.com/MaterializeInc/materialize/issues/13762. In the framing of that issue, materialized views currently have REFRESH ON COMMIT semantics, and we want to allow users to downgrade to REFRESH BEST EFFORT semantics.

Sources are exactly the opposite. They currently have REFRESH BEST EFFORT semantics, and we want to allow users to upgrade to REFRESH ON COMMIT semantics. And so the syntax could look like:

CREATE SOURCE ...
WITH (
    REFRESH = ON COMMIT
);

See also

  • https://github.com/MaterializeInc/materialize/issues/19322
  • https://github.com/MaterializeInc/materialize/issues/13762
  • https://github.com/MaterializeInc/materialize/issues/17936

benesch avatar Aug 28 '23 15:08 benesch

For sources that do have transactional semantics (Postgres, Debezium) Materialize reclocks events to respect the transactional boundaries.

I believe the behavior we'd want to allow users to opt into is changing (1) to "if x < y in the source, then x < y in Materialize." Note the change from ≤ to <.

Not sure if it was already implied, but it seems like this behavior affects non-transactional sources too, like a plain Kafka topic into an UPSERT source. If we see multiple messages for the same key within one timestamp interval, today I'd expect us to only emit a diff for the last value, while a user might be expecting to observe each individual update

pH14 avatar Aug 28 '23 15:08 pH14

@frankmcsherry mentions that https://github.com/MaterializeInc/materialize/issues/17936 is related. I agree! If we were to to try to map each PostgreSQL transaction to an "epoch milliseconds" timestamp in Materialize, if two updates arrive in the same millisecond, we'd need to force the second update to wait until the next millisecond. If the rate of updates is sustained at more than one update per millisecond, we'd permanently fall behind. With hybrid logical timestamps, we'd introduce a second logical component of the timestamp. If we used (say) a 32-bit logical timestamp, we'd be able to absorb up to 2^32 distinct updates per millisecond.

benesch avatar Aug 28 '23 20:08 benesch