apache-spark-internals icon indicating copy to clipboard operation
apache-spark-internals copied to clipboard

refine the pending `FIXME` in `addFile` function

Open xkx9431 opened this issue 4 years ago • 2 comments

Want help refine the leaving FIXME in addFile function

please help review, if you think there are some inappropriate interpretation , I will refine.

Changes:

Firstly, addFile validate the schema of given path. For a no-schema path, addFile converts it to a canonical form. For a local schema path, addFile prints out the following WARN message to the logs and exits.

File with 'local' scheme is not supported to add to file server, since it is already available on every node.

And for other schema path, addFile creates a Hadoop Path from the given path.

addFile Will validate the URL if the path is an HTTP, HTTPS or FTP URI.

addFile Will throw SparkException with below message if path is local directories but not in local mode.

addFile does not support local directories when not running local mode.

addFile Will throw SparkException with below message if path is directories but not turn on recursive flag.

Added file $hadoopPath is a directory and recursive is not turned on.

validation

preview in local vs-code mode:

current the snap code are below

 /**
   * Add a file to be downloaded with this Spark job on every node.
   *
   * If a file is added during execution, it will not be available until the next TaskSet starts.
   *
   * @param path can be either a local file, a file in HDFS (or other Hadoop-supported
   * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
   * use `SparkFiles.get(fileName)` to find its download location.
   * @param recursive if true, a directory can be given in `path`. Currently directories are
   * only supported for Hadoop-supported filesystems.
   *
   * @note A path can be added only once. Subsequent additions of the same path are ignored.
   */
  def addFile(path: String, recursive: Boolean): Unit = {
    val uri = new Path(path).toUri
    val schemeCorrectedPath = uri.getScheme match {
      case null => new File(path).getCanonicalFile.toURI.toString
      case "local" =>
        logWarning("File with 'local' scheme is not supported to add to file server, since " +
          "it is already available on every node.")
        return
      case _ => path
    }

    val hadoopPath = new Path(schemeCorrectedPath)
    val scheme = new URI(schemeCorrectedPath).getScheme
    if (!Array("http", "https", "ftp").contains(scheme)) {
      val fs = hadoopPath.getFileSystem(hadoopConfiguration)
      val isDir = fs.getFileStatus(hadoopPath).isDirectory
      if (!isLocal && scheme == "file" && isDir) {
        throw new SparkException(s"addFile does not support local directories when not running " +
          "local mode.")
      }
      if (!recursive && isDir) {
        throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " +
          "turned on.")
      }
    } else {
      // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
      Utils.validateURL(uri)
    }

    val key = if (!isLocal && scheme == "file") {
      env.rpcEnv.fileServer.addFile(new File(uri.getPath))
    } else {
      schemeCorrectedPath
    }
    val timestamp = System.currentTimeMillis
    if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
      logInfo(s"Added file $path at $key with timestamp $timestamp")
      // Fetch the file locally so that closures which are run on the driver can still use the
      // SparkFiles API to access files.
      Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
        env.securityManager, hadoopConfiguration, timestamp, useCache = false)
      postEnvironmentUpdate()
    } else {
      logWarning(s"The path $path has been added already. Overwriting of added paths " +
       "is not supported in the current version.")
    }
  }

xkx9431 avatar Sep 16 '21 06:09 xkx9431

@jaceklaskowski can you help review this PR ? Thanks a lot.

xkx9431 avatar Sep 17 '21 06:09 xkx9431

@jaceklaskowski can you help review this PR ? Thanks a lot.

Hi @jaceklaskowski , if you think this PR is inappropriate, feel free to close this. And I also wish join one PR if you think this is right.

xkx9431 avatar Sep 18 '21 15:09 xkx9431