cdc-apache-cassandra icon indicating copy to clipboard operation
cdc-apache-cassandra copied to clipboard

Need to restart Cassandra node if drop + create table with different primary key fields

Open nicoloboschi opened this issue 2 years ago • 1 comments

Let's say you create a table table1 in this way (key text, value text, primary key (text)) with cdc=true. Then the agent will cache the schema for the primary keys in memory. If the user drop the table and recreate in with the same name and in the same keyspace but with different primary keys fields, you'll get an error like:

2022-07-08 08:25:13,561 CommitLogReadHandlerImpl.java:468 - Invalid primary key schema:
com.datastax.oss.cdc.agent.exceptions.CassandraConnectorSchemaException: Not a valid schema field: key
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.buildAvroKey(AbstractPulsarMutationSender.java:237)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.sendMutationAsync(AbstractPulsarMutationSender.java:256)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.sendAsync(CommitLogReadHandlerImpl.java:461)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:32)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:24)
        at com.datastax.oss.cdc.agent.AbstractMutationMaker.insert(AbstractMutationMaker.java:28)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleRowModifications(CommitLogReadHandlerImpl.java:344)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.process(CommitLogReadHandlerImpl.java:303)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleMutation(CommitLogReadHandlerImpl.java:238)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readMutation(CommitLogReader.java:477)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readSection(CommitLogReader.java:396)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:243)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:146)
        at com.datastax.oss.cdc.agent.CommitLogReaderServiceImpl$1.run(CommitLogReaderServiceImpl.java:72)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Unknown Source) 

After that the agent is not able to go forward and you have to restart the agent (therefore the node) to make it work again. Note that other tables still continue to be sent to Pulsar

nicoloboschi avatar Jul 08 '22 08:07 nicoloboschi

Even after the node restarts, the dirty topic schema should be updated otherwise you'll get

ERROR [CdcCommitlogProcessor:3] 2022-07-08 08:35:54,658 AbstractPulsarMutationSender.java:221 - Failed to get a pulsar producer
org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "key",
    "type" : "string"
  } ]
}
using schema:
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "a",
    "type" : "string"
  } ]
} caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "key",
    "type" : "string"
  } ]
}
using schema:
{
  "type" : "record",
  "name" : "table1",
  "namespace" : "ks1",
  "doc" : "Primary key schema for table ks1.table1",
  "fields" : [ {
    "name" : "a",
    "type" : "string"
  } ]
}
        at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:967)
        at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:95)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.lambda$getProducer$1(AbstractPulsarMutationSender.java:219)
        at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(Unknown Source)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.getProducer(AbstractPulsarMutationSender.java:187)
        at com.datastax.oss.cdc.agent.AbstractPulsarMutationSender.sendMutationAsync(AbstractPulsarMutationSender.java:251)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.sendAsync(CommitLogReadHandlerImpl.java:461)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:32)
        at com.datastax.oss.cdc.agent.MutationMaker.createRecord(MutationMaker.java:24)
        at com.datastax.oss.cdc.agent.AbstractMutationMaker.delete(AbstractMutationMaker.java:40)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handlePartitionDeletion(CommitLogReadHandlerImpl.java:322)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.process(CommitLogReadHandlerImpl.java:280)
        at com.datastax.oss.cdc.agent.CommitLogReadHandlerImpl.handleMutation(CommitLogReadHandlerImpl.java:238)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readMutation(CommitLogReader.java:477)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readSection(CommitLogReader.java:396)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:243)
        at org.apache.cassandra.db.commitlog.CommitLogReader.readCommitLogSegment(CommitLogReader.java:146)
        at com.datastax.oss.cdc.agent.CommitLogReaderServiceImpl$1.run(CommitLogReaderServiceImpl.java:72)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Unknown Source)

nicoloboschi avatar Jul 08 '22 08:07 nicoloboschi