pulsar-flink
pulsar-flink copied to clipboard
[ENHANCEMENT] enhancement for setting up auth parameters
Motivations
When I tried to create FlinkPulsarSource in the following way,
props.setProperty(PulsarOptions.AUTH_PLUGIN_CLASSNAME_KEY, "org.apache.pulsar.client.impl.auth.AuthenticationToken");
props.setProperty(PulsarOptions.AUTH_PARAMS_KEY, "token:abcdefghijklmn");
FlinkPulsarSource(adminUrl, clientConf, pulsarDeserializationSchema, props);
I found my flink job was failed and I can see some warnning logs in my broker like:
Failed to authenticate HTTP request: Authentication required
The reason for this problem is that only setting auth params in Properties will not take effect in the final initialization of PulsarClient.
Modifications
Add some extra checking for this issue.
OK, LGTM. But this is only a hotfix we may perform a code cleanup and refactor these complex logic.