Alink
Alink copied to clipboard
throw java.lang.ClassCastException: java.util.Date cannot be cast to java.sql.Timestamp when use odps catalog
1. 问题原因分析:
在odps中有一张表中有两列的数据类型为:
expired_time DATETIME,
create_time DATETIME
上述类型在odps的com.aliyun.odps.commons.proto.ProtobufRecordStreamReader#readField方法中会将datatime类型的字段处理成java.util.Date类型,而在Alink的com.alibaba.alink.common.io.catalog.odps.OdpsTableUtil类中有一段静态代码块,具体如下:
static {
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.BOOLEAN, Types.BOOLEAN());
FLINK_TYPE_TO_ODPS_MAP.put(Types.BOOLEAN(), OdpsType.BOOLEAN);
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.BIGINT, Types.LONG());
FLINK_TYPE_TO_ODPS_MAP.put(Types.LONG(), OdpsType.BIGINT);
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DOUBLE, Types.DOUBLE());
FLINK_TYPE_TO_ODPS_MAP.put(Types.DOUBLE(), OdpsType.DOUBLE);
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.STRING, Types.STRING());
FLINK_TYPE_TO_ODPS_MAP.put(Types.STRING(), OdpsType.STRING);
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());
FLINK_TYPE_TO_ODPS_MAP.put(Types.SQL_TIMESTAMP(), OdpsType.DATETIME);
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.TINYINT, Types.BYTE());
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.SMALLINT, Types.SHORT());
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.INT, Types.INT());
ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.FLOAT, Types.FLOAT());
}
在这里会将odpsType为OdpsType.DATETIME类型的转换成flink的Types.SQL_TIMESTAMP()类型。而由于上面datetime类型的已经被转换成java.util.Date,于是便出现了上面的类强转异常。
2. 问题尝试解决方案一:
将ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());修改为ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, BasicTypeInfo.DATE_TYPE_INFO); 运行时发现,报了新的错误:Type is not supported:Date。 报错源码位于org.apache.flink.table.calcite.FlinkTypeFactory#typeInfoToSqlTypeName方法,在里面是找不到java.util.Date的,它支持的time类型主要有java.sql.Date和Timestamp等。 于是这种解决方案宣告失败。
3. 解决方案二:
将ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());修改为ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_DATE());同时用一个新的ProtobufRecordStreamReader类覆盖原有的代码,并将java.util.Date改为java.sql.Date。
请问还有更好的解决办法吗?
注:用的是当前的master分支的代码,发现使用的是flink old planner,在blink planner中是没有org.apache.flink.table.planner.calcite.FlinkTypeFactory中typeInfoToSqlTypeName方法的。