kafka-tutorials
kafka-tutorials copied to clipboard
Working with grouped messages
Describe the use case
We want to keep track of the current address for some persons. We get updates on their location per country, but these updates contain all the persons moved into that country that 'day'. So we need to break those down in single messages using EXPLODE, before we can create the JOIN for the updates.
Provide the ksqlDB application Document the full set of ksqlDB statements that represent the use case.
CREATE TABLE IF NOT EXISTS persons (id_key STRING PRIMARY KEY) WITH (KAFKA_TOPIC = 'persons', VALUE_FORMAT = 'PROTOBUF');
CREATE STREAM IF NOT EXISTS address_updates (country_id STRING KEY) WITH (KAFKA_TOPIC = 'address-updates', VALUE_FORMAT = 'PROTOBUF');
CREATE STREAM IF NOT EXISTS exploded_address_updates AS SELECT country_id, country, EXPLODE(persons) AS person_id, EXPLODE(addresses)
AS address FROM address_updates;
CREATE STREAM IF NOT EXISTS persons_with_address AS SELECT persons.id_key AS id, persons.first_name as first_name, persons.last_name
as last_name, persons.birthday as birthday, country, address FROM exploded_address_updates JOIN persons ON
exploded_address_updates.person_id = persons.id_key;
"#;