cassandra-data-migrator icon indicating copy to clipboard operation
cassandra-data-migrator copied to clipboard

Document how to override default Java Driver configuration

Open msmygit opened this issue 2 years ago • 2 comments

This is for 3.3.0_stable branch but, would be helpful if we could clarify the same in main branch too.

Today, there is no manual/documentation which explains how to pass in Java Driver configuration to override parameters such as for example request timeouts, heartbeat timeouts, etc.,

Original Error

Expand/Collapse to view stacktrace 1
23/05/03 11:50:06 ERROR DiffJobSession: Could not perform diff for Key: 10061674 %% 1 %% ABCPTSQDBRRLCKJ07
java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.connection.HeartbeatException
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at datastax.astra.migrate.DiffJobSession.diffAndClear(DiffJobSession.java:107)
        at datastax.astra.migrate.DiffJobSession.lambda$getDataAndDiff$0(DiffJobSession.java:85)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at com.datastax.oss.driver.internal.core.cql.PagingIterableSpliterator.forEachRemaining(PagingIterableSpliterator.java:118)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
        at datastax.astra.migrate.DiffJobSession.getDataAndDiff(DiffJobSession.java:69)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$3(DiffData.scala:28)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$3$adapted(DiffData.scala:26)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$2(DiffData.scala:26)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$2$adapted(DiffData.scala:25)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$1(DiffData.scala:25)
        at datastax.astra.migrate.DiffData$.$anonfun$diffTable$1$adapted(DiffData.scala:24)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1003)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1003)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.oss.driver.api.core.connection.HeartbeatException
        at com.datastax.oss.driver.internal.core.channel.HeartbeatHandler$HeartbeatRequest.fail(HeartbeatHandler.java:109)
        at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.fail(ChannelHandlerRequest.java:62)
        at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.onTimeout(ChannelHandlerRequest.java:108)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at com.datastax.oss.driver.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at com.datastax.oss.driver.shaded.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Heartbeat request: timed out after 5000 ms
        ... 10 more
Expand/Collapse to view stacktrace 2
23/05/02 13:16:35 ERROR CopyJobSession: Error occurred during Attempt#: 1
java.util.concurrent.ExecutionException: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT10S
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at datastax.astra.migrate.CopyJobSession.iterateAndClearWriteResults(CopyJobSession.java:197)
        at datastax.astra.migrate.CopyJobSession.getDataAndInsert(CopyJobSession.java:109)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$3(Migrate.scala:30)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$3$adapted(Migrate.scala:28)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$2(Migrate.scala:28)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$2$adapted(Migrate.scala:27)
        at com.datastax.spark.connector.cql.CassandraConnector.$anonfun$withSessionDo$1(CassandraConnector.scala:104)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:121)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:103)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$1(Migrate.scala:27)
        at datastax.astra.migrate.Migrate$.$anonfun$migrateTable$1$adapted(Migrate.scala:26)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1003)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1003)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT10S
        at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:206)
        at com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
        at com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
        at com.datastax.oss.driver.shaded.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
23/05/02 13:16:35 ERROR CopyJobSession: Error with PartitionRange -- ThreadID: 123 Processing min: 3335171328526692640 max: 3337016002934063595 -- Attempt# 1
23/05/02 13:16:35 ERROR CopyJobSession: Error stats Read#: 47, Wrote#: 0, Skipped#: 0, Error#: 47

What options were tried

Option 1

Added spark.cassandra.connection.timeoutMS 90000 in cdm.properties. This did not incrase the timeout value.

Option 2

Added --conf datastax-java-driver.advanced.heartbeat.timeout="90 seconds" --conf datastax-java-driver.basic.request.timeout="60 seconds" to the ./spark-submit command and had no effect in changing the timeout.

Option 3

Attempted the below without any luck,

./spark-submit \
--files /path/to/application.conf \
--conf spark.cassandra.connection.config.profile.path=application.conf \
...

in this case, it would only consider properties from application.conf and ignores everything else in cdm.properties as we could confirm this based on the below error stack,

23/05/02 16:20:56 WARN CassandraConnectionFactory: Ignoring all programmatic configuration, only using configuration from application.conf

Option 4

Attempted to add --driver-java-options "-Ddriver.basic.request.timeout='60 seconds'" to the ./spark-submit command and it ended up with no luck too.

msmygit avatar May 04 '23 13:05 msmygit

CDM uses SparkConf to create CassandraConnector, so spark.cassandra.* properties should work. You need to use spark.cassandra.read.timeoutMS for request timeout though. spark.cassandra.connection.timeoutMS sets connection timeout.

If you really need to set driver options precisely, your option would be to specify spark.cassandra.connection.config.profile.path, however, current CDM implementation shares SparkConf for both origin and target connections, I imagine this does not work as you already tested.

yukim avatar Jun 06 '23 03:06 yukim