starrocks-connector-for-apache-spark icon indicating copy to clipboard operation
starrocks-connector-for-apache-spark copied to clipboard

数据类型转换异常

Open JialeHe opened this issue 2 years ago • 10 comments

我在使用Spark-Connector读取Starrocks数据时,遇到数据类型转换的问题。 我目前使用的版本信息 Starrocks: 2.5.1 Spark: 3.2.0 Starrocks-Spark-Connector: 1.1.0

背景如下: image

表中有个字段 dt 类型是 date 使用SparkSQL DataFrame读取Starrock, 这张表的数据。报错如下:

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, DateType, fromJavaDate, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 37, dt), DateType), true, false) AS dt#1051
	at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1469)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of date
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_16$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_12$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)
	... 15 more

初步怀疑是因为Connector从Starrocks中读取到的数据 dt 字段是String类型,但是其schema又是DateType,所以报错。 image

简单复现代码:

import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object Test2 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Spark 3 Demo1")
      .getOrCreate()
    val data = Seq(
      Row("Kim", "Seoul", "2023-08-01"),
      Row("Lee", "Busan", "2023-08-01"),
      Row("Park", "Jeju", "2023-08-01"),
      Row("Jeong", "Daejon", "2023-08-01")
    )
    val schema = List(
      StructField("name", StringType, nullable = true),
      StructField("city", StringType, nullable = true),
      StructField("dt", DateType, nullable = true)
    )
    spark
      .createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
      .show(false)
  }
}
image

官网说明 Connector 1.1.0 已经把DATE的数据类型映射从StringType改成了DateType,但是还是出现这个报错。 感觉是只改了Schema映射,实际读取到的数据还是StringType导致的异常。

所以我将Connector 改成了 1.0.0 版本,dt的schema为StringType时便不再报错 image

JialeHe avatar Aug 11 '23 09:08 JialeHe

你的代码里如果只执行 dataSet.show() 会有问题吗?finalSql是什么样的?

banmoy avatar Aug 15 '23 02:08 banmoy

@banmoy

starrocks table: edw_dm.dm_crowd_push_wide_acc_d, view_table: dm_crowd_push_wide_acc_d, sql: select DT,SOURCE_UID,PRODUCT,PROJECT,CATEGORY_TYPE,UNACTIVE_DAYS,GROUP_ID,REGION_LABEL,USE_TIME_LABEL,INTERACTIVE_USER_LABEL,COIN_LABEL,PUSH_TUNNEL,PUSH_ID,BRAND,DEVICE_MODEL,VERSION_CODE,OS_VERSION,PUSH_CLICK_RATE,ACCOUNT_ID,MEMBER,ACC_NO_PAY_DAYS,MAX_PAY_ANT,LAST_PAY_ANT,LATEST_PACKAGE_CHANNEL,FIRST_FOLLOW_DAYS,READ_TYPE,READ_DAY_BOOK_NAME_STR,FIRST_ACTIVATION_DAYS,IS_PUSH_INVALID,AVG_READ_DUR_1D,AVG_READ_DUR_3D,AVG_READ_DUR_7D,AVG_READ_DUR_14D,AVG_BOOK_CURRENCY_ANT_1D,AVG_BOOK_CURRENCY_ANT_3D,AVG_BOOK_CURRENCY_ANT_7D,AVG_BOOK_CURRENCY_ANT_14D,IS_CHARGE,SIGNIN_DAYS_7D,ACTIVE_DAYS_14D,CHARGE_ANT_7D,PROMOTION_TYPE,CREATE_TIME,UPDATE_TIME from dm_crowd_push_wide_acc_d

没有执行过show(), 后续的代码会write到hdfs。都是action操作,原理是一样的。

JialeHe avatar Aug 15 '23 08:08 JialeHe

@JialeHe 方便发一下StarRocks表的DDL吗,connector测试里倒是有验证 date类型数据的读取,我看下差别是什么

测试case: https://github.com/StarRocks/starrocks-connector-for-apache-spark/blob/main/src/test/java/com/starrocks/connector/spark/sql/ReadWriteITTest.java#L362C11-L362C11

banmoy avatar Aug 15 '23 08:08 banmoy

@banmoy

CREATE TABLE `dm_crowd_push_wide_acc_d` (
  `dt` date,
  `source_uid` varchar(65533),
  `product` varchar(65533),
  `project` varchar(65533),
  `category_type` int(11),
  `unactive_days` int(11),
  `group_id` int(11),
  `region_label` varchar(65533),
  `use_time_label` varchar(65533),
  `interactive_user_label` varchar(65533),
  `coin_label` varchar(65533),
  `push_tunnel` varchar(65533),
  `push_id` varchar(65533),
  `brand` varchar(65533),
  `device_model` varchar(65533),
  `version_code` varchar(65533),
  `os_version` varchar(65533),
  `push_click_rate` varchar(65533),
  `account_id` bigint(20),
  `member` varchar(65533),
  `acc_no_pay_days` bigint(20),
  `max_pay_ant` bigint(20),
  `last_pay_ant` bigint(20),
  `latest_package_channel` varchar(65533),
  `first_follow_days` bigint(20),
  `read_type` int(11),
  `read_day_book_name_str` varchar(65533),
  `first_activation_days` bigint(20),
  `is_push_invalid` int(11),
  `avg_read_dur_1d` double,
  `avg_read_dur_3d` double,
  `avg_read_dur_7d` double,
  `avg_read_dur_14d` double,
  `avg_book_currency_ant_1d` double,
  `avg_book_currency_ant_3d` double,
  `avg_book_currency_ant_7d` double,
  `avg_book_currency_ant_14d` double,
  `is_charge` int(11),
  `signin_days_7d` bigint(20),
  `active_days_14d` int(11),
  `charge_ant_7d` double,
  `promotion_type` varchar(65533),
  `create_time` varchar(65533),
  `update_time` varchar(65533)
) ENGINE = OLAP DUPLICATE KEY(
  `dt`,
  `source_uid`,
  `product`,
  `project`,
  `category_type`,
  `unactive_days`,
  `group_id`
) COMMENT "OLAP" PARTITION BY RANGE(`dt`) (
  PARTITION p20230804
  VALUES
    [("2023-08-04"), ("2023-08-05")),
PARTITION p20230805 VALUES [("2023-08-05"), ("2023-08-06")),
PARTITION p20230806 VALUES [("2023-08-06"), ("2023-08-07")),
PARTITION p20230807 VALUES [("2023-08-07"), ("2023-08-08")),
PARTITION p20230808 VALUES [("2023-08-08"), ("2023-08-09")),
PARTITION p20230809 VALUES [("2023-08-09"), ("2023-08-10")))
DISTRIBUTED BY HASH(`source_uid`) BUCKETS 120 
PROPERTIES (
"replication_num" = "2",
"dynamic_partition.enable" = "false",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "120",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true",
"compression" = "ZSTD"
);

JialeHe avatar Aug 15 '23 08:08 JialeHe

@banmoy 按照UT中的逻辑 我改了一下复现的代码,确实是能走通。 感觉应该是Connector从Starrocks读出来的DT字段是String类型的,而不是Date类型 image

JialeHe avatar Aug 15 '23 08:08 JialeHe

@JialeHe 按照你的建表语句和spark查询,我验证了下,还是没有复现 ,麻烦确认下这个是不是符合你的场景?

StarRock表插入数据,为了简单,只插入了包含date的duplicate key列

insert into edw_dm.dm_crowd_push_wide_acc_d (dt, source_uid, product, project, category_type, unactive_days, group_id) values ("2023-08-09", "a", "b", "c", 1, 2, 3);

spark代码

SparkSession spark = SparkSession
                .builder()
                .master("local[1]")
                .appName("test")
                .getOrCreate();

        Dataset<Row> readDf = spark.read().format("starrocks")
                .option("starrocks.table.identifier", "edw_dm.dm_crowd_push_wide_acc_d")
                .option("starrocks.fenodes", FE_HTTP)
                .option("starrocks.fe.jdbc.url", FE_JDBC)
                .option("user", USER)
                .option("password", PASSWORD)
                .load();
        readDf.createOrReplaceTempView("dm_crowd_push_wide_acc_d");
        String finalSql = "select DT,SOURCE_UID,PRODUCT,PROJECT,CATEGORY_TYPE,UNACTIVE_DAYS,GROUP_ID,REGION_LABEL,USE_TIME_LABEL,INTERACTIVE_USER_LABEL,COIN_LABEL,PUSH_TUNNEL,PUSH_ID,BRAND,DEVICE_MODEL,VERSION_CODE,OS_VERSION,PUSH_CLICK_RATE,ACCOUNT_ID,MEMBER,ACC_NO_PAY_DAYS,MAX_PAY_ANT,LAST_PAY_ANT,LATEST_PACKAGE_CHANNEL,FIRST_FOLLOW_DAYS,READ_TYPE,READ_DAY_BOOK_NAME_STR,FIRST_ACTIVATION_DAYS,IS_PUSH_INVALID,AVG_READ_DUR_1D,AVG_READ_DUR_3D,AVG_READ_DUR_7D,AVG_READ_DUR_14D,AVG_BOOK_CURRENCY_ANT_1D,AVG_BOOK_CURRENCY_ANT_3D,AVG_BOOK_CURRENCY_ANT_7D,AVG_BOOK_CURRENCY_ANT_14D,IS_CHARGE,SIGNIN_DAYS_7D,ACTIVE_DAYS_14D,CHARGE_ANT_7D,PROMOTION_TYPE,CREATE_TIME,UPDATE_TIME from dm_crowd_push_wide_acc_d";
        Dataset<Row> result = spark.sql(finalSql);
        result.show();
        spark.stop();

运行spark输出

+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+
|        DT|SOURCE_UID|PRODUCT|PROJECT|CATEGORY_TYPE|UNACTIVE_DAYS|GROUP_ID|REGION_LABEL|USE_TIME_LABEL|INTERACTIVE_USER_LABEL|COIN_LABEL|PUSH_TUNNEL|PUSH_ID|BRAND|DEVICE_MODEL|VERSION_CODE|OS_VERSION|PUSH_CLICK_RATE|ACCOUNT_ID|MEMBER|ACC_NO_PAY_DAYS|MAX_PAY_ANT|LAST_PAY_ANT|LATEST_PACKAGE_CHANNEL|FIRST_FOLLOW_DAYS|READ_TYPE|READ_DAY_BOOK_NAME_STR|FIRST_ACTIVATION_DAYS|IS_PUSH_INVALID|AVG_READ_DUR_1D|AVG_READ_DUR_3D|AVG_READ_DUR_7D|AVG_READ_DUR_14D|AVG_BOOK_CURRENCY_ANT_1D|AVG_BOOK_CURRENCY_ANT_3D|AVG_BOOK_CURRENCY_ANT_7D|AVG_BOOK_CURRENCY_ANT_14D|IS_CHARGE|SIGNIN_DAYS_7D|ACTIVE_DAYS_14D|CHARGE_ANT_7D|PROMOTION_TYPE|CREATE_TIME|UPDATE_TIME|
+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+
|2023-08-09|         a|      b|      c|            1|            2|       3|        null|          null|                  null|      null|       null|   null| null|        null|        null|      null|           null|      null|  null|           null|       null|        null|                  null|             null|     null|                  null|                 null|           null|           null|           null|           null|            null|                    null|                    null|                    null|                     null|     null|          null|           null|         null|          null|       null|       null|
+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+

banmoy avatar Aug 15 '23 10:08 banmoy

@banmoy 请问你这段代码使用的是Connector 1.1.0 版本么?

JialeHe avatar Aug 15 '23 11:08 JialeHe

是的

<dependency>
      <groupId>com.starrocks</groupId>
      <artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
      <version>1.1.0</version>
</dependency>

banmoy avatar Aug 15 '23 14:08 banmoy

你用我的代码例子跑还是有问题吗

banmoy avatar Aug 15 '23 14:08 banmoy

我今天试一下

JialeHe avatar Aug 16 '23 01:08 JialeHe