kafka-tutorials icon indicating copy to clipboard operation
kafka-tutorials copied to clipboard

Working with grouped messages

Open gklijs opened this issue 3 years ago • 0 comments

Describe the use case We want to keep track of the current address for some persons. We get updates on their location per country, but these updates contain all the persons moved into that country that 'day'. So we need to break those down in single messages using EXPLODE, before we can create the JOIN for the updates.

Provide the ksqlDB application Document the full set of ksqlDB statements that represent the use case.

statements as exucuted

    CREATE TABLE IF NOT EXISTS persons (id_key STRING PRIMARY KEY) WITH (KAFKA_TOPIC = 'persons', VALUE_FORMAT = 'PROTOBUF');
    CREATE STREAM IF NOT EXISTS address_updates (country_id STRING KEY) WITH (KAFKA_TOPIC = 'address-updates', VALUE_FORMAT = 'PROTOBUF');
    CREATE STREAM IF NOT EXISTS exploded_address_updates AS SELECT country_id, country, EXPLODE(persons) AS person_id, EXPLODE(addresses)
    AS address FROM address_updates;
    CREATE STREAM IF NOT EXISTS persons_with_address AS SELECT persons.id_key AS id, persons.first_name as first_name, persons.last_name
    as last_name, persons.birthday as birthday, country, address FROM exploded_address_updates JOIN persons ON
    exploded_address_updates.person_id = persons.id_key;
    "#;

gklijs avatar Nov 11 '21 06:11 gklijs