confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

The ID in a message is always `-1` when `UseSchemaID` in the `serde.Serializer` is set.

Open falau opened this issue 2 years ago • 1 comments

Description

From here: https://github.com/confluentinc/confluent-kafka-go/blob/561a26092b1678e2ddcddbff6b2720ee4e85aaf9/schemaregistry/serde/config.go#L20-L29

I expect that the behaviour of UseSchemaID is to tell a Serializer to use that ID regardless what's on a Schema Registry.

When invoking serde.Serialize()(I'm using the specific avro one), the embedded BaseSerializer.GetID would be used to get the schema ID of a message https://github.com/confluentinc/confluent-kafka-go/blob/561a26092b1678e2ddcddbff6b2720ee4e85aaf9/schemaregistry/serde/avro/avro_specific.go#L79

However, the condition for UseSchemaID is like the snippet below. There is no line actually assigning anything from either the query results or the UseSchemaID to the id variable in the same method: https://github.com/confluentinc/confluent-kafka-go/blob/561a26092b1678e2ddcddbff6b2720ee4e85aaf9/schemaregistry/serde/serde.go#L147-L158

Therefore, the GetID method is always like this when UseSchemaID >= 0:

	var id = -1
	...
	if elses... {
        }
        ...
	return id, nil

How to reproduce

With the specific avro serializer example, the issue can be reproduced via setting UseSchemaID to whatever >= 0:

        url := "https://myschemaregistry.example.com"
	client, err := schemaregistry.NewClient(schemaregistry.NewConfig(url))

	if err != nil {
		fmt.Printf("Failed to create schema registry client: %s\n", err)
		os.Exit(1)
	}
	serdeCfg := avro.NewSerializerConfig()
	serdeCfg.AutoRegisterSchemas = false
	serdeCfg.UseLatestVersion = false
	serdeCfg.UseSchemaID = 1234
	ser, err := avro.NewSpecificSerializer(client, serde.ValueSerde, serdeCfg)

	if err != nil {
		fmt.Printf("Failed to create serializer: %s\n", err)
		os.Exit(1)
	}
	var value MyStruct;
	payload, err := ser.Serialize(topic, &value)
        // the first 5 bytes would be [0,-1]
	...

Checklist

Please provide the following information:

  • [x] confluent-kafka-go and librdkafka version (LibraryVersion()): v1.9.2 / v1.9.2
  • [x] Apache Kafka broker version: irrelevant
  • [x] Client configuration: ConfigMap{...}: irrelevant
  • [x] Operating system: irrelevant
  • [x] Provide client logs (with "debug": ".." as necessary): irrelevant
  • [x] Provide broker log excerpts: irrelevant
  • [ ] Critical issue

falau avatar Aug 25 '22 09:08 falau

We too are facing this issue. Can this be because of this - https://github.com/confluentinc/confluent-kafka-go/blob/master/schemaregistry/serde/serde.go#L153

slice-dinesh avatar Sep 08 '22 13:09 slice-dinesh