pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[improve][pip] PIP-359: Support custom message listener executor for specific subscription

Open AuroraTwinkle opened this issue 1 year ago • 28 comments

Motivation

PIP-359 Implementation PR:https://github.com/apache/pulsar/pull/22861

Modifications

Verifying this change

  • [x] Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • [ ] Dependencies (add or upgrade a dependency)
  • [ ] The public API
  • [ ] The schema
  • [ ] The default values of configurations
  • [ ] The threading model
  • [ ] The binary protocol
  • [ ] The REST endpoints
  • [ ] The admin CLI options
  • [ ] The metrics
  • [ ] Anything that affects deployment

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository:

AuroraTwinkle avatar Jun 13 '24 11:06 AuroraTwinkle

Why can't this logic be written by the person writing the listener? I think it adds extra complexity to an already complicated client

asafm avatar Jun 14 '24 17:06 asafm

Why can't this logic be written by the person writing the listener? I think it adds extra complexity to an already complicated client

Yeah, the solution you mentioned can also solve this problem. But on the contrary, I think providing a unified builder configuration would be easier to use.

AuroraTwinkle avatar Jun 14 '24 17:06 AuroraTwinkle

IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained. In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation

asafm avatar Jun 14 '24 21:06 asafm

IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained. In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation

You make a very valid point. We do indeed need a plugin library that allows contributors to customize Pulsar according to their own needs without introducing more complexity into Pulsar. Do you have the opportunity to promote this matter? If so, I think we can continue to push forward with this. And take this proposal as the first test case.

Otherwise, it might be better to first promote this optimization, so that those who need it can start using it.

I am very interested in the plugin library you mentioned, and perhaps later we can work together to promote its implementation in Pulsar. However, it would be unwise to block other optimizations from entering before that.

liangyepianzhou avatar Jun 15 '24 15:06 liangyepianzhou

IMO the surface area the client already exposes is way too big. It feels any person that wanted to scratch their own itch just added their feature without holding back. This needs to be maintained. In OTel they have a contrib repo where people can contribute their plugin . In your case it's contributing a listener implementation

Great idea! Are you interested in building and promoting a similar contrib repo for Pulsar? This PIP may be a good opportunity to do this, and we can build it together. @asafm @liangyepianzhou

AuroraTwinkle avatar Jun 15 '24 15:06 AuroraTwinkle

I'm currently working on my free time on pulsar as in my new company they don't even use pulsar. I can help review a pip to open such a repo. You can read OTel policy on how they handle it and use this as the basis for the contrib repo PIP. If you are also on free time we can try to work on this together but it'll be a lower pace

asafm avatar Jun 15 '24 19:06 asafm

We also participate in this work in our spare time, and progress is indeed slower. However, if we can achieve something significant together, it would be a very cool thing.

liangyepianzhou avatar Jun 16 '24 05:06 liangyepianzhou

@asafm Since we all agree that the plugin library is still a long way from being truly realized, it's best that we do not hinder the entry of other optimizations before that time. It would be best to let it continue along its original process, allowing those who need it to use this feature first.We can migrate some functionalities after the plugin library is completed. What do you think about it?

liangyepianzhou avatar Jun 17 '24 00:06 liangyepianzhou

With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:

  • Synchronously, by calling consumer.recieve()
  • Asynchronously, by registering a MessageListener interface, when building the Consumer. When this method is used, you can't also use consumer.receive().

In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.

I suggest a different solution, if I may:

in the ConsumerBuilder there is the following method to register a MessageListener : .messageListener(myMessageListener)

I suggest we expand it to: .messageListener(myMessageListener, myExecutorService)

This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer.

So summarizing my request for changes:

  1. Rewrite the motivation and background knowledge to include all relevant information (see my example above).
  2. Switch to a different API for providing that executor service.

asafm avatar Jun 18 '24 07:06 asafm

With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:

  • Synchronously, by calling consumer.recieve()
  • Asynchronously, by registering a MessageListener interface, when building the Consumer. When this method is used, you can't also use consumer.receive().

In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.

I suggest a different solution, if I may:

in the ConsumerBuilder there is the following method to register a MessageListener : .messageListener(myMessageListener)

I suggest we expand it to: .messageListener(myMessageListener, myExecutorService)

This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer.

So summarizing my request for changes:

  1. Rewrite the motivation and background knowledge to include all relevant information (see my example above).
  2. Switch to a different API for providing that executor service.

Okay, good suggestions! I will update it later as suggested.Thank you all @asafm @liangyepianzhou

AuroraTwinkle avatar Jun 18 '24 07:06 AuroraTwinkle

With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:

  • Synchronously, by calling consumer.recieve()
  • Asynchronously, by registering a MessageListener interface, when building the Consumer. When this method is used, you can't also use consumer.receive().

In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other.

I suggest a different solution, if I may:

in the ConsumerBuilder there is the following method to register a MessageListener : .messageListener(myMessageListener)

I suggest we expand it to: .messageListener(myMessageListener, myExecutorService)

This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer.

So summarizing my request for changes:

  1. Rewrite the motivation and background knowledge to include all relevant information (see my example above).
  2. Switch to a different API for providing that executor service.

I have made the changes to the PIP as suggested, PTAL @asafm @liangyepianzhou Thanks!

AuroraTwinkle avatar Jun 18 '24 10:06 AuroraTwinkle

The use case looks a little strange, why don't

        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription2")
                .messageListener((consumer, message) -> {
                    // process message
                    consumer.acknowledgeAsync(message);
                })
                 // Introduce a new `processExecutor` method
                .processExecutor(Executors.newCachedExecutor(...))
                .subscribe();

or

        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription2")
                .messageListener((consumer, message) -> {
                    // process message
                    consumer.acknowledgeAsync(message);
                // Add a new method that accepts `MessageListener, ExecutorService`
                }, Executors.newCachedExecutor(...))
                .subscribe();

dao-jun avatar Jun 18 '24 12:06 dao-jun

The use case looks a little strange, why don't

        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription2")
                .messageListener((consumer, message) -> {
                    // process message
                    consumer.acknowledgeAsync(message);
                })
                 // Introduce a new `processExecutor` method
                .processExecutor(Executors.newCachedExecutor(...))
                .subscribe();

or

        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription2")
                .messageListener((consumer, message) -> {
                    // process message
                    consumer.acknowledgeAsync(message);
                // Add a new method that accepts `MessageListener, ExecutorService`
                }, Executors.newCachedExecutor(...))
                .subscribe();

Agree! Using the Function to implement this feature does indeed reduce development costs while ensuring that partition messages are sent in an orderly manner. However, it increases the cost of use for the user.

liangyepianzhou avatar Jun 18 '24 12:06 liangyepianzhou

The use case looks a little strange, why don't

        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription2")
                .messageListener((consumer, message) -> {
                    // process message
                    consumer.acknowledgeAsync(message);
                })
                 // Introduce a new `processExecutor` method
                .processExecutor(Executors.newCachedExecutor(...))
                .subscribe();

or

        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription2")
                .messageListener((consumer, message) -> {
                    // process message
                    consumer.acknowledgeAsync(message);
                // Add a new method that accepts `MessageListener, ExecutorService`
                }, Executors.newCachedExecutor(...))
                .subscribe();

@dao-jun @liangyepianzhou Thanks for your suggestions! I have updated the usage example. And the following points make it impossible to simply pass the ExecutorService interface:

  1. For Key_Shared or Failover subscription, we need to ensure message order, if just pass the ExecutorService simply , we cannot guarantee that messages with the same key or the same partition will be processed by the same thread(unless the passed ExecutorService is a SingleThreadExecutor).
  2. Also see this link: https://github.com/apache/pulsar/pull/22861#discussion_r1630111105
    private void startConsumerWithMessageListener(String topic, String subscriptionName) throws PulsarClientException {
    // for example: key_shared
    MessageListenerExecutor keySharedExecutor = getKeySharedMessageListenerExecutor(subscriptionName);
    Consumer<Long> keySharedconsumer =
            pulsarClient.newConsumer(Schema.INT64)
                    .topic(topic)
                    .subscriptionName(subscriptionName)
                    // set and then message lister will be executed in the executor
                    .messageListener((c1, msg) -> {
                        log.info("Received message [{}] in the listener", msg.getValue());
                        c1.acknowledgeAsync(msg);
                    })
                    .messageListenerExecutor(keySharedExecutor)
                    .subscribe();


    // for example: partition_ordered
    MessageListenerExecutor partitionOrderedExecutor = getPartitionOrderdMessageListenerExecutor(subscriptionName);
    Consumer<Long> partitionOrderedConsumer =
            pulsarClient.newConsumer(Schema.INT64)
                    .topic(topic)
                    .subscriptionName(subscriptionName)
                    // set and then message lister will be executed in the executor
                    .messageListener((c1, msg) -> {
                        log.info("Received message [{}] in the listener", msg.getValue());
                        c1.acknowledgeAsync(msg);
                    })
                    .messageListenerExecutor(partitionOrderedExecutor)
                    .subscribe();

    // for example: out-of-order
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Consumer<Long> outOfOrderConsumer =
            pulsarClient.newConsumer(Schema.INT64)
                    .topic(topic)
                    .subscriptionName(subscriptionName)
                    // not set and then message lister will be executed in the default executor
                    .messageListener((c1, msg) -> {
                        log.info("Received message [{}] in the listener", msg.getValue());
                        c1.acknowledgeAsync(msg);
                    })
                    .messageListenerExecutor((message, runnable) -> executorService.execute(runnable))
                    .subscribe();
}

private static MessageListenerExecutor getKeySharedMessageListenerExecutor(String subscriptionName) {
    ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");

    return (message, runnable) -> {
        byte[] key = "".getBytes(StandardCharsets.UTF_8);
        if (message.hasKey()) {
            key = message.getKeyBytes();
        } else if (message.hasOrderingKey()) {
            key = message.getOrderingKey();
        }
        // select a thread by message key to execute the runnable!
        // that say, the message listener task with same order key
        // will be executed by the same thread
        ExecutorService executorService = executorProvider.getExecutor(key);
        // executorService is a SingleThreadExecutor
        executorService.execute(runnable);
    };
}

private static MessageListenerExecutor getPartitionOrderdMessageListenerExecutor(String subscriptionName) {
    ExecutorProvider executorProvider = new ExecutorProvider(10, subscriptionName + "listener-executor-");

    return (message, runnable) -> {
        // select a thread by partition topic name to execute the runnable!
        // that say, the message listener task from the same partition topic
        // will be executed by the same thread
        ExecutorService executorService = executorProvider.getExecutor(message.getTopicName().getBytes());
        // executorService is a SingleThreadExecutor
        executorService.execute(runnable);
    };
}

AuroraTwinkle avatar Jun 18 '24 13:06 AuroraTwinkle

With the help of @liangyepianzhou I was able to fully figure out all the context required to understand what this PIP is all about. If I summarize it: When using a Consumer, you have two options to consume messages:

  • Synchronously, by calling consumer.recieve()
  • Asynchronously, by registering a MessageListener interface, when building the Consumer. When this method is used, you can't also use consumer.receive().

In the asynchronous way, the MessageListener instance is called by the consumer, hence it is doing that with a thread taken from its own internal ExecutorService (i.e. thread pool). The problem comes when you build and use 2 consumers from the same PulsarClient. It so happens that those 2 consumers will share the same thread pool to call the Message Listeners. One can be slower from the other. I suggest a different solution, if I may: in the ConsumerBuilder there is the following method to register a MessageListener : .messageListener(myMessageListener) I suggest we expand it to: .messageListener(myMessageListener, myExecutorService) This will allow the user to provide a separate executor service (thread pool) to be used when executing myMessageListener for that consumer. So summarizing my request for changes:

  1. Rewrite the motivation and background knowledge to include all relevant information (see my example above).
  2. Switch to a different API for providing that executor service.

I have made the changes to the PIP as suggested, PTAL @asafm @liangyepianzhou Thanks!

After further discussion with @lhotari (see more discussions at https://github.com/apache/pulsar/pull/22861), for the second point, we both think that Builders usually configure each property separately, so I will stick to the previous interface! Thanks! @asafm @liangyepianzhou (Of course, what you said is also a good suggestion. Thanks again!)

AuroraTwinkle avatar Jun 18 '24 15:06 AuroraTwinkle

I support the motivation, make MessageListener executor isolate is indeed a good point. But I hold a conservative attitude towards the implementation.

dao-jun avatar Jun 18 '24 15:06 dao-jun

I support the motivation, make MessageListener executor isolate is indeed a good point. But I hold a conservative attitude towards the implementation.

I have explained why we cannot use simply java.util.concurrent.ExecutorService at https://github.com/apache/pulsar/pull/22902#issuecomment-2176107246, do you agree with this opinion?If not, please provide more detailed suggestions and I will try my best to make it better.

AuroraTwinkle avatar Jun 18 '24 15:06 AuroraTwinkle

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.

I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right? Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.

Others is OK

dao-jun avatar Jun 18 '24 16:06 dao-jun

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.

I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right? Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.

Others is OK

Or

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
    ExecutorService selectOrdering(Message<?> message);
}

If the subscriptionType is KeyShared, call selectOrdering, others call select;

dao-jun avatar Jun 18 '24 16:06 dao-jun

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design.

I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right? Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box.

Others is OK

For the first point, I don't see any better point than now, on the contrary, the interface you mentioned has shortcomings: The select method returns an ExecutorService instance, which may contain any number of threads, and the order is still not guaranteed.

For the second point, refer to the previous discussion:https://github.com/apache/pulsar/pull/22861#discussion_r1635850544

AuroraTwinkle avatar Jun 18 '24 17:06 AuroraTwinkle

I understand you introduce MessageListenerExecutor is for the purpose of keep consume messages in ordering, but the point is the API design. I had read the implementation PR, there are some points I think are strange:

  1. The MessageListener interface
public interface MessageListenerExecutor {
    void execute(Message<?> message, Runnable runnable);
}

The interface is for the purpose of select an executor by message and execute runnable on it, right? Why not just like the following pattern? Just expose message and get an executor is OK.

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
}
  1. No default implementations for MessageListenerExecutor

Default implementations is an example, users could follow the pattern to develop their own extensions, and it can be used out of the box. Others is OK

Or

public interface MessageListenerExecutorSelector {
    ExecutorService select(Message<?> message);
    ExecutorService selectOrdering(Message<?> message);
}

If the subscriptionType is KeyShared, call selectOrdering, others call select;

Also see:https://github.com/apache/pulsar/pull/22902#issuecomment-2176610042

AuroraTwinkle avatar Jun 18 '24 17:06 AuroraTwinkle

Hi, Everyone! @lhotari @codelipenghui @asafm @liangyepianzhou @dao-jun @poorbarcode @Technoboy-

I have sent a voting email for PIP-359: https://lists.apache.org/thread/oo3jdvq3b6bv6p4n7x7sdvypw4gp6hpk

Thanks!

AuroraTwinkle avatar Jun 18 '24 17:06 AuroraTwinkle

@AuroraTwinkle Please select only one documentation label in your PR description.

github-actions[bot] avatar Jun 19 '24 04:06 github-actions[bot]

I have reviewed the comments made by @lhotari but I'm sorry I still don't understand.

ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);

This is basically to "spill the guts/internals" of the way the customer is implemented. This is super hard to undetrstand for a normal user. I haven't seen anything like it in Kafka. Have you guys looked at how other messaging are doing it?

The reason I'm asking is that I think it's too complicated. I asked on the PR comments some questions to help clarify since I lack some background to try to come up with another idea.

You guys move to the vote phase way too soon. I mean you posted your comments 13 hours ago. I don't want to hold anyone back of course, but you're changing quite a substantial public API here. @lhotari I think we need at least one more set of eyes on this solution before we can proceed.

asafm avatar Jun 19 '24 07:06 asafm

I have reviewed the comments made by @lhotari but I'm sorry I still don't understand.

ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);

This is basically to "spill the guts/internals" of the way the customer is implemented. This is super hard to undetrstand for a normal user. I haven't seen anything like it in Kafka. Have you guys looked at how other messaging are doing it?

@asafm I believe that this concern could be handled by improving the explanation of the concept and abstraction that this exposes. I agree that it's not a good practice to add abstractions that are implementation specific. This abstraction isn't implementation specific, but the problem is that 95% of the users don't need it. In a multi-topic consumer when a MessageListener is used, there might be a requirement to have different priorities how the messages for a specific topic are processed. It is up to the implementer how to implement this logic.

The reason I'm asking is that I think it's too complicated. I asked on the PR comments some questions to help clarify since I lack some background to try to come up with another idea.

That's a valid point. I think that we would need to come up with a way to add extensions to the Pulsar client without cluttering the basic use cases. 95% of Pulsar users will never need this messageListenerExecutor, but we'd like to make it possible to do this for the remaining 5% of users. Perhaps we should prioritize finding the way to add such extensions to the Pulsar client. .

You guys move to the vote phase way too soon. I mean you posted your comments 13 hours ago. I don't want to hold anyone back of course, but you're changing quite a substantial public API here. @lhotari I think we need at least one more set of eyes on this solution before we can proceed.

It's great to see that this PIP has interest. Even if someone is accepted with a vote doesn't mean that we cannot make changes later before releasing the feature. In this case, there's a clear need to solve the multi-topic consumer queuing problem when using a MessageListener. We have to find a way to address that problem.

lhotari avatar Jun 19 '24 07:06 lhotari

One possible idea for avoiding cluttering the builder interfaces with more and more configuration options would be to support some type of extension pattern.

In this case, there could be this type of method on the ConsumerBuilder interface:

<E extends ConsumerBuilderExtension> E extensionBuilder(Class<E> customerBuilderExtensionClass);

then these interfaces:

interface MessageListenerExecutorConsumerBuilderExtension<T> extends ConsumerBuilderExtension<T> {
      MessageListenerExecutorConsumerBuilderExtension<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
}
interface ConsumerBuilderExtention {
    ConsumerBuilder<T> parent();
}

then it would be possible to call .extensionBuilder(MessageListenerExecutorConsumerBuilderExtension.class).messageListenerExecutor(....).parent().build()

Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.

lhotari avatar Jun 19 '24 07:06 lhotari

One possible idea for avoiding cluttering the builder interfaces with more and more configuration options would be to support some type of extension pattern.

In this case, there could be this type of method on the ConsumerBuilder interface:

<E extends ConsumerBuilderExtension> E extensionBuilder(Class<E> customerBuilderExtensionClass);

then these interfaces:

interface MessageListenerExecutorConsumerBuilderExtension<T> extends ConsumerBuilderExtension<T> {
      MessageListenerExecutorConsumerBuilderExtension<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);
}
interface ConsumerBuilderExtention {
    ConsumerBuilder<T> parent();
}

then it would be possible to call .extensionBuilder(MessageListenerExecutorConsumerBuilderExtension.class).messageListenerExecutor(....).parent().build()

Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.

How do you think about this? @asafm @liangyepianzhou

AuroraTwinkle avatar Jun 19 '24 08:06 AuroraTwinkle

Obviously this is more complexity, but it would address the problem of cluttering the main interfaces with configuration options that are only for very specific requirements.

Hi @lhotari, this is a truly innovative concept that showcases a profound level of technical proficiency and expertise. However, as you've mentioned, it does seem overly complex.

I suggest we revert to the initial design approach; I'm convinced that the first iteration of the API design was the most succinct and user-friendly. What are your thoughts? cc @asafm @AuroraTwinkle

  pulsarClient.newConsumer().topic("topic").subscriptionName("subName")
          .messageListener((consumer, msg) -> {}, new ExecutorProvider(10, "your_poolName"))
          .subscribe();

liangyepianzhou avatar Jun 29 '24 16:06 liangyepianzhou

The pip is approved with 5 binding votes and 1 non-binding vote and stay more than 72 hours. So merge it.

liangyepianzhou avatar Jul 16 '24 02:07 liangyepianzhou