doris
doris copied to clipboard
[Feature] JSON data is dynamically written to the Doris table
Search before asking
- [X] I had searched in the issues and found no similar issues.
Description
The flink connector automatically writes the data into the corresponding table by parsing the database name and table name in the JSON data
Use case
Collect data in real time, consume the data in Kafka through Flink, there are multiple table data in a topic, JSON data format, and the data includes database name, table name, field name and data value
Related issues
No response
Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Can you provide a brief description of your design?
Customize the sink class, extends richsinkfunction and implement checkpointedfunction. In invoke dynamically sink through the properties in the custom entity object. The properties of the custom entity object are databasename, tablename, list<string>. If there is a need to dynamically insert multi table data, the user only needs to encapsulate this entity object, I'm not good at English. I don't know whether I can express it clearly
------------------ 原始邮件 ------------------ 发件人: "apache/doris" @.>; 发送时间: 2022年7月28日(星期四) 上午9:39 @.>; 抄送: "I'm ~ @.@.>; 主题: Re: [apache/doris] [Feature] JSON data is dynamically written to the Doris table (Issue #11258)
Can you provide a brief description of your design?
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
Customize the sink class, extends richsinkfunction and implement checkpointedfunction. In invoke dynamically sink through the properties in the custom entity object. The properties of the custom entity object are databasename, tablename, list<string>. If there is a need to dynamically insert multi table data, the user only needs to encapsulate this entity object, I'm not good at English. I don't know whether I can express it clearly
By encapsulating a data structure similar to <db, table, data>, when stream loading, replace the corresponding url, and then stream load the data of each table in turn on the sink side?
Yes, encapsulate such a data structure, dynamically load by spelling the URL on the sink side, and add a keyby operator before sink
------------------ 原始邮件 ------------------ 发件人: "apache/doris" @.>; 发送时间: 2022年7月28日(星期四) 中午11:48 @.>; 抄送: "I'm ~ @.@.>; 主题: Re: [apache/doris] [Feature] JSON data is dynamically written to the Doris table (Issue #11258)
Customize the sink class, extends richsinkfunction and implement checkpointedfunction. In invoke dynamically sink through the properties in the custom entity object. The properties of the custom entity object are databasename, tablename, list<string>. If there is a need to dynamically insert multi table data, the user only needs to encapsulate this entity object, I'm not good at English. I don't know whether I can express it clearly
By encapsulating a data structure similar to <db, table, data>, when stream loading, replace the corresponding url, and then stream load the data of each table in turn on the sink side?
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
Yes, encapsulate such a data structure, dynamically load by spelling the URL on the sink side, and add a keyby operator before sink
At present, flink-doris-connector initiates the stream load of the table when the flink task starts, instead of doing the stream load when the upstream data is received. How to do this dynamic stream load? Please describe your design in detail~
Yes, encapsulate such a data structure, dynamically load by spelling the URL on the sink side, and add a keyby operator before sink
At present, flink-doris-connector initiates the stream load of the table when the flink task starts, instead of doing the stream load when the upstream data is received. How to do this dynamic stream load? Please describe your design in detail~
Many users put all the canal logs of all tables in the business library into one topic, which needs to be distributed before they can use doris-flink-connector. His idea is to edit a task to synchronize the entire library. Because currently doris-flink-connector uses http inputstream, that is, a checkpoint opens a stream, and a streamLoad url is strongly bound. Therefore, the current doris-flink-connector architecture is not suitable for the entire library synchronization, because it will involve too many http long link. In this case, we can only use the old streamload batch mode: the flink side caches data, then a table generates a buffer, and binds the corresponding table-streamload-url, and sets a threshold, such as rows number or batch size to submit tasks, just like doris-datax-writer.
生成
canal json ---> serialize to RowData --> TableBufferMap<String,Buffer<String>> map
key is {db}_{table}
, buffer value is a buffer contained {"column_a":"value_a","column_b":"value_b"...}
we can submit to http://xxx:xx/api/{db}/{table}/_streamLoad when buffer over size like doris-datax-writer.
What you said is quite right. My idea is like this. The previous answer may not be described clearly
------------------ 原始邮件 ------------------ 发件人: "apache/doris" @.>; 发送时间: 2022年7月28日(星期四) 晚上8:20 @.>; 抄送: "I'm ~ @.@.>; 主题: Re: [apache/doris] [Feature] JSON data is dynamically written to the Doris table (Issue #11258)
Yes, encapsulate such a data structure, dynamically load by spelling the URL on the sink side, and add a keyby operator before sink
At present, flink-doris-connector initiates the stream load of the table when the flink task starts, instead of doing the stream load when the upstream data is received. How to do this dynamic stream load? Please describe your design in detail~
Many users put all the canal logs of all tables in the business library into one topic, which needs to be distributed before they can use doris-flink-connector. His idea is to edit a task to synchronize the entire library. Because currently doris-flink-connector uses http inputstream, that is, a checkpoint opens a stream, and a streamLoad url is strongly bound. In this case, we can only use the flink side to cache data, and then a table generates a buffer, and bind the corresponding table-streamload-url, set a threshold, such as rows number or batch size to submit tasks, just like doris-datax-writer.
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
Yes, that's it. A flink task can synchronize the data of the entire database
------------------ 原始邮件 ------------------ 发件人: "apache/doris" @.>; 发送时间: 2022年7月28日(星期四) 晚上8:31 @.>; 抄送: "I'm ~ @.@.>; 主题: Re: [apache/doris] [Feature] JSON data is dynamically written to the Doris table (Issue #11258)
生成
canal json ---> serialize to RowData --> TableBufferMap<String,Buffer> map key is {db}_{table}, buffer value is a buffer contained {"column_a":"value_a","column_b":"value_b"...} we can submit to http://xxx:xx/api/{db}/{table}/_streamLoad when buffer over size like doris-datax-writer.
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
Many users put all the canal logs of all tables in the business library into one topic, which needs to be distributed before they can use doris-flink-connector. His idea is to edit a task to synchronize the entire library. Because currently doris-flink-connector uses http inputstream, that is, a checkpoint opens a stream, and a streamLoad url is strongly bound. Therefore, the current doris-flink-connector architecture is not suitable for the entire library synchronization, because it will involve too many http long link. In this case, we can only use the old streamload batch mode: the flink side caches data, then a table generates a buffer, and binds the corresponding table-streamload-url, and sets a threshold, such as rows number or batch size to submit tasks, just like doris-datax-writer.
However, in the old version of stream load and batch writing, there may be several problems:
- A series of problems caused by the unreasonable setting of the cached batch size: For example, if it is too small, it will cause the -235 problem caused by frequent imports; if the setting is too large, the flink memory will be under pressure.
- And does not guarantee exactly-once semantics
After the function development is completed, a reference threshold can be given to users. Users can set sink concurrency and checkpoint interval according to scenarios such as data volume and effectiveness
------------------ 原始邮件 ------------------ 发件人: "apache/doris" @.>; 发送时间: 2022年7月29日(星期五) 中午11:47 @.>; 抄送: "I'm ~ @.@.>; 主题: Re: [apache/doris] [Feature] JSON data is dynamically written to the Doris table (Issue #11258)
Many users put all the canal logs of all tables in the business library into one topic, which needs to be distributed before they can use doris-flink-connector. His idea is to edit a task to synchronize the entire library. Because currently doris-flink-connector uses http inputstream, that is, a checkpoint opens a stream, and a streamLoad url is strongly bound. Therefore, the current doris-flink-connector architecture is not suitable for the entire library synchronization, because it will involve too many http long link. In this case, we can only use the old streamload batch mode: the flink side caches data, then a table generates a buffer, and binds the corresponding table-streamload-url, and sets a threshold, such as rows number or batch size to submit tasks, just like doris-datax-writer.
However, in the old version of stream load and batch writing, there may be several problems:
A series of problems caused by the unreasonable setting of the cached batch size: For example, if it is too small, it will cause the -235 problem caused by frequent imports; if the setting is too large, the flink memory will be under pressure.
And does not guarantee exactly-once semantics
— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
Many users put all the canal logs of all tables in the business library into one topic, which needs to be distributed before they can use doris-flink-connector. His idea is to edit a task to synchronize the entire library. Because currently doris-flink-connector uses http inputstream, that is, a checkpoint opens a stream, and a streamLoad url is strongly bound. Therefore, the current doris-flink-connector architecture is not suitable for the entire library synchronization, because it will involve too many http long link. In this case, we can only use the old streamload batch mode: the flink side caches data, then a table generates a buffer, and binds the corresponding table-streamload-url, and sets a threshold, such as rows number or batch size to submit tasks, just like doris-datax-writer.
However, in the old version of stream load and batch writing, there may be several problems:
- A series of problems caused by the unreasonable setting of the cached batch size: For example, if it is too small, it will cause the -235 problem caused by frequent imports; if the setting is too large, the flink memory will be under pressure.
- And does not guarantee exactly-once semantics
So in future versions of flink-connector-doris, will this function of dynamically writing doris data tables be added? If so, in which version will it be added?
Yes, encapsulate such a data structure, dynamically load by spelling the URL on the sink side, and add a keyby operator before sink … ------------------ 原始邮件 ------------------ 发件人: "apache/doris" @.>; 发送时间: 2022年7月28日(星期四) 中午11:48 @.>; 抄送: "I'm ~ @.@.>; 主题: Re: [apache/doris] [Feature] JSON data is dynamically written to the Doris table (Issue #11258) Customize the sink class, extends richsinkfunction and implement checkpointedfunction. In invoke dynamically sink through the properties in the custom entity object. The properties of the custom entity object are databasename, tablename, list<string>. If there is a need to dynamically insert multi table data, the user only needs to encapsulate this entity object, I'm not good at English. I don't know whether I can express it clearly By encapsulating a data structure similar to <db, table, data>, when stream loading, replace the corresponding url, and then stream load the data of each table in turn on the sink side? — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>
That's right, we've done that for now
Perhaps,doris can provide another http interface for database sync,use a special header -H 'table:xxx'
to flush into doris the same database. and we can also reuse the url.