confluent-kafka-go
confluent-kafka-go copied to clipboard
The ID in a message is always `-1` when `UseSchemaID` in the `serde.Serializer` is set.
Description
From here: https://github.com/confluentinc/confluent-kafka-go/blob/561a26092b1678e2ddcddbff6b2720ee4e85aaf9/schemaregistry/serde/config.go#L20-L29
I expect that the behaviour of UseSchemaID
is to tell a Serializer to use that ID regardless what's on a Schema Registry.
When invoking serde.Serialize()
(I'm using the specific avro one), the embedded BaseSerializer.GetID
would be used to get the schema ID of a message
https://github.com/confluentinc/confluent-kafka-go/blob/561a26092b1678e2ddcddbff6b2720ee4e85aaf9/schemaregistry/serde/avro/avro_specific.go#L79
However, the condition for UseSchemaID
is like the snippet below. There is no line actually assigning anything from either the query results or the UseSchemaID
to the id
variable in the same method:
https://github.com/confluentinc/confluent-kafka-go/blob/561a26092b1678e2ddcddbff6b2720ee4e85aaf9/schemaregistry/serde/serde.go#L147-L158
Therefore, the GetID method is always like this when UseSchemaID
>= 0:
var id = -1
...
if elses... {
}
...
return id, nil
How to reproduce
With the specific avro serializer example, the issue can be reproduced via setting UseSchemaID
to whatever >= 0:
url := "https://myschemaregistry.example.com"
client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url))
if err != nil {
fmt.Printf("Failed to create schema registry client: %s\n", err)
os.Exit(1)
}
serdeCfg := avro.NewSerializerConfig()
serdeCfg.AutoRegisterSchemas = false
serdeCfg.UseLatestVersion = false
serdeCfg.UseSchemaID = 1234
ser, err := avro.NewSpecificSerializer(client, serde.ValueSerde, serdeCfg)
if err != nil {
fmt.Printf("Failed to create serializer: %s\n", err)
os.Exit(1)
}
var value MyStruct;
payload, err := ser.Serialize(topic, &value)
// the first 5 bytes would be [0,-1]
...
Checklist
Please provide the following information:
- [x] confluent-kafka-go and librdkafka version (
LibraryVersion()
): v1.9.2 / v1.9.2 - [x] Apache Kafka broker version: irrelevant
- [x] Client configuration:
ConfigMap{...}
: irrelevant - [x] Operating system: irrelevant
- [x] Provide client logs (with
"debug": ".."
as necessary): irrelevant - [x] Provide broker log excerpts: irrelevant
- [ ] Critical issue
We too are facing this issue. Can this be because of this - https://github.com/confluentinc/confluent-kafka-go/blob/master/schemaregistry/serde/serde.go#L153