kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

Implicit Conversion error from topic to Microsoft SQL

Open mikementzmaersk opened this issue 3 years ago • 4 comments

Hi Getting this error when trying to sink topic data to Microsoft SQL - any ideas of things to try to get around this please?

io.confluent.connect.jdbc.sink.JdbcSinkTask put - Write of 500 records failed, remainingRetries=0 java.sql.BatchUpdateException: Implicit conversion from data type nvarchar to varbinary(max) is not allowed. Use the CONVERT function to run this query. at com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.executeBatch(SQLServerPreparedStatement.java:2088) at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:221) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:187) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

mikementzmaersk avatar Nov 10 '21 18:11 mikementzmaersk

Can you share your sink connector config please?

mataralhawiti avatar Nov 25 '21 09:11 mataralhawiti

Sure :-)

Pretty basic... I think it could be something around null values..

{ "name": "sqlsink1", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics.regex": "sqlserver1.dbo.", "connection.url": "jdbc:sqlserver://sqlserver1:1433;database=test;user=kafka1;password=redacted;encrypt=true", "transforms": "unwrap,striptopic", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.striptopic.type" : "org.apache.kafka.connect.transforms.RegexRouter", "transforms.striptopic.regex" : "sqlserver1.dbo.(.)", "transforms.striptopic.replacement" : "$1", "auto.create": "true", "auto.evolve": "true", "insert.mode": "upsert", "delete.enabled": "true", "pk.mode": "record_key" } }

mikementzmaersk avatar Dec 08 '21 14:12 mikementzmaersk

Thanks. from this :

Implicit conversion from data type nvarchar to varbinary(max) is not allowed. Use the CONVERT function to run this query.

It seems you're trying to inset (nvarchar ) field into (varbinary) in the destination table. Checking your message schema in Kafka would be helpful.

Did you create the table in destination upfront or was it created by the connector ?

mataralhawiti avatar Dec 09 '21 18:12 mataralhawiti

I've run into this problem too. It's caused by a message containing a null value for a nullable bytes field.

Here's a script to reproduce it:

#!/bin/sh -ex

curl -sO https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.0.1-post/cp-all-in-one/docker-compose.yml

cat <<EOF >>docker-compose.yml
  sqlserver:
    container_name: sqlserver
    image: mcr.microsoft.com/mssql/server:2019-latest
    ports:
     - 1433:1433
    environment:
     - ACCEPT_EULA=Y
     - MSSQL_AGENT_ENABLED=true
     - MSSQL_PID=Standard
     - SA_PASSWORD=Password!
EOF

docker-compose up -d connect sqlserver

docker exec -i connect sh -e <<EOF
curl -sO https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.6/confluentinc-kafka-connect-jdbc-10.2.6.zip
python -m zipfile -e confluentinc-kafka-connect-jdbc-10.2.6.zip /usr/share/java
EOF

# Wait for Connect.
sh -c 'while ! curl -fs localhost:8083; do sleep 1; done'

curl -H Content-Type:application/json -d @- \
  http://localhost:8083/connectors <<EOF
{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:sqlserver://sqlserver:1433",
    "connection.user": "sa",
    "connection.password": "Password!",
    "auto.create": "true",
    "pk.fields": "id",
    "pk.mode": "record_value",
    "topics": "jdbc-sink"
  }
}
EOF

# In case this timed out waiting for broker to start.
docker-compose up -d schema-registry
sh -c 'while ! curl -fs localhost:8081; do sleep 1; done'

echo '{"id": 1, "nullableBytes": null}' | 
  docker exec -i schema-registry kafka-avro-console-producer \
    --bootstrap-server broker:29092 --topic jdbc-sink \
    --property value.schema='
{
  "type": "record",
  "namespace": "test",
  "name": "Value",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "nullableBytes",
      "type": [
        "null",
        "bytes"
      ]
    }
  ]
}
'

nwt avatar Jan 06 '22 18:01 nwt