[BUG] Topic cannot be deleted after producing transactional messages
Describe the bug
To Reproduce
- Kafka Client: 3.1.0
- Pulsar: 2.10.0
- KoP: 2.10.1.1
KoP configs:
messagingProtocols=kafka
protocolHandlerDirectory=./protocols
allowAutoTopicCreationType=partitioned
kafkaListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
brokerDeleteInactivePartitionedTopicMetadataEnabled=true
brokerDeduplicationEnabled=true
kafkaTransactionCoordinatorEnabled=true
brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
First, create the partitioned topic via pulsar-admin:
./bin/pulsar-admin topics create-partitioned-topic my-topic-5 -p 1
Then, run the following Kafka application:
final var bootstrapServers = "localhost:9092";
final var topic = "my-topic-5";
final var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "X0");
@Cleanup final var producer = new KafkaProducer<String, String>(props);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(topic, 0, null, "hello"));
producer.commitTransaction();
Then, try to delete the topic:
$ ./bin/pulsar-admin topics delete-partitioned-topic my-topic-5
2022-07-06T16:45:48,350+0800 [AsyncHttpClient-7-1] WARN org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:8080/admin/v2/persistent/public/default/my-topic-5/partitions?force=false&deleteSchema=false] Failed to perform http delete request: javax.ws.rs.ClientErrorException: HTTP 412 Precondition Failed
Topic has active producers/subscriptions
Reason: Topic has active producers/subscriptions
Expected behavior The topic should be deleted.
Additional context
Here are the essential broker logs:
2022-07-06T16:45:48,317+0800 [pulsar-web-47-2] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to delete topic persistent://public/default/my-topic-5-partition-0
org.apache.pulsar.broker.service.BrokerServiceException$TopicBusyException: Topic has 1 connected producers/consumers
at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$delete$32(PersistentTopic.java:1205) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) ~[?:1.8.0_332]
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_332]
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_332]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1142) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1074) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.pulsar.broker.service.persistent.PersistentTopic.delete(PersistentTopic.java:1069) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.pulsar.broker.service.BrokerService.deleteTopic(BrokerService.java:1037) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalDeleteTopic(PersistentTopicsBase.java:1020) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalDeleteTopic(PersistentTopicsBase.java:1011) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.pulsar.broker.admin.v2.PersistentTopics.deleteTopic(PersistentTopics.java:961) ~[org.apache.pulsar-pulsar-broker-2.10.0.jar:2.10.0]
Here is the internal stats of my-topic-5:
$ ./bin/pulsar-admin topics partitioned-stats-internal my-topic-5
{
"metadata" : {
"partitions" : 1
},
"partitions" : {
"persistent://public/default/my-topic-5-partition-0" : {
"entriesAddedCounter" : 2,
"numberOfEntries" : 2,
"totalSize" : 156,
"currentLedgerEntries" : 2,
"currentLedgerSize" : 156,
"lastLedgerCreatedTimestamp" : "2022-07-06T16:44:42.352+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "236:1",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 236,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"pulsar.dedup" : {
"markDeletePosition" : "236:-1",
"readPosition" : "236:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 237,
"cursorLedgerLastEntry" : 0,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-07-06T16:44:42.363+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
}
}
Using an idempotent producer works. It looks like this bug is not related to Pulsar.
final var bootstrapServers = "localhost:9092";
final var topic = "my-topic-4";
final var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
@Cleanup final var producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<>(topic, 0, null, "hello"));
The stats is similar (no producer, only 1 pulsar.dedup non-durable cursor), but the topic can be deleted.
$ ./bin/pulsar-admin topics partitioned-stats-internal my-topic-4
{
"metadata" : {
"partitions" : 1
},
"partitions" : {
"persistent://public/default/my-topic-4-partition-0" : {
"entriesAddedCounter" : 1,
"numberOfEntries" : 1,
"totalSize" : 51,
"currentLedgerEntries" : 1,
"currentLedgerSize" : 51,
"lastLedgerCreatedTimestamp" : "2022-07-06T16:35:10.711+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "234:0",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 234,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"pulsar.dedup" : {
"markDeletePosition" : "234:-1",
"readPosition" : "234:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 235,
"cursorLedgerLastEntry" : 0,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2022-07-06T16:35:10.724+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}
}
}
$ ./bin/pulsar-admin topics delete-partitioned-topic my-topic-4
you could be delete topic with --force options when topics has connected producers or consumers.
Yeah, but this issue is more about why the topic cannot be deleted. It's okay to delete the topic with -f option, but if there were some producers, the topic could be created automatically and might lead to some unexpected results.
BTW, the cause of this issue is that there's a InternalProducer object, which extends the broker's Producer class, is writing the transactional marker when the topic is deleted.
Thanks for your prompt reply and helpful suggestion,has any commits to fixed this issue up to now?
See https://github.com/streamnative/kop/pull/1388. Actually, the topic could be deleted after waiting for a while. Because at the time producer.commitTransaction() returns, the related transaction marker might not be written. After the marker being written successfully, the topic could be deleted.