ksql
ksql copied to clipboard
KSQL Test Runner: "Tests failed: null" error when changing stream names
Describe the bug
To test our migrations, we take all the SQL code in our migration files and combine them into one .sql file. This is used as input for the KSQL test runner. However, one of the (several) bugs we run into is that when we create a stream, drop it and then create the same stream but with a different name strange things, including the test failed message, happen.
To Reproduce
We are using confluentinc/ksqldb-server:0.23.1 to run our tests.
Simplified example code:
CREATE OR REPLACE STREAM EXAMPLE_ALLOCATIONS (
metadata VARCHAR,
data VARCHAR,
_key STRUCT<KEY_1 VARCHAR>) WITH (
KAFKA_TOPIC = 'ingest.example'
, VALUE_FORMAT='JSON'
, KEY_FORMAT='KAFKA'
);
CREATE OR REPLACE STREAM "STREAM_EXAMPLE_ALLOCATIONS" WITH (
KAFKA_TOPIC='app.example.example-allocations-rekey'
, VALUE_FORMAT='JSON'
, KEY_FORMAT='KAFKA'
) AS SELECT _key AS example_allocation_id,
metadata,
data,
_key
FROM "EXAMPLE_ALLOCATIONS"
EMIT CHANGES;
CREATE OR REPLACE STREAM MODEL_EXAMPLE_ALLOCATIONS_CHANGED_EVENT WITH (
KAFKA_TOPIC='app.example.example-allocation-changed-event-raw'
, VALUE_FORMAT='JSON'
, KEY_FORMAT='KAFKA'
)
AS SELECT
allocation.example_allocation_id
-- other stuff
FROM STREAM_EXAMPLE_ALLOCATIONS allocation
EMIT CHANGES;
-- drop all of the above
DROP STREAM MODEL_EXAMPLE_ALLOCATIONS_CHANGED_EVENT DELETE TOPIC;
DROP STREAM STREAM_EXAMPLE_ALLOCATIONS DELETE TOPIC;
DROP STREAM EXAMPLE_ALLOCATIONS;
-- recreating them - but this time the names are singular isntead of plural!
CREATE OR REPLACE STREAM EXAMPLE_ALLOCATION (
metadata VARCHAR,
data VARCHAR,
_key STRUCT<KEY_1 VARCHAR>) WITH (
KAFKA_TOPIC = 'ingest.example'
, VALUE_FORMAT='JSON'
, KEY_FORMAT='KAFKA'
);
CREATE OR REPLACE STREAM "STREAM_EXAMPLE_ALLOCATION" WITH (
KAFKA_TOPIC='app.example.example-allocation-rekey'
, VALUE_FORMAT='JSON'
, KEY_FORMAT='KAFKA'
) AS SELECT _key AS example_allocation_id,
metadata,
data,
_key
FROM "EXAMPLE_ALLOCATION"
EMIT CHANGES;
CREATE OR REPLACE STREAM MODEL_EXAMPLE_ALLOCATION_CHANGED_EVENT WITH (
KAFKA_TOPIC='app.example.example-allocation-changed-event-raw'
, VALUE_FORMAT='JSON'
, KEY_FORMAT='KAFKA'
)
AS SELECT
allocation.example_allocation_id
-- other stuff
FROM STREAM_EXAMPLE_ALLOCATION allocation
EMIT CHANGES;
Input and output shouldn't matter much, seems the error happens earlier:
{
"inputs": [
{
"topic": "ingest.example",
"timestamp": 0,
"key": "example-2130469",
"value": {
"metadata": {},
"data": {},
"_key": {
"key_1": "stuff"
}
}
}
]
}
{
"outputs": [
{
"topic": "app.example.example-allocation-changed-event-raw",
"timestamp": 0,
"key": "KEY",
"value": {}
}
]
}
Expected behavior
In the above example, I would expect a failure due to a mismatch in output. (Or in the original case, I expected my tests to pass.)
Actual behaviour
Instead we get:
...
[2022-07-06 08:36:18,931] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_CSAS_MODEL_EXAMPLE_ALLOCATION_CHANGED_EVENT_3-3d2c5d9b-ab22-4354-b983-9873aa39d25e-StreamThread-4] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:393)
[2022-07-06 08:36:18,947] INFO stream-thread [main] task [0_0] Initialized (org.apache.kafka.streams.processor.internals.StreamTask:240)
[2022-07-06 08:36:18,949] INFO stream-thread [main] task [0_0] Restored and ready to run (org.apache.kafka.streams.processor.internals.StreamTask:265)
>>>>> Test failed: null
[2022-07-06 08:36:18,950] INFO stream-client [_confluent-ksql-some.ksql.service.idquery_CSAS_STREAM_EXAMPLE_ALLOCATION_2-cf9be1a7-6abe-4e76-be47-82f296b1132b] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:332)
[2022-07-06 08:36:18,950] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_CSAS_STREAM_EXAMPLE_ALLOCATION_2-cf9be1a7-6abe-4e76-be47-82f296b1132b-StreamThread-1] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1117)
[2022-07-06 08:36:18,951] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_CSAS_STREAM_EXAMPLE_ALLOCATION_2-cf9be1a7-6abe-4e76-be47-82f296b1132b-StreamThread-1] State transition from CREATED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread:229)
[2022-07-06 08:36:18,951] INFO stream-thread [_confluent-ksql-some.ksql.service.idquery_CSAS_STREAM_EXAMPLE_ALLOCATION_2-cf9be1a7-6abe-4e76-be47-82f296b1132b-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1131)
I think something goes wrong with dropping the streams, but not sure how. In our code, when we do a drop but don't change the names of the streams for plural to singular, everything works again.
Any ideas or workarounds? And if fixing bugs with the test runner is not a priority because you are moving to a new tester, any updates on that one?