spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

childContainer of previous run is stopping ConcurrentContainer after a new start

Open LokeshAlamuri opened this issue 6 months ago • 5 comments

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.3

Describe the bug

If a ConcurrentContainer is stopped, then the child container should not be allowed to stop ConcurrentContainer. But, there are some scenarios where it is possible.

Scenario:

Concurrency: 2

ConcurrentContainer:: CMain Child containers: C0, C1.

  1. ConcurrentContainer started.

    CMain -- running. C0 -- running. C1 -- running.

  2. ConcurrentContainer stopped.

    CMain -- not running. C0 -- delinked. (message processing is happening) C1 -- delinked.

  3. ConcurrentContainer started. This is permitted since stop is called before. Nothing wrong here. It should be allowed. Only the practise is not correct.

    CMain -- running. C2 -- running. C3 -- running.

    C0 -- delinked. (message processing is happening)

  4. C0 has thrown error while processing. This would stop the running ConcurrentContainer !!!!

    CMain -- not running. C2 -- delinked. C3 -- delinked.

    C0 -- delinked.

To Reproduce

@Test
	public void testFencedContainerFailed() throws Exception {
		this.logger.info("Start testFencedContainerFailed");
		Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
				embeddedKafka);
		AtomicReference<Properties> overrides = new AtomicReference<>();
		DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
			@Override
			protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
																	String clientIdSuffixArg, Properties properties) {
				overrides.set(properties);
				return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
			}
		};
		ContainerProperties containerProps = new ContainerProperties(topic1);
		containerProps.setLogContainerConfig(true);
		containerProps.setClientId("client");
		containerProps.setAckMode(ContainerProperties.AckMode.RECORD);

		final CountDownLatch secondRunLatch = new CountDownLatch(5);
		final Set<String> listenerThreadNames = new ConcurrentSkipListSet<>();
		final List<String> payloads = new ArrayList<>();
		final CountDownLatch processingLatch = new CountDownLatch(1);
		final CountDownLatch firstLatch = new CountDownLatch(1);

		AtomicBoolean first = new AtomicBoolean(true);

		containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
			if (first.getAndSet(false)) {
				try {
					firstLatch.await(100, TimeUnit.SECONDS);
					throw new NullPointerException();
				}
				catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
			}
			ConcurrentMessageListenerContainerTests.this.logger.info("auto: " + message);
			listenerThreadNames.add(Thread.currentThread().getName());
			payloads.add(message.value());
			secondRunLatch.countDown();
			processingLatch.countDown();
		});

		ConcurrentMessageListenerContainer<Integer, String> container =
				new ConcurrentMessageListenerContainer<>(cf, containerProps);
		container.setConcurrency(2);
		container.setBeanName("testAuto");
		container.setChangeConsumerThreadName(true);
		container.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());

		BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
		CountDownLatch concurrentContainerStopLatch = new CountDownLatch(1);
		CountDownLatch consumerStoppedEventLatch = new CountDownLatch(1);

		container.setApplicationEventPublisher(e -> {
			events.add((KafkaEvent) e);
			if (e instanceof ConcurrentContainerStoppedEvent) {
				concurrentContainerStopLatch.countDown();
			}
			if (e instanceof ConsumerStoppedEvent) {
				consumerStoppedEventLatch.countDown();
			}
		});

		CountDownLatch interceptedSecondRun = new CountDownLatch(5);
		container.setRecordInterceptor((record, consumer) -> {
			interceptedSecondRun.countDown();
			return record;
		});

		container.start();

		MessageListenerContainer childContainer0 = container.getContainers().get(0);
		MessageListenerContainer childContainer1 = container.getContainers().get(1);

		ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
		assertThat(container.getAssignedPartitions()).hasSize(2);
		Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
		assertThat(assignments).hasSize(2);
		assertThat(assignments.get("client-0")).isNotNull();
		assertThat(assignments.get("client-1")).isNotNull();

		Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
		ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
		template.setDefaultTopic(topic1);
		template.sendDefault(0, 0, "foo");
		template.sendDefault(1, 2, "bar");
		template.sendDefault(0, 0, "baz");
		template.sendDefault(1, 2, "qux");
		template.flush();

		assertThat(container.metrics()).isNotNull();
		assertThat(container.isInExpectedState()).isTrue();
		assertThat(childContainer0.isRunning()).isTrue();
		assertThat(childContainer1.isRunning()).isTrue();
		assertThat(container.isChildRunning()).isTrue();

		assertThat(processingLatch.await(60, TimeUnit.SECONDS)).isTrue();

		container.stop();

		assertThat(container.isChildRunning()).isTrue();
		assertThat(container.isRunning()).isFalse();
		assertThat(childContainer0.isRunning()).isFalse();
		assertThat(childContainer1.isRunning()).isFalse();

		assertThat(consumerStoppedEventLatch.await(30, TimeUnit.SECONDS)).isTrue();

		assertThat(container.isChildRunning()).isTrue();

		assertThat(listenerThreadNames).containsAnyOf("testAuto-0", "testAuto-1");

		assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isFalse();


		template.sendDefault(0, 0, "FOO");
		template.sendDefault(1, 2, "BAR");
		template.sendDefault(0, 0, "BAZ");
		template.sendDefault(1, 2, "QUX");
		template.flush();

		// permitted since stop is called prior.
		container.start();

		assertThat(container.isRunning()).isTrue();
		assertThat(container.getContainers().stream().allMatch(containerL -> containerL.isRunning()))
				.isTrue();

		firstLatch.countDown();

		//Running container is stopped!!!!!!!!
		assertThat(concurrentContainerStopLatch.await(30, TimeUnit.SECONDS)).isTrue();

		assertThat(container.isRunning()).isFalse();
		assertThat(container.getContainers().stream().anyMatch(containerL -> containerL.isRunning()))
				.isFalse();

		this.logger.info("Stop testFencedContainerFailed");
	}

Please suggest if this is a valid scenario.

LokeshAlamuri avatar Aug 20 '24 17:08 LokeshAlamuri