geowave icon indicating copy to clipboard operation
geowave copied to clipboard

overlay analysis

Open scially opened this issue 5 years ago • 37 comments

I ingest two shape file into geowave, and how do I perform overlay analysis based on SimpleFeature(and i can get Geometry) from geowave?

scially avatar Dec 11 '18 12:12 scially

One tool that we provide is a spatial join spark operation. Using the CLI it is geowave analytic spatialjoin or you can follow along with our jupyter notebook example that performs the operation. While at a small scale you can loop over N geometries from what dataset and compare it with M geometries from the other, that N * M comparisons is infeasible at larger scales. The spatial join spark operation we provide is an indexed operation with as you might expect much better performance at larger scales. After you get the pairs of overlapping geometries you can use JTS to manipulate the result if you prefer the intersections rather than the pairs (eg. geom1.intersection(geom2)).

rfecher avatar Dec 11 '18 13:12 rfecher

Thank you very much, this is exactly what I need, then I will give you feedback on the results.

scially avatar Dec 11 '18 15:12 scially

i submit my job to yarn ,but there are one error: java.lang.NullPointerException at org.locationtech.geowave.core.index.persist.PersistenceUtils.fromBinary(PersistenceUtils.java:148)


18/12/17 18:21:51 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NullPointerException
java.lang.NullPointerException
	at org.locationtech.geowave.core.index.persist.PersistenceUtils.fromBinary(PersistenceUtils.java:148)
	at org.locationtech.geowave.core.store.index.PrimaryIndex.fromBinary(PrimaryIndex.java:93)
	at org.locationtech.geowave.core.store.index.CustomNameIndex.fromBinary(CustomNameIndex.java:61)
	at org.locationtech.geowave.core.index.persist.PersistenceUtils.fromBinary(PersistenceUtils.java:148)
	at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.fromValue(AbstractGeoWavePersistence.java:341)
	at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.entryToValue(AbstractGeoWavePersistence.java:347)
	at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.internalGetObject(AbstractGeoWavePersistence.java:293)
	at org.locationtech.geowave.core.store.metadata.AbstractGeoWavePersistence.getObject(AbstractGeoWavePersistence.java:240)
	at org.locationtech.geowave.core.store.metadata.IndexStoreImpl.getIndex(IndexStoreImpl.java:54)
	at org.locationtech.geowave.core.store.AdapterToIndexMapping.getIndices(AdapterToIndexMapping.java:69)
	at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.getIndicesForAdapter(SpatialJoinRunner.java:136)
	at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.createRDDFromOptions(SpatialJoinRunner.java:291)
	at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.loadDatasets(SpatialJoinRunner.java:316)
	at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.run(SpatialJoinRunner.java:107)
	at com.wh.GOverLay.main(GOverLay.java:58)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
18/12/17 18:21:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NullPointerException)

and Below is my code

public class GOverLay {
    public static void main(String[] args)
            throws InterruptedException, ExecutionException, IOException {

        // input accumulo
        AccumuloOptions options = new AccumuloOptions();
        AccumuloRequiredOptions inputOperations = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.over",
                options);


        // output accumulo
        AccumuloRequiredOptions outputOperations = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.lgcy",
                options);


        SparkSession spark = SparkSession.builder()
                .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                .config("spark.kryo.registrator", "org.locationtech.geowave.analytic.spark.GeoWaveRegistrator")
                //.config("spark.default.parallelism", "6000")
                .appName("Overlay")
                .getOrCreate();
        SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);


        // set layer1
        joinRunner.setLeftStore(inputOperations.createPluginOptions());
        joinRunner.setLeftAdapterTypeName("csztgh");
        // set layer2
        joinRunner.setRightStore(inputOperations.createPluginOptions());
        joinRunner.setRightAdapterTypeName("qmdltb");
        // set out layer
        joinRunner.setOutputStore(outputOperations.createPluginOptions());
        joinRunner.setOutputLeftAdapterTypeName("leftjoin");
        joinRunner.setOutputLeftAdapterTypeName("rightjoin");

        joinRunner.setPredicate(new GeomIntersects());
        //joinRunner.setPartCount(6000);

        joinRunner.run();
    }
}

And i package into a jar, use SparkLauncher to submit

public class SparkRun {
    public static void main(String[] args)
            throws IOException, InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        HashMap env = new HashMap();

        env.put("HADOOP_CONF_DIR", "/home/hwang/hadoop-conf");
        SparkAppHandle handle = new SparkLauncher(env)
                .setAppResource("/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar")
                .addJar("/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar")
                .setMainClass("com.wh.GOverLay")

                .setAppName(new Date().toString())
                .setSparkHome("/home/hwang/spark-2.2.0-bin-hadoop2.6")
                .setMaster("yarn-cluster")
                .setVerbose(true)
                .setConf(SparkLauncher.DRIVER_MEMORY, "4g")
                .startApplication();
        // Use handle API to monitor / control application.
        handle.addListener(new SparkAppHandle.Listener() {
            public void stateChanged(SparkAppHandle sparkAppHandle) {
                System.out.println("state:" + sparkAppHandle.getState().toString());
            }

            public void infoChanged(SparkAppHandle sparkAppHandle) {
                System.out.println("Info:" + sparkAppHandle.getState().toString());
            }

        });
        countDownLatch.await();
    }
}

scially avatar Dec 17 '18 10:12 scially

how do you package it? Do you use the maven shade plugin? If so, make sure to use the ServicesResourceTranformer such as this. And if not, make sure the META-INF/services/* files are included and appended together when multiple libraries contribute the same file.

rfecher avatar Dec 17 '18 13:12 rfecher

Do you mean this "/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar"? I use maven shade plugin to package jar which includes geowave-analytic-spark-SNAPSHOT-1.0.0.jar and geowave-accumulo-datastore-SNAPSHOT-1.0.0.jar and so on. Do you mean when i package my jar, i need use ServicesResourceTranformer ?

scially avatar Dec 17 '18 15:12 scially

yes, the shade plugin by default would just overwrite when multiple maven modules use the same file, but this behavior makes no sense with Java SPI files. The ServicesResourceTranformer is useful to append SPI files for this reason.

rfecher avatar Dec 17 '18 15:12 rfecher

thanks... I will try it and hope I am allowed to consult you in the future if I have any problem

scially avatar Dec 17 '18 15:12 scially

This is my key code,but when i submit to yarn by spark-submit, my cluster resources are running out and there has some error:

       SparkConf conf = GeoWaveSparkConf.getDefaultConfig()
                .set("spark.yarn.jars","hdfs://10.66.150.5:8020/spark-jars/*.jar")
                //.setJars(new String[]{"/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar"})
                .setMaster("yarn-client");

        SparkSession spark = GeoWaveSparkConf.createDefaultSession(conf);

        GeomFunctionRegistry.registerGeometryFunctions(spark);
        GeoWaveSpatialEncoders.registerUDTs();

        SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);
        // set layer1
        joinRunner.setLeftStore(inputOperations1.createPluginOptions());
        joinRunner.setLeftAdapterTypeName("qmdltb"); //csztgh
        // set layer2
        joinRunner.setRightStore(inputOperations2.createPluginOptions());
        joinRunner.setRightAdapterTypeName("csztgh"); //qmdltb
        // set out layer
        joinRunner.setOutputStore(outputOperations.createPluginOptions());
        joinRunner.setOutputLeftAdapterTypeName("leftjoin");
        joinRunner.setOutputLeftAdapterTypeName("rightjoin");

        joinRunner.setPredicate(new GeomIntersects());
        // is it too small?
        joinRunner.setPartCount(1000);
18/12/20 18:34:34 INFO scheduler.DAGScheduler: Job 2 failed: isEmpty at TieredSpatialJoin.java:371, took 714.866837 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 148 in stage 53.0 failed 4 times, most recent failure: Lost task 148.3 in stage 53.0 (TID 101253, server7, executor 84): ExecutorLostFailure (executor 84 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 138297 ms
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2024)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2045)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2064)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1462)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1461)
        at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
        at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
        at org.locationtech.geowave.analytic.spark.spatial.TieredSpatialJoin.join(TieredSpatialJoin.java:371)
        at org.locationtech.geowave.analytic.spark.spatial.SpatialJoinRunner.run(SpatialJoinRunner.java:114)
        at com.wh.GOverLay.main(GOverLay.java:74)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

My cluster configuration is snipaste_2018-12-20_20-37-38 And the intersect of two shape that have been ingest to accumulo is: image

snipaste_2018-12-20_20-41-58

And i have a problem: I use arcgis for overlay analysis, only a few minutes, but for geowave and spark, why it is need large resource and so slow? And Is there a problem with my code? thanks....

scially avatar Dec 20 '18 12:12 scially

the Yarn running situation

snipaste_2018-12-20_21-04-02

image

snipaste_2018-12-20_21-07-49

Is it necessary to increase server memory? But in this case, is the memory required by the program too much?

scially avatar Dec 20 '18 13:12 scially

Our expert in the spatial join implementation, @JWileczek, probably has some ideas too, but one thing I think you should try is to explicitly set min and max splits to some reasonable value (it should be an option from the geowave spatial join runner). I know it defaults to spark.default.parallelism if its not explicilty provided and that is set in your spark config. But if thats not set, it roughly uses the number of ranges as the number of splits which is often way too many input splits and results in a lot of unnecessary overhead.

rfecher avatar Dec 20 '18 18:12 rfecher

also, I should say that distributed processing isn't necessarily faster than local - if the size of the job is small enough that it can be done locally, there is overhead associated with distributing it. But leveraging the distributed technologies does allow you to scale the job well beyond anything that would even be feasible within a reasonable time locally. That being said, for this to fail, something must be going on (again my inclination is too many splits, but I'm really just glancing at this and could be missing something important).

rfecher avatar Dec 20 '18 18:12 rfecher

Hi @scially, sorry you are having such issues with the spatial join hopefully I'll be able to help you track down what the issue is and get you moving forward. I'll need some more information to better help you along in this scenario mainly I need the size of the cluster you are working with for Spark and the size of the datasets you are attempting to join. When it comes to spatial joins in Spark there are many things that will effect the job itself and its performance is largely driven by the configuration of the Spark cluster itself. Like Rich has mentioned above one thing that greatly effects the performance of the join is the number of splits (a.k.a partitions) used for the data when it is loaded into Spark. However many splits you specify when loading the RDD or via the setPartCount function of the runner will be the number of partitions the resulting RDD will use when loading the data. This partition count is very important to Spark because it defines the base block size and parallelism Spark will use during the join. The partition count can be both too large or too small dependent on the size of the data we are working with, and the size of the cluster we are trying to join on. Figuring out a adequate partition count is largely what these memory issues boil down to. In an ideal world we would be able to calculate this for you, but the join algorithm is not at that point yet in its current implementation as it is a fairly complicated problem to solve. When it comes to thinking about partition count I like to start just by considering the total dataset size vs the size/memory of an single executor that we are working with. In Spark, individual tasks operate on partitions of data and a single thread of an executor can only operate on one task at a time. Using too few partitions can leave large parts of the cluster un-utilized for the work, and can still result in OOM errors depending on the data size we are working with. Using an extreme example to demonstrate the point: If we had a dataset that takes up 250gb of space and we split it across 4 partitions for instance. Spark would allow you to do this, but if you wanted to then reasonably work with this data without OOM errors you would need executors capable of holding roughly 62.5gb in memory because tasks that run on the executors will still try to load at least 1 partition(s) worth of data to run the task. This example is unrealistic but demonstrates the basic issue. The issue becomes more complicated once you consider the fact that executors will attempt to run multiple tasks at once depending on the number of cores you've configured the executors to use. This paired with the fact that some partitions will be larger than others makes coming up with a perfect partition count very difficult. For spatial joins, it's actually recommended to err on the side of too many partitions rather than too few. I hope this information helps you understand a little more about what is happening, and to help you further if you can provide me with the following information we can hopefully come up with some configuration settings that will help you move forward:

  1. What is the size of the data that we are attempting to join?
  2. What is size of the YARN cluster we are working with? Master/Slave node count, Hardware resources of master and slave node (RAM, Disk, Cores)
  3. What is the configuration for the Spark executors? Memory and Cores mainly

When working with very large datasets (over 200gb) it's not unheard of to see partition counts >3000 and sometimes up to 6000. In my experience, aiming for max partition sizes of less than 64mb has yielded the best results but that again is dependent on executor size as well.

JWileczek avatar Dec 20 '18 20:12 JWileczek

firstly, thanks for you and rfecher ... there are some configuration:

  1. I don't know how to describe the data size to you. So i give you two will give you two instructions:
    1. size of shapfile about two datasets shapefile-size attribute1 attributes

    2. row counts of accumulo i ingest two shapefile into accumulo, below is my code

geowave config addstore -t accumulo -u geowave -p geowave -i accumulo  -z server7:2181,server8:2181,server9:2181 --gwNamespace geowave.over1 overlay1

geowave config addindex -c EPSG:2384 -t spatial -ps HASH -np 4 overlay1-spatial

geowave ingest localtogw  -t 4 -f geotools-vector ./qmdltb.shp overlay1 overlay1-spatial
geowave config addstore -t accumulo -u geowave -p geowave -i accumulo  -z server7:2181,server8:2181,server9:2181 --gwNamespace geowave.over2 overlay2

geowave config addindex -c EPSG:2384 -t spatial -ps HASH -np 4 overlay2-spatial

geowave ingest localtogw  -t 4 -f geotools-vector ./csztgh.shp overlay2 overlay2-spatial

accumulo And i also published to geserver geoserver

  1. My Cluster Configuration:

    1. Cluster Server cpu

    2. Yarn Cluster yarn

  2. Spark executors

spark2-submit  --master yarn --deploy-mode client --class com.wh.GOverLay ./overlay-1.0-SNAPSHOT.jar 6000  # 6000 is partCount

And i don't set extra parameters, so i guess the executors memory is default:

--executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
--executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

Don't know if this information meets the requirements? thanks very much...

scially avatar Dec 21 '18 02:12 scially

there is my code

        AccumuloOptions options = new AccumuloOptions();
        AccumuloRequiredOptions inputOperations1 = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.over1", //xzq
                options);
        AccumuloRequiredOptions inputOperations2 = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.over2", //xzq
                options);


        // output accumulo
        AccumuloRequiredOptions outputOperations = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181",
                "accumulo",
                "geowave",
                "geowave",
                "geowave.lgcy",
                options);

        SparkConf conf = GeoWaveSparkConf.getDefaultConfig()
                .set("spark.yarn.jars","hdfs://10.66.150.5:8020/spark-jars/*.jar")
                //.setJars(new String[]{"/home/hwang/eclipse-workspace/geowaveoverlay/overlay/target/overlay-1.0-SNAPSHOT.jar"})
                ;//.setMaster("yarn-client");

        SparkSession spark = GeoWaveSparkConf.createDefaultSession(conf);

        GeomFunctionRegistry.registerGeometryFunctions(spark);
        GeoWaveSpatialEncoders.registerUDTs();

        SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);


        // set layer1
        joinRunner.setLeftStore(inputOperations1.createPluginOptions());
        joinRunner.setLeftAdapterTypeName("qmdltb"); //csztgh
        // set layer2
        joinRunner.setRightStore(inputOperations2.createPluginOptions());
        joinRunner.setRightAdapterTypeName("csztgh"); //qmdltb
        // set out layer
        joinRunner.setOutputStore(outputOperations.createPluginOptions());
        joinRunner.setOutputLeftAdapterTypeName("leftjoin");
        joinRunner.setOutputLeftAdapterTypeName("rightjoin");

        joinRunner.setPredicate(new GeomIntersects());
        joinRunner.setPartCount(Integer.valueOf(args[0]));


        joinRunner.run();

and i set the partCount to 6, 6000, 10000 and 20000, but the program failed to execute.

However, i also try to make overlay analysis use other shapefile that has been ingested to geowave, and set PartCount to 1000 ,and succeed.

SPATIAL_IDEX_HASH_64

and this layer onle have 9 attribute, i use this layer to overlay this layer. xzq

scially avatar Dec 21 '18 02:12 scially

@scially Your test data is too small. Is the number of 6000 partitions too large? 你这测试数据量太小,6000分区数是不是太大了

hsg77 avatar Dec 24 '18 02:12 hsg77

@hsg77
但是我设置为60仍然不行 but it is not success when i set partcount=60

scially avatar Dec 24 '18 14:12 scially

@scially When you upload layer data from geowave, try using partitioning strategy for NONE, but don't use it first. HASH and ROUND_ROBIN strategies, the data uploaded by query using ROUND_ROBIN strategy can not be found, and the data uploaded using NONE strategy can be found. 你geowave上传图层数据时采用分区策略为NONE试一下,(先不用 HASH和ROUND_ROBIN策略,我先前空间查询query采用ROUND_ROBIN策略上传的数据就查不出来,采用NONE策略上传的数据就可以查出来)

hsg77 avatar Dec 25 '18 02:12 hsg77

@hsg77 我现在用的HASH_64,可以查出来~,但是我现在在做叠加分析,用geowave的 spark模块,但是总是有问题~ now i set stategies to HASH_64 and i can query fastly, but i have some question when i use spark module of geowave.

scially avatar Dec 25 '18 02:12 scially

@scially Is it the same problem as issue #1469? How about using EPSG: 4326 coordinate system (world 1984)?

跟 issue#1469问题是不是一样的问题?你采用EPSG:4326坐标系(world 1984) 试一下呢?

hsg77 avatar Dec 25 '18 03:12 hsg77

@hsg77 好像还真的是哦。多谢大佬,我试一下,可以留一个联系方式么? 不过我之前用其他坐标系的小型shape,然后也分析成功了。。。 我搞一份最新的代码试一下。 I used to use other coordinate system shapefiles that it's size is smaller than now for analysis, and it is success thanks .. and I cloned the latest code and try ...

scially avatar Dec 25 '18 03:12 scially

@hsg77 我更新了代码,但是还是不行。 我用wgs84坐标也试了下,也有问题。但是我用小数据量大概就2兆左右的,一共也就10条属性,这样做一个叠加分析,就可以。 i update code and try it again, and it still don't work.

scially avatar Jan 02 '19 07:01 scially

@scially joinRunner.setOutputLeftAdapterTypeName("leftjoin"); joinRunner.setOutputLeftAdapterTypeName("rightjoin"); change to joinRunner.setOutputRightAdapterTypeName("rightjoin");

I used version 0.9.8, stored in HBase 1.4.6 My SHP data is uploaded to a store(namespace=tablename=overInputLayer) that contains two adapters (layer_1, layer_2)

Try the following code:

AccumuloOptions options = new AccumuloOptions(); AccumuloRequiredOptions input = new AccumuloRequiredOptions("server7:2181,server8:2181,server9:2181", "accumulo", "geowave", "geowave", "overInputLayer", options);

Conf. setJars (JavaSparkContext. jarOfClass (this. getClass ()); SparkSession spark = GeoWaveSparkConf.createDefaultSession(conf);

GeomFunctionRegistry.registerGeometryFunctions(spark); GeoWaveSpatialEncoders.registerUDTs();

SpatialJoinRunner joinRunner = new SpatialJoinRunner(spark);

joinRunner.setLeftStore(input .createPluginOptions()); joinRunner.setRightStore(input .createPluginOptions());

joinRunner.setLeftAdapterTypeName("layer_1"); joinRunner.setRightAdapterTypeName("layer_2");

// set out layer joinRunner.setOutputStore(outputOperations.createPluginOptions()); joinRunner.setOutputLeftAdapterTypeName("leftjoin"); joinRunner.setOutputRightAdapterTypeName("rightjoin");

joinRunner.setMaster("spark://master:7077");

joinRunner.setPredicate(new GeomIntersects()); joinRunner.run();

hsg77 avatar Jan 08 '19 16:01 hsg77

@hsg77 OK,there is a error, thanks... i try it .

scially avatar Jan 09 '19 00:01 scially

@scially Which version of geowave do you quote? Is it version 1.0 of geowave? Look at the word "org. location tech" in your code.

hsg77 avatar Jan 09 '19 02:01 hsg77

@hsg77 1.0

scially avatar Jan 09 '19 06:01 scially

@hsg77 I have corrected the code, but it still doesn't work... i want to ask, is your program slow at this step I want to ask, is your program slow at this step?? image

scially avatar Jan 15 '19 05:01 scially

@scially @rfecher @JWileczek The results of my operation are as follows: Upload data: partition strategy is none, partition number is 1 partcount=-1 LeftTier count = 81464, rightTier count = 12,1830 Spatial join result: Run 2 minutes 30 seconds Leftjoin count = 54, Rightjoin count = 16

LeftTier count = 688462, rightTier count = 1672008 Spatial join results: (1) Upload data: partition strategy is none, partition number is 1 partcount=-1 I reported an insufficient memory error. (2) Upload data: partition strategy hash, partition number 8, partcount=-1 I reported an insufficient memory error. (3) Upload data: partition strategy is hash, partition number 64, partcount = 1000 Run 53 minutes 56 seconds Leftjoin count = 11103, Rightjoin count = 17872

I also run very slowly.

hsg77 avatar Jan 15 '19 06:01 hsg77

@scially @rfecher When creating a layer, how to set the IndexStrategy of a layer to TieredSFCIndexStrategy

hsg77 avatar Jan 29 '19 10:01 hsg77

When you call DataStore.addType() or DataStore.addIndex() providing the indices associated with a feature type, you have the opportunity to define whatever indexing you'd like. For example, this creates the default spatial indexing given set of options. Following similar logic, you could replace this line with a call to TieredSFCIndexFactory such as this factory method.

rfecher avatar Jan 29 '19 16:01 rfecher

@hsg77 I think the issues with slowness is likely due to how Spark memory usage + garbage collection is being done for the job. I would try again with the following settings and see if you have different results:

partition strategy: hash
partition number: 8
partcount: 3000

In addition to these settings I generally have better results if I use a different garbage collection method for Spark than the default parallel collection method. G1GC is available to Spark as a garbage collection strategy and tends to lead to better results with larger Spatial Join jobs. You can see more about setting up G1GC here: https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

For the feature counts you are trying to join I don't see needing more than 8 partitions for the store, but you'll potentially need more for the Spark RDD (partCount) based on how your executors are setup (cores, instances, memory usage). At a partition count of ~3000+ Spark begins to use compressed partitions, so generally the rule of thumb is if you are needing 1000-2000 partitions for a job just to bump it up to 3000+ so Spark will use less memory in the task tracking itself. Getting the speed up is a combination of finding the right Spark executor/memory configuration + garbage collection configuration + partition count configuration. I would start by modifying the garbage collection method since you have a working but slow result and then move from there. Hopefully, this will help you guys get some better speed out of the Spatial Join.

JWileczek avatar Jan 29 '19 16:01 JWileczek