scalatest-embedded-kafka icon indicating copy to clipboard operation
scalatest-embedded-kafka copied to clipboard

multiple brokers

Open cizmazia opened this issue 8 years ago • 5 comments

Please could you enable running multiple brokers?

Testing with the exactly once guarantee turned on requires at least 3 brokers:

p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)

Associated errors:

ERROR KafkaApis:99 - [KafkaApi-0] Number of alive brokers '1' does not meet the required replication factor '3' for the transactions state topic (configured via 'transaction.state.log.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.

Could not create internal topics: Found only 1 brokers,  but replication factor is 3. Decrease replication factor for internal topics via StreamsConfig parameter "replication.factor" or add more brokers to your cluster.

Workaround

  def withRunningKafkaBrokers[T](numberOfBrokers: Int)(body: => T)(
    implicit config: EmbeddedKafkaConfig): T = {
    withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
      val brokers = Iterator.range(0, numberOfBrokers).map{ n =>
        val c = config.copy(
          zooKeeperPort = zkPort,
          kafkaPort = config.kafkaPort + n,
          customBrokerProperties = Map("broker.id" -> n.toString))
        val dir = Directory.makeTemp(s"kafka$n")
        (startKafka(c, dir), dir)
      }.toArray
      try {
        body
      } finally {
        brokers.foreach{case (broker, dir) =>
          broker.shutdown()
          broker.awaitShutdown()
          dir.deleteRecursively()
        }
      }
    }
  }

cizmazia avatar Sep 21 '17 16:09 cizmazia

I'm thinking about possible implications of this on memory allocation during the build. It's definitely something that could be investigated - are you running them successfully?

manub avatar Sep 21 '17 20:09 manub

Yes, I am. Thanks!

cizmazia avatar Sep 22 '17 08:09 cizmazia

Hi @cizmazia. Most of the work I believe to achieve this is available in the latest release.

#101 addressed the problem of running tests in parallel, with the underlying issue being with managing brokers.

Currently however you would run into the issue with the broker.id being hardcoded to "0".

With a 👍 from @manub I'll make the change to increment broker.id for each subsequent broker created. When all brokers are stopped the id can be set back to "0". Alternatively the broker.id could be passed to start().

Note that you'll need make multipled calls to EmbeddedKafka.start() or EmbeddedKafka.startKafka() instead of withRunningKafka to achieve this.

You'll be able to stop specific instances or stop all of them as you please.

tjheslin1 avatar Mar 29 '18 14:03 tjheslin1

Thanks @tjheslin1, seems like a sensible thing to do.

manub avatar Apr 03 '18 10:04 manub

@manub @cizmazia I'll have a PR for this soon 👍

tjheslin1 avatar Apr 03 '18 12:04 tjheslin1