mosaic
mosaic copied to clipboard
grid_tessellateexplode() creates jts.geom.TopologyException in instances where input geometry is invalid
Describe the bug
In version 0.4.1; utilizing the function grid_tessellateexplode()
may inadvertently create topological exceptions which prevent common functions like display()
, count()
, saveAsTable()
from operating and produce an error. This same workflow did not produce any errors in version 0.3.11.
To Reproduce Steps to reproduce the behavior:
df = (
df
.select(mosaic.grid_tessellateexplode(f.col("geom"), f.lit(6), True).alias("h3_index"), "*")
.select(f.col("h3_index.*"), "*")
)
df.count()
df.display()
ZIP Codes - MultiPolygon - Invalid Geometry.csv
Expected behavior
Because there is not an st_makevalid()
within the Mosaic library, my assumption is one of the ways to work around invalid geometries is to utilize grid_tessellateexplode()
to break these apart into manageable H3 tiles. This has been my workflow of choice within the 0.3.x release and was built into our data pipelines.
We regularly work with geometries that are irregular due to their location in coastal areas and/or upstream issues with their creation by other entities.
Screenshots
Produces errors like the following:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 13.0 failed 4 times, most recent failure: Lost task 3.3 in stage 13.0 (TID 80) (10.26.175.165 executor 0): org.locationtech.jts.geom.TopologyException: found non-noded intersection between LINESTRING ( -97.14452 31.11474, -97.14424 31.11453 ) and LINESTRING ( -97.14285 31.11526, -97.14424 31.11453 ) [ (-97.14424, 31.11453, NaN) ]
Additionally, dropping geometry columns does not resolve the issue above. The dataframe object is "sticky" and retains its Mosaic identity and the topological baggage.
Additional context Using Databricks 13.3 LTS with Mosaic 0.4.1
Could you try zero-buffering the input geometry, e.g. mosaic.st_buffer(f.col("geom"), 0)
in order to force it to be made valid?
Although not a named topology error, I'm getting the following error for an out of bounds array which I am assuming is an unmatched pair of vertices deriving from an invalid geometry.
I've implemented a 0-point buffer like this, since we are restricted to a WKT column:
our_df= (
spark.read.option("path", f"{path_geospatial_db}/{delta_tbl}")
.table(f"{schema_common_geometry}.{delta_tbl}")
.withColumn(column_geometry,
mosaic.st_buffer(
mosaic.st_setsrid(
mosaic.st_geomfromwkt(egdb_geometry)
, f.lit(4326))
, f.lit(0))
))
This is happening:
Py4JJavaError: An error occurred while calling o1516.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 67.0 failed 4 times, most recent failure: Lost task 3.3 in stage 67.0 (TID 15931) (10.26.69.103 executor 5): java.lang.ArrayIndexOutOfBoundsException: 0
at org.locationtech.jts.geom.GeometryCollection.getGeometryN(GeometryCollection.java:116)
at com.databricks.labs.mosaic.core.geometry.polygon.MosaicPolygonJTS.toInternal(MosaicPolygonJTS.scala:15)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:224)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:224)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:88)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:88)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:60)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:899)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:902)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:797)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3645)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3567)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3554)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3554)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1521)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1521)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1521)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3890)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3790)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1245)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1233)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2959)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:286)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:282)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:366)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:363)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:117)
at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:124)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:126)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:114)
at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:557)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:545)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:565)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:426)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:419)
at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:313)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:519)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:516)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:492)
at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3733)
at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3732)
at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4546)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:959)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4544)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:298)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:528)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:225)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:154)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:477)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4544)
at org.apache.spark.sql.Dataset.count(Dataset.scala:3732)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at org.locationtech.jts.geom.GeometryCollection.getGeometryN(GeometryCollection.java:116)
at com.databricks.labs.mosaic.core.geometry.polygon.MosaicPolygonJTS.toInternal(MosaicPolygonJTS.scala:15)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
The zip code data itself seems to be the issue in the instance. @FeralCatColonist and I connected offline and are working it out.