metorikku icon indicating copy to clipboard operation
metorikku copied to clipboard

support for reading tables from Glue metastore

Open moshir opened this issue 3 years ago • 12 comments

Hi folks ! Is there any support for reading tables from Glue metastore (using metorikku on EMR)

moshir avatar Jul 27 '20 08:07 moshir

I never tried this but if you're in EMR and glue is connected it will simply work...

lyogev avatar Jul 28 '20 10:07 lyogev

How do you specify a table as input then ?

moshir avatar Jul 28 '20 14:07 moshir

You don't need to define anything as input. Just use table names from glue inside your sqls.

lyogev avatar Jul 29 '20 04:07 lyogev

Hello @lyogev

I can't manage to read from tables from the Glue metastore.

As you say, if I understand correctly, I don't need to define anything as input. I have the following as job_config.yaml :

metrics:
  - s3://.../metrics_config.yaml
output:
  jdbc:
    connectionUrl: ${JDBC_URL}
    user: root
    password: ${DB_PASSWORD}
    driver: "com.mysql.jdbc.Driver"
    options:
      truncate: true

appName: test_ingestion
showPreviewLines: 20
showQuery: true

In metrics_config.yaml, I use this syntax :

steps:
- dataFrameName: df_asset
  sql: SELECT * FROM my_database.table_name
output:
- dataFrameName: df_table_name
  outputType: JDBC
  outputOptions:
    saveMode: Overwrite
    dbTable: table_name

When I run this, I get the issue that the table can't be found : org.apache.spark.sql.AnalysisException: Table or view not found: my_database.table_name; line 1 pos 14;

I tried replacing the SELECT * FROM ... with SHOW DATABASES to see if the database is detected correctly, but I get the following output :

root
 |-- namespace: string (nullable = false)

+---------+
|namespace|
+---------+
|default  |
+---------+

I can't understand why I can't see my databases here.

Something is also weird, on that same EMR cluster, if I ssh into the master node, and run SHOW DATABASES in a spark-sql prompt, I can indeed see my databases, so it works correctly.

Do you have any idea ? Any help would be much appreciated !

Thanks you in advance.

cyrillay avatar Feb 18 '21 21:02 cyrillay

hello @lyogev @cyrillay i'am facing the same issue on EMR release 6.2.0 and hadoop3.2.1 and spark.3.0.1

dthauvin avatar Feb 19 '21 20:02 dthauvin

Hi @cyrillay can you provide how you are running the metorikku job from EMR? you are running as a step? what is the command? Also which metorikku version are you using?

Thanks

lyogev avatar Feb 20 '21 11:02 lyogev

Hi @lyogev I am indeed running the job from EMR as a step, this is my command :

spark-submit --driver-java-options "-DENV=dev -DJDBC_URL='****'" --master yarn --deploy-mode client --jars /home/hadoop/mysql-connector-java-5.1.49.jar --class com.yotpo.metorikku.Metorikku /home/hadoop/metorikku_2.12.jar -c s3://.../job_config.yaml

I also tried adding --conf spark.sql.catalogImplementation=hive which gave an error instead Exception in thread "main" java.lang.NoSuchMethodError: com.amazonaws.util.StringUtils.trim(Ljava/lang/String;)Ljava/lang/String; at com.amazonaws.auth.profile.internal.AwsProfileNameLoader.getEnvProfileName(AwsProfileNameLoader.java:72)

I am using metorikku 2.12 (downloaded from the releases in mid-december 2020, as I notice different jars named metorikku_2.12.jar were released). EMR emr-6.2.0, spark 3.0.1.

cyrillay avatar Feb 22 '21 09:02 cyrillay

Can you also add: --conf spark.sql.catalogImplementation=hive --conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory?

lyogev avatar Feb 23 '21 19:02 lyogev

Hello @lyogev , I tried with the parameters you gave, but still have the same NoSuchMethodError :

21/02/24 13:57:50 INFO StreamingQueryMetricsListener$: Initialize stream listener
21/02/24 13:57:57 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist
Exception in thread "main" java.lang.NoSuchMethodError: com.amazonaws.util.StringUtils.trim(Ljava/lang/String;)Ljava/lang/String;
	at com.amazonaws.auth.profile.internal.AwsProfileNameLoader.getEnvProfileName(AwsProfileNameLoader.java:72)
	at com.amazonaws.auth.profile.internal.AwsProfileNameLoader.loadProfileName(AwsProfileNameLoader.java:54)
	at com.amazonaws.regions.AwsProfileRegionProvider.<init>(AwsProfileRegionProvider.java:40)
	at com.amazonaws.regions.DefaultAwsRegionProviderChain.<init>(DefaultAwsRegionProviderChain.java:23)
	at com.amazonaws.client.builder.AwsClientBuilder.<clinit>(AwsClientBuilder.java:60)
	at com.amazonaws.glue.catalog.metastore.AWSGlueClientFactory.newClient(AWSGlueClientFactory.java:48)
	at com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient.<init>(AWSCatalogMetastoreClient.java:142)
	at com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory.createMetaStoreClient(AWSGlueDataCatalogHiveClientFactory.java:20)
	at org.apache.hadoop.hive.ql.metadata.HiveUtils.createMetaStoreClient(HiveUtils.java:507)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3744)
	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3724)
	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3986)
	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:251)
	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:234)
	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:402)
	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:335)
	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:315)
	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:291)
	at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:260)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:286)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
	at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:389)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:225)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:103)
	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:225)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)
	at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:157)
	at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:155)
	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:60)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:93)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:93)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupGlobalTempView(SessionCatalog.scala:789)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$.lookupTempView(Analyzer.scala:858)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.applyOrElse(Analyzer.scala:838)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.applyOrElse(Analyzer.scala:836)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:84)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$2(AnalysisHelper.scala:87)
	at org.apache.spark.sql.catalyst.trees.TreeNode.applyFunctionIfChanged$1(TreeNode.scala:380)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:416)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:248)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:414)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:362)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:87)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:84)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$.apply(Analyzer.scala:836)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:962)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:934)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:130)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:154)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:149)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:153)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:153)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
	at com.yotpo.metorikku.metric.stepActions.Sql.run(Sql.scala:24)
	at com.yotpo.metorikku.metric.stepActions.Sql.run(Sql.scala:11)
	at com.yotpo.metorikku.metric.Metric.$anonfun$calculate$1(Metric.scala:29)
...

My spark-submit was :

spark-submit --deploy-mode client --driver-java-options "-DENV=pprd -DJDBC_URL='****'" --master yarn --jars /home/hadoop/mysql-connector-java-5.1.49.jar --class com.yotpo.metorikku.Metorikku --conf spark.sql.catalogImplementation=hive --conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory /home/hadoop/metorikku_2.12.jar -c s3://.../job_config.yaml

Maybe this link could help pointing the issue, could be a classpath issue... Any idea ? 😞

cyrillay avatar Feb 24 '21 14:02 cyrillay

Unfortunately metorikku comes bundled with AWS (due to DQ integration)... Can you try to add the following jar to the command via --jars https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.961/aws-java-sdk-core-1.11.961.jar

LMK if that helps

lyogev avatar Feb 24 '21 15:02 lyogev

@lyogev Unfortunately I get the exact same exception 😕 spark-submit --deploy-mode client --master yarn --jars /home/hadoop/mysql-connector-java-5.1.49.jar,https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.961/aws-java-sdk-core-1.11.961.jar --class com.yotpo.metorikku.Metorikku --conf spark.sql.catalogImplementation=hive --conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory /home/hadoop/metorikku_2.12.jar -c s3://.../job_config_glue.yaml

cyrillay avatar Feb 24 '21 16:02 cyrillay

Okay, just managed to work around this issue.

Providing Metorikku from maven with --packages avoids to embed every dependency, they are downloaded on the fly, so it is possible to exclude the aws-java-sdk which caused the problems, with --conf spark.jars.excludes=com.amazonaws:aws-java-sdk,com.amazonaws:aws-java-sdk-core

The final command that worked :

spark-submit --deploy-mode client --master yarn --class com.yotpo.metorikku.Metorikku --packages com.yotpo:metorikku_2.12:0.0.119,mysql:mysql-connector-java:8.0.23 --conf spark.sql.catalogImplementation=hive --conf hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars.excludes=com.amazonaws:aws-java-sdk,com.amazonaws:aws-java-sdk-core dummy -c s3://.../job_config.yaml

cyrillay avatar Feb 25 '21 14:02 cyrillay