Local namespace symbols referenced from foreach-partition function not found.
It would appear that referencing symbols within the current namespace from the function passed to foreach-partition. Oddly enough referencing protocols works, so I originally thought it was specific to helper functions called from protocol implementations, but it turns out calling the helper function directly is broken too. These issues are reproducible when running a spark job with spark-submit using a cluster as the master (I'm using mesos as the master), but obviously do not repro using "local" as the master. I'm using sparkling 1.2.5.
Here's the error I'm seeing:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ip-x-x-x-x.us-west-2.compute.internal): java.lang.IllegalStateException: Attempting to call unbound fn: #'sparkling-bug.core/helper-fn
at clojure.lang.Var$Unbound.throwArity(Var.java:43)
at clojure.lang.AFn.invoke(AFn.java:36)
at sparkling_bug.core.MyImpl.compute(core.clj:21)
at sparkling_bug.core$foreach_partition_is_broken$fn__551$fn__552.invoke(core.clj:34)
at clojure.core$map$fn__4785.invoke(core.clj:2644)
Here's a code snippet:
(ns sparkling-bug.core
(:gen-class)
(:require
[sparkling.core :as spark]
[sparkling.conf :as sparkconf]))
(defn- connect
[]
(-> (sparkconf/spark-conf)
(sparkconf/app-name "sparkling-bug")
(spark/spark-context)))
(defprotocol MyProtocol
(compute [this x] "Perform some computation"))
(defn- helper-fn [x y] (+ x y))
(defrecord MyImpl [y]
MyProtocol
(compute [this x] (helper-fn x y)))
(defn- map-partition-works
[sc]
(let [input (spark/parallelize sc (range 100))
worker (->MyImpl 7)
result (spark/map-partition (fn [part] (map #(compute worker %) (iterator-seq part))) input)]
(println (spark/collect result))))
(defn- foreach-partition-is-broken
[sc]
(let [input (spark/parallelize sc (range 100))
worker (->MyImpl 7)]
(spark/foreach-partition (fn [part] (println (map #(compute worker %) part))) input)))
(defn- require-fixes-the-issue
[sc]
(let [input (spark/parallelize sc (range 100))
worker (->MyImpl 7)]
(spark/foreach-partition (fn [part]
;; require our own namespace so that the helper fn symbol is declared on the executor
(require 'sparkling-bug.core)
(println (map #(compute worker %) part))) input)))
(defn- calling-local-function-is-broken
[sc]
(let [input (spark/parallelize sc (range 100))]
(spark/foreach-partition
(fn [part]
(println (map #(helper-fn 7 %) part))) input)))
(defn- calling-let-function-works
[sc]
(let [input (spark/parallelize sc (range 100))
tmp-fn helper-fn]
(spark/foreach-partition
(fn [part]
(println (map #(tmp-fn 7 %) part))) input)))
(defn -main
"Run a very simple sparkling job."
[& _]
(with-open [sc (connect)]
;; (map-partition-works sc) ;; ok
(foreach-partition-is-broken sc) ;; error
;; (require-fixes-the-issue sc) ;; ok
;; (calling-local-function-is-broken sc) ;; error
;; (calling-let-function-works sc) ;; ok
))
Here's my project.clj:
(defproject sparkling_bug "0.1.0-SNAPSHOT"
:description "Example of sparkling issue calling a protocol function on a record instance."
:dependencies [[org.clojure/clojure "1.8.0"]
[gorillalabs/sparkling "1.2.5"]
[org.apache.spark/spark-core_2.10 "1.6.1"]
[org.apache.spark/spark-sql_2.10 "1.6.1"]]
:aot [#".*" sparkling.serialization sparkling.destructuring]
:main sparkling-bug.core
:target-path "target/%s")
Here's the command I'm using to launch:
spark-submit --master mesos://localhost:5050 --conf spark.mesos.executor.home=/opt/spark-1.6.1-bin-hadoop2.6 ./target/uberjar/sparkling_bug-0.1.0-SNAPSHOT-standalone.jar
Eliminating the comp with iterator-seq in the foreach-partition function (and making that call in the function passed to foreach-partition instead), seems to fix this issue. I have seen issues with using comp on functions in other namespaces in the past.
https://github.com/gorillalabs/sparkling/blob/develop/src/clojure/sparkling/core.clj#L591
As an example of the issues I've run into using comp, if comp is used to combine something referencing our helper function with iterator-seq, the result cannot be used with sparkling. For an example, add this to the repro code above:
(defn- map-partition-with-comp-broken
[sc]
(let [input (spark/parallelize sc (range 100))]
(->> input
(spark/map-partition (comp (partial map #(helper-fn 7 %)) iterator-seq))
(spark/collect)
(prn))))
However, as can be seen in the example code above, it works fine to create a function that calls our helper together with iterator-seq.
The problem with using comp in the case of map-partition would appear to be unavoidable or by design given the implementation of readIFn which only calls require on the namespace that the function passed to it was declared in, but since clojure doesn't provide any way to access what makes up a function, I don't see how this could be resolved. That said, it would be nice if the getting started guide made it more clear that functions passed to sparkling methods need to be top level (i.e. not the result of a call to comp or some other function that returns a function in a different namespace than the function actually being called).
As far as the foreach-partition function goes, I believe this can be resolved by modifying the foreach-partition implementation to produce a special version of VoidFunction that takes an override for the namespace of the function passed to it.
I've run in to similar problems, sometimes accessing functions in my own namespace. I think that the problem is roughly along these lines:
All imports, etc, happen in the JVM of the driver. However, when running functions on remote JVMs, the "clojure context", as it were, hasn't had the same series of initialization steps, including imports, defs, etc.
I don't really know what the solution is. I'm sometimes able to work around the problem by wrapping things in a call to sparkling.function.function (or whatever is appropriate), so that it gets serialized properly without having to refer to defs that have not happened on the remote JVM.
I was able to resolve this pretty satisfactorily by partially traversing fields of the IFn instances being passed to the serializer: /paul-amperity/sparkling. This not only fixes the foreach-partition issue, but generally makes comp and partial usable.
There are more issues with the existing string split on the class name approach to determining what namespace to require, including that the part of the class name before the $ includes the record name when an anonymous function is declared within a protocol implementation function body, and if you call functions that are declared in an un-required namespace (for example if a function or protocol implementation was passed into your record constructor). This change fixes those as well.
I'm currently stuck on 1.2.x because 2.0.0 does not appear to be backward compatible with the version of spark that I'm using (1.6.1). But if there is interest in this change I can move it to the head of the develop branch, write some tests for it, and make a pull request.