seatunnel
seatunnel copied to clipboard
[Feature][Connector V2] Add SaveMode for All Sink Connector
Background
Currently, SeaTunnel's sink connector does not support the SaveMode function. So I create this issue and discuss how to add SaveMode feature to SeaTunnel.
Unified SaveMode Type
I have checked all Sink connectors. At present, SaveMode can be divided into two categories.
PathSaveMode
The SaveMode for the Sink connectors that use path to organize data, For example File Connectors.
public enum PathSaveMode {
DROP_DATA,
KEEP_DATA,
ERROR
}
TableSaveMode
The SaveMode for the Sink connectors that use table or other table structures to organize data.
public enum TableSaveMode {
DROP_TABLE,
KEEP_TABLE_DROP_DATA,
KEEP_TABLE_AND_DATA,
ERROR_WHEN_EXISTS
}
Add interface for SaveMode
I will add tow interface for SaveMode.
SupportPathSaveMode
The Sink Connectors which support PathSaveMode should implement this interface
/**
* The Sink Connectors which support PathSaveMode should implement this interface
*/
public interface SupportPathSaveMode {
/**
* We hope every sink connector use the same option name to config SaveMode, So I add checkOptions method to this interface.
* checkOptions method have a default implement to check whether `save_mode` parameter is in config.
*
* @param config config of Sink Connector
* @return
*/
default PathSaveMode checkOptions(Config config) {
if (config.hasPath(SinkCommonOptions.PATH_SAVE_MODE.key())) {
String pathSaveMode = config.getString(SinkCommonOptions.PATH_SAVE_MODE.key());
return PathSaveMode.valueOf(pathSaveMode.toUpperCase(Locale.ROOT));
} else {
throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
SinkCommonOptions.PATH_SAVE_MODE.key() + " must in config");
}
}
void handleSaveMode(PathSaveMode pathSaveMode);
}
SupportTableSaveMode
The Sink Connectors which support TableSaveMode should implement this interface.
/**
* The Sink Connectors which support TableSaveMode should implement this interface
*/
public interface SupportTableSaveMode {
/**
* We hope every sink connector use the same option name to config SaveMode, So I add checkOptions method to this interface.
* checkOptions method have a default implement to check whether `save_mode` parameter is in config.
*
* @param config config of Sink Connector
* @return TableSaveMode TableSaveMode
*/
default TableSaveMode checkOptions(Config config) {
if (config.hasPath(SinkCommonOptions.TABLE_SAVE_MODE.key())) {
String tableSaveMode = config.getString(SinkCommonOptions.TABLE_SAVE_MODE.key());
return TableSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
} else {
throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
SinkCommonOptions.TABLE_SAVE_MODE.key() + " must in config");
}
}
void handleSaveMode(TableSaveMode tableSaveMode);
}
Add SaveMode Option to Sink Connector
In order to unify the parameter names of each Sink connector that supports SaveMode, I added two parameters about SaveMode.
public static final Option<TableSaveMode> TABLE_SAVE_MODE =
Options.key("save_mode")
.enumType(TableSaveMode.class)
.noDefaultValue()
.withDescription("The table save mode");
public static final Option<PathSaveMode> PATH_SAVE_MODE =
Options.key("save_mode")
.enumType(PathSaveMode.class)
.noDefaultValue()
.withDescription("The path save mode");
Automatically add SaveMode Option to the OptionRule of each connector supported SaveMode
To do this, I add SupportPathSaveMode
and SupportTableSaveMode
interface check in FactoryUtil.sinkFullOptionRule
.
/**
* This method is called by SeaTunnel Web to get the full option rule of a sink.
* @return
*/
public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
OptionRule sinkOptionRule = factory.optionRule();
if (sinkOptionRule == null) {
throw new FactoryException("sinkOptionRule can not be null");
}
Class<? extends SeaTunnelSink> sinkClass = factory.getSinkClass();
if (sinkClass.isAssignableFrom(SupportTableSaveMode.class)) {
OptionRule sinkCommonOptionRule =
OptionRule.builder().required(SinkCommonOptions.TABLE_SAVE_MODE).build();
sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
}
if (sinkClass.isAssignableFrom(SupportPathSaveMode.class)) {
OptionRule sinkCommonOptionRule =
OptionRule.builder().required(SinkCommonOptions.PATH_SAVE_MODE).build();
sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
}
return sinkOptionRule;
}
Because if (sinkClass.isAssignableFrom(SupportPathSaveMode.class))
need to know the Sink Class, So I add Class<? extends SeaTunnelSink> getSinkClass();
to TableSinkFactory
. Every Sink Connector need implement getSinkClass
method.
public interface TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Factory {
/**
* We will never use this method now. So gave a default implement and return null.
*
* @param context TableFactoryContext
* @return
*/
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createSink(TableFactoryContext context) {
throw new UnsupportedOperationException("unsupported now");
}
Class<? extends SeaTunnelSink> getSinkClass();
}
What stage should SaveMode be processed
I think void handleSaveMode(PathSaveMode pathSaveMode);
and void handleSaveMode(TableSaveMode tableSaveMode);
method should be call after prepare()
.
So I updated the starter code.
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins
@Override
protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> initializePlugins(List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
List<URL> pluginJars = new ArrayList<>();
List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
if (seaTunnelSink.getClass().isAssignableFrom(SupportTableSaveMode.class)) {
SupportTableSaveMode saveModeSink = (SupportTableSaveMode) seaTunnelSink;
TableSaveMode tableSaveMode = saveModeSink.checkOptions(sinkConfig);
saveModeSink.handleSaveMode(tableSaveMode);
}
if (seaTunnelSink.getClass().isAssignableFrom(SupportPathSaveMode.class)) {
SupportPathSaveMode saveModeSink = (SupportPathSaveMode) seaTunnelSink;
PathSaveMode pathSaveMode = saveModeSink.checkOptions(sinkConfig);
saveModeSink.handleSaveMode(pathSaveMode);
}
return seaTunnelSink;
}).distinct().collect(Collectors.toList());
jarPaths.addAll(pluginJars);
return sinks;
}
About automatic create table
I think if we support automatic create table, we can do it after handleSaveMode
in org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins
(or other starter).
After discuss in https://github.com/apache/incubator-seatunnel/issues/3851, I will update the design here.
Background
Currently, SeaTunnel's sink connector does not support the SaveMode function. So I create this issue and discuss how to add SaveMode feature to SeaTunnel.
Unified SaveMode Type
I have checked all Sink connectors. At present, SaveMode can be unified into four modes.
/**
* The SaveMode for the Sink connectors that use table or other table structures to organize data
*/
public enum DataSaveMode {
// Will drop table in MySQL, Will drop path for File Connector.
DROP_SCHEMA,
// Only drop the data in MySQL, Only drop the files in the path for File Connector.
KEEP_SCHEMA_DROP_DATA,
// Keep the table and data and continue to write data to the existing table for MySQL. Keep the path and files in the path, create new files in the path.
KEEP_SCHEMA_AND_DATA,
// Throw error when table is exists for MySQL. Throw error when path is exists.
ERROR_WHEN_EXISTS
}
Add interface for SaveMode
I will add tow interface for SaveMode.
SupportDataSaveMode
The Sink Connectors which support DataSaveMode should implement this interface
/**
* The Sink Connectors which support data SaveMode should implement this interface
*/
public interface SupportDataSaveMode {
/**
* We hope every sink connector use the same option name to config SaveMode, So I add checkOptions method to this interface.
* checkOptions method have a default implement to check whether `save_mode` parameter is in config.
*
* @param config config of Sink Connector
* @return TableSaveMode TableSaveMode
*/
default void checkOptions(Config config) {
if (config.hasPath(SinkCommonOptions.DATA_SAVE_MODE)) {
String tableSaveMode = config.getString(SinkCommonOptions.DATA_SAVE_MODE);
DataSaveMode dataSaveMode = DataSaveMode.valueOf(tableSaveMode.toUpperCase(Locale.ROOT));
if (!supportedDataSaveModeValues().contains(dataSaveMode)) {
throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"This connector don't support save mode: " + dataSaveMode);
}
} else {
throw new SeaTunnelRuntimeException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
SinkCommonOptions.DATA_SAVE_MODE + " must in config");
}
}
DataSaveMode getDataSaveModeUsed();
/**
* Return the DataSaveMode list supported by this connector
* @return
*/
List<DataSaveMode> supportedDataSaveModeValues();
void handleSaveMode(DataSaveMode tableSaveMode);
}
Add SaveMode Option to Sink Connector
In order to unify the parameter names of each Sink connector that supports SaveMode, I added a parameters about SaveMode.
public static final String DATA_SAVE_MODE = "save_mode"
Add a SingleChoiceOption
to let the connector set the DataSaveMode
list they supported. The method related to the construction of this Option will not be described in detail here.
public class SingleChoiceOption<T> extends Option{
@Getter
private final List<T> optionValues;
public SingleChoiceOption(String key,
TypeReference<T> typeReference,
List<T> optionValues,
T defaultValue) {
super(key, typeReference, defaultValue);
this.optionValues = optionValues;
}
}
Automatically add DataSaveMode Option to the OptionRule of each connector supported DataSaveMode
To do this, I add SupportDataSaveMode
and SupportDataSaveMode
interface check in FactoryUtil.sinkFullOptionRule
.
Please note that the createSink
method of the TableSinkFactory
interface is enabled here, So the connector which implement SupportDataSaveMode
must implement TableSinkFactory#createSink
.
/**
* This method is called by SeaTunnel Web to get the full option rule of a sink.
* @return
*/
public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
OptionRule sinkOptionRule = factory.optionRule();
if (sinkOptionRule == null) {
throw new FactoryException("sinkOptionRule can not be null");
}
try {
TableSink sink = factory.createSink(null);
if (SupportDataSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportDataSaveMode supportDataSaveModeSink = (SupportDataSaveMode) sink;
Option<DataSaveMode> saveMode =
Options.key(SinkCommonOptions.DATA_SAVE_MODE)
.singleChoice(DataSaveMode.class, supportDataSaveModeSink.supportedDataSaveModeValues())
.noDefaultValue()
.withDescription("data save mode");
OptionRule sinkCommonOptionRule =
OptionRule.builder().required(saveMode).build();
sinkOptionRule.getOptionalOptions().addAll(sinkCommonOptionRule.getOptionalOptions());
}
} catch (UnsupportedOperationException e) {
LOG.warn("Add save mode option need sink connector support create sink by TableSinkFactory");
}
return sinkOptionRule;
}
What stage should DataSaveMode be processed
We will call checkOptions
in SinkExecuteProcessor
to check the config.
So I updated the starter code.
org.apache.seatunnel.core.starter.flink.execution.SinkExecuteProcessor#initializePlugins
@Override
protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> initializePlugins(List<URL> jarPaths, List<? extends Config> pluginConfigs) {
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(addUrlToClassloader);
List<URL> pluginJars = new ArrayList<>();
List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> sinks = pluginConfigs.stream().map(sinkConfig -> {
PluginIdentifier pluginIdentifier = PluginIdentifier.of(ENGINE_TYPE, PLUGIN_TYPE, sinkConfig.getString(PLUGIN_NAME));
pluginJars.addAll(sinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink =
sinkPluginDiscovery.createPluginInstance(pluginIdentifier);
seaTunnelSink.prepare(sinkConfig);
seaTunnelSink.setJobContext(jobContext);
if (seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
saveModeSink.checkOptions(sinkConfig);
}
return seaTunnelSink;
}).distinct().collect(Collectors.toList());
jarPaths.addAll(pluginJars);
return sinks;
}
About handleSaveMode method and automatic create table
Actually, it contains the semantics of creating schema in the whole life circle of data save mode, we can do the logic of automatic create table when invoke handleSaveMode
.
But what the best time to invoke this logic? As we know, if we want to implement the logic of automatic create table we need know the acually schema of sink connector, so invoke it after SeaTunnelSink#setTypeInfo
is a best choice like the following shown:
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
if (seaTunnelSink.getClass().isAssignableFrom(SupportDataSaveMode.class)) {
SupportDataSaveMode saveModeSink = (SupportDataSaveMode) seaTunnelSink;
DataSaveMode dataSaveMode = saveModeSink.getDataSaveMode();
saveModeSink.handleSaveMode(dataSaveMode);
}
About how to getting or modify metadata
In api modules, it has the interface Catalog
, this interface is used to communicate with datasource in connectors. So start it in this feature is a best choice.
Connector can implement theirs own Catalog
and use it to do the semantics of schema management.
void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists);
related issue #3271
Does createTable
need use in handleSavePoint()
?
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.