seatunnel
seatunnel copied to clipboard
[Feature][API] Let connectors return parameter list and parameter verification rules
Search before asking
- [X] I had searched in the feature and found no similar feature requirement.
Description
background
We plan to define seatunnel job in web. The web need to know the options and option rules about the Source/Transform/Sink plugin. So we need let the plugin return the options and rules.
Design
1. How to define a option
In the seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration
we already have Option
class to define a option.
2. How to define option rule
In the seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/util
we can see OptionRule
and Expression
. OptionRule
is used to define the basic rules and Expression
is used to define the complex conditional judgment.
3. How to let the web server to know the plugin options and option rules
seatunnel-plugin-discovery
is a module which is used to load the connectors from dir. Use AbstractPluginDiscovery
use can get all connectors in the plugin dir.
seatunnel-plugin-discovery
only dependency seatunnel-api
and seatunnel-common
, we can make the seatunnel web server dependency seatunnel-plugin-discovery
. Then seatunnel web server can get all connectors and connector options and option rules by AbstractPluginDiscovery
when it start and the storage them in memory.
4. What needs to be done now
I need add a method to PluginIdentifierInterface
. All connector need implement this interface and method.
OptionRule getOptionRule();
We already have 60+ connectors now. In order to modify these completed connectors step by step, I will provide a default implementation, which returns null.
default OptionRule getOptionRule() {
return null;
}
to do list
- [ ] Update the PluginIdentifierInterface and add
getOptionRule()
method. - [ ] Update all connectors and implement this method.
Usage Scenario
No response
Related issues
No response
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
default OptionRule getOptionRule()
should return list?
org.apache.seatunnel.api.table.factory.Factory
can be used directly, this can also support the options of catalog and format. (can see https://github.com/apache/incubator-seatunnel/issues/2490), I think this is necessary.
Discover the existing source, sink, and catalog
// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactories
public static <T extends Factory> List<T> discoverFactories(ClassLoader classLoader, Class<T> factoryClass) {
final List<Factory> factories = discoverFactories(classLoader);
return factories.stream()
.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
.map(f -> (T) f)
.collect(Collectors.toList());
}
Gets a list of options for a source
// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactory
// example for kafka source
OptionRule rule = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), TableSourceFactory.class, "kafka")
.optionRule();
Option rule example
@hk-lrzy The option rule contains a list of options
// option example
Option<String> TEST_TOPIC_PATTERN = Options.key("option.topic-pattern")
.stringType()
.noDefaultValue()
.withDescription("test string type");
// simple rule
OptionRule simpleRule = OptionRule.builder()
.optional(POLL_TIMEOUT, POLL_INTERVAL)
.required(CLIENT_SERVICE_URL)
.build();
// basic full rule
OptionRule fullRule = OptionRule.builder()
.optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
.required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
.exclusive(TOPIC_PATTERN, TOPIC)
.conditional(CURSOR_STARTUP_MODE, StartMode.TIMESTAMP, CURSOR_STARTUP_TIMESTAMP)
.build();
// complex conditional rule
// moot expression
Expression expression = Expression.of(TOPIC_DISCOVERY_INTERVAL, 200)
.and(Expression.of(Condition.of(CURSOR_STARTUP_MODE, StartMode.EARLIEST)
.or(CURSOR_STARTUP_MODE, StartMode.LATEST)))
.or(Expression.of(Condition.of(TOPIC_DISCOVERY_INTERVAL, 100)))
OptionRule complexRule = OptionRule.builder()
.optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE)
.required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL)
.exclusive(TOPIC_PATTERN, TOPIC)
.conditional(expression, CURSOR_RESET_MODE)
.build();
org.apache.seatunnel.api.table.factory.Factory
can be used directly, this can also support the options of catalog and format. (can see #2490), I think this is necessary.Discover the existing source, sink, and catalog
// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactories public static <T extends Factory> List<T> discoverFactories(ClassLoader classLoader, Class<T> factoryClass) { final List<Factory> factories = discoverFactories(classLoader); return factories.stream() .filter(f -> factoryClass.isAssignableFrom(f.getClass())) .map(f -> (T) f) .collect(Collectors.toList()); }
Gets a list of options for a source
// org.apache.seatunnel.api.table.factory.FactoryUtil#discoverFactory // example for kafka source OptionRule rule = FactoryUtil.discoverFactory(Thread.currentThread().getContextClassLoader(), TableSourceFactory.class, "kafka") .optionRule();
Option rule example
@hk-lrzy The option rule contains a list of options
// option example Option<String> TEST_TOPIC_PATTERN = Options.key("option.topic-pattern") .stringType() .noDefaultValue() .withDescription("test string type"); // simple rule OptionRule simpleRule = OptionRule.builder() .optional(POLL_TIMEOUT, POLL_INTERVAL) .required(CLIENT_SERVICE_URL) .build(); // basic full rule OptionRule fullRule = OptionRule.builder() .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE) .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL) .exclusive(TOPIC_PATTERN, TOPIC) .conditional(CURSOR_STARTUP_MODE, StartMode.TIMESTAMP, CURSOR_STARTUP_TIMESTAMP) .build(); // complex conditional rule // moot expression Expression expression = Expression.of(TOPIC_DISCOVERY_INTERVAL, 200) .and(Expression.of(Condition.of(CURSOR_STARTUP_MODE, StartMode.EARLIEST) .or(CURSOR_STARTUP_MODE, StartMode.LATEST))) .or(Expression.of(Condition.of(TOPIC_DISCOVERY_INTERVAL, 100))) OptionRule complexRule = OptionRule.builder() .optional(POLL_TIMEOUT, POLL_INTERVAL, CURSOR_STARTUP_MODE) .required(CLIENT_SERVICE_URL, ADMIN_SERVICE_URL) .exclusive(TOPIC_PATTERN, TOPIC) .conditional(expression, CURSOR_RESET_MODE) .build();
Yes, I see. I will use Factory
.
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.