ballerina-library icon indicating copy to clipboard operation
ballerina-library copied to clipboard

NPE when using Schema registry configurations with Kafka

Open gayaldassanayake opened this issue 1 month ago • 2 comments

Description

When using the confluent schema registry to validate Kafka messages, I get an NPE when I use an invalid configuration (eg: invalid API key), instead of the real error, as shown below.

xception in thread "" java.lang.NullPointerException: Cannot invoke "io.ballerina.runtime.api.Module.getOrg()" because "module" is null
        at io.ballerina.runtime.internal.values.ValueCreator.getLookupKey(ValueCreator.java:74)
        at io.ballerina.runtime.api.creators.ErrorCreator.createError(ErrorCreator.java:166)
        at io.ballerina.stdlib.constraint.ErrorUtils.createError(ErrorUtils.java:127)
        at io.ballerina.stdlib.constraint.ErrorUtils.buildTypeConversionError(ErrorUtils.java:113)
        at io.ballerina.stdlib.constraint.Constraints.validate(Constraints.java:57)
        at io.ballerina.stdlib.kafka.utils.KafkaUtils.validateConstraints(KafkaUtils.java:1046)
        at io.ballerina.stdlib.kafka.utils.KafkaUtils.getValuesWithIntendedType(KafkaUtils.java:1079)
        at io.ballerina.stdlib.kafka.impl.KafkaListenerImpl.getResourceParameters(KafkaListenerImpl.java:213)
        at io.ballerina.stdlib.kafka.impl.KafkaListenerImpl.lambda$executeResource$0(KafkaListenerImpl.java:129)
        at java.base/java.lang.VirtualThread.run(VirtualThread.java:329)

I did further debugging in the code. The error occurs in KafkaUtils.validateConstraints[1] when it tries to validate the response type with the ballerina/constraint module utils. Here, when trying to access ModuleUtils.getModule()[2] which returns an NPE since module hasn't been set before hand.

[1] https://github.com/ballerina-platform/module-ballerinax-kafka/blob/5fb9c7bdf08db5df4e97d770c801d11f16577e52/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java#L1046

[2] https://github.com/ballerina-platform/module-ballerina-constraint/blob/9abc12328b33e42e83d145388b7e417ed8e94078/native/src/main/java/io/ballerina/stdlib/constraint/ErrorUtils.java#L127

Steps to Reproduce

  1. Run the below code.
listener kafka:Listener orderListener = new ("<BOOTSTRAP_URL>", {
    groupId: "bal-listener-group-id",
    topics: "students",
    securityProtocol: kafka:PROTOCOL_SASL_SSL,

    auth: {
        mechanism: kafka:AUTH_SASL_PLAIN,
        username: "<KAFKA_API_KEY>", 
        password: "<KAFKA_API_SECRET>" 
    },
    schemaRegistryUrl: baseUrl,
    schemaRegistryConfig: {
        "baseUrl": baseUrl,
        "originals":{
               "basic.auth.credentials.source": "USER_INFO",
               "schema.registry.basic.auth.user.info": "<SR_API_KEY>:<SR_API_SECRET>"
        },
        "headers":{}
    },


    keyDeserializerType: kafka:DES_AVRO,
    valueDeserializerType: kafka:DES_AVRO,
    offsetReset: kafka:OFFSET_RESET_EARLIEST
});

service on orderListener {

    remote function onConsumerRecord(anydata[] data) returns error? {
        io:println("Received data: ", data.toBalString());
        Student[] students = data.'map(value => check value.cloneWithType(Student));
        foreach Student student in students {
            io:println("Received: ", student);
        }
    }
}

Version

4.6.1

Environment Details (with versions)

Ballerina 2201.12.10

gayaldassanayake avatar Oct 25 '25 08:10 gayaldassanayake

I encountered the same issue when using the above code. However after changing the parameter type in the onConsumerRecord function from anydata[] to kafka:AnydataConsumerRecord[] the issue no longer occurred.

remote function onConsumerRecord(kafka:AnydataConsumerRecord[] data) returns error? {
    // ...
}

Nuvindu avatar Oct 25 '25 15:10 Nuvindu

@Nuvindu I understand anydata[] is a too generic of a data-type. But ideally the code should not break because of that.

ayeshLK avatar Oct 25 '25 15:10 ayeshLK