Mobius icon indicating copy to clipboard operation
Mobius copied to clipboard

createUnionStream failed for EventHub sample

Open PerthCharern opened this issue 7 years ago • 6 comments

Issue:

Running the EventHub sample in HD Insights Spark cluster failed with this error: [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils

Details:

I am trying to run the EventHub example. Here's my event hub parameter and the relevant part of the code:

var sparkContext = new SparkContext(new SparkConf().SetAppName("SparkCLREventHubExample"));
var eventhubsParams = new Dictionary<string, string>()
{
	{"eventhubs.policyname", "myreceivepolicy"},
	{"eventhubs.policykey", "Sdyx+lV/qaEPfzKOhYD8EUHIIpD70hIUCV+Jv+3Apy0="},
	{"eventhubs.namespace", "sparkscratch"},
	{"eventhubs.name", "sparkscratch1"},
	{"eventhubs.partition.count", "2"},
	{"eventhubs.consumergroup", "$default"},
	{"eventhubs.checkpoint.dir", "/EventCheckpoint"},
	{"eventhubs.checkpoint.interval", "10"},
};
const int windowDurationInSecs = 5;
const int slideDurationInSecs = 5;
const string checkpointPath = "/EventCheckpointSpark";

const long slideDurationInMillis = 5000;
StreamingContext sparkStreamingContext = StreamingContext.GetOrCreate(checkpointPath,
	() =>
	{
		var ssc = new StreamingContext(sparkContext, slideDurationInMillis);
		ssc.Checkpoint(checkpointPath);

		var stream = EventHubsUtils.CreateUnionStream(ssc, eventhubsParams);
		var countByLogLevelAndTime = stream
			.Map(bytes => Encoding.UTF8.GetString(bytes))
			.Filter(line => line.Contains(","))
			.Map(line => line.Split(','))
			.Map(columns => new KeyValuePair<string, int>(string.Format("{0},{1}", columns[0], columns[1]), 1))
			.ReduceByKeyAndWindow((x, y) => x + y, (x, y) => x - y, windowDurationInSecs, slideDurationInSecs, 3)
			.Map(logLevelCountPair => string.Format("{0},{1}", logLevelCountPair.Key, logLevelCountPair.Value));

		countByLogLevelAndTime.ForeachRDD(countByLogLevel =>
		{
			//countByLogLevel.SaveAsTextFile( string.Format( "{0}/{1}", outputPath, Guid.NewGuid() ) );

			var dimensionalCountCollection = countByLogLevel.Collect();
			
			foreach ( var dimensionalCountItem in dimensionalCountCollection)
			{
				Console.WriteLine(dimensionalCountItem);
			}
		});

		return ssc;
	});
sparkStreamingContext.Start();
sparkStreamingContext.AwaitTermination();

I double-checked the policy name and key. I also tried the 'full' namespace with .servicebus.windows.net suffix, and also $Default instead of $default for the consumer group.

Here's the command I use to run (formatted for readability):

/home/sshuser/mobius/runtime/scripts/sparkclr-submit.sh  
--master yarn 
--deploy-mode client 
--jars /home/sshuser/MobiusDependencies/eventhubs-client-0.9.1.jar,/home/sshuser/MobiusDependencies/qpid-amqp-1-0-client-0.32.jar,/home/sshuser/MobiusDependencies/qpid-amqp-1-0-common-0.32.jar,/home/sshuser/MobiusDependencies/spark-streaming-eventhubs_2.10-0.1.0.jar 
--exe SparkClrEventHub.exe /home/sshuser/Release

In all those attempts, I get the same error:

17/03/15 00:01:09 ERROR CSharpBackendHandler: methods:
17/03/15 00:01:09 ERROR CSharpBackendHandler: public static org.apache.spark.streaming.api.java.JavaDStream org.apache.spark.streaming.api.csharp.EventHubsUtils.createUnionStream(org.apache.spark.streaming.api.java.JavaStreamingContext,scala.collection.immutable.Map,org.apache.spark.storage.StorageLevel)
17/03/15 00:01:09 ERROR CSharpBackendHandler: args:
17/03/15 00:01:09 ERROR CSharpBackendHandler: argType: org.apache.spark.streaming.api.java.JavaStreamingContext, argValue: org.apache.spark.streaming.api.java.JavaStreamingContext@47d61ec2
17/03/15 00:01:09 ERROR CSharpBackendHandler: argType: scala.collection.immutable.HashMap.HashTrieMap, argValue: Map(eventhubs.name -> sparkscratch1, eventhubs.checkpoint.dir -> /EventCheckpoint, eventhubs.consumergroup -> $default, eventhubs.partition.count -> 2, eventhubs.checkpoint.interval -> 10, eventhubs.policykey -> Sdyx+lV/qaEPfzKOhYD8EUHIIpD70hIUCV+Jv+3Apy0=, eventhubs.namespace -> sparkscratch, eventhubs.policyname -> myreceivepolicy)
17/03/15 00:01:09 ERROR CSharpBackendHandler: argType: org.apache.spark.storage.StorageLevel, argValue: StorageLevel(memory, deserialized, 1 replicas)
[2017-03-15 00:01:09,057] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
[2017-03-15 00:01:09,057] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.streaming.api.csharp.EventHubsUtils$.createUnionStream(EventHubsUtils.scala:27)
        at org.apache.spark.streaming.api.csharp.EventHubsUtils.createUnionStream(EventHubsUtils.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.api.csharp.CSharpBackendHandler.handleMethodCall(CSharpBackendHandler.scala:156)
        at org.apache.spark.api.csharp.CSharpBackendHandler.handleBackendRequest(CSharpBackendHandler.scala:106)
        at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:32)
        at org.apache.spark.api.csharp.CSharpBackendHandler.channelRead0(CSharpBackendHandler.scala:28)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/Logging
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.streaming.eventhubs.EventHubsUtils.createUnionStream(EventHubsUtils.scala)
        ... 32 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 44 more

[2017-03-15 00:01:09,058] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] - JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
[2017-03-15 00:01:09,065] [1] [ERROR] [Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge] -
*******************************************************************************************************************************
  at Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge.CallJavaMethod (Boolean isStatic, System.Object classNameOrJvmObjectReference, System.String methodName, System.Object[] parameters) <0x40ca8230 + 0x00153> in <filename unknown>:0
*******************************************************************************************************************************

Unhandled Exception:
System.Exception: JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
  at Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge.CallJavaMethod (Boolean isStatic, System.Object classNameOrJvmObjectReference, System.String methodName, System.Object[] parameters) <0x40ca8230 + 0x00153> in <filename unknown>:0
[ERROR] FATAL UNHANDLED EXCEPTION: System.Exception: JVM method execution failed: Static method createUnionStream failed for class org.apache.spark.streaming.api.csharp.EventHubsUtils when called with 3 parameters ([Index=1, Type=JvmObjectReference, Value=10], [Index=2, Type=JvmObjectReference, Value=15], [Index=3, Type=JvmObjectReference, Value=16], )
  at Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge.CallJavaMethod (Boolean isStatic, System.Object classNameOrJvmObjectReference, System.String methodName, System.Object[] parameters) <0x40ca8230 + 0x00153> in <filename unknown>:0

PerthCharern avatar Mar 15 '17 00:03 PerthCharern

The issue here is that the versions of jar files suggested are not compatible with Mobius 2.0.200 examples. I will post the exact details.

PerthCharern avatar Mar 17 '17 21:03 PerthCharern

The jar dependency files posted on this page https://github.com/Microsoft/Mobius/blob/master/notes/running-mobius-app.md#standalone-cluster

  • qpid-amqp-1-0-client-0.32.jar
  • qpid-amqp-1-0-common-0.32.jar
  • eventhubs-client-0.9.1.jar
  • spark-streaming-eventhubs_2.10-0.1.0.jar

do NOT work with Mobius 2.0.200.

I have experimented a few versions, and verified that the following combination works:

  • qpid-amqp-1-0-client-0.32.jar
  • qpid-amqp-1-0-common-0.32.jar
  • eventhubs-client-1.0.1.jar
  • spark-streaming-eventhubs_2.11-2.0.3.jar

PerthCharern avatar Mar 17 '17 22:03 PerthCharern

@perthcha may I get your e-mail? I have some question. Thanks

Mrsevic avatar Mar 19 '17 21:03 Mrsevic

Could you post the problem here instead if it's related to this issue? I frequently check my github account and should be able to answer the questions here.

If it's something else, please feel free to open a new issue and tag me. I will take a look as well.

PerthCharern avatar Mar 20 '17 18:03 PerthCharern

I will tag you then. It is related to Kafka streaming issues.

Thanks

Mrsevic avatar Mar 20 '17 20:03 Mrsevic

@perthcha thanks for getting to the root cause of the issue and sharing your findings.

skaarthik avatar Mar 20 '17 21:03 skaarthik