Alink icon indicating copy to clipboard operation
Alink copied to clipboard

throw java.lang.ClassCastException: java.util.Date cannot be cast to java.sql.Timestamp when use odps catalog

Open wysstartgo opened this issue 3 years ago • 1 comments

1. 问题原因分析:

在odps中有一张表中有两列的数据类型为:

 expired_time              DATETIME,
    create_time               DATETIME

上述类型在odps的com.aliyun.odps.commons.proto.ProtobufRecordStreamReader#readField方法中会将datatime类型的字段处理成java.util.Date类型,而在Alink的com.alibaba.alink.common.io.catalog.odps.OdpsTableUtil类中有一段静态代码块,具体如下:

static {
		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.BOOLEAN, Types.BOOLEAN());
		FLINK_TYPE_TO_ODPS_MAP.put(Types.BOOLEAN(), OdpsType.BOOLEAN);

		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.BIGINT, Types.LONG());
		FLINK_TYPE_TO_ODPS_MAP.put(Types.LONG(), OdpsType.BIGINT);

		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DOUBLE, Types.DOUBLE());
		FLINK_TYPE_TO_ODPS_MAP.put(Types.DOUBLE(), OdpsType.DOUBLE);

		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.STRING, Types.STRING());
		FLINK_TYPE_TO_ODPS_MAP.put(Types.STRING(), OdpsType.STRING);

		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());
		FLINK_TYPE_TO_ODPS_MAP.put(Types.SQL_TIMESTAMP(), OdpsType.DATETIME);

		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.TINYINT, Types.BYTE());
		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.SMALLINT, Types.SHORT());
		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.INT, Types.INT());
		ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.FLOAT, Types.FLOAT());
	}

在这里会将odpsType为OdpsType.DATETIME类型的转换成flink的Types.SQL_TIMESTAMP()类型。而由于上面datetime类型的已经被转换成java.util.Date,于是便出现了上面的类强转异常。

2. 问题尝试解决方案一:

将ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());修改为ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, BasicTypeInfo.DATE_TYPE_INFO); 运行时发现,报了新的错误:Type is not supported:Date。 报错源码位于org.apache.flink.table.calcite.FlinkTypeFactory#typeInfoToSqlTypeName方法,在里面是找不到java.util.Date的,它支持的time类型主要有java.sql.Date和Timestamp等。 于是这种解决方案宣告失败。

3. 解决方案二:

将ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());修改为ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_DATE());同时用一个新的ProtobufRecordStreamReader类覆盖原有的代码,并将java.util.Date改为java.sql.Date。

请问还有更好的解决办法吗?

wysstartgo avatar Sep 26 '21 11:09 wysstartgo

注:用的是当前的master分支的代码,发现使用的是flink old planner,在blink planner中是没有org.apache.flink.table.planner.calcite.FlinkTypeFactory中typeInfoToSqlTypeName方法的。

wysstartgo avatar Sep 26 '21 12:09 wysstartgo