pulsar-client-node icon indicating copy to clipboard operation
pulsar-client-node copied to clipboard

`AlreadyClosed` exception when using `topicsPattern`

Open dlecocq opened this issue 4 years ago • 4 comments

We are unable to cleanly close our consumer when using topicsPattern:

2020-10-27 11:15:43.304 INFO  [0x70000d6d5000] ConnectionPool:85 | Created connection for pulsar://pulsar.data-access-platform-portal.docker:6650
2020-10-27 11:15:43.307 INFO  [0x70000ef61000] ClientConnection:343 | [192.168.99.1:58802 -> 192.168.99.100:6650] Connected to broker
Closing consumer...
(node:35996) UnhandledPromiseRejectionWarning: Error: Failed to close consumer: AlreadyClosed
(node:35996) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:35996) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

That's the result from running this code snippet (with the appropriate docker container running):

const Bluebird = require('bluebird')
const Pulsar = require('pulsar-client')

const doit = async () => {
  const client = new Pulsar.Client({
    webServiceUrl: 'http://pulsar.data-access-platform-portal.docker:8080',
    serviceUrl: 'pulsar://pulsar.data-access-platform-portal.docker:6650'
  })
  const consumer = await client.subscribe({
    topicsPattern: `non-persistent://public/default/topic`,
    subscription: 'test-subscription',
    subscriptionType: 'Shared',
    subscriptionInitialPosition: 'Earliest'
  })

  // Rather than push messages to consume, we're just waiting for a second
  await Bluebird.delay(1000)

  console.log('Closing consumer...')
  // We've also tried omitting this line, but it still produces errors
  await consumer.close()
  console.log('Closing client...')
  await client.close()
}

doit()

We've tried this with node versions 10.16.3, 12.16.0, and 14.11.0 and pulsar 2.6.1.

I suspect that the root cause is that it appears that the consumer implementation in this library makes the assumption that there's only a single consumer - https://github.com/apache/pulsar-client-node/blob/f010fb2ed3c530af8006ba3d3a7b73c6b46f6507/src/Consumer.cc#L178 overwrites the wrapper's consumer with each invocation. While this wouldn't be a problem with a single topic, it seems that the last consumer "wins" in that it gets stitched up to be the consumer saved in the wrapper. However, there may be a race condition for whether the listener handler stitched up correctly from the config (https://github.com/apache/pulsar-client-node/blob/f010fb2ed3c530af8006ba3d3a7b73c6b46f6507/src/Consumer.cc#L184 but then the consumer config is subsequently deleted).

Looking at ClientImpl in the pulsar C++ code, it appears that it also takes responsibility for closing any open consumers and producers, so we tried also just relying on the client in order to close the consumers (since ClientImpl appears to correctly track all newly-created consumers), but that also reproduces with AlreadyClosed getting thrown.

All that said, it is a little tricky to track all of it because of the number of levels of indirection - from the JS to the C++ extension to the C bindings which delegate out to the C++ client code. So we might be way off 😆

dlecocq avatar Oct 27 '20 17:10 dlecocq

~~it seems that this exception doesn't occur with persistent topicsPattern. 🤔~~ this was wrong. what mattered was whether or not matching topics exist.

kimula avatar Oct 29 '20 09:10 kimula

so far i've checked that

  • topicsPatterntopic or topics: works
  • other options aren't related
  • it is not related whether or not the RE is valid.
  • an equivalent C++ client code works
  • consumer.closeconsumer.unsubscribe: though AlreadyClosed doesn't occur while unsubscribing, it does while closing the client.

i'll continue investigation.

kimula avatar Nov 04 '20 02:11 kimula

AlreadyClosed occurs when there're no matching topics for topicsPattern. in such case consumers_ become empty and closeAsync passes ResultAlreadyClosed to the callback. (the same thing is also happening with C++ client but it is not explicitly output.)

it may be because with topicsPattern a consumer doesn't dynamically create topics.

kimula avatar Nov 20 '20 02:11 kimula

We're still seeing this behavior on the latest version v1.8.1

mguay22 avatar Apr 09 '23 23:04 mguay22