akka-kafka icon indicating copy to clipboard operation
akka-kafka copied to clipboard

Question : How can I consume my kafka topic as fast as possible ?

Open pocman opened this issue 9 years ago • 22 comments

Hi,

I'm using Kafka between ruby and scala and I noticed that my ruby HermannConsumer is consuming message faster than my AkkaConsumer.

I can set commitConfig = CommitConfig(commitInterval = Some(10 millis)) in kafkaProps but I'm not sure that it's the right thing to do.

How can I consume my kafka topic as fast as possible ?

pocman avatar May 20 '15 16:05 pocman

In other words, when producing a constant flux of message in my topic, my receiver is only receiving message every second. How can I increase this rate ?

pocman avatar May 21 '15 08:05 pocman

I am having a similar issue. Did you manage to find a solution to yours? Thanks :)

coreyauger avatar Feb 03 '16 21:02 coreyauger

When you commit every 10 milliseconds you are writing to zookeeper every 10 milliseconds. Your commit interval should be much, much larger. try 10 seconds?

sclasen avatar Feb 03 '16 21:02 sclasen

and @pocman sorry for the huge lag, gmail started putting my github notifications in spam.

sclasen avatar Feb 03 '16 21:02 sclasen

Thanks for the reply @sclasen. I seem to have about a 1 to 3 second delay before I receive a message to my receiver actor.

val props = AkkaConsumerProps.forSystemWithFilter(
      system = actorSystem,
      zkConnect = config.zookeeperHosts,
      topicFilter = filter,
      group = subscriberSet.map(_.name) getOrElse new UUID().toString,
      streams = config.numStreams,
      keyDecoder = new StringDecoder(),
      msgDecoder = new KafkaJsonDecoder[T](mapper),
      receiver = actorSystem.actorOf(Props(classOf[ConsumerActor[T]], consumer))
    )

    val akkaConsumer = new AkkaConsumer(props)
    akkaConsumer.start()
    consumerRegistry.put(topicPattern.pattern, akkaConsumer)

And then the ConsumerActor

class ConsumerActor[T](consumer: Consumer[T]) extends Actor with Logging {
  def receive = {
    case event: PublishedEvent[T] =>
      logger.trace(s"Received Event [$event]")
      try {
        consumer(event) 
        logger.trace(s"Event Processing complete for [$event]")
      } catch {
        case ex: Exception =>
          logger.error(s"Error Processing Event(id=${event.eventId})", ex)
      }
      sender ! StreamFSM.Processed
  }
}

Any idea where that would be occurring? Thanks for your work :)

coreyauger avatar Feb 03 '16 21:02 coreyauger

@coreyauger can you set the log level to DEBUG and send me the output?

sclasen avatar Feb 03 '16 21:02 sclasen

Hi @sclasen .. thanks again for your help in this.
I have put a small dump of the log file here https://gist.github.com/coreyauger/0ce0b743c4e987218238

The log explodes fast as akka is logging "heartbeats" for the cluster and other things that turn my local log file into a mess. I hope there is something in there that helps.

There are a lot of msgs similar to this stream=stream1 state=Unused msg=Drain outstanding=0

I suspect that this is more what you are looking for?

Let me know if I can provide anything else :)

coreyauger avatar Feb 03 '16 22:02 coreyauger

This is helpful. Things appear to be working as expected to me.

when you see things like this

[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-27] [akka.tcp://[email protected]:44691/user/$B/stream3] stream=stream3 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-25] [akka.tcp://[email protected]:44691/user/$B/stream0] stream=stream0 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-30] [akka.tcp://[email protected]:44691/user/$B/stream1] stream=stream1 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-19] [akka.tcp://[email protected]:44691/user/$B/stream2] stream=stream2 at=transition from=Processing to=Unused

it means that the underlying kafka consumer iterator hasNext() is returning false, meaning there are no messages to consume. When this happens the stream goes into 'Unused' and will not attempt to consume again until the next commit.

What is your use case here? akka-kafka is designed more for high-throughput while still giving some processing/commit guarantees. Commit is a blocking operation in that when a commit happens we wait until all in-flight messages are processsed. Latencies for low-throughput topics are not expected to be very low.

sclasen avatar Feb 03 '16 22:02 sclasen

related https://github.com/sclasen/akka-kafka/issues/23

sclasen avatar Feb 03 '16 22:02 sclasen

There are a few use cases.. but one of which is IM or (chat) based messaging.. so low latency is certainly required for this.

In this case am I better to adapt a new kind of consumer or are you saying that I should not use kafka for this?

I found this http://stackoverflow.com/questions/20520492/how-to-minimize-the-latency-involved-in-kafka-messaging-framework

which lead me to believe that new version of kafka can achieve this low latency.

Thanks again :)

coreyauger avatar Feb 03 '16 22:02 coreyauger

Well not sure about kafka being suitable for low latency, quote from the highest ranked answer on the link provided... I wouldn't use Kafka for anything where latency needed to be consistenly less than 1-2 seconds.

I suppose it depends on how you design and tune things, but generally I dont think akka-kafka is a good fit for a low-latency, low-throughput chat application.

How do the topics and partitions work? is there a consumer for each user?

I think you will have much better chances of success for this particular use case if you just use the kafka consumer directly (this library uses it under the covers), with auto commit enabled, it will keep polling on the iterator during commits, and doesnt have any notion of an unused state. You may have the odd lost message during hard crashes but seems totally fine for a chat app imo.

sclasen avatar Feb 03 '16 22:02 sclasen

I will take a look at this. Would there be any interest if I submit a pr for another type of low latency consumer for this lib?

Or is this use case best left for my own lib?

Let me know Either way. Your help is much appreciate :) Thanks again !

coreyauger avatar Feb 03 '16 22:02 coreyauger

Sure, happy to take a look at a PR, please do!

On Wed, Feb 3, 2016 at 2:52 PM, Corey Auger [email protected] wrote:

I will take a look at this. Would there be any interest if I submit a pr for another type of low latency consumer for this lib?

Or is this use case best left for my own lib?

Let me know Either way. You help is much appreciate :) Thanks again !

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-179515535.

sclasen avatar Feb 03 '16 22:02 sclasen

Hi @sclasen ,

I have built a initial actor that is working well for my use case. It has been modelled from the Kafka HighLevel Consumer. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Here is the code I have so far. https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala

Please take a look and if it is something that is you think we should have. I will clean it up and add it to the documentation. I will also branch the code and fold the commits into a single commit before I do a PR.

Thanks :)

coreyauger avatar Feb 07 '16 19:02 coreyauger

Thanks! I think you can get rid of the hasNext stuff, since it is only used to avoid blocking, you can just loop on calling next() which will block until messages arrive, which is fine in this usage.

Otherwise I fear the code will spin on hasNext and burn cpu

On Sunday, February 7, 2016, Corey Auger [email protected] wrote:

Hi @sclasen https://github.com/sclasen ,

I have built a initial actor that is working well for my use case. It has been modelled from the Kafka HighLevel Consumer. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Here is the code I have so far.

https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala

Please take a look and if it is something that is you think we should have. I will clean it up and add it to the documentation. I will also branch the code and fold the commits into a single commit before I do a PR.

Thanks :)

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-181088469.

sclasen avatar Feb 07 '16 20:02 sclasen

That was indeed my original implementation.. but I continue to get the time-out exception. There is most likely a setting that I can pass in for an infinite wait... I will have a look for that. :)

coreyauger avatar Feb 08 '16 16:02 coreyauger

Yeah, you will still have to handle timeout excetion in the next() call. Which is probably good, you can then check if the consumer has been shut down and exit.

On Mon, Feb 8, 2016 at 11:18 AM, Corey Auger [email protected] wrote:

That was indeed my original implementation.. but I continue to get the time-out exception. There is most likely a setting that I can pass in for an infinite wait... I will have a look for that. :)

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-181453860.

sclasen avatar Feb 08 '16 19:02 sclasen

adding this as a ref http://mail-archives.apache.org/mod_mbox/kafka-users/201408.mbox/%3CCACim9RkJ6+6jJKzP3UQKQPbsOUsboifBoySpOYPm=2RPp9XUcw@mail.gmail.com%3E

> > If you want to restart the consumer in handling the timeout exception,
> then
> > you should probably just increasing the timeout value in the configs to
> > avoid it throwing timeout exception.

Config

consumer.timeout.ms -1  By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.

The blocking in my case will happen on numStreams threads for each topic.
http://doc.akka.io/docs/akka/2.2.3/general/actor-systems.html#Blocking_Needs_Careful_Management

coreyauger avatar Feb 08 '16 19:02 coreyauger

Since timeout and auto commit are required for the HighLevel Consumer to work.. would it be best to append these arguments when the actor is creating the config? That way we can leave the settings intact for the other types of actors?

I plan to use the system with services that support both actors with different goals..

coreyauger avatar Feb 08 '16 19:02 coreyauger

Updated https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala

coreyauger avatar Feb 08 '16 20:02 coreyauger

Want to open a PR? easier to review/discuss.

On Mon, Feb 8, 2016 at 1:16 PM, Corey Auger [email protected] wrote:

Updated

https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala

— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-181547588.

sclasen avatar Feb 09 '16 16:02 sclasen

Sounds good :)

coreyauger avatar Feb 09 '16 16:02 coreyauger