jmeter-java-dsl icon indicating copy to clipboard operation
jmeter-java-dsl copied to clipboard

Kafka base realisation

Open kirillyu opened this issue 2 years ago • 5 comments

I revied the plugins which implement it and make the conclusion that they are really more complex and overfeatured that the base needed. In my mind there is only two base needs: Configure kafka, Produce messages to kafka. I create this code on Kotlin to realise it. Maybe it will be helpfull for anybody, or maybe implement it in dsl:

`import java.util.Properties import kotlin.collections.HashMap import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import us.abstracta.jmeter.javadsl.JmeterDsl import us.abstracta.jmeter.javadsl.java.DslJsr223Sampler

object Kafka { private val kafkaSets = HashMap<String, KafkaProducer<String, String>>()

var propsTemplate = Properties()

init {
    // propsSet["bootstrap.servers"] ="host1:9092,host2:9092host3:9092" - example
    propsTemplate["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    propsTemplate["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    propsTemplate["compression.type"] = "none"
    propsTemplate["batch.size"] = "16384"
    propsTemplate["linger.ms"] = "0"
    propsTemplate["buffer.memory"] = "33554432"
    propsTemplate["acks"] = "all"
    propsTemplate["send.buffer.bytes"] = "131072"
    propsTemplate["receive.buffer.bytes"] = "32768"
    propsTemplate["security.protocol"] = "PLAINTEXT"
    propsTemplate["sasl.kerberos.service.name"] = "kafka"
    propsTemplate["sasl.mechanism"] = "GSSAPI"
    propsTemplate["message.key.placeholder"] = "KEY"
    propsTemplate["message.value.placeholder"] = "MESSAGE"
    propsTemplate["kerberos.auth.enabled"] = "NO"
    propsTemplate["java.security.auth.login.config"] = "null"
    propsTemplate["java.security.krb5.conf"] = "null"
    propsTemplate["ssl.enabled"] = "NO"
    propsTemplate["ssl.key.password"] = "null"
    propsTemplate["ssl.keystore.location"] = "null"
    propsTemplate["ssl.keystore.password"] = "null"
    propsTemplate["ssl.keystore.type"] = "JKS"
    propsTemplate["ssl.truststore.location"] = "null"
    propsTemplate["ssl.truststore.password"] = "null"
    propsTemplate["ssl.truststore.type"] = "JKS"
}

fun kafkaConfig(configName: String, propSet: Properties): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaConfig") {
        if (!kafkaSets.containsKey(configName)) {
            for (prop in propsTemplate) {
                if (!propSet.containsKey(prop.key)) {
                    propSet.setProperty(prop.key as String?, prop.value as String?)
                }
            }
            if (!propSet.containsKey("bootstrap.servers"))
                throw Exception("Couldn't create config without property \"bootstrap.servers\"")
            val kp = KafkaProducer<String, String>(propSet)
            kafkaSets[configName] = kp
        }
    }
}

fun kafkaConfig(configName: String, bootstraps: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaConfig") {
        if (!kafkaSets.containsKey(configName)) {
            val propSet = propsTemplate
            propSet.setProperty("bootstrap.servers", bootstraps)
            val kp = KafkaProducer<String, String>(propSet)
            kafkaSets[configName] = kp
        }
    }
}

fun kafkaProduce(configName: String, topic: String, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, "key", bodyMessage)) // key needed?
        }
    }
}

fun kafkaProduce(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, partition, "key", bodyMessage))
        }
    }
}

fun kafkaProduceWithTimestamp(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, partition, System.currentTimeMillis(), "key", bodyMessage))
        }
    }
}

fun kafkaProduce(configName: String, topic: String, createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String):
    DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, "key", createBodyMessage.invoke(it))) // key needed?
        }
    }
}

fun kafkaProduce(
    configName: String,
    topic: String,
    partition: Int,
    createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(
                ProducerRecord(
                    topic, partition, "key",
                    createBodyMessage.invoke(it)
                )
            )
        }
    }
}

fun kafkaProduceWithTimestamp(
    configName: String,
    topic: String,
    partition: Int,
    createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(
                ProducerRecord(
                    topic, partition, System.currentTimeMillis(),
                    "key",
                    createBodyMessage.invoke(it)
                )
            )
        }
    }
}

}`

kirillyu avatar Mar 24 '22 19:03 kirillyu

We also need Kafka based support in JMeter Java DSL for async API load testing. We are using Kafka Support and Kafka backend Listener from Jmeter plugin manager and hope to have similar features in DSL so that we can start using it . Thanks !

Sonal94539 avatar Jan 19 '23 22:01 Sonal94539

Can you tell me what is missing in the example I provided?

kirillyu avatar Jan 20 '23 06:01 kirillyu

I am still learning about DSL and how we can use it to replace a conventional Jmeter based performance test. And My requirement is to create new performance/load tests for our Kafka Based async APIs. So I read this thread which talks about kafka configuration and producing messages but as per description of this issue , seems like its not already a part of this DSL hence considered a future enhancement ?

Sonal94539 avatar Jan 20 '23 19:01 Sonal94539

This is based on jsr223 Sampler, which is the part of dsl. DSL give you the ability to use any java code inside. So that's it

kirillyu avatar Jan 22 '23 10:01 kirillyu

Thank you for your response . I am new to DSL and Jmeter both so trying to understand how could I use these methods to build a performance test using DSL . Would it be possible for you to provide similar methods for a creating a kafka consumer ?

Sonal94539 avatar Jan 23 '23 17:01 Sonal94539