[FLINK-36805][cdc-common] Add ConfigShade interface to support encryption of sensitive configuration items and provide a base64 encoding implementation
Introduction
When Flink CDC uses pipeline to submit jobs, we need to set configuration items in plaintext in the definition file, including sensitive configurations such as passwords for connecting to source and sink components (e.g., mysql, doris, etc.), which may be a security risk. To avoid the use of plaintext passwords, we provide an interface (ConfigShade) by implementing which developers can customize the decryption method themselves.
We also provide an implementation for base64 encoding first, not only as an example implementation of the interface, but also to solve the current problem of plaintext passwords.
How to use
Using the base64 implementation as an example, the following shows how to use a configuration file with sensitive items encrypted:
- Add two new options
shade.identifierandshade.sensitive.keywordsto thepipelinepart in the definition yaml file to specify the encryption algorithm and the encrypted sensitive keywords. - Replace the plaintext of the sensitive items specified in
shade.sensitive.keywordswith the encrypted ciphertext. - Submit a pipeline job with the new definition file.
Example definition file:
source:
type: mysql
name: source-database
hostname: localhost
port: 3306
username: YWRtaW4=
password: cGFzc3dvcmQx
tables: replication.cluster
server-id: 5400-5404
server-time-zone: Asia/Shanghai
route:
- source-table: replication.cluster
sink-table: test.cluster
description: sync table to one destination table
sink:
type: doris
name: sink-queue
fenodes: localhost:8035
username: cm9vdA==
password: cGFzc3dvcmQy
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
shade.identifier: base64
shade.sensitive.keywords: password;username
How to customize the encryption algorithm
To use a user-defined encryption algorithm, we expect the developer to provide a dependency package that implements the ConfigShade interface.
/**
* The interface that provides the ability to decrypt {@link
* org.apache.flink.cdc.composer.definition}.
*/
public interface ConfigShade {
/**
* Initializes the custom instance using the pipeline configuration.
*
* <p>This method can be useful when decryption requires an external file (e.g. a key file)
* defined in the pipeline configs.
*/
default void initialize(Configuration pipelineConfig) throws Exception {}
/**
* The unique identifier of the current interface, used it to select the correct {@link
* ConfigShade}.
*/
String getIdentifier();
/**
* Decrypt the content.
*
* @param content The content to decrypt
*/
String decrypt(String content);
}
In it, the method getIdentifier() can be called to get the unique identifier of the algorithm, which is used to configure the shade.identifier, and the method decrypt(String content) can be used to decrypt the input cipher text.
This pull request has been automatically marked as stale because it has not had recent activity for 120 days. It will be closed in 60 days if no further activity occurs.
Nice contribution. Could you please add a test case where the configuration value contains sensitive words?
@joyCurry30 Ok, I've added a test to verify that yaml can be parsed and decrypted correctly when the configuration value contains sensitive keywords. PTAL