starrocks-connector-for-apache-spark
starrocks-connector-for-apache-spark copied to clipboard
数据类型转换异常
我在使用Spark-Connector读取Starrocks数据时,遇到数据类型转换的问题。 我目前使用的版本信息 Starrocks: 2.5.1 Spark: 3.2.0 Starrocks-Spark-Connector: 1.1.0
背景如下:
表中有个字段 dt 类型是 date 使用SparkSQL DataFrame读取Starrock, 这张表的数据。报错如下:
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, DateType, fromJavaDate, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 37, dt), DateType), true, false) AS dt#1051
at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1469)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of date
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_16$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_12$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)
... 15 more
初步怀疑是因为Connector从Starrocks中读取到的数据 dt 字段是String类型,但是其schema又是DateType,所以报错。
简单复现代码:
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object Test2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[*]")
.appName("Spark 3 Demo1")
.getOrCreate()
val data = Seq(
Row("Kim", "Seoul", "2023-08-01"),
Row("Lee", "Busan", "2023-08-01"),
Row("Park", "Jeju", "2023-08-01"),
Row("Jeong", "Daejon", "2023-08-01")
)
val schema = List(
StructField("name", StringType, nullable = true),
StructField("city", StringType, nullable = true),
StructField("dt", DateType, nullable = true)
)
spark
.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
.show(false)
}
}
官网说明 Connector 1.1.0 已经把DATE的数据类型映射从StringType改成了DateType,但是还是出现这个报错。 感觉是只改了Schema映射,实际读取到的数据还是StringType导致的异常。
所以我将Connector 改成了 1.0.0 版本,dt的schema为StringType时便不再报错
你的代码里如果只执行 dataSet.show() 会有问题吗?finalSql是什么样的?
@banmoy
starrocks table: edw_dm.dm_crowd_push_wide_acc_d, view_table: dm_crowd_push_wide_acc_d, sql: select DT,SOURCE_UID,PRODUCT,PROJECT,CATEGORY_TYPE,UNACTIVE_DAYS,GROUP_ID,REGION_LABEL,USE_TIME_LABEL,INTERACTIVE_USER_LABEL,COIN_LABEL,PUSH_TUNNEL,PUSH_ID,BRAND,DEVICE_MODEL,VERSION_CODE,OS_VERSION,PUSH_CLICK_RATE,ACCOUNT_ID,MEMBER,ACC_NO_PAY_DAYS,MAX_PAY_ANT,LAST_PAY_ANT,LATEST_PACKAGE_CHANNEL,FIRST_FOLLOW_DAYS,READ_TYPE,READ_DAY_BOOK_NAME_STR,FIRST_ACTIVATION_DAYS,IS_PUSH_INVALID,AVG_READ_DUR_1D,AVG_READ_DUR_3D,AVG_READ_DUR_7D,AVG_READ_DUR_14D,AVG_BOOK_CURRENCY_ANT_1D,AVG_BOOK_CURRENCY_ANT_3D,AVG_BOOK_CURRENCY_ANT_7D,AVG_BOOK_CURRENCY_ANT_14D,IS_CHARGE,SIGNIN_DAYS_7D,ACTIVE_DAYS_14D,CHARGE_ANT_7D,PROMOTION_TYPE,CREATE_TIME,UPDATE_TIME from dm_crowd_push_wide_acc_d
没有执行过show(), 后续的代码会write到hdfs。都是action操作,原理是一样的。
@JialeHe 方便发一下StarRocks表的DDL吗,connector测试里倒是有验证 date类型数据的读取,我看下差别是什么
测试case: https://github.com/StarRocks/starrocks-connector-for-apache-spark/blob/main/src/test/java/com/starrocks/connector/spark/sql/ReadWriteITTest.java#L362C11-L362C11
@banmoy
CREATE TABLE `dm_crowd_push_wide_acc_d` (
`dt` date,
`source_uid` varchar(65533),
`product` varchar(65533),
`project` varchar(65533),
`category_type` int(11),
`unactive_days` int(11),
`group_id` int(11),
`region_label` varchar(65533),
`use_time_label` varchar(65533),
`interactive_user_label` varchar(65533),
`coin_label` varchar(65533),
`push_tunnel` varchar(65533),
`push_id` varchar(65533),
`brand` varchar(65533),
`device_model` varchar(65533),
`version_code` varchar(65533),
`os_version` varchar(65533),
`push_click_rate` varchar(65533),
`account_id` bigint(20),
`member` varchar(65533),
`acc_no_pay_days` bigint(20),
`max_pay_ant` bigint(20),
`last_pay_ant` bigint(20),
`latest_package_channel` varchar(65533),
`first_follow_days` bigint(20),
`read_type` int(11),
`read_day_book_name_str` varchar(65533),
`first_activation_days` bigint(20),
`is_push_invalid` int(11),
`avg_read_dur_1d` double,
`avg_read_dur_3d` double,
`avg_read_dur_7d` double,
`avg_read_dur_14d` double,
`avg_book_currency_ant_1d` double,
`avg_book_currency_ant_3d` double,
`avg_book_currency_ant_7d` double,
`avg_book_currency_ant_14d` double,
`is_charge` int(11),
`signin_days_7d` bigint(20),
`active_days_14d` int(11),
`charge_ant_7d` double,
`promotion_type` varchar(65533),
`create_time` varchar(65533),
`update_time` varchar(65533)
) ENGINE = OLAP DUPLICATE KEY(
`dt`,
`source_uid`,
`product`,
`project`,
`category_type`,
`unactive_days`,
`group_id`
) COMMENT "OLAP" PARTITION BY RANGE(`dt`) (
PARTITION p20230804
VALUES
[("2023-08-04"), ("2023-08-05")),
PARTITION p20230805 VALUES [("2023-08-05"), ("2023-08-06")),
PARTITION p20230806 VALUES [("2023-08-06"), ("2023-08-07")),
PARTITION p20230807 VALUES [("2023-08-07"), ("2023-08-08")),
PARTITION p20230808 VALUES [("2023-08-08"), ("2023-08-09")),
PARTITION p20230809 VALUES [("2023-08-09"), ("2023-08-10")))
DISTRIBUTED BY HASH(`source_uid`) BUCKETS 120
PROPERTIES (
"replication_num" = "2",
"dynamic_partition.enable" = "false",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "120",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true",
"compression" = "ZSTD"
);
@banmoy 按照UT中的逻辑
我改了一下复现的代码,确实是能走通。
感觉应该是Connector从Starrocks读出来的DT字段是String类型的,而不是Date类型
@JialeHe 按照你的建表语句和spark查询,我验证了下,还是没有复现 ,麻烦确认下这个是不是符合你的场景?
StarRock表插入数据,为了简单,只插入了包含date的duplicate key列
insert into edw_dm.dm_crowd_push_wide_acc_d (dt, source_uid, product, project, category_type, unactive_days, group_id) values ("2023-08-09", "a", "b", "c", 1, 2, 3);
spark代码
SparkSession spark = SparkSession
.builder()
.master("local[1]")
.appName("test")
.getOrCreate();
Dataset<Row> readDf = spark.read().format("starrocks")
.option("starrocks.table.identifier", "edw_dm.dm_crowd_push_wide_acc_d")
.option("starrocks.fenodes", FE_HTTP)
.option("starrocks.fe.jdbc.url", FE_JDBC)
.option("user", USER)
.option("password", PASSWORD)
.load();
readDf.createOrReplaceTempView("dm_crowd_push_wide_acc_d");
String finalSql = "select DT,SOURCE_UID,PRODUCT,PROJECT,CATEGORY_TYPE,UNACTIVE_DAYS,GROUP_ID,REGION_LABEL,USE_TIME_LABEL,INTERACTIVE_USER_LABEL,COIN_LABEL,PUSH_TUNNEL,PUSH_ID,BRAND,DEVICE_MODEL,VERSION_CODE,OS_VERSION,PUSH_CLICK_RATE,ACCOUNT_ID,MEMBER,ACC_NO_PAY_DAYS,MAX_PAY_ANT,LAST_PAY_ANT,LATEST_PACKAGE_CHANNEL,FIRST_FOLLOW_DAYS,READ_TYPE,READ_DAY_BOOK_NAME_STR,FIRST_ACTIVATION_DAYS,IS_PUSH_INVALID,AVG_READ_DUR_1D,AVG_READ_DUR_3D,AVG_READ_DUR_7D,AVG_READ_DUR_14D,AVG_BOOK_CURRENCY_ANT_1D,AVG_BOOK_CURRENCY_ANT_3D,AVG_BOOK_CURRENCY_ANT_7D,AVG_BOOK_CURRENCY_ANT_14D,IS_CHARGE,SIGNIN_DAYS_7D,ACTIVE_DAYS_14D,CHARGE_ANT_7D,PROMOTION_TYPE,CREATE_TIME,UPDATE_TIME from dm_crowd_push_wide_acc_d";
Dataset<Row> result = spark.sql(finalSql);
result.show();
spark.stop();
运行spark输出
+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+
| DT|SOURCE_UID|PRODUCT|PROJECT|CATEGORY_TYPE|UNACTIVE_DAYS|GROUP_ID|REGION_LABEL|USE_TIME_LABEL|INTERACTIVE_USER_LABEL|COIN_LABEL|PUSH_TUNNEL|PUSH_ID|BRAND|DEVICE_MODEL|VERSION_CODE|OS_VERSION|PUSH_CLICK_RATE|ACCOUNT_ID|MEMBER|ACC_NO_PAY_DAYS|MAX_PAY_ANT|LAST_PAY_ANT|LATEST_PACKAGE_CHANNEL|FIRST_FOLLOW_DAYS|READ_TYPE|READ_DAY_BOOK_NAME_STR|FIRST_ACTIVATION_DAYS|IS_PUSH_INVALID|AVG_READ_DUR_1D|AVG_READ_DUR_3D|AVG_READ_DUR_7D|AVG_READ_DUR_14D|AVG_BOOK_CURRENCY_ANT_1D|AVG_BOOK_CURRENCY_ANT_3D|AVG_BOOK_CURRENCY_ANT_7D|AVG_BOOK_CURRENCY_ANT_14D|IS_CHARGE|SIGNIN_DAYS_7D|ACTIVE_DAYS_14D|CHARGE_ANT_7D|PROMOTION_TYPE|CREATE_TIME|UPDATE_TIME|
+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+
|2023-08-09| a| b| c| 1| 2| 3| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|
+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+
@banmoy 请问你这段代码使用的是Connector 1.1.0 版本么?
是的
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
<version>1.1.0</version>
</dependency>
你用我的代码例子跑还是有问题吗
我今天试一下