pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

[bug] Schema validation not working: producing messages that are non-compliant with the schema is allowed

Open calindima opened this issue 1 year ago • 6 comments

Expected behavior

Setup:

  • topic has a schema uploaded to the registry
  • schemaEnforced is set to true
  • allowAutoUpdates is set to false
  • producer is created successfully with a JSON schema that follows the definition from the registry

Expected:

  • producer can only publish messages that are compliant with the schema

Actual behavior

The producer created with a schema can publish non-compliant messages. The payload is not validated against the schema.

Steps to reproduce

The first 3 steps from setup can be done through the admin API:

curl --request POST \
  --url $BASE_URL/admin/v2/schemas/$TENANT/$NAMESPACE/$TOPIC/schema \
  --header 'Content-Type: application/json' \
  --data '{
        "type": "JSON",
        "schema": "{\n  \"type\": \"record\",\n  \"name\": \"SchemaTest\",\n  \"fields\": [\n    {\n      \"name\": \"userName\",\n      \"type\": \"string\"\n    },\n    {\n      \"name\": \"userAge\",\n      \"type\": \"int\"\n    }\n  ]\n}",
        "properties": {}
    }'
curl --request POST \
  --url $BASE_URL/admin/v2/persistent/$TENANT/$NAMESPACE/$TOPIC/schemaValidationEnforced \
  --header 'Content-Type: application/json' \
  --data true
curl --request POST \
  --url $BASE_URL/admin/v2/namespaces/$TENANT/$NAMESPACE/isAllowAutoUpdateSchema \
  --header 'Content-Type: application/json' \
  --data false

The code:

package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
	pulsarauth "github.com/apache/pulsar-client-go/pulsar/auth"
	"github.com/google/uuid"
)

type SchemaTest struct {
	UserName string `json:"userName" avro:"userName"`
	UserAge  int    `json:"userAge" avro:"userAge"`
}

type WrongSchema struct {
	NotUserName string `json:"notUserName" avro:"notUserName"`
	NotUserAge  int    `json:"notUserAge" avro:"notUserAge"`
}

func main() {
	slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))

	// Load settings here or hard-code them

	token := "insert-your-token-here"
	authProvider := pulsarauth.NewAuthenticationToken(token)

	pulsarClient, err := client.CreatePulsarClient(settings.Url, authProvider)
	if err != nil {
		slog.Error("Error creating the pulsar client", err)
	}
	defer pulsarClient.Close()

	producerName := fmt.Sprintf("SchemaProducer-%s", uuid.NewString()[0:6])

	schemaDef := `{
  "type": "record",
  "name": "SchemaTest",
  "fields": [
    {
      "name": "userName",
      "type": "string"
    },
    {
      "name": "userAge",
      "type": "int"
    }
  ]
}`
	schemaExample := pulsar.NewJSONSchema(schemaDef, nil)

	producer, err := pulsarClient.CreateProducer(pulsar.ProducerOptions{
		Name:   producerName,
		Topic:  settings.TopicAddress,
		Schema: schemaExample,
	})
	if err != nil {
		slog.Error("Error creating the pulsar producer", "error", err)
	}
	defer producer.Close()

	for {
		//goodMsg := &SchemaTest{
		//	UserName: "JohnDoe",
		//	UserAge:  30,
		//}
		badMsg := &WrongSchema{
			NotUserName: "JohnDoe",
			NotUserAge:  30,
		}

		msgId, sendErr := producer.Send(context.Background(), &pulsar.ProducerMessage{
			//Value: goodMsg,
			Value: badMsg,
			//Schema: schemaExample,
		})
		if sendErr != nil {
			slog.Error("Error sending message", "error", sendErr)
			break
		}

		//slog.Info("Published message: ", slog.String("messageId", msgId.String()), slog.Any("message", goodMsg))
		slog.Info("Published message: ", slog.String("messageId", msgId.String()), slog.Any("message", badMsg))
		time.Sleep(time.Second * 1)
	}
}

The schema from the registry (http response dump):

{
	"version": 0,
	"type": "JSON",
	"timestamp": 1728915217796,
	"data": "{\n  \"type\": \"record\",\n  \"name\": \"SchemaTest\",\n  \"fields\": [\n    {\n      \"name\": \"userName\",\n      \"type\": \"string\"\n    },\n    {\n      \"name\": \"userAge\",\n      \"type\": \"int\"\n    }\n  ]\n}",
	"properties": {}
}

System configuration

Pulsar version: 3.0.6.8

calindima avatar Oct 15 '24 08:10 calindima

Pulsar does not validate the structure of JSON messages against the topic's schema. It only checks if the producer's schema definition matches the broker's topic schema. In contrast, the Avro schema validates the message structure during both encoding and decoding. This is the same for the Java client.

If you need to verify message structure compatibility when sending the messages, you could use the Avro Schema. Otherwise, you need to ensure the producer's schema matches the message structure.

Here's a similar example in Java:

        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        String schemaDef = "{\n" +
                "  \"type\": \"record\",\n" +
                "  \"name\": \"SchemaTest\",\n" +
                "  \"fields\": [\n" +
                "    { \"name\": \"userName\", \"type\": \"string\" },\n" +
                "    { \"name\": \"userAge\", \"type\": \"int\" }\n" +
                "  ]\n" +
                "}";

        SchemaInfo schemaInfo = SchemaInfo.builder()
                .name("SchemaTest")
                .type(SchemaType.JSON)
                .schema(schemaDef.getBytes())
                .build();

        GenericSchema<GenericRecord> schema = Schema.generic(schemaInfo);

        Producer<GenericRecord> producer = client.newProducer(schema)
                .topic("test-schema")
                .create();


        GenericRecord record = schema.newRecordBuilder()
                .set("notUserName", "Alice")
                .set("notUserAge", 30)
                .build();

        producer.send(record);

        System.out.println("Message sent!");

        producer.close();
        client.close();

The message will be sent successfully without validating the schema.

RobertIndie avatar Oct 21 '24 10:10 RobertIndie

I expected JsonSchema to function similarly to AvroSchema, since its very close to it - schema definition is Avro for both, only the type in SchemaInfo is what differs, and how message validation works apparently. Might be a naive view since I haven't looked at how they differ in the internals of Pulsar.

Shouldn't the client library ensure that messages sent with a Schema Producer adhere to its schema? This .NET library returns a typed producer which can't send other type of messages through it, for example.

My impression of the schema validation feature of Pulsar is that it should bring in some guardrails to get some contract between producers and consumers.

calindima avatar Oct 22 '24 07:10 calindima

I expected JsonSchema to function similarly to AvroSchema, since its very close to it - schema definition is Avro for both, only the type in SchemaInfo is what differs, and how message validation works apparently. Might be a naive view since I haven't looked at how they differ in the internals of Pulsar.

When the client encodes the Avro message, the Avro encoder automatically validates the schema. However, JSON schema encoding works differently. We do not pass the schema definition to the encoder. Instead, we encode from any object type to JSON bytes. The JSON encoder does not perform any validation. This's same for both the GO and Java client.

Shouldn't the client library ensure that messages sent with a Schema Producer adhere to its schema? This .NET library returns a typed producer which can't send other type of messages through it, for example. My impression of the schema validation feature of Pulsar is that it should bring in some guardrails to get some contract between producers and consumers.

For client SDKs such as .Net and Java, they can use generics class feature to restrict the type of messages sent or received by the producer or consumer. However, the Go producer and consumer do not use generics. In fact, they are more similar to the Java client's Producer<byte[]>. Since Go introduced struct generics from go 1.18, it might be worthwhile to implement a similar feature for the Go producer and consumer.

RobertIndie avatar Oct 29 '24 10:10 RobertIndie

When the client encodes the Avro message, the Avro encoder automatically validates the schema. However, JSON schema encoding works differently. We do not pass the schema definition to the encoder. Instead, we encode from any object type to JSON bytes. The JSON encoder does not perform any validation. This's same for both the GO and Java client.

Thanks for pointing it out, it was something that I saw and had me wondering why there's no validation. If I understand correctly, there's nothing to do for now besides accepting and working around it e.g. if validation is needed, to use AvroSchema instead.

For client SDKs such as .Net and Java, they can use generics class feature to restrict the type of messages sent or received by the producer or consumer. However, the Go producer and consumer do not use generics. In fact, they are more similar to the Java client's Producer<byte[]>. Since Go introduced struct generics from go 1.18, it might be worthwhile to implement a similar feature for the Go producer and consumer.

I think it would be interesting to offer this. For example, I noticed that although AvroSchema offers the validation we were looking for, you can just override schema completely by using Payload instead of Value to send a message, which means although you create a Schema Producer, you can still publish incompatible messages, which leaves a lot of checks in the hands of producers, instead of having them follow a paved path for this feature. I think it would be a nicer experience to have the library guide some of these use cases.

Thanks for taking the time to respond to all of this. I'm trying to understand better the existing functionality, whether there's something we can contribute and what needs to be taken as is.

calindima avatar Nov 07 '24 12:11 calindima

@RobertIndie

For example, I noticed that although AvroSchema offers the validation we were looking for, you can just override schema completely by using Payload instead of Value to send a message, which means although you create a Schema Producer, you can still publish incompatible messages, which leaves a lot of checks in the hands of producers, instead of having them follow a paved path for this feature.

Would it make sense for me to contribute some extra validation to not allow using Payload if a schema is used, ensuring that you can't override schema validation? I think this could be added here.

calindima avatar Nov 07 '24 14:11 calindima

Would it make sense for me to contribute some extra validation to not allow using Payload if a schema is used, ensuring that you can't override schema validation? I think this could be added here.

This would break the existing behavior. I'm afraid many users already use it that way.

RobertIndie avatar Nov 18 '24 12:11 RobertIndie