restate icon indicating copy to clipboard operation
restate copied to clipboard

KafkaIngress e2e tests seem to be flaky

Open tillrohrmann opened this issue 11 months ago • 9 comments

The KafkaIngress e2e test failed on GHA https://github.com/restatedev/restate/actions/runs/12976017546/job/36188054014?pr=2546#step:11:126. It looks as if the subscription starts before the corresponding logs are provisioned:

2025-01-26T15:54:44.248084Z DEBUG restate_ingress_kafka::consumer_task
  Starting topic 'counter' partition '0' consumption loop
    restate.subscription.id: sub_13Mwxl4GSHoM9kMdZvfYVhf
    messaging.consumer.group.name: "sub_13Mwxl4GSHoM9kMdZvfYVhf"
on rs:ingress-5
2025-01-26T15:54:44.255111Z DEBUG restate_core::partitions
  No known node for partition - requesting refresh
    partition_id: PartitionId(15)
on rs:ingress-4
  in restate_ingress_http::rpc_request_dispatcher::call invocation
    request_id: 01JJHN4TH6C2CKXCJJTCF4GK5R
    invocation_id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
  in restate_ingress_http::handler::service_handler::ingress
    restate.invocation.id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
    restate.invocation.target: Counter/{key}/get
2025-01-26T15:54:44.294222Z DEBUG restate_core::partitions
  No known node for partition - requesting refresh
    partition_id: PartitionId(2)
on rs:ingress-4
  in restate_ingress_http::rpc_request_dispatcher::call invocation
    request_id: 01JJHN4TH6TN6ETK4XSYMESC4D
    invocation_id: inv_1eyQWJItNe7z6p2wcwytITyHskmhz83Alj
  in restate_ingress_http::handler::service_handler::ingress
    restate.invocation.id: inv_1eyQWJItNe7z6p2wcwytITyHskmhz83Alj
    restate.invocation.target: Counter/{key}/get
2025-01-26T15:54:44.319394Z DEBUG restate_core::partitions
  No known node for partition - requesting refresh
    partition_id: PartitionId(15)
on rs:ingress-4
  in restate_ingress_http::rpc_request_dispatcher::call invocation
    request_id: 01JJHN4TH6C2CKXCJJTCF4GK5R
    invocation_id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
  in restate_ingress_http::handler::service_handler::ingress
    restate.invocation.id: inv_1hzh7Pqebx2t6tsJYQZsI7Dr3ttsYQtk9H
    restate.invocation.target: Counter/{key}/get
2025-01-26T15:54:44.346912Z INFO restate_ingress_kafka::consumer_task
  Processing Kafka ingress request
on rs:ingress-4
  in restate_ingress_kafka::consumer_task::kafka_ingress_consume
    otel.name: "kafka_ingress_consume"
    messaging.system: "kafka"
    messaging.operation: "receive"
    messaging.source.name: "event-handler"
    messaging.destination.name: service://Proxy/oneWayCall
    restate.subscription.id: sub_127bckbuL86uiVfBP2m9WFj
    messaging.consumer.group.name: "sub_127bckbuL86uiVfBP2m9WFj"
2025-01-26T15:54:44.347000Z DEBUG restate_ingress_kafka::consumer_task
  Stopping consumer with id sub_127bckbuL86uiVfBP2m9WFj
on rs:worker-0
2025-01-26T15:54:44.346945Z ERROR restate_bifrost::appender
    error: unknown log '10'
on rs:ingress-4
  in restate_bifrost::appender::append
    log_id: 10
  in restate_ingress_kafka::consumer_task::kafka_ingress_consume
    otel.name: "kafka_ingress_consume"
    messaging.system: "kafka"
    messaging.operation: "receive"
    messaging.source.name: "event-handler"
    messaging.destination.name: service://Proxy/oneWayCall
    restate.subscription.id: sub_127bckbuL86uiVfBP2m9WFj
    messaging.consumer.group.name: "sub_127bckbuL86uiVfBP2m9WFj"
2025-01-26T15:54:44.349404Z WARN restate_ingress_kafka::subscription_controller::task_orchestrator
  Consumer task for subscription sub_127bckbuL86uiVfBP2m9WFj unexpectedly returned error: ingress dispatcher channel is closed
on rs:worker-0
2025-01-26T15:54:44.350351Z INFO restate_ingress_kafka::consumer_task
  Processing Kafka ingress request
on rs:ingress-4
  in restate_ingress_kafka::consumer_task::kafka_ingress_consume
    otel.name: "kafka_ingress_consume"
    messaging.system: "kafka"
    messaging.operation: "receive"
    messaging.source.name: "counter"
    messaging.destination.name: service://Counter/add
    restate.subscription.id: sub_13Mwxl4GSHoM9kMdZvfYVhf
    messaging.consumer.group.name: "sub_13Mwxl4GSHoM9kMdZvfYVhf"
2025-01-26T15:54:44.350380Z ERROR restate_bifrost::appender
    error: unknown log '2'
on rs:ingress-4
  in restate_bifrost::appender::append
    log_id: 2
  in restate_ingress_kafka::consumer_task::kafka_ingress_consume
    otel.name: "kafka_ingress_consume"
    messaging.system: "kafka"
    messaging.operation: "receive"
    messaging.source.name: "counter"
    messaging.destination.name: service://Counter/add
    restate.subscription.id: sub_13Mwxl4GSHoM9kMdZvfYVhf
    messaging.consumer.group.name: "sub_13Mwxl4GSHoM9kMdZvfYVhf"
2025-01-26T15:54:44.350437Z DEBUG restate_ingress_kafka::consumer_task
  Stopping consumer with id sub_13Mwxl4GSHoM9kMdZvfYVhf
on rs:worker-3
2025-01-26T15:54:44.351787Z DEBUG restate_admin::cluster_controller::logs_controller
  Proposing new logs configuration: Logs { ... }
on rs:worker-2
2025-01-26T15:54:44.352957Z WARN restate_ingress_kafka::subscription_controller::task_orchestrator
  Consumer task for subscription sub_13Mwxl4GSHoM9kMdZvfYVhf unexpectedly returned error: ingress dispatcher channel is closed
on rs:worker-3

I assume that the retry timeout is too long for the test to finish within the configured 10 seconds.

default => KafkaIngress => handleEventInEventHandler(URL, int, Client)
  MethodSource [className = 'dev.restate.sdktesting.tests.KafkaIngress', methodName = 'handleEventInEventHandler', methodParameterTypes = 'java.net.URL, int, dev.restate.sdk.client.Client']
  => org.awaitility.core.ConditionTimeoutException: Condition with alias 'Updates from Kafka are visible in the counter' didn't complete within 10 seconds because assertion condition defined as a Lambda expression in dev.restate.sdktesting.tests.UtilsKt$untilAsserted$2 
expected: 6L
 but was: 0L.
     org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
     org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
     org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
     org.awaitility.core.ConditionFactory.until(ConditionFactory.java:1006)
     org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:790)
     dev.restate.sdktesting.tests.UtilsKt$untilAsserted$2.invokeSuspend(utils.kt:36)
     kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
     kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:102)
     kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:113)
     kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:96)
     kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:589)
     kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:816)
     kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:720)
     kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:707)
   Caused by: org.opentest4j.AssertionFailedError: 

tillrohrmann avatar Jan 27 '25 08:01 tillrohrmann

@slinkydeveloper could you take a look at this issue?

tillrohrmann avatar Jan 27 '25 08:01 tillrohrmann

Another instance presumably: https://github.com/restatedev/restate/actions/runs/13319028159/job/37200208521#step:11:151

tillrohrmann avatar Feb 14 '25 08:02 tillrohrmann

I'm gonna increase the timeout of this one, because it seems just to be slow. If you look at the timings of the start of the test, you see it took a long time to pull the kafka image. Also if you look at the state table dump, you see both of them have 6 as state.

slinkydeveloper avatar Feb 14 '25 08:02 slinkydeveloper

This sounds very reasonable. Thanks @slinkydeveloper 🙏

tillrohrmann avatar Feb 14 '25 08:02 tillrohrmann

The problem hasn't occurred since increasing the timeouts. Closing this issue as done.

tillrohrmann avatar Apr 02 '25 20:04 tillrohrmann

It seems to have happened again: https://github.com/restatedev/restate/actions/runs/18652107682/job/53174722538?pr=3917#step:12:364. @slinkydeveloper could we further harden the test case or is this a real problem?

tillrohrmann avatar Oct 20 '25 20:10 tillrohrmann

Maybe https://github.com/restatedev/restate/pull/3285 could help with figuring out whether something is wrong here.

tillrohrmann avatar Oct 20 '25 20:10 tillrohrmann

Another instance that happened on a release CI run: https://github.com/restatedev/restate/actions/runs/18655391303/job/53231622011#step:7:442.

tillrohrmann avatar Oct 21 '25 07:10 tillrohrmann

https://github.com/restatedev/restate/actions/runs/19312605011/job/55237473233 and https://github.com/restatedev/restate/actions/runs/19237930392/job/54994152559 look suspiciously like duplicates of this issue.

tillrohrmann avatar Nov 13 '25 09:11 tillrohrmann

Another instance: https://github.com/restatedev/restate/actions/runs/19573572194/job/56054623314?pr=4052

tillrohrmann avatar Nov 21 '25 14:11 tillrohrmann

Yet another instance: https://github.com/restatedev/restate/actions/runs/19630219623/job/56211186381?pr=4053

tillrohrmann avatar Nov 26 '25 08:11 tillrohrmann