geopyspark
geopyspark copied to clipboard
Update pyramid tile image
I am trying to ingest a batch of large tiff images. And my spark cluster doesn't have a lot of memory and resources. So I tried to ingest images in multiple batches
I plan to generate a pyramid of the first tiff image and then write it to disk. Then generate a pyramid of the second tiff image and update to the same directory
I am trying to add an update code
./geopyspark-backend/geotrellis/src/main/scala/geopyspark/geotrellis/io/LayerWriterWrapper.scala
def update(
layerName: String,
spatialRDD: TiledRasterLayer[SpatialKey]
): Unit = {
val id =
spatialRDD.zoomLevel match {
case Some(zoom) => LayerId(layerName, zoom)
case None => LayerId(layerName, 0)
}
layerWriter.update(id, spatialRDD.rdd)
}
def overwrite(
layerName: String,
spatialRDD: TiledRasterLayer[SpatialKey]
): Unit = {
val id =
spatialRDD.zoomLevel match {
case Some(zoom) => LayerId(layerName, zoom)
case None => LayerId(layerName, 0)
}
layerWriter.overwrite(id, spatialRDD.rdd)
}
./geopyspark/geotrellis/catalog.py
def update(uri,
layer_name,
tiled_raster_layer,
store=None):
if tiled_raster_layer.zoom_level is None:
Log.warn(tiled_raster_layer.pysc, "The given layer doesn't not have a zoom_level. Writing to zoom 0.")
if store:
store = AttributeStore.build(store)
else:
store = AttributeStore.cached(uri)
pysc = tiled_raster_layer.pysc
writer = pysc._gateway.jvm.geopyspark.geotrellis.io.LayerWriterWrapper(
store.wrapper.attributeStore(), uri)
writer.update(layer_name, tiled_raster_layer.srdd)
def overwrite(uri,
layer_name,
tiled_raster_layer,
store=None):
if tiled_raster_layer.zoom_level is None:
Log.warn(tiled_raster_layer.pysc, "The given layer doesn't not have a zoom_level. Writing to zoom 0.")
if store:
store = AttributeStore.build(store)
else:
store = AttributeStore.cached(uri)
pysc = tiled_raster_layer.pysc
writer = pysc._gateway.jvm.geopyspark.geotrellis.io.LayerWriterWrapper(
store.wrapper.attributeStore(), uri)
writer.overwrite(layer_name, tiled_raster_layer.srdd)
Then I ingest the data like this.
def writefile(srcpath, key, destpath):
raster_layer = gps.geotiff.get(layer_type=gps.LayerType.SPATIAL, uri=srcpath,num_partitions=100)
raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
tiled_layer = raster_layer.tile_to_layout(layout=gps.GlobalLayout(), target_crs=3857)
pyramid = tiled_layer.pyramid()
for layer in pyramid.levels.values():
gps.write(destpath, key, layer, time_unit=gps.TimeUnit.DAYS)
raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
def updatefile(srcpath, key, destpath):
raster_layer = gps.geotiff.get(layer_type=gps.LayerType.SPATIAL, uri=srcpath,num_partitions=100)
raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
tiled_layer = raster_layer.tile_to_layout(layout=gps.GlobalLayout(), target_crs=3857)
pyramid = tiled_layer.pyramid()
for layer in pyramid.levels.values():
gps.update(destpath, key, layer)
raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
tiflist = [os.path.join(basepath, b) for b in filelist]
writefile(tiflist[0], key, destpath)
for t in tiflist[1:]:
updatefile(t, key, destpath)
Then run an error and prompt
py4j.protocol.Py4JJavaError: An error occurred while calling o131.update. : geotrellis.spark.io.package$LayerOutOfKeyBoundsError: Updating rdd is out of the key index space for Layer(name = "201706", zoom = 14): KeyBounds(SpatialKey(13341,6446),SpatialKey(13460,6569)). You must reindex this layer with large enough key bounds for this update.
What should I do, what good advice?
Hey, @mingnet! As I mentioned in your other issue, I'm really sorry about responding to your issue so late :slightly_frowning_face:
The reason you're getting that error is because LayerWriter.update
fails when trying to update a saved catalog with a layer whose KeyBounds
are outside of the layer's (see here). What this means is that unfortunately your implementation won't work as each layer will have different KeyBounds
.
The most straightforward way around this would be to read all of your files a once and then save them together as one layer. I know you said that you're working with a small cluster, but if you can show me the script you're using as well as give me some info about your cluster, I may be able to point out places where you could improve the performance. I think we should try this first before going to the next alternative (which is more involved/complicated).
I have tried another solution, but I still have some problems. Maybe you are interested to know. I tried to generate a global KeyBounds at the beginning. I am writing another function in the file(./geopyspark-backend/geotrellis/src/main/scala/geopyspark/geotrellis/io/LayerWriterWrapper.scala)
def writeSpatialGlobal(
layerName: String,
spatialRDD: TiledRasterLayer[SpatialKey],
indexStrategy: String
): Unit = {
val id =
spatialRDD.zoomLevel match {
case Some(zoom) => LayerId(layerName, zoom)
case None => LayerId(layerName, 0)
}
val indexKeyBounds = KeyBounds[SpatialKey](SpatialKey(0, 0), SpatialKey(spatialRDD.rdd.metadata.layout.layoutCols, spatialRDD.rdd.metadata.layout.layoutRows))
val indexMethod = getSpatialIndexMethod(indexStrategy)
val keyIndex = indexMethod.createIndex(indexKeyBounds)
layerWriter.write(id, spatialRDD.rdd, keyIndex)
}
I plan to call this function when processing the first batch. This has a global KeyBounds. Then update the data of other batches. But this function is very very slow to execute. As a result, I was very difficult to finish the first batch. Because I don't know enough about geotrellis. So I don't understand why. Just generate a different index. I think it should be as fast as the writeSpatial function.
@mingnet I see. Based on the work you showed, it looks like everything should work okay. What backend are you trying to write to? There can be a lot of I/O involved for some of them, which could greatly increase the running time. Other than what I just mentioned, there could be other causes for slowdown, but I won't be able to say for sure without seeing your Python code.