Mobius
Mobius copied to clipboard
createUnionStream failed for EventHub sample
Issue:
Running the EventHub sample in HD Insights Spark cluster failed with this error: [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils
Details:
I am trying to run the EventHub example. Here's my event hub parameter and the relevant part of the code:
var sparkContext = new SparkContext(new SparkConf().SetAppName("SparkCLREventHubExample"));
var eventhubsParams = new Dictionary<string, string>()
{
{"eventhubs.policyname", "myreceivepolicy"},
{"eventhubs.policykey", "Sdyx+lV/qaEPfzKOhYD8EUHIIpD70hIUCV+Jv+3Apy0="},
{"eventhubs.namespace", "sparkscratch"},
{"eventhubs.name", "sparkscratch1"},
{"eventhubs.partition.count", "2"},
{"eventhubs.consumergroup", "$default"},
{"eventhubs.checkpoint.dir", "/EventCheckpoint"},
{"eventhubs.checkpoint.interval", "10"},
};
const int windowDurationInSecs = 5;
const int slideDurationInSecs = 5;
const string checkpointPath = "/EventCheckpointSpark";
const long slideDurationInMillis = 5000;
StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpointPath,
() =>
{
var ssc = new StreamingContext(sparkContext, slideDurationInMillis);
ssc.Checkpoint(checkpointPath);
var stream = EventHubsUtils.CreateUnionStream(ssc, eventhubsParams);
var countByLogLevelAndTime = stream
.Map(bytes => Encoding.UTF8.GetString(bytes))
.Filter(line => line.Contains(","))
.Map(line => line.Split(','))
.Map(columns => new KeyValuePair<string, int>(string.Format("{0},{1}", columns[0], columns[1]), 1))
.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y, windowDurationInSecs, slideDurationInSecs, 3)
.Map(logLevelCountPair => string.Format("{0},{1}", logLevelCountPair.Key, logLevelCountPair.Value));
countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
{
//countByLogLevel.SaveAsTextFile( string.Format( "{0}/{1}", outputPath, Guid.NewGuid() ) );
var dimensionalCountCollection = countByLogLevel.Collect();
foreach ( var dimensionalCountItem in dimensionalCountCollection)
{
Console.WriteLine(dimensionalCountItem);
}
});
return ssc;
});
sparkStreamingContext.Start();
sparkStreamingContext.AwaitTermination();
I double-checked the policy name and key. I also tried the 'full' namespace with .servicebus.windows.net suffix, and also $Default instead of $default for the consumer group.
Here's the command I use to run (formatted for readability):
/home/sshuser/mobius/runtime/scripts/sparkclr-submit.sh
--master yarn
--deploy-mode client
--jars /home/sshuser/MobiusDependencies/eventhubs-client-0.9.1.jar,/home/sshuser/MobiusDependencies/qpid-amqp-1-0-client-0.32.jar,/home/sshuser/MobiusDependencies/qpid-amqp-1-0-common-0.32.jar,/home/sshuser/MobiusDependencies/spark-streaming-eventhubs_2.10-0.1.0.jar
--exe SparkClrEventHub.exe /home/sshuser/Release
In all those attempts, I get the same error:
17/03/15 00:01:09 ERROR CSharpBackendHandler: methods:
17/03/15 00:01:09 ERROR CSharpBackendHandler: public static org.apache.spark.streaming.api.java.JavaDStream org.apache.spark.streaming.api.csharp.EventHubsUtils.createUnionStream(org.apache.spark.streaming.api.java.JavaStreamingContext,scala.collection.immutable.Map,org.apache.spark.storage.StorageLevel)
17/03/15 00:01:09 ERROR CSharpBackendHandler: args:
17/03/15 00:01:09 ERROR CSharpBackendHandler: argType: org.apache.spark.streaming.api.java.JavaStreamingContext, argValue: org.apache.spark.streaming.api.java.JavaStreamingContext@47d61ec2
17/03/15 00:01:09 ERROR CSharpBackendHandler: argType: scala.collection.immutable.HashMap.HashTrieMap, argValue: Map(eventhubs.name -> sparkscratch1, eventhubs.checkpoint.dir -> /EventCheckpoint, eventhubs.consumergroup -> $default, eventhubs.partition.count -> 2, eventhubs.checkpoint.interval -> 10, eventhubs.policykey -> Sdyx+lV/qaEPfzKOhYD8EUHIIpD70hIUCV+Jv+3Apy0=, eventhubs.namespace -> sparkscratch, eventhubs.policyname -> myreceivepolicy)
17/03/15 00:01:09 ERROR CSharpBackendHandler: argType: org.apache.spark.storage.StorageLevel, argValue: StorageLevel(memory, deserialized, 1 replicas)
[2017-03-15 00:01:09,057] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
[2017-03-15 00:01:09,057] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.streaming.api.csharp.EventHubsUtils$.createUnionStream(EventHubsUtils.scala:27)
at org.apache.spark.streaming.api.csharp.EventHubsUtils.createUnionStream(EventHubsUtils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:156)
at org.apache.spark.api.csharp.CSharpBackendHandler.handleBackendRequest(CSharpBackendHandler.scala:106)
at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:32)
at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:28)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.streaming.eventhubs.EventHubsUtils.createUnionStream(EventHubsUtils.scala)
... 32 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 44 more
[2017-03-15 00:01:09,058] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
[2017-03-15 00:01:09,065] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] -
*******************************************************************************************************************************
at Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge.CallJavaMethod (Boolean isStatic, System.Object classNameOrJvmObjectReference, System.String methodName, System.Object[] parameters) <0x40ca8230 + 0x00153> in <filename unknown>:0
*******************************************************************************************************************************
Unhandled Exception:
System.Exception: JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
at Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge.CallJavaMethod (Boolean isStatic, System.Object classNameOrJvmObjectReference, System.String methodName, System.Object[] parameters) <0x40ca8230 + 0x00153> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: System.Exception: JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
at Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge.CallJavaMethod (Boolean isStatic, System.Object classNameOrJvmObjectReference, System.String methodName, System.Object[] parameters) <0x40ca8230 + 0x00153> in <filename unknown>:0
The issue here is that the versions of jar files suggested are not compatible with Mobius 2.0.200 examples. I will post the exact details.
The jar dependency files posted on this page https://github.com/Microsoft/Mobius/blob/master/notes/running-mobius-app.md#standalone-cluster
- qpid-amqp-1-0-client-0.32.jar
- qpid-amqp-1-0-common-0.32.jar
- eventhubs-client-0.9.1.jar
- spark-streaming-eventhubs_2.10-0.1.0.jar
do NOT work with Mobius 2.0.200.
I have experimented a few versions, and verified that the following combination works:
- qpid-amqp-1-0-client-0.32.jar
- qpid-amqp-1-0-common-0.32.jar
- eventhubs-client-1.0.1.jar
- spark-streaming-eventhubs_2.11-2.0.3.jar
@perthcha may I get your e-mail? I have some question. Thanks
Could you post the problem here instead if it's related to this issue? I frequently check my github account and should be able to answer the questions here.
If it's something else, please feel free to open a new issue and tag me. I will take a look as well.
I will tag you then. It is related to Kafka streaming issues.
Thanks
@perthcha thanks for getting to the root cause of the issue and sharing your findings.