azure-cosmosdb-spark
azure-cosmosdb-spark copied to clipboard
isStreaming is false for org.apache.spark.sql.streaming.StreamingQuery
scala> var streamingQuery = streamingQueryWriter.start() streamingQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1822f675
18/10/08 13:56:43 ERROR MicroBatchExecution: Query [id = 766bfff3-76c5-4ad1-a3ec-9b7307989c94, runId = bcd77014-1717-427a-a0a9-015018e64880] terminated with error java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSource@53333419 did not have isStreaming=true LogicalRDD [_attachments#82, _etag#83, _rid#84, _self#85, _ts#86, id#87, name#88, project#89, version#90], false
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:395)
i used azure-cosmosdb-spark_2.3.0_2.11-1.2.7, azure-cosmosdb-spark_2.3.0_2.11-1.2.6,azure-cosmosdb-spark_2.3.0_2.11-1.2.0 versions, but it is not woking..
Same here, using 1.2.6. Connector settings: "InferStreamSchema" -> "false" val rollingChangeFeed = true val startFromTheBeginning = false val useNextToken = true
java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSource@798a7829 did not have isStreaming=true
LogicalRDD [body#14520, id#14521, _rid#14522, _self#14523, _etag#14524, _attachments#14525, _ts#14526], false
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:461)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:456)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:456)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:456)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:455)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:204)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:172)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)
Can you please share the repro notebook ?