akka-kafka
akka-kafka copied to clipboard
Question : How can I consume my kafka topic as fast as possible ?
Hi,
I'm using Kafka between ruby and scala and I noticed that my ruby HermannConsumer is consuming message faster than my AkkaConsumer.
I can set commitConfig = CommitConfig(commitInterval = Some(10 millis)) in kafkaProps but I'm not sure that it's the right thing to do.
How can I consume my kafka topic as fast as possible ?
In other words, when producing a constant flux of message in my topic, my receiver is only receiving message every second. How can I increase this rate ?
I am having a similar issue. Did you manage to find a solution to yours? Thanks :)
When you commit every 10 milliseconds you are writing to zookeeper every 10 milliseconds. Your commit interval should be much, much larger. try 10 seconds?
and @pocman sorry for the huge lag, gmail started putting my github notifications in spam.
Thanks for the reply @sclasen. I seem to have about a 1 to 3 second delay before I receive a message to my receiver actor.
val props = AkkaConsumerProps.forSystemWithFilter(
system = actorSystem,
zkConnect = config.zookeeperHosts,
topicFilter = filter,
group = subscriberSet.map(_.name) getOrElse new UUID().toString,
streams = config.numStreams,
keyDecoder = new StringDecoder(),
msgDecoder = new KafkaJsonDecoder[T](mapper),
receiver = actorSystem.actorOf(Props(classOf[ConsumerActor[T]], consumer))
)
val akkaConsumer = new AkkaConsumer(props)
akkaConsumer.start()
consumerRegistry.put(topicPattern.pattern, akkaConsumer)
And then the ConsumerActor
class ConsumerActor[T](consumer: Consumer[T]) extends Actor with Logging {
def receive = {
case event: PublishedEvent[T] =>
logger.trace(s"Received Event [$event]")
try {
consumer(event)
logger.trace(s"Event Processing complete for [$event]")
} catch {
case ex: Exception =>
logger.error(s"Error Processing Event(id=${event.eventId})", ex)
}
sender ! StreamFSM.Processed
}
}
Any idea where that would be occurring? Thanks for your work :)
@coreyauger can you set the log level to DEBUG and send me the output?
Hi @sclasen .. thanks again for your help in this.
I have put a small dump of the log file here
https://gist.github.com/coreyauger/0ce0b743c4e987218238
The log explodes fast as akka is logging "heartbeats" for the cluster and other things that turn my local log file into a mess. I hope there is something in there that helps.
There are a lot of msgs similar to this
stream=stream1 state=Unused msg=Drain outstanding=0
I suspect that this is more what you are looking for?
Let me know if I can provide anything else :)
This is helpful. Things appear to be working as expected to me.
when you see things like this
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-27] [akka.tcp://[email protected]:44691/user/$B/stream3] stream=stream3 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-25] [akka.tcp://[email protected]:44691/user/$B/stream0] stream=stream0 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-30] [akka.tcp://[email protected]:44691/user/$B/stream1] stream=stream1 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-19] [akka.tcp://[email protected]:44691/user/$B/stream2] stream=stream2 at=transition from=Processing to=Unused
it means that the underlying kafka consumer iterator hasNext()
is returning false, meaning there are no messages to consume. When this happens the stream goes into 'Unused' and will not attempt to consume again until the next commit.
What is your use case here? akka-kafka is designed more for high-throughput while still giving some processing/commit guarantees. Commit is a blocking operation in that when a commit happens we wait until all in-flight messages are processsed. Latencies for low-throughput topics are not expected to be very low.
related https://github.com/sclasen/akka-kafka/issues/23
There are a few use cases.. but one of which is IM or (chat) based messaging.. so low latency is certainly required for this.
In this case am I better to adapt a new kind of consumer or are you saying that I should not use kafka for this?
I found this http://stackoverflow.com/questions/20520492/how-to-minimize-the-latency-involved-in-kafka-messaging-framework
which lead me to believe that new version of kafka can achieve this low latency.
Thanks again :)
Well not sure about kafka being suitable for low latency, quote from the highest ranked answer on the link provided... I wouldn't use Kafka for anything where latency needed to be consistenly less than 1-2 seconds.
I suppose it depends on how you design and tune things, but generally I dont think akka-kafka is a good fit for a low-latency, low-throughput chat application.
How do the topics and partitions work? is there a consumer for each user?
I think you will have much better chances of success for this particular use case if you just use the kafka consumer directly (this library uses it under the covers), with auto commit enabled, it will keep polling on the iterator during commits, and doesnt have any notion of an unused state. You may have the odd lost message during hard crashes but seems totally fine for a chat app imo.
I will take a look at this. Would there be any interest if I submit a pr for another type of low latency consumer for this lib?
Or is this use case best left for my own lib?
Let me know Either way. Your help is much appreciate :) Thanks again !
Sure, happy to take a look at a PR, please do!
On Wed, Feb 3, 2016 at 2:52 PM, Corey Auger [email protected] wrote:
I will take a look at this. Would there be any interest if I submit a pr for another type of low latency consumer for this lib?
Or is this use case best left for my own lib?
Let me know Either way. You help is much appreciate :) Thanks again !
— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-179515535.
Hi @sclasen ,
I have built a initial actor that is working well for my use case. It has been modelled from the Kafka HighLevel Consumer. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Here is the code I have so far. https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala
Please take a look and if it is something that is you think we should have. I will clean it up and add it to the documentation. I will also branch the code and fold the commits into a single commit before I do a PR.
Thanks :)
Thanks! I think you can get rid of the hasNext stuff, since it is only used to avoid blocking, you can just loop on calling next() which will block until messages arrive, which is fine in this usage.
Otherwise I fear the code will spin on hasNext and burn cpu
On Sunday, February 7, 2016, Corey Auger [email protected] wrote:
Hi @sclasen https://github.com/sclasen ,
I have built a initial actor that is working well for my use case. It has been modelled from the Kafka HighLevel Consumer. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Here is the code I have so far.
https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala
Please take a look and if it is something that is you think we should have. I will clean it up and add it to the documentation. I will also branch the code and fold the commits into a single commit before I do a PR.
Thanks :)
— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-181088469.
That was indeed my original implementation.. but I continue to get the time-out exception. There is most likely a setting that I can pass in for an infinite wait... I will have a look for that. :)
Yeah, you will still have to handle timeout excetion in the next() call. Which is probably good, you can then check if the consumer has been shut down and exit.
On Mon, Feb 8, 2016 at 11:18 AM, Corey Auger [email protected] wrote:
That was indeed my original implementation.. but I continue to get the time-out exception. There is most likely a setting that I can pass in for an infinite wait... I will have a look for that. :)
— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-181453860.
adding this as a ref http://mail-archives.apache.org/mod_mbox/kafka-users/201408.mbox/%3CCACim9RkJ6+6jJKzP3UQKQPbsOUsboifBoySpOYPm=2RPp9XUcw@mail.gmail.com%3E
> > If you want to restart the consumer in handling the timeout exception,
> then
> > you should probably just increasing the timeout value in the configs to
> > avoid it throwing timeout exception.
Config
consumer.timeout.ms -1 By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.
The blocking in my case will happen on numStreams
threads for each topic.
http://doc.akka.io/docs/akka/2.2.3/general/actor-systems.html#Blocking_Needs_Careful_Management
Since timeout and auto commit are required for the HighLevel Consumer to work.. would it be best to append these arguments when the actor is creating the config? That way we can leave the settings intact for the other types of actors?
I plan to use the system with services that support both actors with different goals..
Updated https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala
Want to open a PR? easier to review/discuss.
On Mon, Feb 8, 2016 at 1:16 PM, Corey Auger [email protected] wrote:
Updated
https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala
— Reply to this email directly or view it on GitHub https://github.com/sclasen/akka-kafka/issues/37#issuecomment-181547588.
Sounds good :)