seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [connector-hive] connector-hive spark-test-summary-transforms sink

Open AdkinsHan opened this issue 7 months ago • 1 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

What happened

测试说明

主要问题在spark 中hive版本问题和 hive部署的版本问题,会经常出现下面报错:
25/06/11 18:11:28 ERROR SeaTunnel: 
===============================================================================
25/06/11 18:11:28 ERROR SeaTunnel: Fatal Error, 
25/06/11 18:11:28 ERROR SeaTunnel: Please submit bug report in https://github.com/apache/seatunnel/issues
25/06/11 18:11:28 ERROR SeaTunnel: Reason:Run SeaTunnel on spark failed 
25/06/11 18:11:28 ERROR SeaTunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: **ErrorCode:[API-06], ErrorDescription:[Factory initialize failed]** - Unable to create a sink for identifier 'Hive'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.lambda$execute$1(SinkExecuteProcessor.java:136)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:133)
        at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:74)
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
        ... 14 more
**Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.HiveMetaStoreClient**.<init>(Lorg/apache/hadoop/conf/Configuration;)V
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.initializeClient(HiveMetaStoreProxy.java:97)
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.getClient(HiveMetaStoreProxy.java:83)
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.getTable(HiveMetaStoreProxy.java:153)
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils.getTableInfo(HiveTableUtils.java:42)
        at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSink.getTableInformation(HiveSink.java:234)
        at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSink.<init>(HiveSink.java:85)
        at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkFactory.lambda$createSink$0(HiveSinkFactory.java:62)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:247)
        ... 20 more
 
25/06/11 18:11:28 ERROR SeaTunnel: 
===============================================================================
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a sink for identifier 'Hive'.
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:250)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.lambda$execute$1(SinkExecuteProcessor.java:136)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
        at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:133)
        at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:74)
        at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
        ... 14 more
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(Lorg/apache/hadoop/conf/Configuration;)V
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.initializeClient(HiveMetaStoreProxy.java:97)
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.getClient(HiveMetaStoreProxy.java:83)
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy.getTable(HiveMetaStoreProxy.java:153)
        at org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils.getTableInfo(HiveTableUtils.java:42)
        at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSink.getTableInformation(HiveSink.java:234)
        at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSink.<init>(HiveSink.java:85)
        at org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkFactory.lambda$createSink$0(HiveSinkFactory.java:62)
        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSink(FactoryUtil.java:247)
        ... 20 more
25/06/11 18:11:28 INFO SparkContext: Invoking stop() from shutdown hook
25/06/11 18:11:28 INFO SparkUI: Stopped Spark web UI at https://xxxx136 :4440
25/06/11 18:11:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
25/06/11 18:11:28 INFO MemoryStore: MemoryStore cleared
25/06/11 18:11:28 INFO BlockManager: BlockManager stopped
25/06/11 18:11:28 INFO BlockManagerMaster: BlockManagerMaster stopped
25/06/11 18:11:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
25/06/11 18:11:28 INFO SparkContext: Successfully stopped SparkContext
25/06/11 18:11:28 INFO ShutdownHookManager: Shutdown hook called
25/06/11 18:11:28 INFO ShutdownHookManager: Deleting directory /tmp/spark-fd513ef0-ed30-4728-b086-64f6d9aca800
25/06/11 18:11:28 INFO ShutdownHookManager: Deleting directory /tmp/spark-02102603-c812-4049-8c3f-40d75e18a321

主要/data/seatunnel/seatunnel-2.3.11/connectors/connector-hive-2.3.11.jar问题

  • mysql到Hive

    • 问题同上
  • mysql到Hive 数字精度问题

数字精度问题,默认建表使用PARQUET存储格式,新的版本对decimal精度会出现报错,比较不好解决;  
替换解决办法,使用OCR格式可以规避掉该问题,可能会带来性能上的差异,然后等到下一层的时候再使用decimal精度用来避免计算差异
-- drop table  IF   exists xxx.ods_nocobase_af_zsold_manage_tar_mm_test
CREATE TABLE IF NOT EXISTS xxx.ods_nocobase_af_zsold_manage_tar_mm_test
(
	monthid             string COMMENT '年月',
	af_pic_sold         string COMMENT '客户送达方编码',
	af_pic_sold_n       string COMMENT '客户送达方名称',
	af_pic_sold_short_n string COMMENT '客户送达方简称',
	af_sold_head_name   string COMMENT '客户业务负责人',
	af_zold_type        string COMMENT '客户所在行业',
	af_pic_head_name    string COMMENT '行业负责人',
	net_manage_money    decimal(18, 6) COMMENT 'AF管理目标净额',
	manage_money        decimal(18, 6) COMMENT 'AF管理目标销售额',
	manage_weight       decimal(18, 6) COMMENT 'AF管理目标重量'
)
	COMMENT '维护-测试'
	PARTITIONED BY (dt_mon string)
	STORED AS ORC; --PARQUET  ORC
  • Oracle到Hive

    • 问题同上
  • Hivetoclickhouse

    • hive指定对应的分区和字段 Ok,但是指定字段read_columns的时候不能添加分区字段dt,否则会报错,
      source {
          Hive {
              table_name = "xxx.ods_webcsmsgl_ecsms_theyt_dd"
              metastore_uri = "thrift://xxxx135.com.cn:9083,thrift://xxxx136.com.cn:9083,thrift://xxxx137.com.cn:9083"
              plugin_output = ["ods_webcsmsgl_ecsms_theyt_dd_source"]
              read_partitions = ["dt=2025-06-12"]
              read_columns = ["theyt","theytname","vtweg","en","areaid","remark"]
          }
      }
    
    • transform插件针对plugin_input 和plugin_out 还不支持,根本识别不到
    • query 文档写的要和plugin_input 字段一样,但是给的demo示例都是dual表字段,实际上跑不通,识别不到
    transform {
            Sql {
                    plugin_input = ["ods_webcsmsgl_ecsms_theyt_dd_source"]
                    plugin_out = ["ods_webcsmsgl_ecsms_theyt_dd_out"]
                    query = """
                    SELECT theyt, theytname, vtweg, en, areaid, remark
                    FROM ods_webcsmsgl_ecsms_theyt_dd_source
                    WHERE areaid='86'
                            """
            }
          }
    

    报错信息:

    Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
    at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
    at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
    at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: org.apache.spark.sql.AnalysisException: Invalid view name: ["ods_webcsmsgl_ecsms_theyt_dd_source"]
    at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidViewNameError(QueryCompilationErrors.scala:2282)
    at org.apache.spark.sql.Dataset.createTempViewCommand(Dataset.scala:3502)
    at org.apache.spark.sql.Dataset.createOrReplaceTempView(Dataset.scala:3455)
    at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.registerTempView(SparkAbstractPluginExecuteProcessor.java:126)
    at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.registerInputTempView(SparkAbstractPluginExecuteProcessor.java:74)
    at org.apache.seatunnel.core.starter.spark.execution.SourceExecuteProcessor.execute(SourceExecuteProcessor.java:111)
    at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:72)
    at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
    ... 14 more
    
    

SeaTunnel Version

2.3.11

SeaTunnel Config

env {
# 比如kerberos认证的配置
  execution.parallelism = 2
  job.mode = "BATCH"
  spark.app.name = "h807-hivetockh-ads_instock_store_mi.conf"
  spark.executor.instances = 2
  spark.executor.cores = 2
  spark.executor.memory = "4g"
  spark.sql.catalogImplementation = "hive"
  spark.kerberos.principal = "hive/ "@xxxx"
  spark.kerberos.keytab = "/etc/security/keytab/xxxx.service.keytab"
    }

source {
 Hive {
    table_name = "xxxx.ods_webcsmsgl_ecsms_theyt_dd"
    metastore_uri = "thrift://xxxx135.com.cn:9083,thrift://xxxx136.com.cn:9083,thrift://xxxx137.com.cn:9083"
    plugin_output = ["ods_webcsmsgl_ecsms_theyt_dd_source"]
    read_partitions = ["dt=2025-06-12"]
    read_columns = ["theyt","theytname","vtweg","en","areaid","remark"]
  }

    }

transform {
 Sql {
     plugin_input = ["ods_webcsmsgl_ecsms_theyt_dd_source"]
     plugin_out = ["ods_webcsmsgl_ecsms_theyt_dd_out"]
     query = """
           SELECT theyt, theytname, vtweg, en, areaid, remark
           FROM ods_webcsmsgl_ecsms_theyt_dd_source
           WHERE areaid='86'
            """
    }
}

sink {
  Console {
    plugin_input = ["ods_webcsmsgl_ecsms_theyt_dd_out"]
  }
#  Clickhouse {
#    plugin_input = ads_instock_store_mi_out
#    host = "xxxxx:8123"
#    database = "xxxx"
#    table = "ads_instock_store_mi"
#    username = "xxxxx"
#    password = "xxxxx"
#  }

}

Running Command

bash /data/seatunnel/seatunnel-2.3.11/bin/start-seatunnel-spark-3-connector-v2.sh \
  --master local \
  --deploy-mode client \
  --queue hive \
  -i rfc_date=${date} \
  -i rfc_year=${year} \
  -i rfc_mon=${mon} \
  --config /data/seatunnel/seatunnel-2.3.11/config/061-hive2clickhouse.config

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Run SeaTunnel on spark failed
  at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:62)
  at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
  at org.apache.seatunnel.core.starter.spark.SeaTunnelSpark.main(SeaTunnelSpark.java:35)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
  at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
  at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
  at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
  at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  Caused by: org.apache.spark.sql.AnalysisException: Invalid view name: ["ods_webcsmsgl_ecsms_theyt_dd_source"]
  at org.apache.spark.sql.errors.QueryCompilationErrors$.invalidViewNameError(QueryCompilationErrors.scala:2282)
  at org.apache.spark.sql.Dataset.createTempViewCommand(Dataset.scala:3502)
  at org.apache.spark.sql.Dataset.createOrReplaceTempView(Dataset.scala:3455)
  at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.registerTempView(SparkAbstractPluginExecuteProcessor.java:126)
  at org.apache.seatunnel.core.starter.spark.execution.SparkAbstractPluginExecuteProcessor.registerInputTempView(SparkAbstractPluginExecuteProcessor.java:74)
  at org.apache.seatunnel.core.starter.spark.execution.SourceExecuteProcessor.execute(SourceExecuteProcessor.java:111)
  at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:72)
  at org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand.execute(SparkTaskExecuteCommand.java:60)
  ... 14 more

Zeta or Flink or Spark Version

spark3.3

Java or Scala Version

jdk1.8

Screenshots

No response

Are you willing to submit PR?

  • [x] Yes I am willing to submit a PR!

Code of Conduct

AdkinsHan avatar Jun 13 '25 08:06 AdkinsHan

I have tested almost every version from 2.3.4 to 2.3.11, but each version has some problems, especially when it comes to connector-hive and spark ,so sad

AdkinsHan avatar Jun 13 '25 08:06 AdkinsHan

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Jul 14 '25 00:07 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Jul 21 '25 00:07 github-actions[bot]