seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Feature][API] Let connectors return parameter list and parameter verification rules

Open EricJoy2048 opened this issue 2 years ago • 2 comments

Search before asking

  • [X] I had searched in the feature and found no similar feature requirement.

Description

background

We plan to define seatunnel job in web. The web need to know the options and option rules about the Source/Transform/Sink plugin. So we need let the plugin return the options and rules.

Design

1. How to define a option

In the seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration we already have Option class to define a option.

2. How to define option rule

In the seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util we can see OptionRule and Expression. OptionRule is used to define the basic rules and Expression is used to define the complex conditional judgment.

3. How to let the web server to know the plugin options and option rules

seatunnel-plugin-discovery is a module which is used to load the connectors from dir. Use AbstractPluginDiscovery use can get all connectors in the plugin dir.

seatunnel-plugin-discovery only dependency seatunnel-api and seatunnel-common, we can make the seatunnel web server dependency seatunnel-plugin-discovery. Then seatunnel web server can get all connectors and connector options and option rules by AbstractPluginDiscovery when it start and the storage them in memory.

4. What needs to be done now

I need add a method to PluginIdentifierInterface. All connector need implement this interface and method.


OptionRule getOptionRule();

We already have 60+ connectors now. In order to modify these completed connectors step by step, I will provide a default implementation, which returns null.


default OptionRule getOptionRule() {
    return null;
}

to do list

  • [ ] Update the PluginIdentifierInterface and add getOptionRule() method.
  • [ ] Update all connectors and implement this method.

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

EricJoy2048 avatar Nov 02 '22 11:11 EricJoy2048

default OptionRule getOptionRule() should return list?

hk-lrzy avatar Nov 02 '22 11:11 hk-lrzy

org.apache.seatunnel.api.table.factory.Factory can be used directly, this can also support the options of catalog and format. (can see https://github.com/apache/incubator-seatunnel/issues/2490), I think this is necessary.

Discover the existing source, sink, and catalog

// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactories
    public static <T extends Factory> List<T> discoverFactories(ClassLoader classLoader, Class<T> factoryClass) {
        final List<Factory> factories = discoverFactories(classLoader);
        return factories.stream()
                .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
                .map(f -> (T) f)
                .collect(Collectors.toList());
    }

Gets a list of options for a source

// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactory
// example for kafka source
OptionRule rule = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), TableSourceFactory.class, "kafka")
    .optionRule();

Option rule example

@hk-lrzy The option rule contains a list of options

 // option example
 Option<String> TEST_TOPIC_PATTERN = Options.key("option.topic-pattern")
        .stringType()
        .noDefaultValue()
        .withDescription("test string type");

 // simple rule
 OptionRule simpleRule = OptionRule.builder()
     .optional(POLL_TIMEOUT, POLL_INTERVAL)
     .required(CLIENT_SERVICE_URL)
     .build();
 
 // basic full rule
 OptionRule fullRule = OptionRule.builder()
     .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
     .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
     .exclusive(TOPIC_PATTERN, TOPIC)
     .conditional(CURSOR_STARTUP_MODE, StartMode.TIMESTAMP, CURSOR_STARTUP_TIMESTAMP)
     .build();
 
 // complex conditional rule
 // moot expression
 Expression expression = Expression.of(TOPIC_DISCOVERY_INTERVAL, 200)
     .and(Expression.of(Condition.of(CURSOR_STARTUP_MODE, StartMode.EARLIEST)
         .or(CURSOR_STARTUP_MODE, StartMode.LATEST)))
     .or(Expression.of(Condition.of(TOPIC_DISCOVERY_INTERVAL, 100)))
 
 OptionRule complexRule = OptionRule.builder()
     .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
     .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
     .exclusive(TOPIC_PATTERN, TOPIC)
     .conditional(expression, CURSOR_RESET_MODE)
     .build();

ashulin avatar Nov 02 '22 12:11 ashulin

org.apache.seatunnel.api.table.factory.Factory can be used directly, this can also support the options of catalog and format. (can see #2490), I think this is necessary.

Discover the existing source, sink, and catalog

// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactories
    public static <T extends Factory> List<T> discoverFactories(ClassLoader classLoader, Class<T> factoryClass) {
        final List<Factory> factories = discoverFactories(classLoader);
        return factories.stream()
                .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
                .map(f -> (T) f)
                .collect(Collectors.toList());
    }

Gets a list of options for a source

// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactory
// example for kafka source
OptionRule rule = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), TableSourceFactory.class, "kafka")
    .optionRule();

Option rule example

@hk-lrzy The option rule contains a list of options

 // option example
 Option<String> TEST_TOPIC_PATTERN = Options.key("option.topic-pattern")
        .stringType()
        .noDefaultValue()
        .withDescription("test string type");

 // simple rule
 OptionRule simpleRule = OptionRule.builder()
     .optional(POLL_TIMEOUT, POLL_INTERVAL)
     .required(CLIENT_SERVICE_URL)
     .build();
 
 // basic full rule
 OptionRule fullRule = OptionRule.builder()
     .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
     .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
     .exclusive(TOPIC_PATTERN, TOPIC)
     .conditional(CURSOR_STARTUP_MODE, StartMode.TIMESTAMP, CURSOR_STARTUP_TIMESTAMP)
     .build();
 
 // complex conditional rule
 // moot expression
 Expression expression = Expression.of(TOPIC_DISCOVERY_INTERVAL, 200)
     .and(Expression.of(Condition.of(CURSOR_STARTUP_MODE, StartMode.EARLIEST)
         .or(CURSOR_STARTUP_MODE, StartMode.LATEST)))
     .or(Expression.of(Condition.of(TOPIC_DISCOVERY_INTERVAL, 100)))
 
 OptionRule complexRule = OptionRule.builder()
     .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
     .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
     .exclusive(TOPIC_PATTERN, TOPIC)
     .conditional(expression, CURSOR_RESET_MODE)
     .build();

Yes, I see. I will use Factory .

EricJoy2048 avatar Nov 03 '22 01:11 EricJoy2048

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Dec 04 '22 00:12 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Dec 11 '22 00:12 github-actions[bot]