pulsar-client-go
pulsar-client-go copied to clipboard
consumers stop receiving new data from pulsar
Expected behavior
the consumer should consume and receive all data.
Actual behavior
the consumer stops receiving new data from pulsar after have started. and "availablePermits" : -16,
Steps to reproduce
Start a consumer (go client v0.8.1), the consumer will receive some data after it has started immediately, and then stops receiving. This phenomenon can recur stably.
System configuration
Pulsar version: 2.10.1 KoP version: 2.10.1.6
https://github.com/streamnative/pulsar/issues/4830
./pulsar-admin topics partitioned-stats BD_YD_OrgInsight_STEP_50partition can see : "availablePermits" : -16
Topic "BD_YD_OrgInsight_STEP_50partition" is a partitioned topic, used by Kafka-on-Pulsar (KoP)。 kafka consumers work well and java consumers (java pulsar client 2.8.1) work well, but the go consumers(client v0.8.1) stops receiving new data after starting and this phenomenon can recur stably.
some partitions are
[root@kxnhhj6f bin]# ./pulsar-admin topics stats BD_YD_OrgInsight_STEP_50partition-partition-11 { "msgRateIn" : 6.222211471623512, "msgThroughputIn" : 47694.13981734731, "msgRateOut" : 6.222212615125944, "msgThroughputOut" : 47755.259599212506, "bytesInCounter" : 696903598, "msgInCounter" : 90958, "bytesOutCounter" : 535431349, "msgOutCounter" : 69779, "averageMsgSize" : 7665.142857142858, "msgChunkPublished" : false, "storageSize" : 697780372, "backlogSize" : 589186359, "publishRateLimitedTimes" : 0, "earliestMsgPublishTimeInBacklogs" : 0, "offloadedStorageSize" : 0, "lastOffloadLedgerId" : 0, "lastOffloadSuccessTimeStamp" : 0, "lastOffloadFailureTimeStamp" : 0, "publishers" : [ { "accessMode" : "Shared", "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "averageMsgSize" : 0.0, "chunkedMessageRate" : 0.0, "producerId" : 837687384786673193, "supportsPartialProducer" : false, "metadata" : { }, "producerName" : "pulsar-cluster-2-333", "connectedSince" : "2022-09-05T10:52:48.374+08:00", "address" : "/10.181.116.117:58506" }, { "accessMode" : "Shared", "msgRateIn" : 0.0, "msgThroughputIn" : 0.0, "averageMsgSize" : 0.0, "chunkedMessageRate" : 0.0, "producerId" : 837687384786674305, "supportsPartialProducer" : false, "metadata" : { }, "producerName" : "pulsar-cluster-2-405", "connectedSince" : "2022-09-05T11:00:47.073+08:00", "address" : "/10.181.116.121:45752" }, { "accessMode" : "Shared", "msgRateIn" : 6.222211471623512, "msgThroughputIn" : 47694.13981734731, "averageMsgSize" : 7665.142857142857, "chunkedMessageRate" : 0.0, "producerId" : 837687384786696451, "supportsPartialProducer" : false, "metadata" : { }, "producerName" : "pulsar-cluster-2-935", "connectedSince" : "2022-09-05T14:21:48.029+08:00", "address" : "/10.181.116.119:50970" } ], "waitingPublishers" : 0, "subscriptions" : { "sub1" : { "msgRateOut" : 6.222212615125944, "msgThroughputOut" : 47755.259599212506, "bytesOutCounter" : 401514905, "msgOutCounter" : 52277, "msgRateRedeliver" : 0.0, "messageAckRate" : 4.111104673121193, "chunkedMessageRate" : 0, "msgBacklog" : 0, "backlogSize" : 0, "earliestMsgPublishTimeInBacklog" : 0, "msgBacklogNoDelayed" : 0, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 0, "type" : "Exclusive", "activeConsumerName" : "f383c", "msgRateExpired" : 0.0, "totalMsgExpired" : 0, "lastExpireTimestamp" : 0, "lastConsumedFlowTimestamp" : 1662358934897, "lastConsumedTimestamp" : 1662358973448, "lastAckedTimestamp" : 1662358973548, "lastMarkDeleteAdvancedTimestamp" : 1662358973548, "consumers" : [ { "msgRateOut" : 6.222212615125944, "msgThroughputOut" : 47755.259599212506, "bytesOutCounter" : 401514905, "msgOutCounter" : 52277, "msgRateRedeliver" : 0.0, "messageAckRate" : 4.111104673121193, "chunkedMessageRate" : 0.0, "consumerName" : "f383c", "availablePermits" : 723, "unackedMessages" : 0, "avgMessagesPerEntry" : 1, "blockedConsumerOnUnackedMsgs" : false, "lastAckedTimestamp" : 1662358973548, "lastConsumedTimestamp" : 1662358973448, "metadata" : { }, "connectedSince" : "2022-09-05T10:31:24.3+08:00", "address" : "/10.180.224.86:23350", "clientVersion" : "2.8.1" } ], "isDurable" : true, "isReplicated" : false, "allowOutOfOrderDelivery" : false, "consumersAfterMarkDeletePosition" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "subscriptionProperties" : { }, "durable" : true, "replicated" : false }, "dsjcashflow_test1" : { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 133916444, "msgOutCounter" : 17502, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0, "msgBacklog" : 75963, "backlogSize" : 0, "earliestMsgPublishTimeInBacklog" : 0, "msgBacklogNoDelayed" : 75963, "blockedSubscriptionOnUnackedMsgs" : false, "msgDelayed" : 0, "unackedMessages" : 0, "type" : "Exclusive", "activeConsumerName" : "dsjcashflow_test1", "msgRateExpired" : 0.0, "totalMsgExpired" : 0, "lastExpireTimestamp" : 0, "lastConsumedFlowTimestamp" : 1662345688063, "lastConsumedTimestamp" : 1662345688133, "lastAckedTimestamp" : 1662345692140, "lastMarkDeleteAdvancedTimestamp" : 1662345692140, "consumers" : [ { "msgRateOut" : 0.0, "msgThroughputOut" : 0.0, "bytesOutCounter" : 26795637, "msgOutCounter" : 3501, "msgRateRedeliver" : 0.0, "messageAckRate" : 0.0, "chunkedMessageRate" : 0.0, "consumerName" : "dsjcashflow_test1", "availablePermits" : -1, "unackedMessages" : 0, "avgMessagesPerEntry" : 1, "blockedConsumerOnUnackedMsgs" : false, "lastAckedTimestamp" : 1662345692140, "lastConsumedTimestamp" : 1662345688133, "metadata" : { }, "connectedSince" : "2022-09-05T10:41:13.551+08:00", "address" : "/10.180.231.207:34358" } ], "isDurable" : true, "isReplicated" : false, "allowOutOfOrderDelivery" : false, "consumersAfterMarkDeletePosition" : { }, "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "subscriptionProperties" : { }, "durable" : true, "replicated" : false } }, "replication" : { }, "deduplicationStatus" : "Enabled", "nonContiguousDeletedMessagesRanges" : 0, "nonContiguousDeletedMessagesRangesSerializedSize" : 0, "compaction" : { "lastCompactionRemovedEventCount" : 0, "lastCompactionSucceedTimestamp" : 0, "lastCompactionFailedTimestamp" : 0, "lastCompactionDurationTimeInMills" : 0 } }
[root@kxnhhj6f bin]# ./pulsar-admin topics stats-internal BD_YD_OrgInsight_STEP_50partition-partition-11 { "entriesAddedCounter" : 90224, "numberOfEntries" : 90223, "totalSize" : 699139477, "currentLedgerEntries" : 40223, "currentLedgerSize" : 313576368, "lastLedgerCreatedTimestamp" : "2022-09-05T11:10:32.71+08:00", "waitingCursorsCount" : 1, "pendingAddEntriesCount" : 0, "lastConfirmedEntry" : "1469:40222", "state" : "LedgerOpened", "ledgers" : [ { "ledgerId" : 1036, "entries" : 50000, "size" : 385563109, "offloaded" : false, "underReplicated" : false }, { "ledgerId" : 1469, "entries" : 0, "size" : 0, "offloaded" : false, "underReplicated" : false } ], "cursors" : { "dsjcashflow_test1" : { "markDeletePosition" : "1036:14082", "readPosition" : "1036:14690", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : 14084, "cursorLedger" : 1035, "cursorLedgerLastEntry" : 189, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2022-09-05T09:15:08.997+08:00", "state" : "Open", "numberOfEntriesSinceFirstNotAckedMessage" : 608, "totalNonContiguousDeletedMessagesRange" : 0, "subscriptionHavePendingRead" : false, "subscriptionHavePendingReplayRead" : false, "properties" : { } }, "pulsar.dedup" : { "markDeletePosition" : "1469:39265", "readPosition" : "1469:39266", "waitingReadOp" : false, "pendingReadOps" : 0, "messagesConsumedCounter" : 89267, "cursorLedger" : 1564, "cursorLedgerLastEntry" : 15, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2022-09-05T13:20:18.336+08:00", "state" : "Open", "numberOfEntriesSinceFirstNotAckedMessage" : 1, "totalNonContiguousDeletedMessagesRange" : 0, "subscriptionHavePendingRead" : false, "subscriptionHavePendingReplayRead" : false, "properties" : { } }, "sub1" : { "markDeletePosition" : "1469:40221", "readPosition" : "1469:40223", "waitingReadOp" : true, "pendingReadOps" : 0, "messagesConsumedCounter" : 90223, "cursorLedger" : 1343, "cursorLedgerLastEntry" : 8038, "individuallyDeletedMessages" : "[]", "lastLedgerSwitchTimestamp" : "2022-09-05T10:31:24.268+08:00", "state" : "Open", "numberOfEntriesSinceFirstNotAckedMessage" : 2, "totalNonContiguousDeletedMessagesRange" : 0, "subscriptionHavePendingRead" : true, "subscriptionHavePendingReplayRead" : false, "properties" : { } } }, "schemaLedgers" : [ ], "compactedLedger" : { "ledgerId" : -1, "entries" : -1, "size" : -1, "offloaded" : false, "underReplicated" : false } }
Maybe #835 would be fix this problem.