kop icon indicating copy to clipboard operation
kop copied to clipboard

[BUG] Topic cannot be deleted after producing transactional messages

Open BewareMyPower opened this issue 3 years ago • 5 comments

Describe the bug

To Reproduce

  • Kafka Client: 3.1.0
  • Pulsar: 2.10.0
  • KoP: 2.10.1.1

KoP configs:

messagingProtocols=kafka
protocolHandlerDirectory=./protocols
allowAutoTopicCreationType=partitioned
kafkaListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
brokerDeleteInactivePartitionedTopicMetadataEnabled=true
brokerDeduplicationEnabled=true
kafkaTransactionCoordinatorEnabled=true
brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up

First, create the partitioned topic via pulsar-admin:

./bin/pulsar-admin topics create-partitioned-topic my-topic-5 -p 1

Then, run the following Kafka application:

        final var bootstrapServers = "localhost:9092";
        final var topic = "my-topic-5";
        final var props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "X0");
        @Cleanup final var producer = new KafkaProducer<String, String>(props);
        producer.initTransactions();
        producer.beginTransaction();
        producer.send(new ProducerRecord<>(topic, 0, null, "hello"));
        producer.commitTransaction();

Then, try to delete the topic:

$ ./bin/pulsar-admin topics delete-partitioned-topic my-topic-5     
2022-07-06T16:45:48,350+0800 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:8080/admin/v2/persistent/public/default/my-topic-5/partitions?force=false&deleteSchema=false] Failed to perform http delete request: javax.ws.rs.ClientErrorException: HTTP 412 Precondition Failed
Topic has active producers/subscriptions

Reason: Topic has active producers/subscriptions

Expected behavior The topic should be deleted.

Additional context

Here are the essential broker logs:

2022-07-06T16:45:48,317+0800 [pulsar-web-47-2] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to delete topic persistent://public/default/my-topic-5-partition-0
org.apache.pulsar.broker.service.BrokerServiceException$TopicBusyException: Topic has 1 connected producers/consumers
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$32(PersistentTopic.java:1205) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_332]
	at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_332]
	at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_332]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1142) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1074) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
	at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1069) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
	at org.apache.pulsar.broker.service.BrokerService.deleteTopic(BrokerService.java:1037) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalDeleteTopic(PersistentTopicsBase.java:1020) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalDeleteTopic(PersistentTopicsBase.java:1011) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
	at org.apache.pulsar.broker.admin.v2.PersistentTopics.deleteTopic(PersistentTopics.java:961) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]

Here is the internal stats of my-topic-5:

$ ./bin/pulsar-admin topics partitioned-stats-internal my-topic-5
{
  "metadata" : {
    "partitions" : 1
  },
  "partitions" : {
    "persistent://public/default/my-topic-5-partition-0" : {
      "entriesAddedCounter" : 2,
      "numberOfEntries" : 2,
      "totalSize" : 156,
      "currentLedgerEntries" : 2,
      "currentLedgerSize" : 156,
      "lastLedgerCreatedTimestamp" : "2022-07-06T16:44:42.352+08:00",
      "waitingCursorsCount" : 0,
      "pendingAddEntriesCount" : 0,
      "lastConfirmedEntry" : "236:1",
      "state" : "LedgerOpened",
      "ledgers" : [ {
        "ledgerId" : 236,
        "entries" : 0,
        "size" : 0,
        "offloaded" : false,
        "underReplicated" : false
      } ],
      "cursors" : {
        "pulsar.dedup" : {
          "markDeletePosition" : "236:-1",
          "readPosition" : "236:0",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : 0,
          "cursorLedger" : 237,
          "cursorLedgerLastEntry" : 0,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2022-07-06T16:44:42.363+08:00",
          "state" : "Open",
          "numberOfEntriesSinceFirstNotAckedMessage" : 1,
          "totalNonContiguousDeletedMessagesRange" : 0,
          "subscriptionHavePendingRead" : false,
          "subscriptionHavePendingReplayRead" : false,
          "properties" : { }
        }
      },
      "schemaLedgers" : [ ],
      "compactedLedger" : {
        "ledgerId" : -1,
        "entries" : -1,
        "size" : -1,
        "offloaded" : false,
        "underReplicated" : false
      }
    }
  }
}

BewareMyPower avatar Jul 06 '22 08:07 BewareMyPower

Using an idempotent producer works. It looks like this bug is not related to Pulsar.

        final var bootstrapServers = "localhost:9092";
        final var topic = "my-topic-4";
        final var props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        @Cleanup final var producer = new KafkaProducer<String, String>(props);
        producer.send(new ProducerRecord<>(topic, 0, null, "hello"));

The stats is similar (no producer, only 1 pulsar.dedup non-durable cursor), but the topic can be deleted.

$ ./bin/pulsar-admin topics partitioned-stats-internal my-topic-4   
{
  "metadata" : {
    "partitions" : 1
  },
  "partitions" : {
    "persistent://public/default/my-topic-4-partition-0" : {
      "entriesAddedCounter" : 1,
      "numberOfEntries" : 1,
      "totalSize" : 51,
      "currentLedgerEntries" : 1,
      "currentLedgerSize" : 51,
      "lastLedgerCreatedTimestamp" : "2022-07-06T16:35:10.711+08:00",
      "waitingCursorsCount" : 0,
      "pendingAddEntriesCount" : 0,
      "lastConfirmedEntry" : "234:0",
      "state" : "LedgerOpened",
      "ledgers" : [ {
        "ledgerId" : 234,
        "entries" : 0,
        "size" : 0,
        "offloaded" : false,
        "underReplicated" : false
      } ],
      "cursors" : {
        "pulsar.dedup" : {
          "markDeletePosition" : "234:-1",
          "readPosition" : "234:0",
          "waitingReadOp" : false,
          "pendingReadOps" : 0,
          "messagesConsumedCounter" : 0,
          "cursorLedger" : 235,
          "cursorLedgerLastEntry" : 0,
          "individuallyDeletedMessages" : "[]",
          "lastLedgerSwitchTimestamp" : "2022-07-06T16:35:10.724+08:00",
          "state" : "Open",
          "numberOfEntriesSinceFirstNotAckedMessage" : 1,
          "totalNonContiguousDeletedMessagesRange" : 0,
          "subscriptionHavePendingRead" : false,
          "subscriptionHavePendingReplayRead" : false,
          "properties" : { }
        }
      },
      "schemaLedgers" : [ ],
      "compactedLedger" : {
        "ledgerId" : -1,
        "entries" : -1,
        "size" : -1,
        "offloaded" : false,
        "underReplicated" : false
      }
    }
  }
}
$ ./bin/pulsar-admin topics delete-partitioned-topic my-topic-4  

BewareMyPower avatar Jul 06 '22 08:07 BewareMyPower

you could be delete topic with --force options when topics has connected producers or consumers.

zhouxy0809 avatar Jul 18 '22 09:07 zhouxy0809

Yeah, but this issue is more about why the topic cannot be deleted. It's okay to delete the topic with -f option, but if there were some producers, the topic could be created automatically and might lead to some unexpected results.

BTW, the cause of this issue is that there's a InternalProducer object, which extends the broker's Producer class, is writing the transactional marker when the topic is deleted.

BewareMyPower avatar Jul 18 '22 14:07 BewareMyPower

Thanks for your prompt reply and helpful suggestion,has any commits to fixed this issue up to now?

zhouxy0809 avatar Jul 21 '22 01:07 zhouxy0809

See https://github.com/streamnative/kop/pull/1388. Actually, the topic could be deleted after waiting for a while. Because at the time producer.commitTransaction() returns, the related transaction marker might not be written. After the marker being written successfully, the topic could be deleted.

BewareMyPower avatar Jul 21 '22 03:07 BewareMyPower