incubator-streampark
incubator-streampark copied to clipboard
Metadata Center - real-time data warehouse metadata, resource metadata (such as parsed out UDF, Connector and other key information, what attributes are there)
Flink metadata management
Metadata definition (StreamPark RoadMap
)
Step 1
I will finish to implement the
catalog
function implementation, and source management participates in the design. (Part of the metadata depends on syntax analysis, which will make the project complicated, so i split as flower.)
reference:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.html
definition:
- Firstly we must satisfy the most simple metadata management, Include :
version control catalog
,source manager(kafka,Es and so on)
,Simple auth control
;
Requires implementation
- Flink catalog: including:
connector information
, based on physical column schema management.Schema
contains or not containsmeta
manager. For example:kafka
has nometa
, andmysql/es
hasmeta
.
No
meta
management,kafka
for example
- A
Topic
, hasjson
, we store json define.There is
meta
management,mysql
for example
- Only record
meta
information obtained frommysql
.
- Source management:
catalog
simple auth , check a user has permission to use a table, regardless of read and write permissions. (Simplecatalog
cannot distinguish whether read or not). -
Catalog
version control:catalog
table has the concept of version. Unless user specified, the default is the time when the task first initialization is selected as the version. -
Flink
feature support (Not only physic column). **Note thatschema
management does not include the definition of time columns, primary key columns, and operation columns, just physic column. **
# flink feature syntax are as follow
Support syntax 1 (define a new table)
create table as tablea
Support syntax 2 (redefine the table structure within the session. For example: a bigint is defined as timestamp)
create tablea (a bigint, ts as xxx)
-
Schema
management:source schema type
--> datatype--->Flink sql type
. Instead ofsource type``-->
Flink sql type(Hard to scalability, not uniform define
).
reference: ( reference design document)
https://atlas.apache.org/2.0.0/TypeSystem.html
https://datahubproject.io/docs/advanced/field-path-spec-v2
https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py
- Resolved support for partition insert columns in
flink sql
. For example,kafka
table defines 10 fields,insert
only use 6 fields, which is not supported yet. (catalog
decline all column but we just need partly.)
solution:
# sql parse rebuild sql as follow:
before :
insert into temp select a,b,c from source
after:
insert into temp select a,b,c, null as `d` from source
Notice:
- Do not consider complex permissions issues.
- Function definitions such as
udx
are not considered. - The principle of simplification, without considering
metadata
manger such asdatahub
dose. - Do not consider
schema evolation
,schema
compatibility problem . (Compatibility is a big issue inFlink sql
, especially the logical changes follow byschema
changes will lead tostate
incompatibility, so it is meaningless forFlink sql
to consider this issue.)
Step 2
Part of the functional commitment to achieve
Requires implementation
- Meet the step 1.
- Introduce field-level permission management.
Since
catalog
cannot distinguishsource/sink
tables, there are two ways to implement permissions control:
streamgraph
(our implementation)- Syntax parsing
- udx management support. (automatically loaded via classload)
- Format enhancement. For example, row type flattening avoid bug . (
Flink bug
, [FLINK-18027](https://issues.apache.org/jira/browse/FLINK-18027) ).
Step 3
No commitment to fulfill
Requires implementation
- Satisfy the step 3.
- Field source blood relationship analysis.
- Cross-task bloodline analysis, open up external
metadata
tools, such asaltas
,datahub
, etc. -
schema evloation
&state evloation
.
This problem is complicated. Not only solve the problem of
scheam
, but also the compatibility of Flink state; schema compatibility can be achieved throughavro
compatibility, refer tohudi
, etc.
Flink 元数据管理内容
元数据定义(StreamPark RoadMap
)
第一步
除部分列支持外,承诺实现
catalog
功能实现,源管理参与设计。(部分列依赖语法分析,会导致项目变得复杂)
reference:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.html
定义:
- 第一层的是满足最朴素的元数据管理 ,对应
flink sql
中的具有版本管理的catalog
实现,最基本的权限管理
需要实现
- Flink catalog:包括:
connector information
,基于物理列schema管理。schema
管理包括,无meta
源,有meta
源。比如:kafka
是无meta
的,mysql/es
等是有meta
。
无
meta
管理,kafka
为例
- 一个
Topic
, 是json
格式,管理该schema
定义。有
meta
管理,mysql
为例
- 只记录从
mysql
中获取的meta
信息。
- 源管理:
catalog
简单的权限控制,一个用户是否具有一张表的使用权限,不区分读写权限。(单纯catalog
无法区分,第二阶段能够实现)。 -
catalog
版本控制:catalog
表具有版本的概念,除非特定声明,默认以任务首次初始化的时间作为版本选择的。 -
flink
特性支持(以上只能解决物理列)。注意schema
管理不包含时间列,主键列,运算列的定义,是最朴素的物理列定义。Flink
特性支持只在单个任务内有效。
# flink 特性声明语法 (以下语法无需改动 flink 底层,只与 flink catalog 有关)
支持语法1 (定义一张新表)
create table as tablea
支持语法2 (会话内重新定义表结构。如:a bigint 定义为 timestamp)
create tablea (a bigint, ts as xxx)
-
schema
管理:不同源优先考虑source schema type
--> datatype--->Flink sql type
。而不是source schema type
-->Flink sql type
这种管理方式(扩展性差,不统一
)。
reference: (基于以下规则扩展,或者基于
avro schema
参考设计)https://atlas.apache.org/2.0.0/TypeSystem.html
https://datahubproject.io/docs/advanced/field-path-spec-v2
https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py
- 解决
flink sql
部分列的支持。比如kafka
表定义了10个字段,insert
只声明6个字段,不支持的问题。(catalog
会导致所有插入都是全部列声明,无法使用部分列。)
解决方案:
sql 改写,自动补 null
insert a,b,c,null as `d`, null as `e`
注意:
- 不考虑复杂权限问题。
- 不考虑
udx
等函数定义。 - 最简单化原则,不考虑血缘等其他层的定义。(对于
streamPark
的核心实现是实现第一步) - 不考虑
schema evolation
,schema
兼容相关性问题。(Flink sql
中兼容性是个大问题,特别是schema
变更同时带来的逻辑变更会导致state
不兼容,因此Flink sql
考虑这个问题是无意义的。)
第二步
部分功能承诺实现
- 需要实现
- 满足第一层。
- 引入字段级别的权限管理。
由于
catalog
无法区分source/sink
表,实现权限可以有两种思路:
streamgraph
(我们内部实现方式)- 语法解析
- udx 管理支持。(通过 classload 自动装载)
- 非元数据相关内容,format 增强。比如,row 类型展平(
Flink bug
,[FLINK-18027](https://issues.apache.org/jira/browse/FLINK-18027) 等row
潜在问题),通过 format 支持etl
代码和sql
混合编程等。(内部实现,所有etl
会映射到pojo
,通过pojo format
实现支持。这种支持对于spark stream etl/flink stream elt
迁移到flink sql
具有实用性。 )。
第三步
不承诺实现
- 需要实现
- 满足第二层。
- 字段来源血缘分析。
- 跨任务血缘分析,打通外部
metadata
工具,如altas
,datahub
等。 -
schema evoloation
&state evloation
。
这个问题有点复杂,实际应用不仅仅要解决
scheam
的问题,还必须解决Flink state 兼容性问题;schema 兼容可以通过avro
兼容性,参考hudi
等实现
#609 #602
Currently, there are no plans for this.