kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Add manual assignment of TopicPartitions to Consumer

Open JaapRood opened this issue 5 years ago • 26 comments

In using Kafka for stream processing it's common practice to have to manually assign topic-partitions for consumption, rather than subscribing and receiving an assignment through the group.

For example, when processing messages from a partition the consumer subscribes to, you might have to keep some state. To make sure this state is durable, it's often replicated to a changelog, also implemented by a Kafka topic. That way, when the consumer is designed another partition or crashes, the state can be restored. To make sure the right processing state is restored before processing continues, the changelog is written to a topic with the same partition for which we're processing, a process called copartitioning. By simply consuming the state-replication-topic from the same partition for which you're processing messages, you're guaranteed to restore the right state.

In the above example, subscribing to a ConsumerGroup won't work: you've already been assigned a partition for the input topic and you need exactly that same partition number. There's other examples, too, like replicating a changelog as a local Cache on every node, ready to be queried through HTTP. In that case, you want to consume all partitions, rather than just be assigned a couple.

The documentation of the official Java client describes it as well:

In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a fair share of the partitions for those topics based on the active consumers in the group. However, in some cases you may need finer control over the specific partitions that are assigned. For example:

  • If the process is maintaining some kind of local state associated with that partition (like a local on-disk key-value store), then it should only get records for the partition it is maintaining on disk.
  • If the process itself is highly available and will be restarted if it fails (perhaps using a cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process will be restarted on another machine.

Proposed solution

Like the Java KafkaConsumer does, allow to call consumer.assign instead of consumer.subscribe with an array of topic partitions.

To make this practical, implementing consumer.assignments() might be necessary too, returning the list of topic partitions assigned to the consumer (through either subscription or manual assignment).

JaapRood avatar May 29 '19 13:05 JaapRood

I'm pretty happy to take this one on, but won't be able to start for a while. We're still right in the middle of our switch to using KafkaJS for all our stream processing and don't need manual assignment until we get to the more complicated tasks (where, as described above, we use Kafka for the changelog of stateful processing).

JaapRood avatar May 29 '19 13:05 JaapRood

Nice, let's spec this change and have a project to track the progress.

tulios avatar Jun 04 '19 21:06 tulios

This is something i will be interested in too, im working at similar project described by the @JaapRood ,and i see a lot of use cases for implementing stream processing, ktable or http < kafka proxy
I would love to use kafkajs on my topic ui / browser project but for that i would need fine control of what is assigned to the consumer , right now im forced to use alternative lib ;/

patrykwegrzyn avatar Jun 11 '19 19:06 patrykwegrzyn

Hello guys! Is that in progress?

AlexKonstantinov1991 avatar Nov 05 '19 11:11 AlexKonstantinov1991

Forces beyond my control (client work) keep pushing this forward on my schedule, so I thought I'd at least share the approach @tulios has helped to outline.

To allow us to ship initial support within a reasonable time frame, the biggest constraint we should add is that assignments have to be made before consumer.run and can't be change while running.

The API would look as follows:

const consumer = kafka.consumer({ groupId: 'my-group' })
await consumer.connect()
await consumer.assign({
  topic: 'test-topic',
  partitions: [
    { partition: 0, offset: '30' },
    { partition: 1, offset: '8' },
  ]
})

await consumer.run({ /* define eachMessage or eachBatch */ })

For the use cases I've described in this issue and have seen other describing, being able to change assignments while a Consumer is running is definitely a major convenience, but the assumption assignments can't change are throughout Consumer, Runner and ConsumerGroup. Several major changes would be required to make that work. However, if we assume assignments are static while the consumer runs, a couple of changes should do it:

  • Make SubscriptionState aware of the difference between manually assigned and subscribed to topics, exposing both sets through through a method (like subscriptionState.paused())
  • Create the ConsumerGroup with the manually assigned topics.
  • Only let consumerGroup.sync accept assignment for topics which we subscribed.
  • Prevent Runner from joining and syncing the consumerGroup when there's no subscribed topics. Since all rebalance code is contained in the runner, fencing the join and sync operations should allow safe support for manual assignments.

There are plenty of details along the way that should be hashed out (for example what to do with the GROUP_JOIN instrumentation event), but I imagine more of that would become clear as we work towards a first implementation.

While assignments can't be changed while the consumer is running, nothing should prevent a user from stopping the consumer, changing assignments and starting again:

/* assume running consumer */

await consumer.stop()

await consumer.assign({
  topic: 'test-topic',
  partitions: [
    { partition: 1, offset: '8' },
    { partition: 4, offset: '299' },
  ]
})

await consumer.run({ /* define eachMessage or eachBatch */ })

JaapRood avatar Nov 20 '19 09:11 JaapRood

Any progress?

jaysequr avatar Dec 13 '19 12:12 jaysequr

Nope, you @jaysequr?

JaapRood avatar Dec 13 '19 13:12 JaapRood

I really like how this library is written in general. I think the API is very clean and straightforward so it'd be such a waste for this to be a dealbreaker for us. I'm happy to take this on, @JaapRood if you're backing out

anonimitoraf avatar Jun 01 '20 00:06 anonimitoraf

Go for it @anonimitoraf, with my new job I can't really justify putting the required time towards this. More than happy to assist anyone else attempting it, though!

JaapRood avatar Jun 01 '20 08:06 JaapRood

Hey @anonimitoraf (or anyone else), was there any progress made on this? Is there maybe an ETA for this feature?

sOfekS avatar Oct 20 '20 11:10 sOfekS

Hey, sorry didn't/haven't gotten around to doing this (simply because life got too busy). Do you want to give it a shot @sOfekS ?

anonimitoraf avatar Oct 26 '20 04:10 anonimitoraf

Just to clarify - will this new API (assign instead of subscribe) support creating consumers without specifying "group.id" in consumer constructor? I'd like prevent consumer group to be registered at all..

Question from this related issue: https://github.com/tulios/kafkajs/issues/853

pimpelsang avatar Nov 24 '20 10:11 pimpelsang

@pimpelsang that's right. Rather than relying on the ConsumerGroup mechanism to provide which partitions are being consumed, the Consumer would read from the topic+partition manually provided directly. Each Consumer configured in this way should run irrespective of others like it, so using a group would not make sense.

JaapRood avatar Nov 24 '20 11:11 JaapRood

@JaapRood hi it's been a while, any new updates?

shaniMaayan avatar Oct 17 '21 18:10 shaniMaayan

Nothing has changed since June 2020:

Go for it @anonimitoraf, with my new job I can't really justify putting the required time towards this. More than happy to assist anyone else attempting it, though!

JaapRood avatar Oct 19 '21 07:10 JaapRood

Anyone taking this on? I see we have partitionAssigners but the API seems very unclear, not sure if it's useful for this use case.

dwinrick-lever avatar Feb 16 '22 21:02 dwinrick-lever

No, partition assigners is the mechanism by which assignments are done within a consumer group. For this use case, there would be no consumer group.

Nevon avatar Feb 17 '22 07:02 Nevon

I think what was confusing for me about the API today was that memberAssignment is a Buffer, maybe it should be a more self documenting type alias like EncodedMemberAssignment, thoughts?

export type GroupMemberAssignment = { memberId: string; memberAssignment: Buffer }

Also it's confusing as to what userData is...

For those wondering, here's what my single-partition assigner looks like:

const SinglePartitionAssigner: PartitionAssigner = () => ({
            name: 'SinglePartitionAssigner',
            version: 1,
            async assign() {
                return [
                    {
                        memberId: 'what',
                        memberAssignment: AssignerProtocol.MemberAssignment.encode({
                            version: this.version,
                            assignment: {
                                [topic]: [partition]
                            },
                            userData: Buffer.from([]) // no idea what this is for
                        })
                    }
                ]
            },
            protocol({topics}) {
                return {
                    name: this.name,
                    metadata: AssignerProtocol.MemberMetadata.encode({
                        version: this.version,
                        topics,
                        userData: Buffer.from([]) // no idea what this is for
                    }),
                }
            }
        });

dwinrick-lever avatar Feb 17 '22 21:02 dwinrick-lever

The unsubscribe method would really be very useful for a number of use cases. Without it, you need to instantiate a new consumer and it's quite difficult to manage that.

paulovitorweb avatar Feb 24 '22 18:02 paulovitorweb

Hi everyone. Has there been any progress here? As it stands it is only possible to publish with a partition key, but on the consumption side you have to specify a partition number.

vertho avatar Apr 28 '22 07:04 vertho

Hi everyone. Has there been any progress here? As it stands it is only possible to publish with a partition key, but on the consumption side you have to specify a partition number.


For publishing, it is possible to publish with a partition key or partition number (refer to message structure):

If a partition is specified in the message, use it If no partition is specified but a key is present choose a partition based on a hash (murmur2) of the key If no partition or key is present choose a partition in a round-robin fashion


For consuming, at the moment, there is no way to specify a partition number. Currently, KafkaJS does not support manual assignment via assign (which is the rationale for this issue).

lamweili avatar May 08 '22 08:05 lamweili

Nothing has changed since June 2020, waiting for someone willing to invest the time to take this on:

[...] with my new job I can't really justify putting the required time towards this. More than happy to assist anyone else attempting it, though!

JaapRood avatar May 09 '22 10:05 JaapRood

In case it's of any use to others seeking to subscribe multiple group members to all partitions within a given topic, here's my solution using partitionAssigners which seems to work. Thanks @dwinrick-lever for the single-partition example above!

The partition assigner generated by the generateAllPartitionAssigner function (given your topic name) will get the metadata for your topic + pull all the partition IDs, then assign all group members to all partitions.

import { Kafka, PartitionAssigner, AssignerProtocol } from "kafkajs";

const generateAllPartitionAssigner =
  (topic: string): PartitionAssigner =>
  ({ cluster }) => ({
    name: "AllPartitionAssigner",
    version: 1,
    protocol({ topics }) {
      return {
        name: this.name,
        metadata: AssignerProtocol.MemberMetadata.encode({
          version: this.version,
          topics,
          userData: Buffer.from([]),
        }),
      };
    },
    assign: async ({ members }) => {
      await cluster.connect();
      await cluster.refreshMetadata();
      const partitionMetadata = cluster.findTopicPartitionMetadata(topic);
      const availablePartitions = partitionMetadata.map((pm) => pm.partitionId);

      return members.map((member) => ({
        memberId: member.memberId,
        memberAssignment: AssignerProtocol.MemberAssignment.encode({
          version: 1,
          assignment: { [topic]: availablePartitions },
          userData: Buffer.from([]),
        }),
      }));
    },
  });

const consumer = kafka.consumer({
  groupId: "my.super.special.group.name",
  partitionAssigners: [generateAllPartitionAssigner("YOUR.TOPIC.NAME.HERE")],
});

tbaggaley avatar Jun 14 '22 15:06 tbaggaley

Hey y'all, any update on this one? I'm writing a command line tool to consume messages from a given topic for analysis and automation purposes. This tool would only need to read messages coming in, and I want to avoid being assigned to a group and committing an offset so as not to interrupt the services consuming these messages.

Thank you!

etlr avatar Sep 16 '22 01:09 etlr

I think what was confusing for me about the API today was that memberAssignment is a Buffer, maybe it should be a more self documenting type alias like EncodedMemberAssignment, thoughts?

export type GroupMemberAssignment = { memberId: string; memberAssignment: Buffer }

Also it's confusing as to what userData is...

For those wondering, here's what my single-partition assigner looks like:

const SinglePartitionAssigner: PartitionAssigner = () => ({
            name: 'SinglePartitionAssigner',
            version: 1,
            async assign() {
                return [
                    {
                        memberId: 'what',
                        memberAssignment: AssignerProtocol.MemberAssignment.encode({
                            version: this.version,
                            assignment: {
                                [topic]: [partition]
                            },
                            userData: Buffer.from([]) // no idea what this is for
                        })
                    }
                ]
            },
            protocol({topics}) {
                return {
                    name: this.name,
                    metadata: AssignerProtocol.MemberMetadata.encode({
                        version: this.version,
                        topics,
                        userData: Buffer.from([]) // no idea what this is for
                    }),
                }
            }
        });

No Luck.

lazyboson avatar May 03 '23 17:05 lazyboson

Hello @lazyboson any progress in the issue? I'm really willing to take this forward. It would be great to have this functionality in one of my projects

Sigoloh avatar Dec 29 '23 13:12 Sigoloh