[Feature] Real-Time Wide Table Solutions
Search before asking
- [X] I had searched in the feature and found no similar feature requirement.
Description
Real-time large wide table will bring: big state problem of dual stream association, FlinkSQL's dual stream association will keep the historical data of left and right streams to associate with each other, the longer the time interval needs to be associated, the more historical data will be saved and the bigger the state will be. For example, to correlate the order placement event and refund event of an order and to ensure the correctness of the calculation result, you need to consider the interval between these two events, which may be a month or even longer.
A two-stream correlated stateful SQL job, Mem and Disk form the TaskManager node of the SQL job, the SQL job state backend uses RocksDB, and the state is persisted on the HDFS file system. At first, we tried to set the state of SQL jobs to be retained for one month, but the SQL jobs would become unstable, with problems such as memory overrun and degraded state reading performance, and we had to keep increasing the number of TMs and memory size of the jobs to alleviate them.
Even so, there are still two pain points in the business. First, it is difficult to initialize the associated data. Currently, the company's Kafka data source has restrictions on historical backtracking, so the business cannot build a complete historical state, and even if Kafka supports longer backtracking, the efficiency of state initialization is still a problem. Secondly, the memory resource overhead is large, especially when multiple SQL jobs are associated with the same data source, the corresponding memory resource needs to be allocated for each SQL job, the state between different SQL jobs is isolated, and the same associated data between jobs cannot be reused.
There are two solutions on the market at present.
- [快手] The solution of hot and cold association separation. Assuming that the data associated with two days ago is relatively low frequency and the state rollback will not exceed two days, then you can define the data before two days as cold data and the data within two days as hot data.
The SQL job on the left retains only the hot data for the two days T+0 and T+1 by setting the length of state retention, while the cold data for T+2 and beyond is synchronized daily from Hive to the external KV via a batch task. When correlating, if the hot data in the state does not exist, the cold data is then correlated by accessing the external KV. On the right is another SQL job that needs to associate the same data source, which shares the cold data in the outer KV with the SQL job on the left.
For the first pain point, because the state was controlled within two days, the amount of data initialized by the SQL job when it went live was controlled. For the second pain point, because most of the data before two days are saved in the outer KV, different SQL jobs can query the outer KV, which can save a lot of memory resources. 2. [ziroom] abstract a layer of datastream flink program + flink sql temporal join to solve the problem. In order to solve the low memory consumption we chose flink sql temporal join, bringing the core disadvantage is that only one table change can be monitored, and the associated dimensional table data can not guarantee real-time can not meet business needs. So our first layer of datastream flink program The core problem to solve is to make up for these two shortcomings of flink sql temporal join.
In order to solve the problem of "monitoring only one table's changes", the datastream flink program has to get all the table changes and then transform all the table changes into a virtual master table change.
To get the changes of all the dependent tables we need to filter the changes of the tables we need from the messages of the canal topic of multiple libraries. Then analyze the logic of the hive sql above. We define the main_key of the virtual master table generated by the changes in table a, table b and table c as the col1 field, so that once the data in table a and table b has changed, we can directly take out the value in the col1 field and write it to the main_key of the virtual master table.
The problem is that there is no col1 field in table c, how should we get it? We can get it by select b.col1 from b join c on b.col2= c.col2 where c.col2 = ? Put the value of col2 of table c in the change to "?" Find out the value of col1 field that should be changed by sql and write it to the main_key of the virtual master table.
In order to solve the problem of "dimensional data cannot be guaranteed in real time", we write to mysql based on all the changes in the dependent tables, but this does not guarantee that the results obtained by the flink sql temporal join program must be in real time.
For example, if there is an insert operation data change information written to the virtual master table toipc, then the flink sql temporal join program triggered to go to the mysql dimension table to find this check data, the reason is that the mirror write program may not be finished writing.
This will cause data loss. To solve this problem, we must ensure that the data in the mirror table is written successfully and then write the information of this table change to the virtual main table topic.
After solving the above two core problems considering the business often appear to add indexes to brush the library, we will not need to care about the changes in the field column data intercepted to avoid bringing unnecessary pressure.
实时大宽表会带来:双流关联的大状态问题,FlinkSQL 的双流关联会保留左右流的历史数据来互相关联,需要关联的时间间隔越长,保存的历史数据就会越多,状态也就会越大。比如,要关联订单的下单事件和退款事件,并保证计算结果的正确性,需要考虑这两个事件发生的间隔,可能是一个月甚至更久。
一个双流关联的有状态 SQL 作业,Mem 和 Disk 组成了 SQL 作业的 TaskManager 节点,SQL 作业状态后端使用 RocksDB,状态持久化在 HDFS 文件系统上。一开始我们尝试把 SQL 作业的状态设置为保留一个月,但 SQL 作业会变得不稳定,出现内存超限、状态读取性能下降等问题,只能不断增加作业的 TM 数和内存大小来缓解。
即使这样,业务上仍然存在两个痛点。首先是关联数据初始化难,目前公司 Kafka 数据源对历史回溯有限制,因此业务不能构建出完整的历史状态,即使 Kafka 支持了更久的回溯,状态初始化的效率也依然是一个问题。其次,内存资源开销大,特别是当多个 SQL 作业关联相同的数据源时,需要为每个 SQL 作业都分配相应的内存资源,不同 SQL 作业间的状态是隔离的,作业间相同的关联数据不能复用。
目前市面上有两种解决方案: 1.【快手】冷热关联分离的解决方案。假设关联两天前的数据是相对低频的且状态回滚不会超过两天,那么可以定义两天前的数据为冷数据,两天之内的数据为热数据。
左侧的 SQL 作业通过设置状态保留时长,只保留 T+0 和 T+1 这两天的热数据,而 T+2 及更久以前的冷数据则通过批任务每天从 Hive 同步到外存 KV 中。关联时,若状态中的热数据不存在,则再通过访问外存 KV 来关联冷数据。右侧是另外一个 SQL 作业需要关联相同的数据源,它与左侧的 SQL 作业共享外层 KV 中的冷数据。
对于第一个痛点,因为状态控制在了两天内,SQL 作业上线时,关联数据初始化的数据量得到了控制。对于第二个痛点,因为两天前的大部分数据都保存在外层KV中,不同的 SQL 作业都可以查询外存 KV,从而可以节省大量内存资源。 2.【自如】抽象出一层datastream flink 程序 + flink sql temporal join的方式来解决问题。 为了解决低内存占用我们选择了flink sql temporal join,带来核心的缺点是只能监测一个表的变化,而且关联的维表数据不能保证实时性不能满足业务需求。所以我们第一层datastream flink 程序 核心要解决的问题就是弥补flink sql temporal join 的这两个缺点。
为了解决“只能监测一个表的变化”的问题,datastream flink 程序要拿到所有表的变化,然后将所有表的变化转化到一张虚拟主表的变化。
要想拿到所有依赖表的数据的变化我们需要从多个库的canal topic的消息中过滤出我们需要的表的数据变化。然后分析一下上面hive sql的逻辑。我们定义a表、b表、c表变化最终生成的虚拟主表的main_key为col1 字段,这样 一旦a表和b表中数据发生变化,直接就可以取出col1字段中的值写到虚拟主表的main_key中。
问题来了c表中没有col1字段,应该怎么获取呢?我们可以通过select b.col1 from b join c on b.col2= c.col2 where c.col2 = ?将变化中c表的col2值放到“?”的位置通过sql 找出应该发生变化的col1字段的值,然后写到虚拟主表的main_key中。
为了解决“维表数据不能保证实时性”的问题,我们根据所有依赖表的变化镜像写到mysql中,但是这不并能保证flink sql temporal join程序拿到的结果一定是实时。
比如有1条insert操作数据的变化信息写到虚拟主表toipc中,这时flink sql temporal join程序触发后去mysql 维表查找不到这条查数据,原因是镜像写入程序可能还没写完。
这样就会造成数据丢失。要解决这个问题就必须保证是镜像表中的数据确认写入成功后再把这条表变化的信息写到虚拟主表的topic中。
解决了上面两个核心问题后考虑到业务经常会出现加索引刷库的情况,我们将不需要关心的变化的字段列数据拦截掉,避免带来不必要的压力。
Usage Scenario
No response
Related issues
No response
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
This is a issue at the business logic side, how does streampark solve this issue?
这是业务逻辑方面的问题,streampark是如何解决这个问题的?
I'm not quite sure what to do, we'll have some discussion about it in the team and I'll feed it back to the current issues.