zerocode
zerocode copied to clipboard
Send AVRO content to kafka
Hi,
We try to send kafka message using AVRO and schema registry.
Right now, we've been trying to do something like this:
on configuration
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
on test
{
"name":"send avro message",
"url":"kafka-topic:command",
"operation":"produce",
"request": {
"recordType" : "JSON",
"records": [
{
"key": "test",
"value": {
"property" : "001"
}
}
]
},
"verify": {
"status": "Ok",
"recordMetadata": "$NOT.NULL"
}
}
ZeroCode serialize this code using avro and send it. But it doesn't use the existing schema for the object. On the schema registry, we have this schema:
{
"type" : "record",
"name" : "TestKafkaMessage",
"namespace" : "com.foo.bar",
"fields" : [ {
"name" : "property",
"type" : "string"
}]
}
Instead of use the existing schema, it create a new one command-value with the content :
"string"
Expected Behaviour:
Be able to specify a schema (using file, json, or schema registry reference) to serialize json using this schema.
Is this already possible? Is there any other way?
Thanks
@deblockt , apologies! we were slightly busy with other priority tickets and our day jobs. That's why couldn't address this issue. This looks like a feature request(or question?).
We will have a look soon!
looking for the answer as well
Hi!
Currently, we are evaluating the framework to be used in a corporate context.
I would like to know if producing AVRO messages is working (with schema registry).
Is this already fixed and if yes, could you provide an example?
Thank you!
René
I have the same needs of all of you and i use a Custom Kafka Client to achieve that !
@rpapesch if my workaround in interesting you :
- generate your java AVRO POJO from your AVRO schema specification. From my side, they comes in a packaged jar
- You can create a CustomKafkaClient based on this sample : https://github.com/authorjapps/zerocode/blob/master/kafka-testing/src/main/java/org/jsmart/zerocode/kafka/MyCustomKafkaClient.java
- Then in your test, use
@UseKafkaClient(MyCustomKafkaClient.class)
Here is an extract of my Custom client (not cleaned) :
public class KafkaProduceCostingRequestAvro extends BasicKafkaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProduceCostingRequestAvro.class);
// unfortunately not accessible from BasicKafkaClient
private final ObjectMapper objectMapper = new ObjectMapperProvider().get();
// unfortunately not accessible from BasicKafkaClient
private final Gson gson = new GsonSerDeProvider().get();
// unfortunately not accessible from BasicKafkaClient
@Inject(optional = true)
@Named("kafka.producer.properties")
private String producerPropertyFile;
@Override
public String execute(String brokers, String topicName, String operation, String requestJson, ScenarioExecutionState scenarioExecutionState) {
// just create an avro producer, returning your avro java pojo
Producer<String, CostingRequestPayload> producer = createAvroProducer(brokers, producerPropertyFile);
try {
// read the actual zerocode configuration
ProducerJsonRecords jsonRecords = objectMapper.readValue(requestJson, ProducerJsonRecords.class);
List<ProducerJsonRecord> records = jsonRecords.getRecords();
validateProduceRecord(records);
AtomicReference<RecordMetadata> recordMetadata = new AtomicReference<>();
records.forEach(producerJsonRecord -> {
CostingRequestPayload crp = new CostingRequestPayload();
// use Avro Deserializer to get json record from zerocode configuration
DatumReader<CostingRequestPayload> reader
= new SpecificDatumReader<>(CostingRequestPayload.class);
Decoder decoder;
try {
decoder = DecoderFactory.get().jsonDecoder(
CostingRequestPayload.getClassSchema(), producerJsonRecord.getValue().toString());
crp = reader.read(null, decoder);
} catch (IOException e) {
LOGGER.error("Deserialization error:" + e.getMessage());
}
RecordHeaders headers = new RecordHeaders();
producerJsonRecord.getHeaders().forEach((o, o2) -> headers.add(o.toString(), o2.toString().getBytes(StandardCharsets.UTF_8)));
// construct your avro record
ProducerRecord<String, CostingRequestPayload> producerRecord =
new ProducerRecord(topicName, null, null, null, crp, headers);
try {
// and send it
RecordMetadata recordMetadata1 = producer.send(producerRecord, (metadata, e) -> {
if (metadata != null) {
// Record sent successfully. Exception == null and metadata != null
LOGGER.info("Message {}-{} successfully sent to topic={} part={} off={} at time={}",
null,
producerRecord,
metadata.topic(),
metadata.partition(),
metadata.offset(),
new Date(metadata.timestamp()));
} else {
// An error occurred. Exception != null and metadata == null
// Correctly handle the exception according to your needs
// /!\ If you don't process the exception, it is "fire-and-forget" like. Send or not or maybe :-)
LOGGER.error("An error occurred during send !", e);
}
}).get();
recordMetadata.set(recordMetadata1);
} catch (InterruptedException e) {
e.printStackTrace();
error.set(e.getMessage());
} catch (ExecutionException e) {
e.printStackTrace();
error.set(e.getMessage());
}
});
// dont forget to return a status for tests assertions
return gson.toJson(new DeliveryDetails(OK, recordMetadata.get()));
} catch (Exception e) {
LOGGER.error("Unable to process json.", e);
error.set(e.getMessage());
}
if (StringUtils.isBlank(error.get())) {
return gson.toJson(new DeliveryDetails(FAILED, error.get()));
}
return gson.toJson(new DeliveryDetails(FAILED, ""));
}
}
Because i used my generated avro pojo, no need to set a schema
Because i used Avro Deserializer, the records content should be compliant - be careful to avro union. a field declated with ["null", "string"] type should be described as
"request": {
"records": [
{
"key": null,
"headers": {
"REPLY_TOPIC": "TEST"
},
"value": {
"ProductId": {
"field": {
"string": "82306457"
},
...
I hope this help !
@authorjapps i think that i could do a PR refactoring to allow the custom client to reuse some of your functions (switch fields from private to protected , etc) without the need to duplicate the code
Hi @M3lkior,
This example is in some repository so I can analyze it in more detail.
This alternative will help me a lot.
Hi @M3lkior,
This example is in some repository so I can analyze it in more detail.
This alternative will help me a lot.
Nop, it is a part from my enterprise application.
@authorjapps I am facing the same issue. Here is the log when i run the test:
2022-03-03 11:25:24,863 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - Cluster ID: lkc-oow0x
2022-03-03 11:25:25,649 [main] ERROR org.jsmart.zerocode.core.kafka.send.KafkaSender - Error in sending record. Exception : org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string"
2022-03-03 11:25:25,653 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
@erobertolima121 is this the same issue you are facing?
hi @erobertolima121 ;
If it can help, i created a gist with my helper classes in order to support AVRO in the produce step
https://gist.github.com/M3lkior/aa4f2b21a46f2d45c84b09b5b0331930