cdc-apache-cassandra
cdc-apache-cassandra copied to clipboard
Need to restart Cassandra node if drop + create table with different primary key fields
Let's say you create a table table1
in this way (key text, value text, primary key (text)) with cdc=true
. Then the agent will cache the schema for the primary keys in memory. If the user drop the table and recreate in with the same name and in the same keyspace but with different primary keys fields, you'll get an error like:
2022-07-08 08:25:13,561 CommitLogReadHandlerImpl.java:468 - Invalid primary key schema:
com.datastax.oss.cdc.agent.exceptions.CassandraConnectorSchemaException: Not a valid schema field: key
at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.buildAvroKey(AbstractPulsarMutationSender.java:237)
at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.sendMutationAsync(AbstractPulsarMutationSender.java:256)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.sendAsync(CommitLogReadHandlerImpl.java:461)
at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:32)
at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:24)
at com.datastax.oss.cdc.agent.AbstractMutationMaker.insert(AbstractMutationMaker.java:28)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleRowModifications(CommitLogReadHandlerImpl.java:344)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.process(CommitLogReadHandlerImpl.java:303)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleMutation(CommitLogReadHandlerImpl.java:238)
at org.apache.cassandra.db.commitlog.CommitLogReader.readMutation(CommitLogReader.java:477)
at org.apache.cassandra.db.commitlog.CommitLogReader.readSection(CommitLogReader.java:396)
at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:243)
at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:146)
at com.datastax.oss.cdc.agent.CommitLogReaderServiceImpl$1.run(CommitLogReaderServiceImpl.java:72)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
After that the agent is not able to go forward and you have to restart the agent (therefore the node) to make it work again. Note that other tables still continue to be sent to Pulsar
Even after the node restarts, the dirty topic schema should be updated otherwise you'll get
ERROR [CdcCommitlogProcessor:3] 2022-07-08 08:35:54,658 AbstractPulsarMutationSender.java:221 - Failed to get a pulsar producer
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema:
{
"type" : "record",
"name" : "table1",
"namespace" : "ks1",
"doc" : "Primary key schema for table ks1.table1",
"fields" : [ {
"name" : "key",
"type" : "string"
} ]
}
using schema:
{
"type" : "record",
"name" : "table1",
"namespace" : "ks1",
"doc" : "Primary key schema for table ks1.table1",
"fields" : [ {
"name" : "a",
"type" : "string"
} ]
} caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema:
{
"type" : "record",
"name" : "table1",
"namespace" : "ks1",
"doc" : "Primary key schema for table ks1.table1",
"fields" : [ {
"name" : "key",
"type" : "string"
} ]
}
using schema:
{
"type" : "record",
"name" : "table1",
"namespace" : "ks1",
"doc" : "Primary key schema for table ks1.table1",
"fields" : [ {
"name" : "a",
"type" : "string"
} ]
}
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:967)
at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:95)
at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.lambda$getProducer$1(AbstractPulsarMutationSender.java:219)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.getProducer(AbstractPulsarMutationSender.java:187)
at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.sendMutationAsync(AbstractPulsarMutationSender.java:251)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.sendAsync(CommitLogReadHandlerImpl.java:461)
at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:32)
at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:24)
at com.datastax.oss.cdc.agent.AbstractMutationMaker.delete(AbstractMutationMaker.java:40)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handlePartitionDeletion(CommitLogReadHandlerImpl.java:322)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.process(CommitLogReadHandlerImpl.java:280)
at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleMutation(CommitLogReadHandlerImpl.java:238)
at org.apache.cassandra.db.commitlog.CommitLogReader.readMutation(CommitLogReader.java:477)
at org.apache.cassandra.db.commitlog.CommitLogReader.readSection(CommitLogReader.java:396)
at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:243)
at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:146)
at com.datastax.oss.cdc.agent.CommitLogReaderServiceImpl$1.run(CommitLogReaderServiceImpl.java:72)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)