gatling-kafka-plugin
gatling-kafka-plugin copied to clipboard
Support Avro classes in ReqRep
In the default Kafka.send
method it is possible to use Avro Java classes (generated by avro-maven-plugin) as payload in key or value. The payload then gets serialized as valid Avro bytestream.
But in the new ReqRepBases (used in the Request/Reply mechanism) send
method this does not seem possible. We are seeing an error like [No implicits found for parameter evidence]
.
Is Avro not supported here? Or are we simply using it wrong?
It is possible, but not straight out of the box. All the credit goes to @daylikon
import com.sksamuel.avro4s._
import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol
import path.AvroClass // here goes the path to the generated avro class
// define Serde for Avro-class
val ser =
new KafkaAvroSerializer(
new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),
)
val de =
new KafkaAvroDeserializer(
new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),
)
implicit val serdeClass: Serde[AvroClass] = new Serde[AvroClass] {
override def serializer(): Serializer[AvroClass] = ser.asInstanceOf[Serializer[AvroClass]]
override def deserializer(): Deserializer[AvroClass] = de.asInstanceOf[Deserializer[AvroClass]]
}
// Protocol description
val kafkaScn1Protocol: KafkaProtocol = kafka
.topic("myTopic")
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "url",
),
)
val kafkaScn2Protocol: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "url",
),
)
.consumeSettings(
Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
"schema.registry.url" -> "url",
),
)
.timeout(5.seconds)
// Sending Avro-serialised messages scenario
val scn: ScenarioBuilder = scenario("Simple request")
.exec(
kafka("Simple Request")
.send[AvroClass](new AvroClass("someParam1", "someParam2")),
)
val scn2: ScenarioBuilder = scenario("RequestReply")
.exec(
kafka("RequestReply").requestReply
.requestTopic("request.t")
.replyTopic("reply.t")
.send[String, AvroClass]("key", new AvroClass("someParam1", "someParam2")),
)