hadoop-pcap
hadoop-pcap copied to clipboard
Porting code to spark 2.0
Does this code work on spark 2.0? I tried and it was giving me an NPE. If you know anyone who had done this, then let me know, or I can try my hand at it.
Hi Manesh,
I do not know what you have tried, but this library does not support Spark APIs, just map/reduce and Hive.
However, you should be able to use org.apache.spark.SparkContext#hadoopFile
methods with classes from the net.ripe.hadoop.pcap.mr1.io
package.
That is exactly what I did, let me paste the code below. The code below does not work with spark 2.0+ and my guess is its due to change in some hadoop api going from 2.6 to 2.7. So may be my question should be, does this library work with hadoop 2.7+
System.setProperty("hadoop.home.dir", "D:/hadoop-2.6.5")
val conf: SparkConf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("ERROR")
val hadoopConf = new org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
val jobConf = new JobConf(sc.hadoopConfiguration)
//FileInputFormat.setInputPaths(jobConf, "hdfs://localhost:9000/pcap/small_00000_20120316062947.pcap")
val something = sc.hadoopFile(
//hadoopConf,
"hdfs://localhost:9000/pcap/",
classOf[PcapInputFormat],
classOf[LongWritable],
classOf[ObjectWritable],
2
)
val another = something.map{case(k,v) => (k.get(),v.get().asInstanceOf[Packet])}
another.take(10).map(println)*
Output is as follows. Once we get the data in RDD, we can then put it in hive.
*
17/06/30 17:39:27 INFO BlockManagerMaster: Registered BlockManager
(1,dst=192.168.202.79,ip_flags_df=false,tcp_flag_ns=false,ip_header_length=20,pr
otocol=TCP,ip_version=4,len=47,tcp_seq=2162570451,id=36140,tcp_flag_urg=false,tc
p_header_length=32,fragment_offset=0,tcp_flag_cwr=false,src=192.168.229.254,ttl=
254,src_port=443,tcp_flag_rst=false,fragment=false,tcp_ack=4204467708,dst_port=4
6117,tcp_flag_ack=true,tcp_flag_fin=false,ts_usec=1.331901E9,ip_flags_mf=false,t
cp_flag_syn=false,tcp_flag_psh=true,ts=1331901000,ts_micros=0,tcp_flag_ece=false
)
...
(9,dst=192.168.202.79,ip_flags_df=false,tcp_flag_ns=false,ip_header_length=20,pr
otocol=TCP,ip_version=4,len=0,tcp_seq=3045988242,id=1572,tcp_flag_urg=false,tcp_
header_length=44,fragment_offset=0,tcp_flag_cwr=false,src=192.168.229.251,ttl=12
7,src_port=80,tcp_flag_rst=false,fragment=false,tcp_ack=2662467557,dst_port=5046
5,tcp_flag_ack=true,tcp_flag_fin=false,ts_usec=1.331901E9,ip_flags_mf=false,tcp_
flag_syn=true,tcp_flag_psh=false,ts=1331901000,ts_micros=0,tcp_flag_ece=false)
(10,dst=192.168.202.79,ip_flags_df=false,tcp_flag_ns=false,ip_header_length=20,p
rotocol=TCP,ip_version=4,len=50,tcp_seq=2162570606,id=56707,tcp_flag_urg=false,t
cp_header_length=32,fragment_offset=0,tcp_flag_cwr=false,src=192.168.229.254,ttl
=254,src_port=443,tcp_flag_rst=false,fragment=false,tcp_ack=4204467907,dst_port=
46117,tcp_flag_ack=true,tcp_flag_fin=false,ts_usec=1.331901E9,ip_flags_mf=false,
tcp_flag_syn=false,tcp_flag_psh=true,ts=1331901000,ts_micros=0,tcp_flag_ece=fals
e)*
As you could see in the hadoop-pcap/hadoop-pcap-lib/pom.xml
, it has been compiled with hadoop-core version 2.6.0-mr1-cdh5.5.2, and hadoop-common version 2.6.0-cdh5.5.2. So strictly speaking it's not guaranteed to be working with hadoop 2.7.
You could try to change versions in pom.xml and compile your own version of the library with the required version of Hadoop API.
Thanks Oleg, I will try to port it when I find the need/time :), its a very useful library. Btw my name is maHesh :)