jgit-spark-connector
jgit-spark-connector copied to clipboard
Cannot find siva file in complex query
Expected Behavior
The following query should not crash:
-- Repository count per language precense.
-- If any file in language X is present in HEAD of a repository,
-- it contributes one for the language count.
-- Forks are excluded.
--
-- category: slow,enry
SELECT
language,
COUNT(repository_id) AS repository_count
FROM (
SELECT DISTINCT
t.repository_id AS repository_id,
COALESCE(
classifyLanguages(b.is_binary, t.path, b.content),
'Unknown') AS language
FROM
tree_entries t JOIN blobs b
ON
t.repository_id = b.repository_id AND
t.reference_name = b.reference_name AND
t.blob = b.blob_id
JOIN (
SELECT DISTINCT
s.repository_id AS repository_id
FROM (
SELECT
hash,
MAX(STRUCT(index, repository_id)) AS s
FROM commits
WHERE
index != 1
AND reference_name = 'refs/heads/HEAD'
AND SIZE(parents) == 0
GROUP BY hash
) AS q1
) AS r
ON
b.repository_id = r.repository_id
WHERE
t.reference_name = 'refs/heads/HEAD'
) AS q2
GROUP BY language
ORDER BY repository_count DESC
Current Behavior
Query immediately crashes:
Relation in the other, or the Join type is not supported.
[Stage 2:> (0 + 9) / 9][Stage 4:> (0 + 9) / 9]18/04/09 09:35:33 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 20)
ExitCodeException exitCode=1: chmod: cannot access '/tmp/spark-a937af10-3f27-436a-8542-4bbf7033adce/siva-files/06a5cabd0dc53f58e36d3103d01fdd2ecd6d232f.siva': No such file or directory
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2034)
at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2003)
at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:1979)
at tech.sourced.engine.provider.RepositoryObjectFactory.genSivaRepository(RepositoryProvider.scala:206)
at tech.sourced.engine.provider.RepositoryObjectFactory.create(RepositoryProvider.scala:106)
at tech.sourced.engine.provider.RepositoryObjectFactory.create(RepositoryProvider.scala:100)
Possible Solution
My best guess is that a task is cleaning up a temporary siva file that other task needs simultaneously. But I couldn't debug this properly yet.
Steps to Reproduce
- Execute the provided SQL query against a siva dataset in HDFS.
Context
I'm trying a complex query containing a join with a subquery. Both sides access the same repositories.
Your Environment (for bugs)
- Version used: engine-jupyter 0.5.5 on Docker
- Operating System and version: pipeline-staging cluster
- Some needed resources to reproduce the problem: If you need access to a system exhibiting this problem, ping me.
Here's the generated physical plan:
*Sort [repository_count#138L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(repository_count#138L DESC NULLS LAST, 200)
+- *HashAggregate(keys=[language#137], functions=[count(1)])
+- Exchange hashpartitioning(language#137, 200)
+- *HashAggregate(keys=[language#137], functions=[partial_count(1)])
+- *HashAggregate(keys=[repository_id#49, language#137], functions=[])
+- Exchange hashpartitioning(repository_id#49, language#137, 200)
+- *HashAggregate(keys=[repository_id#49, language#137], functions=[])
+- *Project [repository_id#49, coalesce(UDF(is_binary#65, path#51, content#64), Unknown) AS language#137]
+- *SortMergeJoin [repository_id#62], [repository_id#135], Inner
:- *Sort [repository_id#62 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(repository_id#62, 200)
: +- *Project [repository_id#49, path#51, repository_id#62, content#64, is_binary#65]
: +- *Filter (((blob#52 = blob_id#60) && (reference_name#50 = refs/heads/HEAD)) && (reference_name#63 = refs/heads/HEAD))
: +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(path,StringType,false), StructField(blob,StringType,false), StructField(blob_id,StringType,false), StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(content,BinaryType,true), StructField(is_binary,BooleanType,false)),Some((((repository_id#49 = repository_id#62) && (reference_name#50 = reference_name#63)) && (blob#52 = blob_id#60))),None) [content#64,path#51,repository_id#62,reference_name#50,is_binary#65,blob_id#60,repository_id#49,reference_name#63,blob#52] PushedFilters: [EqualTo(reference_name,refs/heads/HEAD), EqualTo(reference_name,refs/heads/HEAD)], ReadSchema: struct<repository_id:string,path:string,repository_id:string,content:binary,is_binary:boolean>
+- *Sort [repository_id#135 ASC NULLS FIRST], false, 0
+- *HashAggregate(keys=[repository_id#135], functions=[])
+- Exchange hashpartitioning(repository_id#135, 200)
+- *HashAggregate(keys=[repository_id#135], functions=[])
+- *Filter isnotnull(repository_id#135)
+- SortAggregate(key=[hash#23], functions=[max(named_struct(index, index#22, repository_id, repository_id#20))])
+- *Sort [hash#23 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(hash#23, 200)
+- SortAggregate(key=[hash#23], functions=[partial_max(named_struct(index, index#22, repository_id, repository_id#20))])
+- *Sort [hash#23 ASC NULLS FIRST], false, 0
+- *Project [repository_id#20, index#22, hash#23]
+- *Filter (((size(parents#25) = 0) && NOT (index#22 = 1)) && (reference_name#21 = refs/heads/HEAD))
+- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(index,IntegerType,false), StructField(hash,StringType,false), StructField(message,StringType,false), StructField(parents,ArrayType(StringType,false),true), StructField(parents_count,IntegerType,false), StructField(author_email,StringType,true), StructField(author_name,StringType,true), StructField(author_date,TimestampType,true), StructField(committer_email,StringType,true), StructField(committer_name,StringType,true), StructField(committer_date,TimestampType,true)),None,Some(commits)) [reference_name#21,repository_id#20,hash#23,index#22,parents#25] PushedFilters: [Not(EqualTo(index,1)), EqualTo(reference_name,refs/heads/HEAD)], ReadSchema: struct<repository_id:string,index:int,hash:string>
I could reduce the failing query to this:
SELECT
t.path
FROM
tree_entries t
JOIN (
SELECT DISTINCT
repository_id
FROM commits
) AS r
ON
t.repository_id = r.repository_id
Plan:
*Project [path#51]
+- *SortMergeJoin [repository_id#49], [repository_id#20], Inner
:- *Sort [repository_id#49 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(repository_id#49, 200)
: +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(path,StringType,false), StructField(blob,StringType,false)),None,Some(tree_entries)) [repository_id#49,path#51] ReadSchema: struct<repository_id:string,path:string>
+- *Sort [repository_id#20 ASC NULLS FIRST], false, 0
+- *HashAggregate(keys=[repository_id#20], functions=[])
+- Exchange hashpartitioning(repository_id#20, 200)
+- *HashAggregate(keys=[repository_id#20], functions=[])
+- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(index,IntegerType,false), StructField(hash,StringType,false), StructField(message,StringType,false), StructField(parents,ArrayType(StringType,false),true), StructField(parents_count,IntegerType,false), StructField(author_email,StringType,true), StructField(author_name,StringType,true), StructField(author_date,TimestampType,true), StructField(committer_email,StringType,true), StructField(committer_name,StringType,true), StructField(committer_date,TimestampType,true)),None,Some(commits)) [repository_id#20] ReadSchema: struct<repository_id:string>