schema-registry
schema-registry copied to clipboard
errorType:'READER_FIELD_MISSING_DEFAULT_VALUE' while checking compatibility
In the official document with BACKWARD compatibility, we can add an optional field to an old schema, which is ignored by default value, here is my code for the client to check schema compatibility:
- client.go
package compatibility
import (
"bytes"
"context"
"fmt"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/registry"
jsoniter "github.com/json-iterator/go"
"io"
"net"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
)
// credentials are used to store the basic auth credentials.
type credentials struct {
username string
password string
}
// Client is a client for the schema registry.
type Client struct {
client *http.Client
base *url.URL
creds credentials
}
// Error is returned by the registry when there is an error.
type Error struct {
StatusCode int `json:"-"`
Code int `json:"error_code"`
Message string `json:"message"`
}
type ClientFunc func(*Client)
// Request is the request to test compatibility.
type Request struct {
Schema string `json:"schema"`
References []registry.SchemaReference `json:"references"`
}
// Response is the response from the compatibility check.
type Response struct {
IsCompatible bool `json:"is_compatible"`
Messages []string `json:"messages"`
}
// defaultClient is the default HTTP client.
var defaultClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 15 * time.Second,
KeepAlive: 90 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 3 * time.Second,
IdleConnTimeout: 90 * time.Second,
},
Timeout: 10 * time.Second,
}
const contentType = "application/vnd.schemaregistry.v1+json"
// WithBasicAuth sets the basic auth credentials for the client.
func WithBasicAuth(username, password string) ClientFunc {
return func(c *Client) {
c.creds = credentials{username: username, password: password}
}
}
// WithHTTPClient sets the HTTP client for the client.
func WithHTTPClient(client *http.Client) ClientFunc {
return func(c *Client) {
c.client = client
}
}
// NewClient creates a new schema registry client.
func NewClient(baseURL string, opts ...ClientFunc) (*Client, error) {
u, err := url.Parse(baseURL)
if err != nil {
return nil, err
}
if !strings.HasSuffix(u.Path, "/") {
u.Path += "/"
}
c := &Client{
client: defaultClient,
base: u,
}
for _, opt := range opts {
opt(c)
}
return c, nil
}
// request performs a request to the schema registry.
func (c *Client) request(ctx context.Context, method, path string, in, out any) error {
var body io.Reader
if in != nil {
b, _ := jsoniter.Marshal(in)
body = bytes.NewReader(b)
}
// These errors are not possible as we have already parse the base URL.
u, _ := c.base.Parse(path)
req, _ := http.NewRequestWithContext(ctx, method, u.String(), body)
req.Header.Set("Content-Type", contentType)
if len(c.creds.username) > 0 || len(c.creds.password) > 0 {
req.SetBasicAuth(c.creds.username, c.creds.password)
}
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("could not perform request: %w", err)
}
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
if resp.StatusCode >= http.StatusBadRequest {
err := Error{StatusCode: resp.StatusCode}
_ = jsoniter.NewDecoder(resp.Body).Decode(&err)
return err
}
if out != nil {
return jsoniter.NewDecoder(resp.Body).Decode(out)
}
return nil
}
// TestCompatibility tests the compatibility of a schema with the schema registry.
//
// - The schema is tested against the subject and version provided.
//
// - If the schema is compatible with the subject and version, the response will be true.
//
// - If the schema is not compatible with the subject and version, the response will be false
// and the reason will be provided in the messages.
func (c *Client) TestCompatibility(
ctx context.Context,
subject,
version string,
schema avro.Schema,
references []registry.SchemaReference,
verbose bool,
) (response Response, err error) {
p := path.Join("compatibility", "subjects", subject, "versions", version)
if verbose {
p += "?verbose=true"
}
requestBody := Request{
Schema: schema.String(),
References: references,
}
if err := c.request(ctx, http.MethodPost, p, requestBody, &response); err != nil {
return response, err
}
return response, nil
}
// Error returns the error message.
func (e Error) Error() string {
if e.Message != "" {
return e.Message
}
return "registry error: " + strconv.Itoa(e.StatusCode)
}
- before suit test setup:
package compatibility
import (
"github.com/hamba/avro/v2/registry"
"github.com/testcontainers/testcontainers-go/network"
"source.vtvlive.vn/cellutions/testutils/containers/common"
"source.vtvlive.vn/cellutions/testutils/containers/kafka"
"source.vtvlive.vn/cellutions/testutils/containers/schemaregistry"
"source.vtvlive.vn/cellutions/testutils/containers/zookeeper"
"testing"
"time"
"context"
"github.com/docker/docker/client"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var dockerClient *client.Client
var logFollower *common.LogFollower
var registryClient *registry.Client
var networkName string
var compatibilityClient *Client
func TestCompatibility(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Compatibility Suite")
}
var _ = BeforeSuite(func() {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Minute)
nw, err := network.New(ctx)
Expect(err).ToNot(HaveOccurred())
DeferCleanup(nw.Remove, context.Background())
networkName = nw.Name
dockerClient, err = common.NewDockerClient(ctx)
Expect(err).ToNot(HaveOccurred())
DeferCleanup(dockerClient.Close)
logFollower, err = common.NewLogFollower(dockerClient, 2)
Expect(err).ToNot(HaveOccurred())
DeferCleanup(logFollower.Close)
zookeeperC, err := zookeeper.ZooKeeperContainer(ctx, dockerClient, logFollower, networkName)
Expect(err).ToNot(HaveOccurred())
DeferCleanup(zookeeperC.Terminate, context.Background())
kafkaC, err := kafka.NewKafkaCluster(
ctx, dockerClient, logFollower, networkName,
zookeeperC.InternalEndpoint(zookeeper.ZookeeperClientPort, ""), 1)
Expect(err).ToNot(HaveOccurred())
DeferCleanup(kafkaC.Terminate, context.Background())
brokerEndpoint, err := kafkaC.InternalEndpoint(0, kafka.KafkaBrokerPort, "")
Expect(err).ToNot(HaveOccurred())
schemaRegistryC, err := schemaregistry.SchemaRegistryContainer(ctx, dockerClient, logFollower, networkName, brokerEndpoint)
Expect(err).ToNot(HaveOccurred())
DeferCleanup(schemaRegistryC.Terminate, context.Background())
srEndpoint, err := schemaRegistryC.ExternalEndpoint("8085/tcp", "http")
Expect(err).ToNot(HaveOccurred())
registryClient, err = registry.NewClient(srEndpoint)
Expect(err).ToNot(HaveOccurred())
compatibilityClient, err = NewClient(srEndpoint)
err = registryClient.SetGlobalCompatibilityLevel(ctx, "BACKWARD")
Expect(err).ToNot(HaveOccurred())
})
- client_test.go
package compatibility
import (
"context"
"github.com/brianvoe/gofakeit/v6"
"github.com/hamba/avro/v2"
"github.com/hamba/avro/v2/registry"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"source.vtvlive.vn/cellutions/cases"
)
var _ = Describe("Client", Ordered, func() {
var subject string
var references []registry.SchemaReference
Describe("TestCompatibility", func() {
When("the old schema does not use references", func() {
BeforeAll(func() {
subject = cases.ToKebab(gofakeit.LoremIpsumSentence(4))
references = make([]registry.SchemaReference, 0)
oldSchema := recordSchema(
"seenow.dev.temp",
`
{
"type": "record",
"name": "SimpleRecord",
"fields": [
{
"name": "name",
"type": "string"
}
]
}`)
_, _, err := registryClient.CreateSchema(context.Background(), subject, oldSchema.String())
Expect(err).ToNot(HaveOccurred())
})
DescribeTable("should works",
func(newSchema avro.Schema, isCompatible bool) {
ctx := context.Background()
res, err := compatibilityClient.TestCompatibility(ctx, subject, "latest", newSchema, references, true)
Expect(err).ToNot(HaveOccurred())
Expect(res.IsCompatible).To(Equal(isCompatible))
},
Entry("add an optional field to a record",
recordSchema("seenow.dev.temp",
`
{
"type": "record",
"name": "SimpleRecord",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": ["int", "null"],
"default": 0
}
]
}`), false,
),
Entry("delete a field from a record", Skip),
Entry("add non-optional field to a record", Skip),
)
})
When("the old schema is a union referencing records", func() {
DescribeTable("should works",
func() {
},
Entry("add an optional field to a record", Skip),
Entry("delete a field from a record", Skip),
Entry("add non-optional field to a record", Skip),
)
})
})
})
func recordSchema(namespace, schemaString string) *avro.RecordSchema {
GinkgoHelper()
defer GinkgoRecover()
schema, err := avro.ParseWithCache(schemaString, namespace, avro.DefaultSchemaCache)
Expect(err).ToNot(HaveOccurred())
return schema.(*avro.RecordSchema)
}
Im expect IsCompatible is return to true
but the value is false
and the error message is:
0 = {string} "{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field 'age' at path '/fields/1' in the new schema has no default value and is missing in the old schema', additionalInfo:'age'}"
1 = {string} "{oldSchemaVersion: 1}"
2 = {string} "{oldSchema: '{"type":"record","name":"SimpleRecord","namespace":"seenow.dev.temp","fields":[{"name":"name","type":"string"}]}'}"
3 = {string} "{compatibility: 'BACKWARD'}"
Am I wrong?