DataflowTemplates
DataflowTemplates copied to clipboard
[Bug]: Flaky test: StreamingDataGeneratorIT#testFakeMessagesToKafka
Related Template(s)
StreamingDataGenerator
Template Version
HEAD
What happened?
StreamingDataGeneratorIT#testFakeMessagesToKafka is flaky due to connection issue to the kafka broker on self-hosted GitHub Actions workers.
Relevant log output
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,854] ERROR [KafkaApi-1] Unexpected error handling request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-6, correlationId=64) -- InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) with context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-6, correlationId=64), connectionId='172.17.0.17:9093-10.128.0.16:37716-248', clientAddress=/10.128.0.16, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.7.0), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@61ef1d46]) (kafka.server.KafkaApis)
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for next producer ID block
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] INFO [BrokerToControllerChannelManager broker=1 name=forwarding] Disconnecting from node 1 due to request timeout. (org.apache.kafka.clients.NetworkClient)
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] INFO [BrokerToControllerChannelManager broker=1 name=forwarding] Cancelled in-flight API_VERSIONS request with correlation id 169 due to node 1 being disconnected (elapsed time since creation: 30030ms, elapsed time since send: 30030ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient)
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use node b010d8cb7964:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] WARN [RPC ProducerId Manager 1]: Timed out when requesting AllocateProducerIds from the controller. (kafka.coordinator.transaction.RPCProducerIdManager)
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,934] ERROR [KafkaApi-1] Unexpected error handling request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-16, correlationId=64) -- InitProducerIdRequestData(transactionalId=null, transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) with context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=producer-16, correlationId=64), connectionId='172.17.0.17:9093-10.128.0.16:37736-248', clientAddress=/10.128.0.16, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.7.0), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@3bd007d2]) (kafka.server.KafkaApis)
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: org.apache.kafka.common.errors.TimeoutException: The request timed out.
[docker-java-stream-2024204139] INFO org.apache.beam.it.testcontainers.TestContainerResourceManager - confluentinc/cp-kafka:7.3.1: [2024-07-26 02:48:53,984] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use node b010d8cb7964:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
...
[kafka-admin-client-thread | adminclient-1] INFO org.apache.kafka.clients.admin.internals.AdminMetadataManager - [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
[pool-1-thread-11] ERROR org.apache.beam.it.kafka.KafkaResourceManager - Failed to delete kafka topic.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: deleteTopics
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at org.apache.beam.it.kafka.KafkaResourceManager.cleanupAll(KafkaResourceManager.java:197)
at org.apache.beam.it.common.utils.ResourceManagerUtils.cleanResources(ResourceManagerUtils.java:153)
at com.google.cloud.teleport.v2.templates.StreamingDataGeneratorIT.tearDown(StreamingDataGeneratorIT.java:99)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.RunAfters.invokeMethod(RunAfters.java:46)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:410)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: deleteTopics