kafka-connect-solr icon indicating copy to clipboard operation
kafka-connect-solr copied to clipboard

Problems using Kafka Connect Solr - Invalid Value type java.lang.String is not a supported type.

Open crojascreangel opened this issue 8 months ago • 0 comments

Hi all !!

@jcustenborder @didiez @ilosamart

Currently, I'm trying to create a connector that takes messages from the Kafka topic and sends them to Solr standalone core. Topic messages were created using jcustenborder spooldir csv and linebyline connectors. All messages were parsed to Avro and have a schema in the schema registry service, as shown in the image below.

image

My problem is the next.

ERROR [solr_connector_test_0046|task-0] WorkerSinkTask{id=solr_connector_test_0046-0} Task threw an uncaught and unrecoverable exception. The task is being killed and will not recover until manually restarted. Error: Only Schema (org.apache.kafka.connect.data.Struct) or Schema less (java.util.Map) are supported. java.lang.String is not a supported type. (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

According to the error, it seems that messages should be stored with schema. I think that these are stored in this way. I checked a lot of forums and the unique recommendation that I found is to change the key converter of the source connector to storage.string but when I try this it doesn't; work. I'm confused with the error also due that messages are not pure strings.

My source connector has the next config

{ "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector", "processing.file.extension": ".*\\.csv", "csv.first.row.as.header": "True", "finished.path": "/data/CREANGEL/administrator/DATA_DRUID_006/processed", "schema.generation.enabled": "true", "value.converter.schema.registry.url": "http://schema-registry:8081", "input.file.pattern": ".*\\.csv", "name": "CREANGEL_administrator_DATA_DRUID_006_csv", "topic": "CREANGEL_administrator_DATA_DRUID_006_csv", "error.path": "/data/CREANGEL/administrator/DATA_DRUID_006/error", "input.path": "/data/CREANGEL/administrator/DATA_DRUID_006/unprocessed", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081" }

My sink connector has the next configuration

{ "connector.class": "com.github.jcustenborder.kafka.connect.solr.HttpSolrSinkConnector", "solr.url": "http://192.168.230.94:15555/", "topics": "CREANGEL_administrator_DATA_DRUID_011_csv", "name": "solr_connector_test_0046" }

My Kafka connect was deployed in a docker container. I use the image confluentinc/cp-kafka-connect-base:6.1.0 I installed the Solr connector manually as you suggested in the Solr sink connector documentation.

crojascreangel avatar Oct 20 '23 21:10 crojascreangel