snowflake-kafka-connector
snowflake-kafka-connector copied to clipboard
SCHEMA EVOLUTION: Converting a String to Number Column
Hi,
I'm explicitly making a Date fields in my JDBC connector to be loaded as STRING in Topics. When using Snowflake Sink connector with schema evolution mode, it is creating Table for me that is different from what is specified in Topics. It is converting that String field into a Number (original value is a DATE), so when DATE gets inserted to the NUMBER field, it fails! Any suggestions?
ERROR LOG
due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:624)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat
java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.apache.kafka.connect.errors.DataException:
Error inserting Records using Streaming API with msg:The given row cannot
be converted to the internal format due to invalid value: Value cannot be
ingested into Snowflake column DATECOLUMNASSTRING of type NUMBER, Row Index: 0,
reason: Not a valid number\n\tat com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.handleInsertRowsFailures(TopicPartitionChannel.java:744)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:512)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRecordToBuffer(TopicPartitionChannel.java:338)\n\tat
com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:299)\n\tat
com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:267)\n\tat
com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:301)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)\n\t...
11 more\nCaused by: net.snowflake.ingest.utils.SFException: The given row
cannot be converted to the internal format due to invalid value: Value cannot
be ingested into Snowflake column DATECOLUMNASSTRING of type NUMBER, Row Index:
0, reason: Not a valid number\n\tat net.snowflake.ingest.streaming.internal.DataValidationUtil.valueFormatNotAllowedException(DataValidationUtil.java:884)\n\tat
net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBigDecimal(DataValidationUtil.java:557)\n\tat
net.snowflake.ingest.streaming.internal.ParquetValueParser.getSb16Value(ParquetValueParser.java:327)\n\tat
net.snowflake.ingest.streaming.internal.ParquetValueParser.parseColumnValueToParquet(ParquetValueParser.java:163)\n\tat
net.snowflake.ingest.streaming.internal.ParquetRowBuffer.addRow(ParquetRowBuffer.java:203)\n\tat
net.snowflake.ingest.streaming.internal.ParquetRowBuffer.addRow(ParquetRowBuffer.java:148)\n\tat
net.snowflake.ingest.streaming.internal.AbstractRowBuffer.insertRows(AbstractRowBuffer.java:306)\n\tat
net.snowflake.ingest.streaming.internal.ParquetRowBuffer.insertRows(ParquetRowBuffer.java:35)\n\tat
net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal.insertRows(SnowflakeStreamingIngestChannelInternal.java:362)\n\tat
net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal.insertRow(SnowflakeStreamingIngestChannelInternal.java:328)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:628)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:580)\n\tat
dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)\n\tat dev.failsafe.Functions.lambda$get$0(Functions.java:46)\n\tat
dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)\n\tat
dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:182)\n\tat
dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:438)\n\tat dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:115)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRowsWithFallback(TopicPartitionChannel.java:574)\n\tat
com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:500)\n\t...
16 more\n"```
@sfc-gh-tzhang hi, can you comment on this
@imrohankataria Are you using JSON or AVRO? And not sure if I fully understand your question, do you mean the field has a type of String but schema evolution creates a column as number?