spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

childContainer of previous run is stopping ConcurrentContainer after a new start

Open LokeshAlamuri opened this issue 1 year ago • 5 comments
trafficstars

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3

Describe the bug

If a ConcurrentContainer is stopped, then the child container should not be allowed to stop ConcurrentContainer. But, there are some scenarios where it is possible.

Scenario:

Concurrency: 2

ConcurrentContainer:: CMain Child containers: C0, C1.

  1. ConcurrentContainer started.

    CMain -- running. C0 -- running. C1 -- running.

  2. ConcurrentContainer stopped.

    CMain -- not running. C0 -- delinked. (message processing is happening) C1 -- delinked.

  3. ConcurrentContainer started. This is permitted since stop is called before. Nothing wrong here. It should be allowed. Only the practise is not correct.

    CMain -- running. C2 -- running. C3 -- running.

    C0 -- delinked. (message processing is happening)

  4. C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!

    CMain -- not running. C2 -- delinked. C3 -- delinked.

    C0 -- delinked.

To Reproduce

@Test
	public void testFencedContainerFailed() throws Exception {
		this.logger.info("Start testFencedContainerFailed");
		Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
				embeddedKafka);
		AtomicReference<Properties> overrides = new AtomicReference<>();
		DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
			@Override
			protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
																	String clientIdSuffixArg, Properties properties) {
				overrides.set(properties);
				return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
			}
		};
		ContainerProperties containerProps = new ContainerProperties(topic1);
		containerProps.setLogContainerConfig(true);
		containerProps.setClientId("client");
		containerProps.setAckMode(ContainerProperties.AckMode.RECORD);

		final CountDownLatch secondRunLatch = new CountDownLatch(5);
		final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
		final List<String> payloads = new ArrayList<>();
		final CountDownLatch processingLatch = new CountDownLatch(1);
		final CountDownLatch firstLatch = new CountDownLatch(1);

		AtomicBoolean first = new AtomicBoolean(true);

		containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
			if (first.getAndSet(false)) {
				try {
					firstLatch.await(100, TimeUnit.SECONDS);
					throw new NullPointerException();
				}
				catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
			listenerThreadNames.add(Thread.currentThread().getName());
			payloads.add(message.value());
			secondRunLatch.countDown();
			processingLatch.countDown();
		});

		ConcurrentMessageListenerContainer<Integer, String> container =
				new ConcurrentMessageListenerContainer<>(cf, containerProps);
		container.setConcurrency(2);
		container.setBeanName("testAuto");
		container.setChangeConsumerThreadName(true);
		container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());

		BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
		CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
		CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1);

		container.setApplicationEventPublisher(e -> {
			events.add((KafkaEvent) e);
			if (e instanceof ConcurrentContainerStoppedEvent) {
				concurrentContainerStopLatch.countDown();
			}
			if (e instanceof ConsumerStoppedEvent) {
				consumerStoppedEventLatch.countDown();
			}
		});

		CountDownLatch interceptedSecondRun = new CountDownLatch(5);
		container.setRecordInterceptor((record, consumer) -> {
			interceptedSecondRun.countDown();
			return record;
		});

		container.start();

		MessageListenerContainer childContainer0 = container.getContainers().get(0);
		MessageListenerContainer childContainer1 = container.getContainers().get(1);

		ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
		assertThat(container.getAssignedPartitions()).hasSize(2);
		Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
		assertThat(assignments).hasSize(2);
		assertThat(assignments.get("client-0")).isNotNull();
		assertThat(assignments.get("client-1")).isNotNull();

		Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
		ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
		template.setDefaultTopic(topic1);
		template.sendDefault(0, 0, "foo");
		template.sendDefault(1, 2, "bar");
		template.sendDefault(0, 0, "baz");
		template.sendDefault(1, 2, "qux");
		template.flush();

		assertThat(container.metrics()).isNotNull();
		assertThat(container.isInExpectedState()).isTrue();
		assertThat(childContainer0.isRunning()).isTrue();
		assertThat(childContainer1.isRunning()).isTrue();
		assertThat(container.isChildRunning()).isTrue();

		assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue();

		container.stop();

		assertThat(container.isChildRunning()).isTrue();
		assertThat(container.isRunning()).isFalse();
		assertThat(childContainer0.isRunning()).isFalse();
		assertThat(childContainer1.isRunning()).isFalse();

		assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue();

		assertThat(container.isChildRunning()).isTrue();

		assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1");

		assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse();


		template.sendDefault(0, 0, "FOO");
		template.sendDefault(1, 2, "BAR");
		template.sendDefault(0, 0, "BAZ");
		template.sendDefault(1, 2, "QUX");
		template.flush();

		// permitted since stop is called prior.
		container.start();

		assertThat(container.isRunning()).isTrue();
		assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning()))
				.isTrue();

		firstLatch.countDown();

		//Running container is stopped!!!!!!!!
		assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isTrue();

		assertThat(container.isRunning()).isFalse();
		assertThat(container.getContainers().stream().anyMatch(containerL -> containerL.isRunning()))
				.isFalse();

		this.logger.info("Stop testFencedContainerFailed");
	}

Please suggest if this is a valid scenario.

LokeshAlamuri avatar Aug 20 '24 17:08 LokeshAlamuri

What are your 1, 2, 3, 4? Different scenarios, or steps, or states of the same scenario? Isn't this a result of all the changes you have introduced recently?

C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!

This indeed must not happen. The failure in one child container must not effect all others.

Does it happen even in previous versions, even before your recent changes?

artembilan avatar Aug 20 '24 17:08 artembilan

Isn't this a result of all the changes you have introduced recently?

I have not introduced any bugs. Current bug is a different one. It is existing in previous versions also. I have provided JUnit to replicate this scenario. Please review and suggest if this a valid scenario and needs to be fixed.

LokeshAlamuri avatar Aug 21 '24 17:08 LokeshAlamuri

What are your 1, 2, 3, 4? Different scenarios, or steps, or states of the same scenario? Isn't this a result of all the changes you have introduced recently?

These are steps to be followed to get the overview of bug. These are not different scenarios. Please let me know, if I have to provide more information regarding this issue.

This bug is not because of my changes.

LokeshAlamuri avatar Aug 21 '24 17:08 LokeshAlamuri

No problem!

I will need more time to investigate this. We may ask if @sobychacko has some cycles to look into this quicker. Thanks

artembilan avatar Aug 21 '24 17:08 artembilan

If you are ok, I will try to fix the issue and provide the PR. I am having some idea on it.

LokeshAlamuri avatar Aug 21 '24 17:08 LokeshAlamuri

@LokeshAlamuri Whats the status of this issue? I know we have closed the other PR you submitted before. Can we close this issue also based on the comments on the PR?

sobychacko avatar Nov 13 '25 02:11 sobychacko

As discussed in the PR I have submitted, reported issue is a valid one. It exists in the previous versions also. Please consider and take appropriate action regarding this bug.

LokeshAlamuri avatar Nov 14 '25 13:11 LokeshAlamuri