jmeter-java-dsl
jmeter-java-dsl copied to clipboard
Kafka base realisation
I revied the plugins which implement it and make the conclusion that they are really more complex and overfeatured that the base needed. In my mind there is only two base needs: Configure kafka, Produce messages to kafka. I create this code on Kotlin to realise it. Maybe it will be helpfull for anybody, or maybe implement it in dsl:
`import java.util.Properties import kotlin.collections.HashMap import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import us.abstracta.jmeter.javadsl.JmeterDsl import us.abstracta.jmeter.javadsl.java.DslJsr223Sampler
object Kafka { private val kafkaSets = HashMap<String, KafkaProducer<String, String>>()
var propsTemplate = Properties()
init {
// propsSet["bootstrap.servers"] ="host1:9092,host2:9092host3:9092" - example
propsTemplate["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
propsTemplate["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
propsTemplate["compression.type"] = "none"
propsTemplate["batch.size"] = "16384"
propsTemplate["linger.ms"] = "0"
propsTemplate["buffer.memory"] = "33554432"
propsTemplate["acks"] = "all"
propsTemplate["send.buffer.bytes"] = "131072"
propsTemplate["receive.buffer.bytes"] = "32768"
propsTemplate["security.protocol"] = "PLAINTEXT"
propsTemplate["sasl.kerberos.service.name"] = "kafka"
propsTemplate["sasl.mechanism"] = "GSSAPI"
propsTemplate["message.key.placeholder"] = "KEY"
propsTemplate["message.value.placeholder"] = "MESSAGE"
propsTemplate["kerberos.auth.enabled"] = "NO"
propsTemplate["java.security.auth.login.config"] = "null"
propsTemplate["java.security.krb5.conf"] = "null"
propsTemplate["ssl.enabled"] = "NO"
propsTemplate["ssl.key.password"] = "null"
propsTemplate["ssl.keystore.location"] = "null"
propsTemplate["ssl.keystore.password"] = "null"
propsTemplate["ssl.keystore.type"] = "JKS"
propsTemplate["ssl.truststore.location"] = "null"
propsTemplate["ssl.truststore.password"] = "null"
propsTemplate["ssl.truststore.type"] = "JKS"
}
fun kafkaConfig(configName: String, propSet: Properties): DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaConfig") {
if (!kafkaSets.containsKey(configName)) {
for (prop in propsTemplate) {
if (!propSet.containsKey(prop.key)) {
propSet.setProperty(prop.key as String?, prop.value as String?)
}
}
if (!propSet.containsKey("bootstrap.servers"))
throw Exception("Couldn't create config without property \"bootstrap.servers\"")
val kp = KafkaProducer<String, String>(propSet)
kafkaSets[configName] = kp
}
}
}
fun kafkaConfig(configName: String, bootstraps: String): DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaConfig") {
if (!kafkaSets.containsKey(configName)) {
val propSet = propsTemplate
propSet.setProperty("bootstrap.servers", bootstraps)
val kp = KafkaProducer<String, String>(propSet)
kafkaSets[configName] = kp
}
}
}
fun kafkaProduce(configName: String, topic: String, bodyMessage: String): DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaProducer") {
if (!kafkaSets.containsKey(configName)) {
throw Exception("Need to create kafkaConfig before sending message")
} else {
kafkaSets[configName]?.send(ProducerRecord(topic, "key", bodyMessage)) // key needed?
}
}
}
fun kafkaProduce(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaProducer") {
if (!kafkaSets.containsKey(configName)) {
throw Exception("Need to create kafkaConfig before sending message")
} else {
kafkaSets[configName]?.send(ProducerRecord(topic, partition, "key", bodyMessage))
}
}
}
fun kafkaProduceWithTimestamp(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaProducer") {
if (!kafkaSets.containsKey(configName)) {
throw Exception("Need to create kafkaConfig before sending message")
} else {
kafkaSets[configName]?.send(ProducerRecord(topic, partition, System.currentTimeMillis(), "key", bodyMessage))
}
}
}
fun kafkaProduce(configName: String, topic: String, createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String):
DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaProducer") {
if (!kafkaSets.containsKey(configName)) {
throw Exception("Need to create kafkaConfig before sending message")
} else {
kafkaSets[configName]?.send(ProducerRecord(topic, "key", createBodyMessage.invoke(it))) // key needed?
}
}
}
fun kafkaProduce(
configName: String,
topic: String,
partition: Int,
createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaProducer") {
if (!kafkaSets.containsKey(configName)) {
throw Exception("Need to create kafkaConfig before sending message")
} else {
kafkaSets[configName]?.send(
ProducerRecord(
topic, partition, "key",
createBodyMessage.invoke(it)
)
)
}
}
}
fun kafkaProduceWithTimestamp(
configName: String,
topic: String,
partition: Int,
createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
return JmeterDsl.jsr223Sampler("KafkaProducer") {
if (!kafkaSets.containsKey(configName)) {
throw Exception("Need to create kafkaConfig before sending message")
} else {
kafkaSets[configName]?.send(
ProducerRecord(
topic, partition, System.currentTimeMillis(),
"key",
createBodyMessage.invoke(it)
)
)
}
}
}
}`
We also need Kafka based support in JMeter Java DSL for async API load testing. We are using Kafka Support and Kafka backend Listener from Jmeter plugin manager and hope to have similar features in DSL so that we can start using it . Thanks !
Can you tell me what is missing in the example I provided?
I am still learning about DSL and how we can use it to replace a conventional Jmeter based performance test. And My requirement is to create new performance/load tests for our Kafka Based async APIs. So I read this thread which talks about kafka configuration and producing messages but as per description of this issue , seems like its not already a part of this DSL hence considered a future enhancement ?
This is based on jsr223 Sampler, which is the part of dsl. DSL give you the ability to use any java code inside. So that's it
Thank you for your response . I am new to DSL and Jmeter both so trying to understand how could I use these methods to build a performance test using DSL . Would it be possible for you to provide similar methods for a creating a kafka consumer ?