[BUG] A topic created by the pulsar-flink connector is always created as a partitioned topic with 1 partition
Describe the bug Since #369 when a topic is created by the flink-connector at the start-up of the stream (if the topic doesn't already exist), the topic is always created as a partioned topic with 1 partition.
To Reproduce
- create a Flink Stream with a Pulsar source/sink linked to a non-existant topic
- start the stream
- the topic is created but always as a partioned topic with 1 partition
Expected behavior Ideally the topic created should respect Pulsar configuration for auto topic creation (as defined by the Pulsar parameters : allowAutoTopicCreationType and defaultNumPartitions). If it's not possible, the source/sink should be configurable to define the 2 points (partitioned or not and if partitioned, the default partitions number).
Additional context Tested with :
- pulsar 2.8.0
- pulsar-flink 1.13.1.2
- flink 1.13.1
@mathieudruart
Ideally the topic created should respect Pulsar configuration for auto topic creation (as defined by the Pulsar parameters : allowAutoTopicCreationType and defaultNumPartitions).
When the pulsar broker is configured to automatically create partitions, the flink-pulsar don't create topic with partition = 1.
If the switch is on(allowAutoTopicCreationType = partitioned/non-partitioned), the code will trigger pulsar to automatically create topic
admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;
However, flink needs to ensure that all configured topics exist, so it create the topic with the default partition of 1 when pulsar not config autoTopicCreate.
@shibd
Even if allowAutoTopicCreation is set to true, this call will not trigger pulsar to automatically create the topic :
admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;
the admin API will only returns "Topic not exist" but without creating the topic.
You can easily reproduce it with a Pulsar deployed with the default configuration :
# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned
Try this with pulsar-admin :
pulsar-admin topics get-partitioned-topic-metadata persistent://public/default/test-topic-creation
it will only answer "Topic not exist", and no topic created.
So this code will always create a partioned topic with 1 partition if the topic doesn't exist (ignoring Pulsar broker configuration) :
public Set<TopicRange> getTopicPartitionsAll() throws PulsarAdminException {
List<TopicRange> topics = getTopics();
HashSet<TopicRange> allTopics = new HashSet<>();
for (TopicRange topic : topics) {
int partNum = 1;
try {
partNum = admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;
} catch (PulsarAdminException.NotFoundException e) {
log.info("topic<{}> is not exit, auto create <{}> partition to <{}>", topic.getTopic(), partNum, topic.getTopic());
try {
createTopic(topic.getTopic(), partNum);
Sorry, I confirmed the implementation of pulsar, (getPartitionedTopicMetadata )This method does not read the broker configuration, We need pulsar support it..
The switch to automatically create topic is exposed in the pulsar interface, but admin api is not exposed. We can provide an issue to pulsar.
@jianyun8023 @syhily Can you help me review the way to solve the problem?
@shibd @jianyun8023 Since the MR for pulsar is too slow to be merged. I think we may need a workaround on the connector side.
@shibd @jianyun8023 Since the MR for pulsar is too slow to be merged. I think we may need a workaround on the connector side.
I think connector need support configuring the default number of partitions.(interim measures)