spring-kafka
spring-kafka copied to clipboard
childContainer of previous run is stopping ConcurrentContainer after a new start
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3
Describe the bug
If a ConcurrentContainer is stopped, then the child container should not be allowed to stop ConcurrentContainer. But, there are some scenarios where it is possible.
Scenario:
Concurrency: 2
ConcurrentContainer:: CMain Child containers: C0, C1.
-
ConcurrentContainerstarted.CMain -- running. C0 -- running. C1 -- running.
-
ConcurrentContainerstopped.CMain -- not running. C0 -- delinked. (message processing is happening) C1 -- delinked.
-
ConcurrentContainerstarted. This is permitted sincestopis called before. Nothing wrong here. It should be allowed. Only the practise is not correct.CMain -- running. C2 -- running. C3 -- running.
C0 -- delinked. (message processing is happening)
-
C0has thrown error while processing. This would stop the runningConcurrentContainer!!!!CMain -- not running. C2 -- delinked. C3 -- delinked.
C0 -- delinked.
To Reproduce
@Test
public void testFencedContainerFailed() throws Exception {
this.logger.info("Start testFencedContainerFailed");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
embeddedKafka);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
@Override
protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {
overrides.set(properties);
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}
};
ContainerProperties containerProps = new ContainerProperties(topic1);
containerProps.setLogContainerConfig(true);
containerProps.setClientId("client");
containerProps.setAckMode(ContainerProperties.AckMode.RECORD);
final CountDownLatch secondRunLatch = new CountDownLatch(5);
final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
final List<String> payloads = new ArrayList<>();
final CountDownLatch processingLatch = new CountDownLatch(1);
final CountDownLatch firstLatch = new CountDownLatch(1);
AtomicBoolean first = new AtomicBoolean(true);
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
if (first.getAndSet(false)) {
try {
firstLatch.await(100, TimeUnit.SECONDS);
throw new NullPointerException();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
listenerThreadNames.add(Thread.currentThread().getName());
payloads.add(message.value());
secondRunLatch.countDown();
processingLatch.countDown();
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(2);
container.setBeanName("testAuto");
container.setChangeConsumerThreadName(true);
container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());
BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
events.add((KafkaEvent) e);
if (e instanceof ConcurrentContainerStoppedEvent) {
concurrentContainerStopLatch.countDown();
}
if (e instanceof ConsumerStoppedEvent) {
consumerStoppedEventLatch.countDown();
}
});
CountDownLatch interceptedSecondRun = new CountDownLatch(5);
container.setRecordInterceptor((record, consumer) -> {
interceptedSecondRun.countDown();
return record;
});
container.start();
MessageListenerContainer childContainer0 = container.getContainers().get(0);
MessageListenerContainer childContainer1 = container.getContainers().get(1);
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
assertThat(container.getAssignedPartitions()).hasSize(2);
Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
assertThat(assignments).hasSize(2);
assertThat(assignments.get("client-0")).isNotNull();
assertThat(assignments.get("client-1")).isNotNull();
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 2, "bar");
template.sendDefault(0, 0, "baz");
template.sendDefault(1, 2, "qux");
template.flush();
assertThat(container.metrics()).isNotNull();
assertThat(container.isInExpectedState()).isTrue();
assertThat(childContainer0.isRunning()).isTrue();
assertThat(childContainer1.isRunning()).isTrue();
assertThat(container.isChildRunning()).isTrue();
assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(container.isChildRunning()).isTrue();
assertThat(container.isRunning()).isFalse();
assertThat(childContainer0.isRunning()).isFalse();
assertThat(childContainer1.isRunning()).isFalse();
assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(container.isChildRunning()).isTrue();
assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1");
assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse();
template.sendDefault(0, 0, "FOO");
template.sendDefault(1, 2, "BAR");
template.sendDefault(0, 0, "BAZ");
template.sendDefault(1, 2, "QUX");
template.flush();
// permitted since stop is called prior.
container.start();
assertThat(container.isRunning()).isTrue();
assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning()))
.isTrue();
firstLatch.countDown();
//Running container is stopped!!!!!!!!
assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(container.isRunning()).isFalse();
assertThat(container.getContainers().stream().anyMatch(containerL -> containerL.isRunning()))
.isFalse();
this.logger.info("Stop testFencedContainerFailed");
}
Please suggest if this is a valid scenario.
What are your 1, 2, 3, 4?
Different scenarios, or steps, or states of the same scenario?
Isn't this a result of all the changes you have introduced recently?
C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!
This indeed must not happen. The failure in one child container must not effect all others.
Does it happen even in previous versions, even before your recent changes?
Isn't this a result of all the changes you have introduced recently?
I have not introduced any bugs. Current bug is a different one. It is existing in previous versions also. I have provided JUnit to replicate this scenario. Please review and suggest if this a valid scenario and needs to be fixed.
What are your
1, 2, 3, 4? Different scenarios, or steps, or states of the same scenario? Isn't this a result of all the changes you have introduced recently?
These are steps to be followed to get the overview of bug. These are not different scenarios. Please let me know, if I have to provide more information regarding this issue.
This bug is not because of my changes.
No problem!
I will need more time to investigate this. We may ask if @sobychacko has some cycles to look into this quicker. Thanks
If you are ok, I will try to fix the issue and provide the PR. I am having some idea on it.
@LokeshAlamuri Whats the status of this issue? I know we have closed the other PR you submitted before. Can we close this issue also based on the comments on the PR?
As discussed in the PR I have submitted, reported issue is a valid one. It exists in the previous versions also. Please consider and take appropriate action regarding this bug.