pulsar-client-go
pulsar-client-go copied to clipboard
Expose GetHashingFunction() method on producer
Motivation
Expose GetHashingFunction() so that the user could construct the hashFunc to call NewDefaultRouter.
Modifications
Expose GetHashingFunction() and added unit tests.
Verifying this change
- [ ] Make sure that the change passes the CI checks.
(Please pick either of the following options)
This change added tests and can be verified as follows:
- Added unit tests in producer_test.go
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
In the MessageRouter, you're getting a TopicMetadata object which will tell you the number of partitions: https://github.com/apache/pulsar-client-go/blob/f17deac7ac47919aa9e46d11d33638672a24d126/pulsar/producer.go#L57
Wouldn't this already solve the problem?
In the
MessageRouter, you're getting aTopicMetadataobject which will tell you the number of partitions:https://github.com/apache/pulsar-client-go/blob/f17deac7ac47919aa9e46d11d33638672a24d126/pulsar/producer.go#L57
Wouldn't this already solve the problem?
IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
I probably shouldn't paste the link here but DSP-28866 is the jira ticket that this change is made for.
I have updated the pull request to also expose the GetHashingFunction() method to make it possible for the user to call NewDefaultRouter.
The idea about how we plan to make use of the changes can be found at https://cd.splunkdev.com/data-availability/s2s-svc/-/merge_requests/937/diffs
IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
I'm not convinced exposing NumPartitions() would help here. All the custom logic should be possible to implement within a custom message router.
In any case, there's already a way to get the partitions for a topic, by using Client.TopicPartitions()
IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
I'm not convinced exposing
NumPartitions()would help here. All the custom logic should be possible to implement within a custom message router.In any case, there's already a way to get the partitions for a topic, by using
Client.TopicPartitions()
ACK, I didn't realize there is a method already. That should work. I removed changes related to NumPartitions()
The point is wrong, can MessageRouter not meet our functions?
e.g:
producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-partitioned-topic",
MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
return 2
},
})
The point is wrong, can MessageRouter not meet our functions?
e.g:
producer, err := client.CreateProducer(ProducerOptions{ Topic: "my-partitioned-topic", MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int { fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions()) return 2 }, })
Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.
internalRouter := pulsarClient.NewDefaultRouter(
getHashingFunction(producerOptions.HashingScheme),
producerOptions.BatchingMaxMessages,
producerOptions.BatchingMaxSize,
producerOptions.BatchingMaxPublishDelay,
producerOptions.DisableBatching)
messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int {
// customNumberOfPartitions could be different from metadata.NumPartitions() here
return internalRouter(message, customNumberOfPartitions)
}
The problem is that getHashingFunction is not exposed so we can't use it to feed the NewDefaultRouter method.
The point is wrong, can MessageRouter not meet our functions? e.g:
producer, err := client.CreateProducer(ProducerOptions{ Topic: "my-partitioned-topic", MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int { fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions()) return 2 }, })Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.
internalRouter := pulsarClient.NewDefaultRouter( getHashingFunction(producerOptions.HashingScheme), producerOptions.BatchingMaxMessages, producerOptions.BatchingMaxSize, producerOptions.BatchingMaxPublishDelay, producerOptions.DisableBatching) messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int { // customNumberOfPartitions could be different from metadata.NumPartitions() here return internalRouter(message, customNumberOfPartitions) }The problem is that
getHashingFunctionis not exposed so we can't use it to feed theNewDefaultRoutermethod.
Sorry for the late reply. @hunter2046 Regarding all the options in NewDefaultRouter, we expose them in the form of parameters. You can set these options in producerOptions, and NewDefaultRouter is also the specific value obtained from these options.
The point is wrong, can MessageRouter not meet our functions? e.g:
producer, err := client.CreateProducer(ProducerOptions{ Topic: "my-partitioned-topic", MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int { fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions()) return 2 }, })Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.
internalRouter := pulsarClient.NewDefaultRouter( getHashingFunction(producerOptions.HashingScheme), producerOptions.BatchingMaxMessages, producerOptions.BatchingMaxSize, producerOptions.BatchingMaxPublishDelay, producerOptions.DisableBatching) messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int { // customNumberOfPartitions could be different from metadata.NumPartitions() here return internalRouter(message, customNumberOfPartitions) }The problem is that
getHashingFunctionis not exposed so we can't use it to feed theNewDefaultRoutermethod.Sorry for the late reply. @hunter2046 Regarding all the options in NewDefaultRouter, we expose them in the form of parameters. You can set these options in producerOptions, and NewDefaultRouter is also the specific value obtained from these options.
Thanks. I understand that those options including MessageRouter are in producerOptions. My use case (as mentioned in https://github.com/apache/pulsar-client-go/pull/507#issuecomment-822802546) is that I need to set the MessageRouter in a customized way that is close to what the internalRouter does except using a different number of partitions to avoid causing producers to send messages to a different paritition when the number of partitions is being increased.
In order to do that, I need to initialize the internalRouter by my own by calling NewDefaultRouter. To call NewDefaultRouter, I could supply my own hashing function for sure. However, because I want to mimic what the internalRouter does, I would like using the same hashing function. That is why I want to see if getHashingFunction can be exposed so I could achieve the above thing. Hope that this makes sense.