Run Gemini file-level duplicate detection on PGA
Document in README the resources, needed to successfully process 1k, 2k, 10k, 100k and whole PGA of the .siva files.
So good start would be
- document the known configuration of the cluster we use internally
- running Gemini hash, documenting how long does it take to finish,
- what CPU-load, Mem, IO-thoughtput workload it creates on that cluster (i.e from Sysdig Dashbord, to get access file an issue)
- see which resource is a bottleneck
- try to optimize, in order to utilize that resource better (i.e in case of throughput - have more executor JVMs running on the same machine)
- see if we are hit and can help with some Engine issues
optimize, in order to utilize that resource better (i.e in case of throughput - have more executor JVMs running on the same machine)
how do to that? we don't control spark cluster.
how do to that? we don't control spark cluster.
let's measure, identify and document the bottleneck first, set preliminary expectations on resources for 100k and then discuss the possible options that we might have i.e this can be powerful argument for changing https://github.com/src-d/charts/tree/master/spark to Apache Spark on k8s.
We would be able to improve the performance expectation model, based on more data later on.
Hit https://github.com/src-d/engine/issues/323
Thanks for keeping it updated!
BTW, super-nice issue description and example how to reproduce 👍
Engine issue is resolved in https://github.com/src-d/engine/releases/tag/v0.5.1
yep. But the engine api has changed a bit. We need to update gemini.
Run gemini on new 1k dataset with new engine. And it works!!!!
The bad new is timing: 24 min. I don't really know how to profile it, but I saw that only 1 job is taking much time, most probably there is 1 huge repo.
10k has failed with https://github.com/src-d/engine/issues/332
currently is blocked by https://github.com/src-d/engine/issues/336
To move this forward, as DR team is super-busy now, can we please submit a PR to engine that just logs RevWalkException without failing, same way as MissingObjectException is handled and run Gemini with this custom built version of Engine from this PR to avoid waiting for a release?
@carlosms could you please check if https://github.com/src-d/engine/pull/347 solves the issue and allows us to move forward with https://github.com/src-d/gemini/issues/42 ?
If that PR is tested on real data and solves the issue - it may be worth posting this information on the PR as well.
Engine 0.5.7 was release 🎉 with many bug fixes and discussion like https://github.com/src-d/minutes/pull/210/files#diff-a0ec2b18d53b6bebfc2a342ed864a52fR34 should rise the priority of finishing running Gemini file duplication up to PGA sizes.
Title and description are updated to represent the current goal.
10k repos are processed successfully with engine 0.5.7. Full PGA is failing with OOM with default params. Need to tune them.
Plan is:
- make sure PGA is available on staging cluster
- run Gemini \w latest Engine 0.6.1 on 1k, 10k, 100k, 200k
- document, how to reproduce the results
PGA is downloading to the pipeline HDFS cluster on hdfs dfs -ls hdfs://hdfs-namenode/pga/siva/latest.
WIP by pga-alex pod with pga get -v -j 32 -o hdfs://hdfs-namenode:8020/pga 2>&1 | tee -a /go/pga-1.log
At this rate it will take ~25h to get there.
PGA download is finished 🎉 but it's a bit :suspect: as only 2.4Tb not 2.7Tb as the rumor has it to be. Would verify PGA integrity first with https://github.com/src-d/datasets/issues/53
Pre-conditions for running new Gemini on pipeline staging Apache Spark cluster:
- [x] PGA downloaded to HDFS (WIP)
- [x] deploy Feature Extractors, same as bblfsh, collocated with Spark Workers, src-d/backlog#1266
blocked by src-d/backlog#1266
- FS are running
pga getsecond round, ETA8h52m56
Full PGA was downloaded to HDFS 🎉 https://github.com/src-d/datasets/issues/53#issuecomment-396528917
$ zgrep -o "[0-9a-z]*\.siva" ~/.pga/latest.csv.gz | sort | uniq | wc -l
239807
$ hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest | grep -c "\.siva$"
239807
Plan
- WIP: run latest Gemini Hash \w latest Engine 0.6.3 on single shard (~10Gb, ~1000 repos, ~1/250 of whole) using current staging pipeline cluster configuration
- [x] Executor page was not available on proxied Spark Web UI https://github.com/src-d/issues-infrastructure/issues/187#issuecomment-396916618
- [x] Run every stage of Gemini Hash
- [x] get Files: ~30min (50min before), 2.5Gb (17Mb \wo content), Parquet: 900Mb
- [x] extract UAST: ~1h, 2.3Gb, Parquet: 700mb https://github.com/src-d/engine/issues/402
- [x] extract Features: ~1.1h, 230Mb, Parquet: 80Mb
- [x] docFreq: 3sec, 110Mb, 73Mb json
- [x] hashFeatures: ~2h now (due to excessive locking, 50min \wo lock), 170Mb, Parquet: 20Mb
- [x] saveHashes: 16sec, hashes ~100Mb, meta ~30Mb
- [x] Run Report
- WIP:
./reportusing DB from the hash above ~6h
- WIP:
- run on whole PGA
Improvements directions:
- will need more performance optimizations
- share single feClient/connecting per partition, instead of creating for every row
- uast de-serialize -> serialize -> feature extractor
- RDD -> DataFrames
- support on-disk Parquet "cache" at every stage
- process each shard individually?
- will need more performance optimizations
Blocked, as all Feature Extractors are deployed under https://github.com/src-d/issues-infrastructure/issues/184 are part of new, separate Apache Spark cluster in a different k8s namespace -n feature-extractor, that does not seem to have access to HDFS 😕
Hash has finished successfully, I'm submitting PRs now to Gemini that enabled it.
Report is
cc.makeBuckets()40minReport.findConnectedComponents()~6h
1h for hashing a ~1/250 of PGA on 3 machines of pipeline staging cluster
Configuration
--conf "spark.executor.memory=16g" \
--conf "spark.local.dir=/spark-temp-data" \
--conf "spark.executor.extraJavaOptions='-Djava.io.tmpdir=/spark-temp-data -Dlog4j.configuration=log4j.properties'" \
--conf "spark.driver.memory=8g" \
--conf "spark.tech.sourced.engine.skip.read.errors=true" \
--conf "spark.files.maxPartitionBytes=12582912" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.eventLog.enabled=true" \
--conf "spark.eventLog.dir=hdfs://hdfs-namenode.default.svc.cluster.local/apps/gemini/spark-logs" \
--files src/main/resources/log4j.properties \
Command
time MASTER="spark://fe-spark-spark-master:7077" ./hash -v \
-k dockergemini4 \
-h scylladb.default.svc.cluster.local \
hdfs://hdfs-namenode.default.svc.cluster.local/pga/siva/latest/ff | tee hash-pga-ff-4.logs
Output
Feature Extraction exceptions
Processed: 4304060, skipped: 120
- TimeoutException -> 109
- StatusRuntimeException -> 11
real 61m2.058s
FE exceptions
ERROR SparkFEClient: feature extractor error: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
ERROR SparkFEClient: feature extractor error: io.grpc.StatusRuntimeException: INTERNAL: Exception deserializing request!
FATAL vendor/golang.org/x/text/unicode/norm/tables.go: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (4323138 vs. 4194304)
UAST extraction exceptions
WARN Bblfsh: FATAL src/main/java/org/yardstickframework/BenchmarkServerStartUp.java: EOF
WARN Bblfsh: FATAL xs6/extensions/crypt/crypt_ghash.js: message is not defined; unsupported: non-object root node
WARN Bblfsh: FATAL vendor/golang.org/x/text/encoding/charmap/tables.go: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (5617479 vs. 4194304)
DB
$ kubectl exec -it scylladb-0 -- /bin/bash
$ cqlsh
use dockergemini4;
select count(1) from meta;
127379
select count(1) from hashtables;
426560
Thanks a lot for the detailed results, @bzz!
Question: how are we sampling the repos for each of these tests?
Question: how are we sampling the repos for each of these tests?
Good question. We always just used only a single shard of PGA dataset - all the repos, who's siva file names start with prefix /ff/.
Overall, on Apache Spark performance depends on data distribution A LOT, so attaching .siva file size distribution histogram in 10mb buckets
hdfs dfs -du hdfs://hdfs-namenode/pga/siva/latest/ff/ | grep "\.siva$" | awk -v "size=100048576" -f hist.awk
0 100048576 912
100048576 200097152 17
200097152 300145728 2
300145728 400194304 1
400194304 500242880 1
500242880 600291456
600291456 700340032
700340032 800388608
800388608 900437184
900437184 1000485760 1
1000485760 1100534336 1
Local: 1mb, 30k features Cluster: 170Mb, 5.5mil features
DataFrame
local: 8sec, cluster: 4sec
val freqDf = features.withColumnRenamed("_1", "feature").withColumnRenamed("_2", "doc")
.select("feature", "doc")
.distinct
.groupBy("feature")
.agg(count("*").alias("cnt"))
.map(row => (row.getAs[String]("feature"), row.getAs[Long]("cnt")))
.collect().toMap
RDD
local: 4sec, cluster: 5s
val freq = features.rdd
.map { case (feature, doc, _) => (feature, doc) }
.distinct
.map { case (token, _) => (token, 1) }
.reduceByKey(_ + _)
.collectAsMap()
DataFrame API does not seem to change performance much, but still has nice benefit of uniform API.
There are 141 .siva files bigger then 1Gb, with rest 260+k being smaller. Those outliers can be moved, to get shorter tail of task execution time on average
hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest/ | grep "\.siva$" | awk '{if ($5 > 1073741824) print $8}' | wc -l
141
hdfs dfs -ls -R hdfs://hdfs-namenode/pga/siva/latest/ \
| grep "\.siva$" | awk '{if ($5 > 1073741824) print $8}' \
| cut -d '/' -f 7- \
| xargs -I{} sh -c 'hdfs dfs -mkdir -p $(dirname hdfs://hdfs-namenode/pga/siva/biggest/{}); hdfs dfs -mv hdfs://hdfs-namenode/pga/siva/latest/{} hdfs://hdfs-namenode/pga/siva/biggest/{}'
After moving biggest files, jobs fail with
org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 32343809.
After setting spark.kryoserializer.buffer.max=1g jobs fail with
tech.sourced.siva.SivaException: Exception at file 022c7272f0c1333a536cb319beadc4171cc8ff6a.siva: At Index footer, index size: Java implementation of siva doesn't support values greater than 9223372036854775807
which at this point might indicate broken .siva files on pga get