ballerina-library
ballerina-library copied to clipboard
NPE when using Schema registry configurations with Kafka
Description
When using the confluent schema registry to validate Kafka messages, I get an NPE when I use an invalid configuration (eg: invalid API key), instead of the real error, as shown below.
xception in thread "" java.lang.NullPointerException: Cannot invoke "io.ballerina.runtime.api.Module.getOrg()" because "module" is null
at io.ballerina.runtime.internal.values.ValueCreator.getLookupKey(ValueCreator.java:74)
at io.ballerina.runtime.api.creators.ErrorCreator.createError(ErrorCreator.java:166)
at io.ballerina.stdlib.constraint.ErrorUtils.createError(ErrorUtils.java:127)
at io.ballerina.stdlib.constraint.ErrorUtils.buildTypeConversionError(ErrorUtils.java:113)
at io.ballerina.stdlib.constraint.Constraints.validate(Constraints.java:57)
at io.ballerina.stdlib.kafka.utils.KafkaUtils.validateConstraints(KafkaUtils.java:1046)
at io.ballerina.stdlib.kafka.utils.KafkaUtils.getValuesWithIntendedType(KafkaUtils.java:1079)
at io.ballerina.stdlib.kafka.impl.KafkaListenerImpl.getResourceParameters(KafkaListenerImpl.java:213)
at io.ballerina.stdlib.kafka.impl.KafkaListenerImpl.lambda$executeResource$0(KafkaListenerImpl.java:129)
at java.base/java.lang.VirtualThread.run(VirtualThread.java:329)
I did further debugging in the code. The error occurs in KafkaUtils.validateConstraints[1] when it tries to validate the response type with the ballerina/constraint module utils. Here, when trying to access ModuleUtils.getModule()[2] which returns an NPE since module hasn't been set before hand.
[1] https://github.com/ballerina-platform/module-ballerinax-kafka/blob/5fb9c7bdf08db5df4e97d770c801d11f16577e52/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java#L1046
[2] https://github.com/ballerina-platform/module-ballerina-constraint/blob/9abc12328b33e42e83d145388b7e417ed8e94078/native/src/main/java/io/ballerina/stdlib/constraint/ErrorUtils.java#L127
Steps to Reproduce
- Run the below code.
listener kafka:Listener orderListener = new ("<BOOTSTRAP_URL>", {
groupId: "bal-listener-group-id",
topics: "students",
securityProtocol: kafka:PROTOCOL_SASL_SSL,
auth: {
mechanism: kafka:AUTH_SASL_PLAIN,
username: "<KAFKA_API_KEY>",
password: "<KAFKA_API_SECRET>"
},
schemaRegistryUrl: baseUrl,
schemaRegistryConfig: {
"baseUrl": baseUrl,
"originals":{
"basic.auth.credentials.source": "USER_INFO",
"schema.registry.basic.auth.user.info": "<SR_API_KEY>:<SR_API_SECRET>"
},
"headers":{}
},
keyDeserializerType: kafka:DES_AVRO,
valueDeserializerType: kafka:DES_AVRO,
offsetReset: kafka:OFFSET_RESET_EARLIEST
});
service on orderListener {
remote function onConsumerRecord(anydata[] data) returns error? {
io:println("Received data: ", data.toBalString());
Student[] students = data.'map(value => check value.cloneWithType(Student));
foreach Student student in students {
io:println("Received: ", student);
}
}
}
Version
4.6.1
Environment Details (with versions)
Ballerina 2201.12.10
I encountered the same issue when using the above code. However after changing the parameter type in the onConsumerRecord function from anydata[] to kafka:AnydataConsumerRecord[] the issue no longer occurred.
remote function onConsumerRecord(kafka:AnydataConsumerRecord[] data) returns error? {
// ...
}
@Nuvindu I understand anydata[] is a too generic of a data-type. But ideally the code should not break because of that.