seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Umbrella][Feature][Connector-V2] Support create table automaticly

Open TyrantLucifer opened this issue 2 years ago • 5 comments

Search before asking

  • [X] I had searched in the feature and found no similar feature requirement.

Description

Now, many sink connectors can not create target table if table not exists. This feature is very important for many users.

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

TyrantLucifer avatar Nov 02 '22 10:11 TyrantLucifer

You can add a method using catalog.

// org.apache.seatunnel.api.table.catalog.Catalog
public interface Catalog {
    void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists);
}

CatalogTable can represent partitioning keys, primary keys, comments, and more.

But the factory class is not enabled for the entire process: #2490

ashulin avatar Nov 08 '22 12:11 ashulin

To solve this problem, we should first list a default policy that How SeaTunnel handle with data type mapping between Source and Sink , SeaTunnel should allow users to customize them. Because sometimes there will be some datatype in Source Side But Sink side does not support 。 For example, the GEOM I met recently ,but not support IN HIVE

cason0126 avatar Nov 14 '22 17:11 cason0126

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Dec 15 '22 00:12 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Dec 23 '22 00:12 github-actions[bot]

Overview

As we know, the ability to automate table before writing out data is important to many users. Mapping of types is often required for automatic table creation, fortunately, this is one of our strengths, so I proposal this feature in community.

Design

In sink connector, The data type is obtained after the SeaTunnelSink#setTypeInfo, so we can add a new life cycle in SeaTunnelSink to do this thing.

    default void createTable(SeaTunnelRowType seaTunnelRowType) {
        // do nothing
    }

Override and implement this method yourself for different connectors and execute it in starter module:

    public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
        DataStream<Row> input = upstreamDataStreams.get(0);
        for (int i = 0; i < plugins.size(); i++) {
            Config sinkConfig = pluginConfigs.get(i);
            SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
            DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
            seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            seaTunnelSink.createTable((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
            DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
            if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
                int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
                dataStreamSink.setParallelism(parallelism);
            }
        }
        // the sink is the last stream
        return null;
    }

TyrantLucifer avatar Jan 03 '23 09:01 TyrantLucifer

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Feb 07 '23 00:02 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Feb 15 '23 00:02 github-actions[bot]