EventStore.JVM icon indicating copy to clipboard operation
EventStore.JVM copied to clipboard

Can't increase the in-flight messages count for persistent subscriptions

Open chiller opened this issue 6 years ago • 1 comments

I have a persistent subscription and a scala consumer. My issue is that no matter what settings I set in the subscription or the client application.conf I can't seem to get more than 10 in-flight messages.

Here is my code:

import java.util.concurrent.Executors
import _root_.akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import eventstore.ResolvedEvent
import eventstore.akka.PersistentSubscriptionActor.ManualAck
import eventstore.akka.{EventStoreExtension, LiveProcessingStarted, PersistentSubscriptionActor}
import eventstore.core.EventStream

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.sys.process._

trait Globals {
  implicit val system = ActorSystem()
  implicit val ec =  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(30))
}

object PersistentSubscriptionExample extends App with Globals{

  val actor = system.actorOf(Props[CountPersistentStream])
  val extension = EventStoreExtension(system)

  val sub = system.actorOf(
    PersistentSubscriptionActor.props(
      connection = extension.actor,
      client = actor,
      streamId = EventStream.Id("$ce-test"),
      groupName = "test1",
      credentials = None,
      settings = extension.settings,
      autoAck = false
    ))
}

class CountPersistentStream extends Actor with ActorLogging with Globals {
  context.setReceiveTimeout(1.second)

  def receive: Receive = {
    case e: ResolvedEvent =>
      val currentSender = sender()
      Future {
        log.info(s"${e.streamId.toString}")
        Thread.sleep(1000)
        currentSender ! ManualAck(e.linkEvent.data.eventId)
      }
    case LiveProcessingStarted => log.info("live processing started")
  }
}

I can see that only 10 threads do work at any given time by looking at visualvm. Now if I set autoack=true, then all 30 threads will do work. If I use the http api curl 'localhost:2113/subscriptions/%24ce-test/test1/15?embed=TryHarder' -H "Accept: application/vnd.eventstore.competingatom+json" then I do get 15 in-flight. Am I missing some configuration?

chiller avatar Jul 30 '19 13:07 chiller

@chiller If you does not change default setting of persistent subscription, Max Subscriber Count is 10.

https://eventstore.com/docs/http-api/competing-consumers/index.html#creating-a-persistent-subscription

amondnet avatar May 14 '20 08:05 amondnet