azure-cosmosdb-spark icon indicating copy to clipboard operation
azure-cosmosdb-spark copied to clipboard

isStreaming is false for org.apache.spark.sql.streaming.StreamingQuery

Open lekkalraja opened this issue 6 years ago • 3 comments

scala> var streamingQuery = streamingQueryWriter.start() streamingQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1822f675

18/10/08 13:56:43 ERROR MicroBatchExecution: Query [id = 766bfff3-76c5-4ad1-a3ec-9b7307989c94, runId = bcd77014-1717-427a-a0a9-015018e64880] terminated with error java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSource@53333419 did not have isStreaming=true LogicalRDD [_attachments#82, _etag#83, _rid#84, _self#85, _ts#86, id#87, name#88, project#89, version#90], false

    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:395)

lekkalraja avatar Oct 08 '18 13:10 lekkalraja

i used azure-cosmosdb-spark_2.3.0_2.11-1.2.7, azure-cosmosdb-spark_2.3.0_2.11-1.2.6,azure-cosmosdb-spark_2.3.0_2.11-1.2.0 versions, but it is not woking..

lekkalraja avatar Oct 08 '18 13:10 lekkalraja

Same here, using 1.2.6. Connector settings: "InferStreamSchema" -> "false" val rollingChangeFeed = true val startFromTheBeginning = false val useNextToken = true

java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSource@798a7829 did not have isStreaming=true
LogicalRDD [body#14520, id#14521, _rid#14522, _self#14523, _etag#14524, _attachments#14525, _ts#14526], false

	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:461)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$10.apply(MicroBatchExecution.scala:456)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:456)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:456)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:455)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:204)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:172)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:379)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:172)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)

jpsimen avatar Oct 11 '18 18:10 jpsimen

Can you please share the repro notebook ?

nomiero avatar Feb 04 '19 23:02 nomiero