jgit-spark-connector icon indicating copy to clipboard operation
jgit-spark-connector copied to clipboard

Cannot find siva file in complex query

Open smola opened this issue 7 years ago • 2 comments

Expected Behavior

The following query should not crash:

-- Repository count per language precense.
-- If any file in language X is present in HEAD of a repository,
-- it contributes one for the language count.
-- Forks are excluded.
--
-- category: slow,enry
SELECT
    language,
    COUNT(repository_id) AS repository_count
FROM (
    SELECT DISTINCT
        t.repository_id AS repository_id,
        COALESCE(
            classifyLanguages(b.is_binary, t.path, b.content),
            'Unknown') AS language
    FROM
        tree_entries t JOIN blobs b
        ON
            t.repository_id = b.repository_id AND
            t.reference_name = b.reference_name AND
            t.blob = b.blob_id
        JOIN (
            SELECT DISTINCT
                s.repository_id AS repository_id
            FROM (
                SELECT
                    hash,
                    MAX(STRUCT(index, repository_id)) AS s
                FROM commits
                WHERE
                    index != 1
                    AND reference_name = 'refs/heads/HEAD'
                    AND SIZE(parents) == 0
                GROUP BY hash
            ) AS q1
        ) AS r
        ON
            b.repository_id = r.repository_id
    WHERE
        t.reference_name = 'refs/heads/HEAD'
    ) AS q2
GROUP BY language
ORDER BY repository_count DESC

Current Behavior

Query immediately crashes:

Relation in the other, or the Join type is not supported.
[Stage 2:>                  (0 + 9) / 9][Stage 4:>                  (0 + 9) / 9]18/04/09 09:35:33 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 20)
ExitCodeException exitCode=1: chmod: cannot access '/tmp/spark-a937af10-3f27-436a-8542-4bbf7033adce/siva-files/06a5cabd0dc53f58e36d3103d01fdd2ecd6d232f.siva': No such file or directory

	at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
	at org.apache.hadoop.util.Shell.run(Shell.java:479)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:365)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:289)
	at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2034)
	at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:2003)
	at org.apache.hadoop.fs.FileSystem.copyToLocalFile(FileSystem.java:1979)
	at tech.sourced.engine.provider.RepositoryObjectFactory.genSivaRepository(RepositoryProvider.scala:206)
	at tech.sourced.engine.provider.RepositoryObjectFactory.create(RepositoryProvider.scala:106)
	at tech.sourced.engine.provider.RepositoryObjectFactory.create(RepositoryProvider.scala:100)

Possible Solution

My best guess is that a task is cleaning up a temporary siva file that other task needs simultaneously. But I couldn't debug this properly yet.

Steps to Reproduce

  1. Execute the provided SQL query against a siva dataset in HDFS.

Context

I'm trying a complex query containing a join with a subquery. Both sides access the same repositories.

Your Environment (for bugs)

  • Version used: engine-jupyter 0.5.5 on Docker
  • Operating System and version: pipeline-staging cluster
  • Some needed resources to reproduce the problem: If you need access to a system exhibiting this problem, ping me.

smola avatar Apr 09 '18 09:04 smola

Here's the generated physical plan:

*Sort [repository_count#138L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(repository_count#138L DESC NULLS LAST, 200)
   +- *HashAggregate(keys=[language#137], functions=[count(1)])
      +- Exchange hashpartitioning(language#137, 200)
         +- *HashAggregate(keys=[language#137], functions=[partial_count(1)])
            +- *HashAggregate(keys=[repository_id#49, language#137], functions=[])
               +- Exchange hashpartitioning(repository_id#49, language#137, 200)
                  +- *HashAggregate(keys=[repository_id#49, language#137], functions=[])
                     +- *Project [repository_id#49, coalesce(UDF(is_binary#65, path#51, content#64), Unknown) AS language#137]
                        +- *SortMergeJoin [repository_id#62], [repository_id#135], Inner
                           :- *Sort [repository_id#62 ASC NULLS FIRST], false, 0
                           :  +- Exchange hashpartitioning(repository_id#62, 200)
                           :     +- *Project [repository_id#49, path#51, repository_id#62, content#64, is_binary#65]
                           :        +- *Filter (((blob#52 = blob_id#60) && (reference_name#50 = refs/heads/HEAD)) && (reference_name#63 = refs/heads/HEAD))
                           :           +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(path,StringType,false), StructField(blob,StringType,false), StructField(blob_id,StringType,false), StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(content,BinaryType,true), StructField(is_binary,BooleanType,false)),Some((((repository_id#49 = repository_id#62) && (reference_name#50 = reference_name#63)) && (blob#52 = blob_id#60))),None) [content#64,path#51,repository_id#62,reference_name#50,is_binary#65,blob_id#60,repository_id#49,reference_name#63,blob#52] PushedFilters: [EqualTo(reference_name,refs/heads/HEAD), EqualTo(reference_name,refs/heads/HEAD)], ReadSchema: struct<repository_id:string,path:string,repository_id:string,content:binary,is_binary:boolean>
                           +- *Sort [repository_id#135 ASC NULLS FIRST], false, 0
                              +- *HashAggregate(keys=[repository_id#135], functions=[])
                                 +- Exchange hashpartitioning(repository_id#135, 200)
                                    +- *HashAggregate(keys=[repository_id#135], functions=[])
                                       +- *Filter isnotnull(repository_id#135)
                                          +- SortAggregate(key=[hash#23], functions=[max(named_struct(index, index#22, repository_id, repository_id#20))])
                                             +- *Sort [hash#23 ASC NULLS FIRST], false, 0
                                                +- Exchange hashpartitioning(hash#23, 200)
                                                   +- SortAggregate(key=[hash#23], functions=[partial_max(named_struct(index, index#22, repository_id, repository_id#20))])
                                                      +- *Sort [hash#23 ASC NULLS FIRST], false, 0
                                                         +- *Project [repository_id#20, index#22, hash#23]
                                                            +- *Filter (((size(parents#25) = 0) && NOT (index#22 = 1)) && (reference_name#21 = refs/heads/HEAD))
                                                               +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(index,IntegerType,false), StructField(hash,StringType,false), StructField(message,StringType,false), StructField(parents,ArrayType(StringType,false),true), StructField(parents_count,IntegerType,false), StructField(author_email,StringType,true), StructField(author_name,StringType,true), StructField(author_date,TimestampType,true), StructField(committer_email,StringType,true), StructField(committer_name,StringType,true), StructField(committer_date,TimestampType,true)),None,Some(commits)) [reference_name#21,repository_id#20,hash#23,index#22,parents#25] PushedFilters: [Not(EqualTo(index,1)), EqualTo(reference_name,refs/heads/HEAD)], ReadSchema: struct<repository_id:string,index:int,hash:string>

smola avatar Apr 09 '18 09:04 smola

I could reduce the failing query to this:

SELECT
        t.path
FROM
    tree_entries t
    JOIN (
        SELECT DISTINCT
            repository_id
        FROM commits
    ) AS r
    ON
        t.repository_id = r.repository_id

Plan:

*Project [path#51]
+- *SortMergeJoin [repository_id#49], [repository_id#20], Inner
   :- *Sort [repository_id#49 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(repository_id#49, 200)
   :     +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(commit_hash,StringType,false), StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(path,StringType,false), StructField(blob,StringType,false)),None,Some(tree_entries)) [repository_id#49,path#51] ReadSchema: struct<repository_id:string,path:string>
   +- *Sort [repository_id#20 ASC NULLS FIRST], false, 0
      +- *HashAggregate(keys=[repository_id#20], functions=[])
         +- Exchange hashpartitioning(repository_id#20, 200)
            +- *HashAggregate(keys=[repository_id#20], functions=[])
               +- *Scan GitRelation(org.apache.spark.sql.SparkSession@728e56f6,StructType(StructField(repository_id,StringType,false), StructField(reference_name,StringType,false), StructField(index,IntegerType,false), StructField(hash,StringType,false), StructField(message,StringType,false), StructField(parents,ArrayType(StringType,false),true), StructField(parents_count,IntegerType,false), StructField(author_email,StringType,true), StructField(author_name,StringType,true), StructField(author_date,TimestampType,true), StructField(committer_email,StringType,true), StructField(committer_name,StringType,true), StructField(committer_date,TimestampType,true)),None,Some(commits)) [repository_id#20] ReadSchema: struct<repository_id:string>

smola avatar Apr 09 '18 10:04 smola