spring-kafka
spring-kafka copied to clipboard
childContainer of previous run is stopping ConcurrentContainer after a new start
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3
Describe the bug
If a ConcurrentContainer is stopped, then the child container should not be allowed to stop ConcurrentContainer. But, there are some scenarios where it is possible.
Scenario:
Concurrency: 2
ConcurrentContainer:: CMain Child containers: C0, C1.
-
ConcurrentContainer
started.CMain -- running. C0 -- running. C1 -- running.
-
ConcurrentContainer
stopped.CMain -- not running. C0 -- delinked. (message processing is happening) C1 -- delinked.
-
ConcurrentContainer
started. This is permitted sincestop
is called before. Nothing wrong here. It should be allowed. Only the practise is not correct.CMain -- running. C2 -- running. C3 -- running.
C0 -- delinked. (message processing is happening)
-
C0
has thrown error while processing. This would stop the runningConcurrentContainer
!!!!CMain -- not running. C2 -- delinked. C3 -- delinked.
C0 -- delinked.
To Reproduce
@Test
public void testFencedContainerFailed() throws Exception {
this.logger.info("Start testFencedContainerFailed");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
embeddedKafka);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
@Override
protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {
overrides.set(properties);
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}
};
ContainerProperties containerProps = new ContainerProperties(topic1);
containerProps.setLogContainerConfig(true);
containerProps.setClientId("client");
containerProps.setAckMode(ContainerProperties.AckMode.RECORD);
final CountDownLatch secondRunLatch = new CountDownLatch(5);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
final List<String> payloads = new ArrayList<>();
final CountDownLatch processingLatch = new CountDownLatch(1);
final CountDownLatch firstLatch = new CountDownLatch(1);
AtomicBoolean first = new AtomicBoolean(true);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
if (first.getAndSet(false)) {
try {
firstLatch.await(100, TimeUnit.SECONDS);
throw new NullPointerException();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
payloads.add(message.value());
secondRunLatch.countDown();
processingLatch.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
container.setBeanName("testAuto");
container.setChangeConsumerThreadName(true);
container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());
BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
events.add((KafkaEvent) e);
if (e instanceof ConcurrentContainerStoppedEvent) {
concurrentContainerStopLatch.countDown();
}
if (e instanceof ConsumerStoppedEvent) {
consumerStoppedEventLatch.countDown();
}
});
CountDownLatch interceptedSecondRun = new CountDownLatch(5);
container.setRecordInterceptor((record, consumer) -> {
interceptedSecondRun.countDown();
return record;
});
container.start();
MessageListenerContainer childContainer0 = container.getContainers().get(0);
MessageListenerContainer childContainer1 = container.getContainers().get(1);
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
assertThat(container.getAssignedPartitions()).hasSize(2);
Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
assertThat(assignments).hasSize(2);
assertThat(assignments.get("client-0")).isNotNull();
assertThat(assignments.get("client-1")).isNotNull();
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 2, "bar");
template.sendDefault(0, 0, "baz");
template.sendDefault(1, 2, "qux");
template.flush();
assertThat(container.metrics()).isNotNull();
assertThat(container.isInExpectedState()).isTrue();
assertThat(childContainer0.isRunning()).isTrue();
assertThat(childContainer1.isRunning()).isTrue();
assertThat(container.isChildRunning()).isTrue();
assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(container.isChildRunning()).isTrue();
assertThat(container.isRunning()).isFalse();
assertThat(childContainer0.isRunning()).isFalse();
assertThat(childContainer1.isRunning()).isFalse();
assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(container.isChildRunning()).isTrue();
assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1");
assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse();
template.sendDefault(0, 0, "FOO");
template.sendDefault(1, 2, "BAR");
template.sendDefault(0, 0, "BAZ");
template.sendDefault(1, 2, "QUX");
template.flush();
// permitted since stop is called prior.
container.start();
assertThat(container.isRunning()).isTrue();
assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning()))
.isTrue();
firstLatch.countDown();
//Running container is stopped!!!!!!!!
assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(container.isRunning()).isFalse();
assertThat(container.getContainers().stream().anyMatch(containerL -> containerL.isRunning()))
.isFalse();
this.logger.info("Stop testFencedContainerFailed");
}
Please suggest if this is a valid scenario.