shc
shc copied to clipboard
How to write a Dataframe to hbase table through Java using SHC?
Hi, I am using below code to write a Dataframe to hbase table:
import java.util.HashMap; import java.util.Map;
import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog;
public class connTest {
static String hbaseCatalog = "{\r\n"
+ "\"table\":{\"namespace\":\"NewSample\", \"name\":\"NewSample:temp\", \"tableCoder\":\"PrimitiveType\"},\r\n"
+ "\"rowkey\":\"id1\",\r\n" + "\"columns\":{\r\n"
+ "\"ID1\":{\"cf\":\"rowkey\", \"col\":\"id1\", \"type\":\"string\"},\r\n"
+ "\"DEPARTMENT\":{\"cf\":\"general\", \"col\":\"department\", \"type\":\"string\"},\r\n"
+ "\"EMAIL_COUNT\":{\"cf\":\"general\", \"col\":\"emailCount\", \"type\":\"string\"},\r\n"
+ "\"REGION\":{\"cf\":\"general\", \"col\":\"region\", \"type\":\"string\"}\r\n" + "}\r\n" + "}";
private static Dataset<org.apache.spark.sql.Row> withCatalog(String catalog) {
SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
.config("hbase.client.retries.number", "2")
.getOrCreate();
SQLContext sqlContext = new SQLContext(spark);
Map<String, String> map = new HashMap();
map.put(HBaseTableCatalog.tableCatalog(), catalog);
Dataset<org.apache.spark.sql.Row> df = sqlContext.read().options(map)
.format("org.apache.spark.sql.execution.datasources.hbase").load();
df.show();
return df;
// df.show();
String newHbaseCatalog = "{\r\n"
- ""table":{"namespace":"NewSample", "name":"newtemp", "tableCoder":"PrimitiveType"},\r\n"
- ""rowkey":"id1",\r\n" + ""columns":{\r\n"
- ""ID1":{"cf":"rowkey", "col":"id1", "type":"string"},\r\n"
- ""DEPARTMENT":{"cf":"general", "col":"department", "type":"string"},\r\n"
- ""EMAIL_COUNT":{"cf":"general", "col":"emailCount", "type":"string"},\r\n"
- ""REGION":{"cf":"general", "col":"region", "type":"string"}\r\n" + "}\r\n" + "}";
Map<String, String> putMap = new HashMap(); map.put(HBaseTableCatalog.tableCatalog(), newHbaseCatalog); map.put(HBaseTableCatalog.newTable(), "1");
df.write().options(putMap) .format("org.apache.spark.sql.execution.datasources.hbase") .save(); }
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
.config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
.config("hbase.client.retries.number", "2")
.getOrCreate();
SparkContext sc = spark.sparkContext();
System.out.println("printing table contents");
withCatalog(hbaseCatalog);
}
}
And getting the below exception:
Exception in thread "main" java.util.NoSuchElementException: key not found: catalog at scala.collection.MapLike$class.default(MapLike.scala:228) at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.default(CaseInsensitiveMap.scala:28) at scala.collection.MapLike$class.apply(MapLike.scala:141) at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.apply(CaseInsensitiveMap.scala:28) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:185) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.
(HBaseRelation.scala:162) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:57) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:64) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:61) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:84) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:609) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
@weiqingy Could you please suggest if i am missing something here?
I came across the same issue.
Could you resolve it?
can you attach your latest code?
Please refer the about snippet followed by the exception. Thats the code I am trying to execute.
@louisliu318 do you need any additional details on the above?
I am stuck here. Can someone please suggest?
Same error on scala.
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:277)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:60)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(DefaultSource.scala:24)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource$.main(HBaseSource.scala:107)
at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource.main(HBaseSource.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:744)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Same error!
Map<String, String> putMap = new HashMap(); map.put(HBaseTableCatalog.tableCatalog(), newHbaseCatalog); map.put(HBaseTableCatalog.newTable(), "1");
df.write().options(putMap) .format("org.apache.spark.sql.execution.datasources.hbase") .save();
you shoulde change putMap to map @rashid-1989