marceline
marceline copied to clipboard
DSL, Serialization, and AOT
I'm using Storm 0.9.2-incubating and Kafka 0.8.1.1.
I prefer not to AOT my topology if possible, but I have found that if:
- The tp uses an IPartitionedTridentSpout (i.e. TransactionalTridentKafkaSpout); and,
- The remainder of the tp has a parallelism hint applied; and,
- The ns enclosing the topology is not AOT'd
I encounter unbound fn errors when deploying the topology if I use any of the Marceline DSL macros (filters, tridentfns, etc).
The smallest reproducible error scenario being similar to:
(t/deffilter filter-type
[tuple]
true)
(defn ->topology
[]
(let [topology (TridentTopology.)
spout (TransactionalTridentKafkaSpout.
(TridentKafkaConfig.
(ZkHosts. "zk-connect-url") "a-topic"))]
(-> (t/new-stream topology "zk-tx-id" spout)
(t/each ["bytes"] a-filter)
(t/parallelism-hint 2))
(.build topology)))
will lead to exceptions like:
java.lang.RuntimeException: java.lang.IllegalStateException: Attempting to call unbound fn: #'...filter/filter__
Changing the spout to non-partitioned (i.e. FixedBatchSpout), or removing the parallelism hint allows the topology to deploy without a problem, though I suspect the spout-type and parallelism hint specifics are a bit of a red-herring, they ensure in my simple cluster with a single worker that some serialization is going on and I assume that's the key thing.
I've previously encountered issues with de/serializing defrecords with carbonite/kryo/storm which are similar and related to this clojure issue: http://dev.clojure.org/jira/browse/CLJ-1208
Marceline is a pleasure to use compared to my previous approach of elaborate :gen-class constructs. AOT itself is not a killer for me, though I want to limit it where possible. Do we have a good idea of when/where it is necessary and why?
I've talked with @sorenmacbeth about this, and it sounds like it is necessary to AOT the namespaces that use e.g. deffilter
. The reasoning is that those classes must be serializable, and if they are compiled on different JVMs (e.g. parallelism > 1) then they will have different generated classnames. The "anonymous" classnames come from reify
, which is what marcy uses under the hood for interop. Unfortunately I see no easy or desirable way of using some other interop strategy.
(As you anticipated, this is indeed all related to serialization!)
So I think that if you wanted to limit AOT, you could define your components (filters, fns, combiners, etc) in a single AOT namespace. The topology namespace need not be AOT. This is not super elegant but at least you can isolate your AOT ns.
I think the best we can do for now is to add a section to the README making it clear what the AOT requirements are. However I'm open to suggestions if @d-t-w has any!
@d-t-w I'm curious, what is the motivation to try to avoid AOT?
Thanks both. We've experienced issues previously related to:
http://dev.clojure.org/jira/browse/CLJ-1227 http://dev.clojure.org/jira/browse/CLJ-1330
Nothing insurmountable but would prefer to limit scope of AOT if possible. I thought with the Java shims that Marceline uses (ClojureFilter etc) that might be possible.
As an interesting aside, in my example above the unbound fn is 'filter__' which is generated by the deffilter macro. When ClojureFilter attempts to execute that fn it uses the storm Utils loadClojureFn method, which should require the ns, so I'm puzzled to how the fn is unbound. I'm no macro ninja though so it's probably more straight-forward than I understand.
Hi guys, a little further to this, I wrote a couple of blog posts:
http://derek.troywest.com/articles/trident-in-clojure/ http://derek.troywest.com/articles/finer-points-marceline/
Mostly about how fine Marceline is, but at the end of the second post I touch on AOT and why I don't think reify is the cause of this particular issue.
Ta, Derek