pulsar-spark icon indicating copy to clipboard operation
pulsar-spark copied to clipboard

add: parameters to control number and the distribution of messages in a micro-batch

Open atezs82 opened this issue 2 years ago • 7 comments

If we have a considerably large backlog for one or more topics that are read by the connector, then using the current implementation we cannot really place an upper limit on the number of messages that are processed at once in a Spark Streaming pipeline. This PR attempts to address this problem by adding an experimental parameter called maxEntriesPerTrigger to the code (this behaves like the maxOffsetsPerTrigger parameter of the Kafka connector: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries). If the parameter is set, only a specified number of entries are read from the source. We are a little bit diverging from the Kafka connector here, since that one limits actual messages (we can limit Pulsar entries only with this approach). The feature is based upon the Pulsar Admin API call https://pulsar.apache.org/docs/en/2.7.3/admin-api-topics/#get-internal-stats, which can return all ledgers and entries for a topic. The use of this approach is required so that current functionalities of the Pulsar server side can be used to achieve the task at hand. In addition to this, since our input data is very much different in backlog sizes, I have added some additional parameters, called forwardStrategy and ensureEntriesPerTopic, so that topics with a very low backlog are also forwarded continuously if needed and we have options for consuming backlogs in different fashion if that is needed.

Please let me know what you think about this draft change. I can add further integration/unit tests and make final documentation changes if you agree with this approach outlined here.

We have also seen some (somewhat troubling) news about creating a brand new connector for Spark Streaming, can you please share additional information on that subject? Thanks in advance!

atezs82 avatar Oct 11 '21 07:10 atezs82

One thing to add here is that - as you might have seen in the PR - is that, unlike the Kafka connector we are forwarding by entries which can contain one or more messages. The reason behind this is that we could not find a suitable method for forwarding by an exact number of messages using a Reader in the getOffset call. Please let me know if you think that such method exists in Pulsar 2.7.x, I can modify the code accordingly (since the current solution is kind-of suboptimal and unpredictable due to this).

We have a close candidate though: I think we can use https://pulsar.apache.org/docs/en/next/admin-api-topics/#examine-messages, but the problem there is that it can look for messages relative to the first or the last message on a topic. If we can make this method work for any message ID, I think it can be sufficiently used in this feature as well. I was also thinking about we can modify the Reader interface so that it will be able to forward itself by n number of messages (on the other hand, that would require 2 API calls per topic to get the forwarded message ID, now we issue only 1, which already causes problems when we read ~100 topics using a connector due to the large number of API requests).

Please let me know what do you think about this.

atezs82 avatar Nov 11 '21 07:11 atezs82

Can we separate the changes of introducing maxMesagesPerBatch config and the forward strategy feature into two PRs?

I can do that if needed, will add a new one about strategies then.

If each micro-batch we are calling to get all topics stats, I'm afraid of there will be performance issues

We are invoking a single admin.topics().getInternalStats(topic) currently to get the internal stats for the topic we want to stream from (that can be multiple topics though, since we can specify topicsPattern as a parameter, meaning one admin API request per parameter), very much like when the connector is just fetching the latest message ID. Is there something that I'm overlooking here? How can we be more efficient?

the "forward" used here is not very intuitive.

Will change the naming.

Is is possible to construct an ending messageID and them check if it exists against pulsar?

I believe we can do that, since we already construct that, but would that cause additional load on the Pulsar cluster? I think by creating a reader/consumer for this purpose this can be achieved. What do you think about this? Do you see any simpler ways for doing that? (I just do not want to place more load on the Pulsar cluster if that is not needed.)

atezs82 avatar Dec 17 '21 14:12 atezs82

I have modified the PR with the following:

  • removed different "forward strategies" (code and documentation) - we can have a separate PR for that in the future if this one is accepted
  • renamed feature to fetchNextOffsetWithMaxEntries - we might want to use something like this when implementing the different strategies if that is needed.

I can make further changes based on the answers for the questions above, see my previous comment.

atezs82 avatar Dec 20 '21 11:12 atezs82

Yes. I will resolve the conflicts shortly. I was waiting for your opinion about the questions raised in https://github.com/streamnative/pulsar-spark/pull/63#issuecomment-996760872 before moving forward with this.

atezs82 avatar Dec 11 '22 14:12 atezs82

Rebased on top of master, now looking for a way to fix the Codacy warnings and do not increase the code complexity too much in the effort. Please in the meantime let me know if there are any answers for my questions from https://github.com/streamnative/pulsar-spark/pull/63#issuecomment-996760872. Thanks in advance!

atezs82 avatar Dec 13 '22 09:12 atezs82

I have slightly reworked some imperative logic inside to TopicInternalStatsUtils to a step more Scala-ish, please let me know what do you think. Thanks in advance!

atezs82 avatar Dec 20 '22 15:12 atezs82

Yes. I will resolve the conflicts shortly. I was waiting for your opinion about the questions raised in https://github.com/streamnative/pulsar-spark/pull/63#issuecomment-996760872 before moving forward with this.

Thanks for your contribution. I'll review this PR later this week. I was still busy working on flink-connector-pulsar.

syhily avatar Jan 10 '23 02:01 syhily