Reliable spark streamlining
Hi
We are using spark kafka reliable streaming to consume message from kafka topic do transform and enrich the messages and post the messages to another kafka topic.
We have configured checkpoint directory as a shared network storage drive across master/workers in spark cluster.
When a driver is killed for maintenance purpose and restarted. We are observing the streaming context is getting not initialized after loading block metadata blocks from configured checkpoint directory.
This issue also happening the same on local[*] spark cluster. Following is the error message - [The same error message is also reported on distributed spark cluster with write ahead log enabled]
Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.WindowedDStream@919086 has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:584)
at com.ist.cseng.mongodb.pulladaptor.common.spark.kafka.receiver.CommonSparkReliableKafkaReceiver.startReceiver(CommonSparkReliableKafkaReceiver.java:107)
at org.cseng.mongodb.pulladaptor.spark.oplog.transform.receiver.TransformSparkReliableKafkaReceiver.main(TransformSparkReliableKafkaReceiver.java:269)
Exception in thread "Thread-45" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): akka.actor.InvalidActorNameException: actor name [Receiver-0-1437789010740] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:552)
at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Spark Version - 1.3.1
Following is the code fragment, any help in this regard is highly appreciated
Thanks sunder
public abstract class CommonSparkReliableKafkaReceiver implements Serializable {
private static final long serialVersionUID = 433981586332686851L;
private static Logger LOG = LoggerFactory.getLogger(CommonSparkReliableKafkaReceiver.class);
protected SparkConf sparkConf = null;
protected JavaStreamingContext ssc = null;
protected final String master;
protected final String appName;
protected final long batchDurationInMillis;
protected final String checkPointDirPath;
public CommonSparkReliableKafkaReceiver(String master, String appName, long batchDurationInMillis, String checkPointDirPath) {
this.master = master;
this.appName = appName;
this.batchDurationInMillis = batchDurationInMillis;
this.checkPointDirPath = checkPointDirPath;
}
public void setupSparkConf(Map<String, String> params) {
sparkConf = new SparkConf();
sparkConf.setMaster(master);
if(appName != null)
sparkConf.setAppName(appName);
else
sparkConf.setAppName("AppTest");
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
}
public void setStreamingContext() {
if(sparkConf.get(Constants.SPARK_MASTER) == null) {
sparkConf.setMaster("local[*]");
}
String appName = null;
try {
appName = sparkConf.get(Constants.SPARK_APP_NAME);
} catch(NoSuchElementException e) {
sparkConf.setAppName("AppTest");
}
if(appName == null) {
throw new RuntimeException("Missing mandatory parameter, appName");
}
String directoryPath = System.getProperty("user.dir" ) + checkPointDirPath;
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
File fDirectoryPath = new File(directoryPath);
if(!fDirectoryPath.exists())
{
fDirectoryPath = Utilities.createDirectory(directoryPath);
}
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(batchDurationInMillis));
ssc.checkpoint(fDirectoryPath.getAbsolutePath());
return ssc;
}
};
ssc = JavaStreamingContext.getOrCreate(directoryPath, factory);
LOG.info("Got reference of temp directory for checkpoint - {} ", directoryPath);
ssc.sparkContext().sc().addSparkListener(new StatsReportListener());
}
protected JavaPairReceiverInputDStream<String, String> kafkaStream;
public abstract void createKakaStream();
public void startReceiver() {
ssc.start();
ssc.awaitTermination();
}
}
public class TransformOraclePayloadSparkReceiver extends CommonSparkReliableKafkaReceiver {
private static final long serialVersionUID = -3556450702583078717L;
static Logger LOG = LoggerFactory.getLogger(TransformOraclePayloadSparkReceiver.class);
protected final String zooKeeperConnectAppConfig;
public TransformOraclePayloadSparkReceiver( final String zooKeeperConnectAppConfig, final String master,
final String appName, final long batchDurationInMillis,
final String checkPointDirPath) {
super(master, appName, batchDurationInMillis, checkPointDirPath);
this.zooKeeperConnectAppConfig = zooKeeperConnectAppConfig;
}
private Map<String, String> loadParams()
{
Map<String, String> params = null;
final String zooKeeperConnect =zooKeeperConnectAppConfig;
String prefixPath = "/oracle-gg/transform/app/";
//suffixPath = "/config/transform-params";
ZkClient zClient = createZkClient(zooKeeperConnect);
// String suffixPath = null;
try{
String transformPramasConfigPath = prefixPath +appName+ ZK_CONFIG_TRANSFORM_PARAMS;
String transformParamsDataFromZookeper = loadJsonStringFromZookeeper(zClient, transformPramasConfigPath);
params = parseJsonStringAsMap(transformParamsDataFromZookeper);
//suffixPath = "/config/collection-details";
Map<?, ?> mapInZookeper = null;
//suffixPath = "/config/transform-rules";
String transformRulesConfigPath = prefixPath +appName+ ZK_CONFIG_TRANSFORM_RULES;
String transformRulesDataFromZookeper = loadJsonStringFromZookeeper(zClient, transformRulesConfigPath);
mapInZookeper = parseJsonStringAsMap(transformRulesDataFromZookeper);
System.out.println(mapInZookeper);
//suffixPath = "/config/partition-details";
String pramasConfigPath = prefixPath +appName+ ZK_CONFIG_PARTITION_DETAILS;
String partitionDetailsDataFromZookeper = loadJsonStringFromZookeeper(zClient, pramasConfigPath);
params.put(KafkaPublisherConstants.TABLE_PARTITION_MAP_JSON, partitionDetailsDataFromZookeper);
}finally{
closeZkClient(zClient);
}
return params;
}
@Override
public void createKakaStream() {
Map<String, String> params = null;
final String zooKeeperConnect =zooKeeperConnectAppConfig;
String prefixPath = "/oracle-gg/transform/app/";
//suffixPath = "/config/transform-params";
ZkClient zClient = createZkClient(zooKeeperConnect);
final Map<String, Tuple2<String, String>> collectionToTableNameMap = new HashMap<>();
final Map<String, String> transformRules = new HashMap<String, String>();
// String suffixPath = null;
try{
String transformPramasConfigPath = prefixPath +appName+ ZK_CONFIG_TRANSFORM_PARAMS;
String transformParamsDataFromZookeper = loadJsonStringFromZookeeper(zClient, transformPramasConfigPath);
params = parseJsonStringAsMap(transformParamsDataFromZookeper);
//suffixPath = "/config/collection-details";
String collectionDetailConfigPath = prefixPath +appName+ ZK_CONFIG_COLLECTION_DETAILS;
String dataFromZookeper = loadJsonStringFromZookeeper(zClient, collectionDetailConfigPath);
Map<?, ?> mapInZookeper = parseJsonStringAsMap(dataFromZookeper);
for( Map.Entry<?, ?> entry : mapInZookeper.entrySet()) {
collectionToTableNameMap.put(entry.getKey().toString(), parseToTuple2(entry.getValue().toString()));
}
//suffixPath = "/config/transform-rules";
String transformRulesConfigPath = prefixPath +appName+ ZK_CONFIG_TRANSFORM_RULES;
String transformRulesDataFromZookeper = loadJsonStringFromZookeeper(zClient, transformRulesConfigPath);
mapInZookeper = parseJsonStringAsMap(transformRulesDataFromZookeper);
System.out.println(mapInZookeper);
//suffixPath = "/config/partition-details";
String pramasConfigPath = prefixPath +appName+ ZK_CONFIG_PARTITION_DETAILS;
String partitionDetailsDataFromZookeper = loadJsonStringFromZookeeper(zClient, pramasConfigPath);
params.put(KafkaPublisherConstants.TABLE_PARTITION_MAP_JSON, partitionDetailsDataFromZookeper);
}finally{
closeZkClient(zClient);
}
boolean consumeFromBegining = false;
if(params.containsKey(CONSUME_FROM_BEGINING))
consumeFromBegining = Boolean.parseBoolean(params.get(CONSUME_FROM_BEGINING).toString());
params.put(Constants.GROUP_ID, params.get(Constants.GROUP_ID) + "-l");
if(consumeFromBegining)
ZkUtils.maybeDeletePath(String.valueOf(params.get(Constants.ZOOKEEPER_CONNECT)), "/consumers/" + String.valueOf(params.get(Constants.GROUP_ID)));
// Add table based partitioner class
//params.put("partitioner.class", "com.cseng.shared.utils.kafka.TablePartitioner");
//suffixPath = "/config/table-partitions";
//String tablePartitionConfigPath = prefixPath +appName+ suffixPath;
params.put(Constants.TABLE_PARTITION_CONFIG_PATH, params.get(Constants.TABLE_PARTITION_CONFIG_PATH));
final Map<String, List<String>> primaryKeyDetails = new HashMap<String, List<String>>();
final Map<String, String> elementDigestDetails = new HashMap<String, String>();
elementDigestDetails.put(ENABLE_CRYPTO, params.get(ENABLE_CRYPTO).toString());
elementDigestDetails.put(CRYPTO_ALGORITHM, params.get(CRYPTO_ALGORITHM).toString());
elementDigestDetails.put(ENCODED_SECRET_KEY, params.get(ENCODED_SECRET_KEY).toString());
elementDigestDetails.put(ENCODED_SECRET_KEY_ALGORITHM, params.get(ENCODED_SECRET_KEY_ALGORITHM).toString());
elementDigestDetails.put(CRYPTO_PROVIDER, params.get(CRYPTO_PROVIDER).toString());
final Map<String, Integer> topicMap = new HashMap<String, Integer>();
if (params.containsKey(SOURCE_TOPIC_NAME)) {
int partitionCount = 1;
if (params.containsKey(SOURCE_TOPIC_PARTITION_COUNT)) {
partitionCount = Integer.parseInt(params.get( SOURCE_TOPIC_PARTITION_COUNT).toString());
} else {
partitionCount = Utilities.fetchPartitionMetaData(
params.get(METADATA_BROKER_LIST).toString(),
params.get(PARTITIONER_CLASS).toString(),
params.get(SOURCE_TOPIC_NAME).toString(),
params).size();
}
topicMap.put(params.get(SOURCE_TOPIC_NAME).toString(), partitionCount);
} else {
throw new RuntimeException("source.topic.name is not set");
}
String shutdowHookPath = prefixPath+appName+Constants.ZK_CONFIG_SHUTDOWN_HOOK_PROP; //"/config/partition-details";
ZkClient zClient1 = createZkClient(zooKeeperConnectAppConfig);
Utilities.updateShutdownFlag(zooKeeperConnectAppConfig, shutdowHookPath, false);
ZkWatcher<String> zkWatcher = new ZkWatcher<>(zClient1, shutdowHookPath);
ShutdownHookCallback<String> sShutdownHookCallback = new ShutdownHookCallback<>(zkWatcher, ssc);
zkWatcher.start(sShutdownHookCallback);
ssc.sparkContext().sc().addSparkListener(new JobStatusListener(zooKeeperConnectAppConfig, shutdowHookPath));
// Create Kafka Stream
// Start the ReliableReceiver with Window feature.
try {
kafkaStream = KafkaUtils.createStream(ssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, params, topicMap,
StorageLevel.MEMORY_AND_DISK_SER());
kafkaStream.checkpoint(Durations.seconds(5*30));
org.apache.spark.streaming.dstream.DStream d;
params.put(Constants.APP_NAME, appName);
//params.put(Constants.ZOOKEEPER_CONNECT, zooKeeperConnectAppConfig);
params.put(Constants.APP_ZOOKEEPER_PREFIX_PATH, prefixPath);
kafkaStream.window(Durations.seconds(30), Durations.seconds(30))
.foreach( new ProcessPairsFromKafka(params, false, transformRules, elementDigestDetails,
collectionToTableNameMap, primaryKeyDetails));
} catch (Exception e) {
LOG.error("Exception in ProcessPairsFromKafka", e);
if(params.containsKey(Constants.APP_NAME))
TransformOracleGGConstants.OP_SEQ.updateFailureInZookper(params.get(Constants.ZOOKEEPER_CONNECT), params.get(Constants.APP_NAME));
}
}
public static void main(final String args[]) {
String appName = "test";
String zooKeeperConnectAppConfig = "ma-csipd-lapp07.corp.com:11182,ma-csipd-lapp08.corp.com:11182";
String checkPointPath = "/../checkpoint-oracle-kafka/";
String sparkMaster = "spark://ma-csipd-lapp06.corp.com:7077,ma-csipd-lapp07.corp.com:7077";
if ((args != null) && (args.length > 0) && (args.length > 1)) {
// System.out.println(args[0]);
appName = args[0];
if (args.length > 1) {
zooKeeperConnectAppConfig = args[1]; // "ma-csipd-lapp07.corp.com:11182,ma-csipd-lapp08.corp.com:11182";
}
if (args.length > 2) {
checkPointPath = args[2]; // "/./checkpoint-oracle-kafka/";
}
if (args.length > 3) {
sparkMaster = args[3]; // "local[3]";
}
} else {
System.out
.println("Application name not provided.\nUsage is \"<appName>\" \"<zooKeeperConnectAppConfig>\" \"<checkPointPath>\" \"<sparkMaster>\"");
System.exit(0);
// JavaStreamingContext.
}
LOG.info( "Parameter appName-{}, zooKeeperConnectAppConfig-{}, checkPointPath-{}, sparkMaster-{}",
appName, zooKeeperConnectAppConfig, checkPointPath, sparkMaster);
final TransformOraclePayloadSparkReceiver sSparkReliableKafkaReceiver = new TransformOraclePayloadSparkReceiver(
zooKeeperConnectAppConfig, sparkMaster, appName, 100,
checkPointPath);
sSparkReliableKafkaReceiver.setupSparkConf(sSparkReliableKafkaReceiver.loadParams());
sSparkReliableKafkaReceiver.setStreamingContext();
sSparkReliableKafkaReceiver.createKakaStream();
sSparkReliableKafkaReceiver.startReceiver();
}
}