shc icon indicating copy to clipboard operation
shc copied to clipboard

How to write a Dataframe to hbase table through Java using SHC?

Open rashid-1989 opened this issue 6 years ago • 11 comments

rashid-1989 avatar Oct 09 '18 06:10 rashid-1989

Hi, I am using below code to write a Dataframe to hbase table:

import java.util.HashMap; import java.util.Map;

import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog;

public class connTest {

static String hbaseCatalog = "{\r\n"
		+ "\"table\":{\"namespace\":\"NewSample\", \"name\":\"NewSample:temp\", \"tableCoder\":\"PrimitiveType\"},\r\n"
		+ "\"rowkey\":\"id1\",\r\n" + "\"columns\":{\r\n"
		+ "\"ID1\":{\"cf\":\"rowkey\", \"col\":\"id1\", \"type\":\"string\"},\r\n"
		+ "\"DEPARTMENT\":{\"cf\":\"general\", \"col\":\"department\", \"type\":\"string\"},\r\n"
		+ "\"EMAIL_COUNT\":{\"cf\":\"general\", \"col\":\"emailCount\", \"type\":\"string\"},\r\n"
		+ "\"REGION\":{\"cf\":\"general\", \"col\":\"region\", \"type\":\"string\"}\r\n" + "}\r\n" + "}";

private static Dataset<org.apache.spark.sql.Row> withCatalog(String catalog) {		
	SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
			.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
			.config("hbase.client.retries.number", "2")
			.getOrCreate();		
	SQLContext sqlContext = new SQLContext(spark);
	Map<String, String> map = new HashMap();
	map.put(HBaseTableCatalog.tableCatalog(), catalog);

	Dataset<org.apache.spark.sql.Row> df = sqlContext.read().options(map)
			.format("org.apache.spark.sql.execution.datasources.hbase").load();
	df.show();
	return df;
	// df.show();		

String newHbaseCatalog = "{\r\n"

  • ""table":{"namespace":"NewSample", "name":"newtemp", "tableCoder":"PrimitiveType"},\r\n"
  • ""rowkey":"id1",\r\n" + ""columns":{\r\n"
  • ""ID1":{"cf":"rowkey", "col":"id1", "type":"string"},\r\n"
  • ""DEPARTMENT":{"cf":"general", "col":"department", "type":"string"},\r\n"
  • ""EMAIL_COUNT":{"cf":"general", "col":"emailCount", "type":"string"},\r\n"
  • ""REGION":{"cf":"general", "col":"region", "type":"string"}\r\n" + "}\r\n" + "}";

Map<String, String> putMap = new HashMap(); map.put(HBaseTableCatalog.tableCatalog(), newHbaseCatalog); map.put(HBaseTableCatalog.newTable(), "1");

df.write().options(putMap) .format("org.apache.spark.sql.execution.datasources.hbase") .save(); }

public static void main(String[] args) {		
	SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
			.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
			.config("hbase.client.retries.number", "2")
			.getOrCreate();
	SparkContext sc = spark.sparkContext();
	System.out.println("printing table contents");
	withCatalog(hbaseCatalog);
	
}

}

And getting the below exception:

Exception in thread "main" java.util.NoSuchElementException: key not found: catalog at scala.collection.MapLike$class.default(MapLike.scala:228) at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.default(CaseInsensitiveMap.scala:28) at scala.collection.MapLike$class.apply(MapLike.scala:141) at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.apply(CaseInsensitiveMap.scala:28) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:185) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:162) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:57) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:64) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:61) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:84) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

rashid-1989 avatar Oct 09 '18 08:10 rashid-1989

@weiqingy Could you please suggest if i am missing something here?

rashid-1989 avatar Oct 09 '18 08:10 rashid-1989

I came across the same issue.

louisliu318 avatar Oct 09 '18 09:10 louisliu318

Could you resolve it?

rashid-1989 avatar Oct 09 '18 10:10 rashid-1989

can you attach your latest code?

louisliu318 avatar Oct 09 '18 10:10 louisliu318

Please refer the about snippet followed by the exception. Thats the code I am trying to execute.

rashid-1989 avatar Oct 09 '18 11:10 rashid-1989

@louisliu318 do you need any additional details on the above?

rashid-1989 avatar Oct 09 '18 13:10 rashid-1989

I am stuck here. Can someone please suggest?

rashid-1989 avatar Oct 10 '18 06:10 rashid-1989

Same error on scala.

        at scala.None$.get(Option.scala:347)
        at scala.None$.get(Option.scala:345)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:277)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:60)
        at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(DefaultSource.scala:24)
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
        at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource$.main(HBaseSource.scala:107)
        at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource.main(HBaseSource.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:744)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

HulkSun avatar Jan 23 '19 04:01 HulkSun

Same error!

TEray avatar Jul 03 '19 09:07 TEray

Map<String, String> putMap = new HashMap(); map.put(HBaseTableCatalog.tableCatalog(), newHbaseCatalog); map.put(HBaseTableCatalog.newTable(), "1");

df.write().options(putMap) .format("org.apache.spark.sql.execution.datasources.hbase") .save();

you shoulde change putMap to map @rashid-1989

TEray avatar Jul 04 '19 02:07 TEray