librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

node-rdkafka e2e tests failing

Open robinfehr opened this issue 2 years ago • 9 comments

Description

I have implemented the cooporative rebalance in node-rdkafka and when i first started, the e2e tests were working. Soon I realized that they don't alway run trough. Sporadically they fail.

I've now extracted a test that can be run directly with node (without mocha) but tape.

The test sometimes fails, sometimes runs trough. The behaviour is inconsistent.

What seems to trigger it the CLOSE of the consumer. Since I'm quite new the internals of the librdkafka lib - some help would be appreciated. Meanwhile, I'll try to dig and see if i can find more.

How to reproduce

this is the test can be added to the root of node-rdkafka and can be run simply with node filename.js

1) add file to the root of node-rdkafka
2) npm install tape
3) node filename.js
var crypto = require("crypto");
var test = require("tape");
var Kafka = require(".");
var eventListener = require("./e2e/listener");

var kafkaBrokerList = process.env.KAFKA_HOST || "localhost:9092";

function getProducer(t) {
  return new Promise((res) => {
    let producer = new Kafka.Producer(
      {
        "client.id": "kafka-mocha",
        "metadata.broker.list": kafkaBrokerList,
        "fetch.wait.max.ms": 1,
        debug: "all",
        dr_cb: true,
      },
      {
        "produce.offset.report": true,
      }
    );
    producer.connect({}, function (err, d) {
      console.log("connecting p");
      t.ifError(err, "no error after connecting");
      t.equal(typeof d, "object", "metadata should be returned");
      res(producer);
    });
  });
}

function getConsumer(t) {
  return new Promise((res) => {
    const grp = "kafka-mocha-grp-" + crypto.randomBytes(20).toString("hex");
    let consumer = new Kafka.KafkaConsumer(
      {
        "metadata.broker.list": kafkaBrokerList,
        "group.id": grp,
        "fetch.wait.max.ms": 1000,
        "session.timeout.ms": 10000,
        "enable.auto.commit": false,
        "enable.partition.eof": true,
        debug: "all",
        // paused: true,
      },
      {
        "auto.offset.reset": "largest",
      }
    );

    consumer.connect({}, function (err, d) {
      console.log("connecting c");
      t.ifError(err, "no error after connecting");
      t.equal(typeof d, "object", "metadata should be returned");
      res(consumer);
    });
  });
}

function disconnectProducer(t, producer) {
  return new Promise((res) => {
    producer.disconnect(function (err) {
      t.ifError(err, "no error after disconnecting");
      res();
    });
  });
}

function disconnectConsumer(t, consumer) {
  return new Promise((res) => {
    consumer.disconnect(function (err) {
      t.ifError(err, "no error after disconnecting");
      res();
    });
  });
}

function assert_headers_match(t, expectedHeaders, messageHeaders) {
  t.equal(
    expectedHeaders.length,
    messageHeaders.length,
    "Headers length does not match expected length"
  );
  for (var i = 0; i < expectedHeaders.length; i++) {
    var expectedKey = Object.keys(expectedHeaders[i])[0];
    var messageKey = Object.keys(messageHeaders[i]);
    t.equal(messageKey.length, 1, "Expected only one Header key");
    t.equal(
      expectedKey,
      messageKey[0],
      "Expected key does not match message key"
    );
    var expectedValue = Buffer.isBuffer(expectedHeaders[i][expectedKey])
      ? expectedHeaders[i][expectedKey].toString()
      : expectedHeaders[i][expectedKey];
    var actualValue = messageHeaders[i][expectedKey].toString();
    t.equal(
      expectedValue.toString(),
      actualValue.toString(),
      "invalid message header"
    );
  }
}

function run_headers_test(t, headers, consumer, producer, topic) {
  return new Promise((res) => {
    var key = "key";

    crypto.randomBytes(4096, function (ex, buffer) {
      producer.setPollInterval(10);

      consumer.on("data", function (message) {
        console.log("tf", message);
        t.equal(
          buffer.toString(),
          message.value.toString(),
          "invalid message value"
        );
        t.equal(key.toString(), message.key.toString(), "invalid message key");
        t.equal(topic, message.topic, "invalid message topic");
        t.ok(message.offset >= 0, "invalid message offset");
        assert_headers_match(t, headers, message.headers);
        res();
      });

      consumer.subscribe([topic]);
      consumer.consume();

      setTimeout(function () {
        var timestamp = new Date().getTime();
        console.log("producing now", topic, key);
        producer.produce(topic, null, buffer, key, timestamp, "", headers);
      }, 2000);
    });
  });
}

test.only("should be able to produce and consume messages with one header value as float: consumeLoop", async function (t) {
  var topic = "test";
  const consumer = await getConsumer(t);
  eventListener(consumer);
  const producer = await getProducer(t);
  eventListener(producer);

  var headers = [{ key: "value" }];
  await run_headers_test(t, headers, consumer, producer, topic);
  await disconnectConsumer(t, consumer);
  await disconnectProducer(t, producer);
  t.end();
});

Debug-Logs in the working case

the callback of the close fn gets called. see ok 13 no error after disconnecting

the

ok 12 invalid message header
{ severity: 7, fac: 'CONSUME' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Enqueue 1 message(s) (4099 bytes, 1 ops) on test [0] fetch queue (qlen 0, v4, last_offset 10517, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Fetch topic test [0] at offset 10518 (v4)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Fetch 1/1/1 toppar(s)
{ severity: 7, fac: 'SEND' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Sent FetchRequest (v11, 94 bytes @ 0, CorrId 9)
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'CLOSE' } [thrd:app]: Closing consumer
{ severity: 7, fac: 'CLOSE' } [thrd:app]: Waiting for close events
{ severity: 7, fac: 'CGRPOP' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c" received op TERMINATE in state up (join-state steady)
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Terminating group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c" in state up with 1 partition(s)
{ severity: 7, fac: 'UNSUBSCRIBE' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": unsubscribe from current subscription of size 1 (leave group=true, has joined=true, rdkafka-8a0e7a22-02bc-4e72-add5-07273cc1de53, join-state steady)
{ severity: 7, fac: 'SUBSCRIPTION' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": clearing subscribed topics list (1)
{ severity: 7, fac: 'SUBSCRIPTION' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": effective subscription list changed from 1 to 0 topic(s):
{ severity: 7, fac: 'GRPLEADER' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": resetting group leader info: unsubscribe
{ severity: 7, fac: 'REBALANCE' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c" initiating rebalance (EAGER) in state up (join-state steady) with 1 assigned partition(s): unsubscribe
{ severity: 7, fac: 'CGRPJOINSTATE' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c" changed join state steady -> wait-unassign-call (state up)
{ severity: 7, fac: 'CLEARASSIGN' } [thrd:main]: Clearing current assignment of 1 partition(s)
{ severity: 7, fac: 'CGRPJOINSTATE' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
{ severity: 7, fac: 'DUMP' } [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
{ severity: 7, fac: 'DUMP_ALL' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_PND' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_QRY' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]: List with 1 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]:  test [0] offset STORED
{ severity: 7, fac: 'BARRIER' } [thrd:main]: test [0]: rd_kafka_toppar_op_fetch_stop:2363: new version barrier v5
{ severity: 7, fac: 'CONSUMER' } [thrd:main]: Stop consuming test [0] (v5)
{ severity: 7, fac: 'BARRIER' } [thrd:main]: test [0]: rd_kafka_toppar_op_pause_resume:2424: new version barrier v6
{ severity: 7, fac: 'RESUME' } [thrd:main]: Resume test [0] (v6)
{ severity: 7, fac: 'DESP' } [thrd:main]: Removing (un)desired topic test [0]
{ severity: 7, fac: 'REMOVE' } [thrd:main]: Removing test [0] from assignment (started=true, pending=false, queried=false, stored offset=10518)
{ severity: 7, fac: 'REMOVE' } [thrd:main]: Served 1 removed partition(s), with 1 offset(s) to commit
{ severity: 7, fac: 'ASSIGNMENT' } [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 0 offset commits in progress
{ severity: 7, fac: 'ASSIGNMENT' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": clearing group assignment
{ severity: 7, fac: 'CLEARASSIGN' } [thrd:main]: No current assignment to clear
{ severity: 7, fac: 'DUMP' } [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=1)
{ severity: 7, fac: 'DUMP_ALL' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_PND' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_QRY' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'ASSIGNMENT' } [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 0 offset commits in progress
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": waiting for 1 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": waiting for 1 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'OP' } [thrd:main]: test [0] received op FETCH_STOP (v5) in fetch-state active (opv4)
{ severity: 7, fac: 'FETCH' } [thrd:main]: Stopping fetch for test [0] in state active (v5)
{ severity: 7, fac: 'PARTSTATE' } [thrd:main]: Partition test [0] changed fetch state active -> stopping
{ severity: 7, fac: 'STORETERM' } [thrd:main]: test [0]: offset store terminating
{ severity: 7, fac: 'PARTSTATE' } [thrd:main]: Partition test [0] changed fetch state stopping -> stopped
{ severity: 7, fac: 'OP' } [thrd:main]: test [0] received op PAUSE (v6) in fetch-state stopped (opv5)
{ severity: 7, fac: 'RESUME' } [thrd:main]: Not resuming test [0]: partition is not paused by library
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": waiting for 1 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'CGRPOP' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for test [0]
{ severity: 7, fac: 'PARTDEL' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": delete test [0]
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": waiting for 0 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'STOPSERVE' } [thrd:main]: All partitions awaiting stop are now stopped: serving assignment
{ severity: 7, fac: 'DUMP' } [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
{ severity: 7, fac: 'DUMP_ALL' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_PND' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_QRY' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'ASSIGNDONE' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": assignment operations done in join-state wait-unassign-to-complete (rebalance rejoin=false)
{ severity: 7, fac: 'UNASSIGN' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": unassign done in state up (join-state wait-unassign-to-complete)
{ severity: 7, fac: 'MEMBERID' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": updating member id "rdkafka-8a0e7a22-02bc-4e72-add5-07273cc1de53" -> ""
{ severity: 7, fac: 'LEAVE' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": leave (in state up)
{ severity: 7, fac: 'LEAVE' } [thrd:main]: localhost:9092/1: Leaving group
{ severity: 7, fac: 'NOREJOIN' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": Not rejoining group without an assignment: Unassignment done: no subscribed topics
{ severity: 7, fac: 'CGRPJOINSTATE' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c" changed join state wait-unassign-to-complete -> init (state up)
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-2e4fe3f045b1a61d486a77c30ea383641134240c": waiting for 0 toppar(s), 0 commit(s), wait-leave, (state up, join-state init) before terminating
{ severity: 7, fac: 'SEND' } [thrd:GroupCoordinator]: GroupCoordinator/1: Sent LeaveGroupRequest (v1, 125 bytes @ 0, CorrId 9)
{ severity: 7, fac: 'RECV' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Received FetchResponse (v11, 66 bytes, CorrId 9, rtt 1001.99ms)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0] MessageSet size 0, error "Success", MaxOffset 10518, LSO 10518, Ver 4/4
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0] in state stopped at offset 10517 (2/100000 msgs, 0/65536 kb queued, opv 4) is not fetchable: not in active fetch state
{ severity: 7, fac: 'FETCHADD' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Removed test [0] from fetch list (0 entries, opv 4): not in active fetch state
ok 13 no error after disconnecting
{ severity: 7, fac: 'WAKEUP' } [thrd:app]: localhost:9092/1: Wake-up: flushing
{ severity: 7, fac: 'WAKEUP' } [thrd:app]: Wake-up sent to 1 broker thread in state >= UP: flushing
{ severity: 7, fac: 'DESTROY' } [thrd:app]: Terminating instance (destroy flags none (0x0))
{ severity: 7, fac: 'TERMINATE' } [thrd:app]: Interrupting timers
{ severity: 7, fac: 'TERMINATE' } [thrd:app]: Sending TERMINATE to internal main thread
{ severity: 7, fac: 'TERMINATE' } [thrd:app]: Joining internal main thread
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Internal main thread terminating
{ severity: 7, fac: 'DESTROY' } [thrd:main]: Destroy internal
{ severity: 7, fac: 'BROADCAST' } [thrd:main]: Broadcasting state change
{ severity: 7, fac: 'DESTROY' } [thrd:main]: Removing all topics
{ severity: 7, fac: 'PARTCNT' } [thrd:main]: Topic test partition count changed from 1 to 0
{ severity: 7, fac: 'REMOVE' } [thrd:main]: test [0] no longer reported in metadata
{ severity: 7, fac: 'BRKMIGR' } [thrd:main]: test [0] 0x104834600 sending final LEAVE for removal by localhost:9092/1
{ severity: 7, fac: 'TOPPARREMOVE' } [thrd:main]: Removing toppar test [-1] 0x10482f400
{ severity: 7, fac: 'DESTROY' } [thrd:main]: test [-1]: 0x10482f400 DESTROY_FINAL
{ severity: 7, fac: 'DESTROY' } [thrd:main]: Sending TERMINATE to localhost:9092/1
{ severity: 7, fac: 'TOPBRK' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x104834600)
{ severity: 7, fac: 'FETCHADD' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Removed test [0] from active list (0 entries, opv 0): leaving
{ severity: 7, fac: 'TOPBRK' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0]: no next broker, failing 0 message(s) in partition queue
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'TOPPARREMOVE' } [thrd:localhost:9092/bootstrap]: Removing toppar test [0] 0x104834600
{ severity: 7, fac: 'DESTROY' } [thrd:localhost:9092/bootstrap]: test [0]: 0x104834600 DESTROY_FINAL
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Purging reply queue
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Decommissioning internal broker
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Join 2 broker thread(s)
{ severity: 7, fac: 'TERM' } [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
{ severity: 7, fac: 'TERM' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Received TERMINATE op in state UP: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
{ severity: 7, fac: 'FAIL' } [thrd::0/internal]: :0/internal: Client is terminating (after 3051ms in state INIT) (_DESTROY)
{ severity: 7, fac: 'STATE' } [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
{ severity: 7, fac: 'BROADCAST' } [thrd::0/internal]: Broadcasting state change
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
{ severity: 7, fac: 'BRKTERM' } [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
{ severity: 7, fac: 'TERMINATE' } [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x1058143a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'FAIL' } [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
{ severity: 7, fac: 'FAIL' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Client is terminating (after 3045ms in state UP) (_DESTROY)
{ severity: 7, fac: 'STATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Broker changed state UP -> DOWN
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Updating 0 buffers on connection reset
{ severity: 7, fac: 'BRKTERM' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state DOWN: 1 refcnts (0x1058151a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'FAIL' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Updating 0 buffers on connection reset
ok 14 no error after disconnecting

Debug-Logs in the non-working case

the callback of the close fn doesn't get called. no ok 13 no error after disconnecting

ok 12 invalid message header
{ severity: 7, fac: 'CONSUME' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Enqueue 1 message(s) (4099 bytes, 1 ops) on test [0] fetch queue (qlen 0, v4, last_offset 10515, 0 ctrl msgs, 0 aborted msgsets, uncompressed)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Fetch topic test [0] at offset 10516 (v4)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Fetch 1/1/1 toppar(s)
{ severity: 7, fac: 'SEND' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Sent FetchRequest (v11, 94 bytes @ 0, CorrId 9)
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'RECV' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Received FetchResponse (v11, 66 bytes, CorrId 9, rtt 1001.12ms)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0] MessageSet size 0, error "Success", MaxOffset 10516, LSO 10516, Ver 4/4
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Fetch topic test [0] at offset 10516 (v4)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Fetch 1/1/1 toppar(s)
{ severity: 7, fac: 'CLOSE' } [thrd:app]: Closing consumer
{ severity: 7, fac: 'CLOSE' } [thrd:app]: Waiting for close events
{ severity: 7, fac: 'SEND' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Sent FetchRequest (v11, 94 bytes @ 0, CorrId 10)
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'CGRPOP' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" received op TERMINATE in state up (join-state steady)
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Terminating group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" in state up with 1 partition(s)
{ severity: 7, fac: 'UNSUBSCRIBE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": unsubscribe from current subscription of size 1 (leave group=true, has joined=true, rdkafka-9eb52c4d-ab40-4cbb-9408-0a7af1179a82, join-state steady)
{ severity: 7, fac: 'SUBSCRIPTION' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": clearing subscribed topics list (1)
{ severity: 7, fac: 'SUBSCRIPTION' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": effective subscription list changed from 1 to 0 topic(s):
{ severity: 7, fac: 'GRPLEADER' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": resetting group leader info: unsubscribe
{ severity: 7, fac: 'REBALANCE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" initiating rebalance (EAGER) in state up (join-state steady) with 1 assigned partition(s): unsubscribe
{ severity: 7, fac: 'CGRPJOINSTATE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" changed join state steady -> wait-unassign-call (state up)
{ severity: 7, fac: 'CLEARASSIGN' } [thrd:main]: Clearing current assignment of 1 partition(s)
{ severity: 7, fac: 'CGRPJOINSTATE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
{ severity: 7, fac: 'DUMP' } [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
{ severity: 7, fac: 'DUMP_ALL' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_PND' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_QRY' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]: List with 1 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]:  test [0] offset STORED
{ severity: 7, fac: 'BARRIER' } [thrd:main]: test [0]: rd_kafka_toppar_op_fetch_stop:2363: new version barrier v5
{ severity: 7, fac: 'CONSUMER' } [thrd:main]: Stop consuming test [0] (v5)
{ severity: 7, fac: 'BARRIER' } [thrd:main]: test [0]: rd_kafka_toppar_op_pause_resume:2424: new version barrier v6
{ severity: 7, fac: 'RESUME' } [thrd:main]: Resume test [0] (v6)
{ severity: 7, fac: 'DESP' } [thrd:main]: Removing (un)desired topic test [0]
{ severity: 7, fac: 'REMOVE' } [thrd:main]: Removing test [0] from assignment (started=true, pending=false, queried=false, stored offset=10516)
{ severity: 7, fac: 'REMOVE' } [thrd:main]: Served 1 removed partition(s), with 1 offset(s) to commit
{ severity: 7, fac: 'ASSIGNMENT' } [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 0 offset commits in progress
{ severity: 7, fac: 'ASSIGNMENT' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": clearing group assignment
{ severity: 7, fac: 'CLEARASSIGN' } [thrd:main]: No current assignment to clear
{ severity: 7, fac: 'DUMP' } [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=1)
{ severity: 7, fac: 'DUMP_ALL' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_PND' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_QRY' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'ASSIGNMENT' } [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 0 offset commits in progress
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": waiting for 1 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": waiting for 1 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'OP' } [thrd:main]: test [0] received op FETCH_STOP (v5) in fetch-state active (opv4)
{ severity: 7, fac: 'FETCH' } [thrd:main]: Stopping fetch for test [0] in state active (v5)
{ severity: 7, fac: 'PARTSTATE' } [thrd:main]: Partition test [0] changed fetch state active -> stopping
{ severity: 7, fac: 'STORETERM' } [thrd:main]: test [0]: offset store terminating
{ severity: 7, fac: 'PARTSTATE' } [thrd:main]: Partition test [0] changed fetch state stopping -> stopped
{ severity: 7, fac: 'OP' } [thrd:main]: test [0] received op PAUSE (v6) in fetch-state stopped (opv5)
{ severity: 7, fac: 'RESUME' } [thrd:main]: Not resuming test [0]: partition is not paused by library
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": waiting for 1 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'CGRPOP' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for test [0]
{ severity: 7, fac: 'PARTDEL' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": delete test [0]
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": waiting for 0 toppar(s), assignment in progress, 0 commit(s) (state up, join-state wait-unassign-to-complete) before terminating
{ severity: 7, fac: 'STOPSERVE' } [thrd:main]: All partitions awaiting stop are now stopped: serving assignment
{ severity: 7, fac: 'DUMP' } [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
{ severity: 7, fac: 'DUMP_ALL' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_PND' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_QRY' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'DUMP_REM' } [thrd:main]: List with 0 partition(s):
{ severity: 7, fac: 'ASSIGNDONE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": assignment operations done in join-state wait-unassign-to-complete (rebalance rejoin=false)
{ severity: 7, fac: 'UNASSIGN' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": unassign done in state up (join-state wait-unassign-to-complete)
{ severity: 7, fac: 'MEMBERID' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": updating member id "rdkafka-9eb52c4d-ab40-4cbb-9408-0a7af1179a82" -> ""
{ severity: 7, fac: 'LEAVE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": leave (in state up)
{ severity: 7, fac: 'LEAVE' } [thrd:main]: localhost:9092/1: Leaving group
{ severity: 7, fac: 'NOREJOIN' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": Not rejoining group without an assignment: Unassignment done: no subscribed topics
{ severity: 7, fac: 'CGRPJOINSTATE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" changed join state wait-unassign-to-complete -> init (state up)
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": waiting for 0 toppar(s), 0 commit(s), wait-leave, (state up, join-state init) before terminating
{ severity: 7, fac: 'SEND' } [thrd:GroupCoordinator]: GroupCoordinator/1: Sent LeaveGroupRequest (v1, 125 bytes @ 0, CorrId 9)
{ severity: 7, fac: 'RECV' } [thrd:GroupCoordinator]: GroupCoordinator/1: Received LeaveGroupResponse (v1, 6 bytes, CorrId 9, rtt 10.89ms)
{ severity: 7, fac: 'LEAVEGROUP' } [thrd:main]: LeaveGroup response received in state up
{ severity: 7, fac: 'CGRPSTATE' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" changed state up -> term (join-state init)
{ severity: 7, fac: 'BROADCAST' } [thrd:main]: Broadcasting state change
{ severity: 7, fac: 'ASSIGNMENT' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03": clearing group assignment
{ severity: 7, fac: 'COORDCLEAR' } [thrd:main]: Group "kafka-mocha-grp-3c28dccd1edc8f45b58c2bbcd38f624d7b2eac03" broker localhost:9092/1 is no longer coordinator
{ severity: 7, fac: 'NODENAME' } [thrd:main]: GroupCoordinator/1: Broker nodename changed from "localhost:9092" to ""
{ severity: 7, fac: 'NODEID' } [thrd:main]: GroupCoordinator/1: Broker nodeid changed from 1 to -1
{ severity: 7, fac: 'CGRPTERM' } [thrd:main]: Consumer group sub-system terminated (will enqueue reply)
{ severity: 7, fac: 'CLOSE' } [thrd:app]: Consumer closed
{ severity: 7, fac: 'DESTROY' } [thrd:app]: Terminating instance (destroy flags NoConsumerClose (0x8))
{ severity: 7, fac: 'TERMINATE' } [thrd:app]: Terminating consumer group handler
{ severity: 7, fac: 'FAIL' } [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 3052ms in state UP) (_TRANSPORT)
{ severity: 7, fac: 'STATE' } [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state UP -> DOWN
{ severity: 7, fac: 'BROADCAST' } [thrd:GroupCoordinator]: Broadcasting state change
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
{ severity: 7, fac: 'BRKTERM' } [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 4 refcnt(s), 0 buffer(s), 0 partition(s)
{ severity: 7, fac: 'METADATA' } [thrd:GroupCoordinator]: Requesting metadata for 1/1 topics: broker down
{ severity: 7, fac: 'METADATA' } [thrd:GroupCoordinator]: localhost:9092/1: Request metadata for 1 topic(s): broker down
{ severity: 7, fac: 'TERMINATE' } [thrd:GroupCoordinator]: GroupCoordinator: Handle is terminating in state DOWN: 4 refcnts (0x105051da0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'STATE' } [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
{ severity: 7, fac: 'BROADCAST' } [thrd:GroupCoordinator]: Broadcasting state change
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Internal main thread terminating
{ severity: 7, fac: 'DESTROY' } [thrd:main]: Destroy internal
{ severity: 7, fac: 'BROADCAST' } [thrd:main]: Broadcasting state change
{ severity: 7, fac: 'TERMINATE' } [thrd:app]: Interrupting timers
{ severity: 7, fac: 'TERMINATE' } [thrd:app]: Sending TERMINATE to internal main thread
{ severity: 7, fac: 'TERMINATE' } [thrd:app]: Joining internal main thread
{ severity: 7, fac: 'DESTROY' } [thrd:main]: Removing all topics
{ severity: 7, fac: 'PARTCNT' } [thrd:main]: Topic test partition count changed from 1 to 0
{ severity: 7, fac: 'REMOVE' } [thrd:main]: test [0] no longer reported in metadata
{ severity: 7, fac: 'BRKMIGR' } [thrd:main]: test [0] 0x10404c600 sending final LEAVE for removal by localhost:9092/1
{ severity: 7, fac: 'TOPPARREMOVE' } [thrd:main]: Removing toppar test [-1] 0x104018000
{ severity: 7, fac: 'DESTROY' } [thrd:main]: test [-1]: 0x104018000 DESTROY_FINAL
{ severity: 7, fac: 'DESTROY' } [thrd:main]: Sending TERMINATE to localhost:9092/1
{ severity: 7, fac: 'DESTROY' } [thrd:main]: Sending TERMINATE to GroupCoordinator
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Purging reply queue
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Decommissioning internal broker
{ severity: 7, fac: 'TERMINATE' } [thrd:main]: Join 3 broker thread(s)
{ severity: 7, fac: 'FETCH' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0] in state stopped at offset 10515 (0/100000 msgs, 0/65536 kb queued, opv 4) is not fetchable: forced removal
{ severity: 7, fac: 'FETCHADD' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Removed test [0] from fetch list (0 entries, opv 4): forced removal
{ severity: 7, fac: 'TOPBRK' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x10404c600)
{ severity: 7, fac: 'TOPBRK' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Topic test [0]: no next broker, failing 0 message(s) in partition queue
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'TERM' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Received TERMINATE op in state UP: 5 refcnts, 0 toppar(s), 0 active toppar(s), 1 outbufs, 1 waitresps, 0 retrybufs
{ severity: 7, fac: 'TERM' } [thrd:GroupCoordinator]: GroupCoordinator: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
{ severity: 7, fac: 'FAIL' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Client is terminating (after 3058ms in state UP) (_DESTROY)
{ severity: 7, fac: 'STATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Broker changed state UP -> DOWN
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Purging bufq with 1 buffers
{ severity: 7, fac: 'FAIL' } [thrd:GroupCoordinator]: GroupCoordinator: Client is terminating (after 0ms in state INIT) (_DESTROY)
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Purging bufq with 1 buffers
{ severity: 7, fac: 'STATE' } [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> DOWN
{ severity: 7, fac: 'BUFQ' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Updating 0 buffers on connection reset
{ severity: 7, fac: 'BROADCAST' } [thrd:GroupCoordinator]: Broadcasting state change
{ severity: 7, fac: 'BRKTERM' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: terminating: broker still has 3 refcnt(s), 0 buffer(s), 0 partition(s)
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state DOWN: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'STATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Broker changed state DOWN -> INIT
{ severity: 7, fac: 'BROADCAST' } [thrd:localhost:9092/bootstrap]: Broadcasting state change
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
{ severity: 7, fac: 'BRKTERM' } [thrd:GroupCoordinator]: GroupCoordinator: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
{ severity: 7, fac: 'TERMINATE' } [thrd:GroupCoordinator]: GroupCoordinator: Handle is terminating in state DOWN: 1 refcnts (0x105051da0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'FAIL' } [thrd:GroupCoordinator]: GroupCoordinator: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
{ severity: 7, fac: 'TERM' } [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
{ severity: 7, fac: 'FAIL' } [thrd::0/internal]: :0/internal: Client is terminating (after 3064ms in state INIT) (_DESTROY)
{ severity: 7, fac: 'STATE' } [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
{ severity: 7, fac: 'BROADCAST' } [thrd::0/internal]: Broadcasting state change
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
{ severity: 7, fac: 'BRKTERM' } [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
{ severity: 7, fac: 'TERMINATE' } [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x105052ba0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'FAIL' } [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
{ severity: 7, fac: 'BUFQ' } [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
{ severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf

Additional Info

After having debugged for a while i've found that the issue occurs after this delete: https://github.com/Blizzard/node-rdkafka/blob/master/src/kafka-consumer.cc#L88. it then gets stuck in the loop in librdkafka.

if i remove the delete the test all work fine.

i cannot reproduce the issue on linux, only on macos - monterey 12.3.1 (21E258)

the kafka broker logs looked fine - this is not kafka related.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • [x] librdkafka version (release number or git tag): v1.9.1
  • [x] Apache Kafka version: [<REPLACE with e.g., 0.10.2.3>](https://hub.docker.com/layers/cp-kafka/confluentinc/cp-kafka/latest/images/sha256-6a9da0b9cbba850cf95b58b6d958518b3b5fbe788c262c7a1763acf7faf2af79?context=explore)
  • [x] Operating system: macos - monterey 12.3.1 (21E258)
  • [x] Provide logs (with debug=.. as necessary) from librdkafka
  • [x] Provide broker log excerpts -
  • [x] Critical issue tbd

robinfehr avatar Jul 10 '22 21:07 robinfehr

@robinfehr I don't think this is only an issue with linux as it also fails and gets stuck when running in k8s which uses a linux container. Its strange that the tests only fail on mac. I'm also using a mac and tested with my app running in local k8s cluster on my mac and also a dev k8s cluster running on a linux machine. Both resulted in a stuck producer state.

o2themar avatar Jul 14 '22 20:07 o2themar

@robinfehr I've been trying to test the code you provided and it seems to be getting stuck at "producing now test key" which is line 130. I'm running the code against a local kafka broker running in docker.

o2themar avatar Jul 18 '22 19:07 o2themar

@o2themar that is weird since that works for me, only gets into the loop when I try to disconnect - just to make sure, do you have the topic created?

robinfehr avatar Jul 18 '22 21:07 robinfehr

@robinfehr Yes I have created the topic 'test' and even verified it was there.

o2themar avatar Jul 19 '22 10:07 o2themar

Ok that was strange when I restarted the container after creating the topic and reran the tests it started working. For some reason it was not finding the topic and the reboot of the container got it working.

o2themar avatar Jul 19 '22 11:07 o2themar

@robinfehr So I was able to replicate the issue. I had to run the script in a loop in order to get it to fail and get stuck. It takes less than 20 random runs to get it in the state. It wasn't failing when I was manually running it. Sometimes it passes all 20 runs. In the case it fails I narrowed it down to rd_kafka_toppar_destroy_final not getting called at the end the second time to remove the topic from the host address. Before that is rd_kafka_toppar_destroy. I don't know the reason why it wouldn't get called since I'm not too familiar with c but it could have something to do with the lock mechanism that happens if I had to guess is getting missed and running into a race condition. The success conditions always call TOPPARREMOVE and DESTROY near the end twice but the failure situation only does it once.

Success has these two while failure doesn't: { severity: 7, fac: 'TOPPARREMOVE' } [thrd:localhost:9092/bootstrap]: Removing toppar test [0] 0x10587ec00 { severity: 7, fac: 'DESTROY' } [thrd:localhost:9092/bootstrap]: test [0]: 0x10587ec00 DESTROY_FINAL

The above is different from the first call which does this: { severity: 7, fac: 'TOPPARREMOVE' } [thrd:main]: Removing toppar test [-1] 0x10504f000 { severity: 7, fac: 'DESTROY' } [thrd:main]: test [-1]: 0x10504f000 DESTROY_FINAL

o2themar avatar Jul 19 '22 16:07 o2themar

Some more findings. I found the loop where it was printing the logs. It seems to be stuck in this loop and it cannot get out.

o2themar avatar Jul 19 '22 20:07 o2themar

Some more findings. I found the loop where it was printing the logs. It seems to be stuck in this loop and it cannot get out.

exactly that was the loop was referring to in the additional info section above 👍 are you also on a mac or were you able to reproduce it on another OS?

robinfehr avatar Jul 20 '22 12:07 robinfehr

Some more findings. I found the loop where it was printing the logs. It seems to be stuck in this loop and it cannot get out.

exactly that was the loop was referring to in the additional info section above 👍 are you also on a mac or were you able to reproduce it on another OS?

Yes I am on a mac and I was able to reproduce your issue. I noticed when I added some debug lines it got harder to reproduce it. I'm wondering if its a timing issue that is causing it to get in the stuck state?

o2themar avatar Jul 20 '22 14:07 o2themar

The repeating { severity: 7, fac: 'TERMINATE' } [thrd:localhost:9092/bootstrap]: localhost:9092/1: Handle is terminating in state INIT: 2 refcnts (0x1048617a0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf lines indicate that there are still references to that broker.

When this happens it is typically because the application/bindings have not followed the termination sequence and is still holding some object, it could be an rd_kafka_message_t, rd_kafka_topic_t, rd_kafka_topic_partition_list_t, etc. See https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination

edenhill avatar Nov 01 '22 14:11 edenhill

What I've seen during the debugging session is that even though there should be a forwading here: image the RD_KAFKA_OP_TERMINATE never appears in rkq during the polling. in rd_kafka_consumer_close()

while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {

it seems to be added to rkcg->rkcg_ops (not sure if those should be forwarded, I don't yet have enough knowledge of librdkafka)

image

I'll try again to see if there are any outstanding objects.

robinfehr avatar Nov 01 '22 15:11 robinfehr

while this hasn't been fixed in node-rdkafka yet for the standard consumer, it is related to the reblance_cb which gets invoked after the disconnect is initiated. if this rebalance that revokes the partitions doesn't get processed properly, the disconnect will hang infidelity. @edenhill do you think I should create a PR here to improve the docs that this REBALANCE must be processed or is that even a bug that this doesn't time out? (I would have to read the specs)

Closing this ticket for now since it is solved for my case and nothing has to be changed in librdkafka. A fix for node-rdkafka will follow.

robinfehr avatar Nov 09 '22 21:11 robinfehr