gatling-kafka-plugin icon indicating copy to clipboard operation
gatling-kafka-plugin copied to clipboard

Support Avro classes in ReqRep

Open neoscaler opened this issue 2 years ago • 1 comments

In the default Kafka.send method it is possible to use Avro Java classes (generated by avro-maven-plugin) as payload in key or value. The payload then gets serialized as valid Avro bytestream.

But in the new ReqRepBases (used in the Request/Reply mechanism) send method this does not seem possible. We are seeing an error like [No implicits found for parameter evidence].

Is Avro not supported here? Or are we simply using it wrong?

neoscaler avatar Nov 03 '22 15:11 neoscaler

It is possible, but not straight out of the box. All the credit goes to @daylikon

import com.sksamuel.avro4s._
import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol
import path.AvroClass // here goes the path to the generated avro class

// define Serde for Avro-class
val ser =
  new KafkaAvroSerializer(
    new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),

val de =
  new KafkaAvroDeserializer(
    new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),

implicit val serdeClass: Serde[AvroClass] = new Serde[AvroClass] {
  override def serializer(): Serializer[AvroClass] = ser.asInstanceOf[Serializer[AvroClass]]

  override def deserializer(): Deserializer[AvroClass] = de.asInstanceOf[Deserializer[AvroClass]]

// Protocol description
val kafkaScn1Protocol: KafkaProtocol = kafka
      ProducerConfig.ACKS_CONFIG                   -> "1",
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG      -> "localhost:9092",
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringSerializer",
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
      "schema.registry.url"                        -> "url",

val kafkaScn2Protocol: KafkaProtocol = kafka.requestReply
      ProducerConfig.ACKS_CONFIG                   -> "1",
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG      -> "localhost:9092",
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringSerializer",
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
      "schema.registry.url"                        -> "url",
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      "schema.registry.url"                   -> "url",

// Sending Avro-serialised messages scenario
val scn: ScenarioBuilder = scenario("Simple request")
    kafka("Simple Request")
      .send[AvroClass](new AvroClass("someParam1", "someParam2")),

val scn2: ScenarioBuilder = scenario("RequestReply")
      .send[String, AvroClass]("key", new AvroClass("someParam1", "someParam2")),

3alster avatar Feb 06 '23 14:02 3alster