confluent-schema-registry icon indicating copy to clipboard operation
confluent-schema-registry copied to clipboard

Problem using Kafka Connect and KSQL

Open thiagoananias opened this issue 3 years ago • 2 comments

Hello!

I'am using Kafka on Kubernetes using the helm chart

I'am using this code to insert the data on the Cluster, very simple

The producer

'use strict';

const { Kafka } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');
const { v4: uuidv4 } = require('uuid');

const kafka = new Kafka({
  clientId: 'my-app',
  connectionTimeout: 1000,
  brokers: [
    'kafka-confluent-cp-kafka.kafka.svc.cluster.local:9092',
  ],
});

const registry = new SchemaRegistry({ host: 'http://kafka-confluent-cp-schema-registry.kafka.svc.cluster.local:8081' });
const { readAVSCAsync } = require('@kafkajs/confluent-schema-registry');

const producer = kafka.producer();
const admin = kafka.admin();

const topic = 'customer';

const run = async () => {

  const schema = await readAVSCAsync('avro/customer.avsc');

  const data = await registry.register(schema);
  const userId = data.id;
  console.info(`User Avro registered ==> ${data.id}`);

  await admin.connect();
  await admin.createTopics({
    topics: [
      {
        topic,
        numPartitions: 3,
        replicationFactor: 3,
      },
    ],
  });
  await admin.disconnect();

  const payload = { name: 'Joker', favorite_number: 10, favorite_color: 'Purple' };
  const encodedPayload = await registry.encode(userId, payload);

  // Producing
  await producer.connect();
  await producer.send({
    topic,
    messages: [
      { key: uuidv4(), value: encodedPayload },
    ],
  });

  producer.disconnect();
  console.info('Success!');

};

run().catch(console.error);

The client

'use strict';

const { Kafka, logLevel } = require('kafkajs');
const { SchemaRegistry } = require('@kafkajs/confluent-schema-registry');

const kafka = new Kafka({
  clientId: 'my-app',
  connectionTimeout: 1000,
  brokers: [
    'kafka-confluent-cp-kafka.kafka.svc.cluster.local:9092',
  ],
});

const registry = new SchemaRegistry({ host: 'http://kafka-confluent-cp-schema-registry.kafka.svc.cluster.local:8081' });
const consumer = kafka.consumer({ groupId: 'test-group' });

const run = async () => {

  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: 'customer', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ partition, message }) => {
      try {
        const ms = await registry.decode(message.value);
        console.log({
          partition,
          offset: message.offset,
          key: message.key.toString(),
          value: ms,
        });
        console.log('CONSUMIU!!');
      } catch (error) {
        console.info(error);
      }
    },
  });

};

run().catch(console.error);

the avro

{
   "namespace": "io.confluent.ksql.avro_schemas",
   "type": "record",
   "name": "customer",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["null", "int"]},
      {"name": "favorite_color", "type": ["null", "string"]}
   ]
 }

The thing is... all works fine until i have to use Kafka Connect and ksql

The Kafka connect Problem: i have managed to create a connector to get the messages and send to MongoDB, but why i need the 'value.converter.value.subject.name.strategy' property to work??

{
    "name": "mongo-user-sink-connector",
    "config": {
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",

        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.deserializer":"org.apache.kafka.common.serialization.StringDerserializer",

        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://kafka-confluent-cp-schema-registry:8081",
        "value.converter.value.subject.name.strategy":"io.confluent.kafka.serializers.subject.RecordNameStrategy",

        "tasks.max":"1",
        "topics":"customer",
        "connection.uri":"mongoURI",
        "database":"healthcheck",
        "collection":"kafka-sink-collection"

    }
}

The ksql problem are two... first i can't create a simple stream, the following erros occurs

CREATE STREAM CUSTOMER WITH (KAFKA_TOPIC='customer', VALUE_FORMAT='AVRO');",
Schema for message values on topic customer does not exist in the Schema Registry.Subject: customer-value\nPossible causes include:\n- The topic itself does not exist\t-> Use SHOW TOPICS; to check\n- Messages on the topic are not serialized using a format Schema Registry supports\t-> Use PRINT 'customer' FROM BEGINNING; to verify\n- Messages on the topic have not been serialized using a Confluent Schema Registry supported serializer\t-> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html\n- The schema is registered on a different instance of the Schema Registry\t-> Use the REST API to list available subjects\thttps://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects\n- You do not have permissions to access the Schema Registry.Subject: customer-value\t-> See https://docs.confluent.io/current/schema-registry/docs/security.html

When i list the subjects from the schema-registry the is no 'customer-value' registered! Is this a problem??

Even after a add it manually it gives me the error on the ksql server when i run a query WARN stream-thread [_confluent-ksql-kafka-confluenttransient_5604733824524432010_1610479904778-81cfb00c-5d81-41f4-93ff-59d801a57908-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[customer] partition=[0] offset=[4] (org.apache.kafka.streams.processor.internals.RecordDeserializer) org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: customer Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic customer to Avro:

Thanks for the help

thiagoananias avatar Jan 12 '21 19:01 thiagoananias

I solved my issue

when creating the schema, for example:

const schema = await readAVSCAsync('avro/car.avsc'); const { id } = await registry.register(schema, { subject: 'car-value' });

I used the subject value to override the default behavior, which is concatenate the namespace and name of the avro.

But i didn't get exaclty why... is this a pattern on Confluent Kafka?

thiagoananias avatar Jan 12 '21 20:01 thiagoananias

Since no one bothered to answer this - Confluent provide 3 ways to map from a topic to a schema registry topic. You MUST adhere to one of their 3 strategies since confluent tools such as ksqlDB and connect must be able to resolve the schema for a given topic.

The default is to use the name of the topic as the subject. (TopicNameStrategy) You can also use RecordNameStrategy (uses the name of the message type - namespace.name for avro, the import URL for proto) Last is TopicRecordNameStrategy, which combines the topic name with the record name like {topic}-{record name}.

All confluent tools allow you to specify which strategy is in use for a given topic. Details are here: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#sr-schemas-subject-name-strategy

ideasculptor avatar Aug 25 '21 23:08 ideasculptor