KafkaIngress e2e tests seem to be flaky
The KafkaIngress e2e test failed on GHA https://github.com/restatedev/restate/actions/runs/12976017546/job/36188054014?pr=2546#step:11:126. It looks as if the subscription starts before the corresponding logs are provisioned:
2025-01-26T15:54:44.248084Z DEBUG restate_ingress_kafka::consumer_task
Starting topic 'counter' partition '0' consumption loop
restate.subscription.id: sub_13Mwxl4GSHoM9kMdZvfYVhf
messaging.consumer.group.name: "sub_13Mwxl4GSHoM9kMdZvfYVhf"
on rs:ingress-5
2025-01-26T15:54:44.255111Z DEBUG restate_core::partitions
No known node for partition - requesting refresh
partition_id: PartitionId(15)
on rs:ingress-4
in restate_ingress_http::rpc_request_dispatcher::call invocation
request_id: 01JJHN4TH6C2CKXCJJTCF4GK5R
invocation_id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
in restate_ingress_http::handler::service_handler::ingress
restate.invocation.id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
restate.invocation.target: Counter/{key}/get
2025-01-26T15:54:44.294222Z DEBUG restate_core::partitions
No known node for partition - requesting refresh
partition_id: PartitionId(2)
on rs:ingress-4
in restate_ingress_http::rpc_request_dispatcher::call invocation
request_id: 01JJHN4TH6TN6ETK4XSYMESC4D
invocation_id: inv_1eyQWJItNe7z6p2wcwytITyHskmhz83Alj
in restate_ingress_http::handler::service_handler::ingress
restate.invocation.id: inv_1eyQWJItNe7z6p2wcwytITyHskmhz83Alj
restate.invocation.target: Counter/{key}/get
2025-01-26T15:54:44.319394Z DEBUG restate_core::partitions
No known node for partition - requesting refresh
partition_id: PartitionId(15)
on rs:ingress-4
in restate_ingress_http::rpc_request_dispatcher::call invocation
request_id: 01JJHN4TH6C2CKXCJJTCF4GK5R
invocation_id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
in restate_ingress_http::handler::service_handler::ingress
restate.invocation.id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
restate.invocation.target: Counter/{key}/get
2025-01-26T15:54:44.346912Z INFO restate_ingress_kafka::consumer_task
Processing Kafka ingress request
on rs:ingress-4
in restate_ingress_kafka::consumer_task::kafka_ingress_consume
otel.name: "kafka_ingress_consume"
messaging.system: "kafka"
messaging.operation: "receive"
messaging.source.name: "event-handler"
messaging.destination.name: service://Proxy/oneWayCall
restate.subscription.id: sub_127bckbuL86uiVfBP2m9WFj
messaging.consumer.group.name: "sub_127bckbuL86uiVfBP2m9WFj"
2025-01-26T15:54:44.347000Z DEBUG restate_ingress_kafka::consumer_task
Stopping consumer with id sub_127bckbuL86uiVfBP2m9WFj
on rs:worker-0
2025-01-26T15:54:44.346945Z ERROR restate_bifrost::appender
error: unknown log '10'
on rs:ingress-4
in restate_bifrost::appender::append
log_id: 10
in restate_ingress_kafka::consumer_task::kafka_ingress_consume
otel.name: "kafka_ingress_consume"
messaging.system: "kafka"
messaging.operation: "receive"
messaging.source.name: "event-handler"
messaging.destination.name: service://Proxy/oneWayCall
restate.subscription.id: sub_127bckbuL86uiVfBP2m9WFj
messaging.consumer.group.name: "sub_127bckbuL86uiVfBP2m9WFj"
2025-01-26T15:54:44.349404Z WARN restate_ingress_kafka::subscription_controller::task_orchestrator
Consumer task for subscription sub_127bckbuL86uiVfBP2m9WFj unexpectedly returned error: ingress dispatcher channel is closed
on rs:worker-0
2025-01-26T15:54:44.350351Z INFO restate_ingress_kafka::consumer_task
Processing Kafka ingress request
on rs:ingress-4
in restate_ingress_kafka::consumer_task::kafka_ingress_consume
otel.name: "kafka_ingress_consume"
messaging.system: "kafka"
messaging.operation: "receive"
messaging.source.name: "counter"
messaging.destination.name: service://Counter/add
restate.subscription.id: sub_13Mwxl4GSHoM9kMdZvfYVhf
messaging.consumer.group.name: "sub_13Mwxl4GSHoM9kMdZvfYVhf"
2025-01-26T15:54:44.350380Z ERROR restate_bifrost::appender
error: unknown log '2'
on rs:ingress-4
in restate_bifrost::appender::append
log_id: 2
in restate_ingress_kafka::consumer_task::kafka_ingress_consume
otel.name: "kafka_ingress_consume"
messaging.system: "kafka"
messaging.operation: "receive"
messaging.source.name: "counter"
messaging.destination.name: service://Counter/add
restate.subscription.id: sub_13Mwxl4GSHoM9kMdZvfYVhf
messaging.consumer.group.name: "sub_13Mwxl4GSHoM9kMdZvfYVhf"
2025-01-26T15:54:44.350437Z DEBUG restate_ingress_kafka::consumer_task
Stopping consumer with id sub_13Mwxl4GSHoM9kMdZvfYVhf
on rs:worker-3
2025-01-26T15:54:44.351787Z DEBUG restate_admin::cluster_controller::logs_controller
Proposing new logs configuration: Logs { ... }
on rs:worker-2
2025-01-26T15:54:44.352957Z WARN restate_ingress_kafka::subscription_controller::task_orchestrator
Consumer task for subscription sub_13Mwxl4GSHoM9kMdZvfYVhf unexpectedly returned error: ingress dispatcher channel is closed
on rs:worker-3
I assume that the retry timeout is too long for the test to finish within the configured 10 seconds.
default => KafkaIngress => handleEventInEventHandler(URL, int, Client)
MethodSource [className = 'dev.restate.sdktesting.tests.KafkaIngress', methodName = 'handleEventInEventHandler', methodParameterTypes = 'java.net.URL, int, dev.restate.sdk.client.Client']
=> org.awaitility.core.ConditionTimeoutException: Condition with alias 'Updates from Kafka are visible in the counter' didn't complete within 10 seconds because assertion condition defined as a Lambda expression in dev.restate.sdktesting.tests.UtilsKt$untilAsserted$2
expected: 6L
but was: 0L.
org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1006)
org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
dev.restate.sdktesting.tests.UtilsKt$untilAsserted$2.invokeSuspend(utils.kt:36)
kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:102)
kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:96)
kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:816)
kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
Caused by: org.opentest4j.AssertionFailedError:
@slinkydeveloper could you take a look at this issue?
Another instance presumably: https://github.com/restatedev/restate/actions/runs/13319028159/job/37200208521#step:11:151
I'm gonna increase the timeout of this one, because it seems just to be slow. If you look at the timings of the start of the test, you see it took a long time to pull the kafka image. Also if you look at the state table dump, you see both of them have 6 as state.
This sounds very reasonable. Thanks @slinkydeveloper 🙏
The problem hasn't occurred since increasing the timeouts. Closing this issue as done.
It seems to have happened again: https://github.com/restatedev/restate/actions/runs/18652107682/job/53174722538?pr=3917#step:12:364. @slinkydeveloper could we further harden the test case or is this a real problem?
Maybe https://github.com/restatedev/restate/pull/3285 could help with figuring out whether something is wrong here.
Another instance that happened on a release CI run: https://github.com/restatedev/restate/actions/runs/18655391303/job/53231622011#step:7:442.
https://github.com/restatedev/restate/actions/runs/19312605011/job/55237473233 and https://github.com/restatedev/restate/actions/runs/19237930392/job/54994152559 look suspiciously like duplicates of this issue.
Another instance: https://github.com/restatedev/restate/actions/runs/19573572194/job/56054623314?pr=4052
Yet another instance: https://github.com/restatedev/restate/actions/runs/19630219623/job/56211186381?pr=4053