pulsar-flink icon indicating copy to clipboard operation
pulsar-flink copied to clipboard

[BUG] A topic created by the pulsar-flink connector is always created as a partitioned topic with 1 partition

Open mathieudruart opened this issue 4 years ago • 6 comments

Describe the bug Since #369 when a topic is created by the flink-connector at the start-up of the stream (if the topic doesn't already exist), the topic is always created as a partioned topic with 1 partition.

To Reproduce

  1. create a Flink Stream with a Pulsar source/sink linked to a non-existant topic
  2. start the stream
  3. the topic is created but always as a partioned topic with 1 partition

Expected behavior Ideally the topic created should respect Pulsar configuration for auto topic creation (as defined by the Pulsar parameters : allowAutoTopicCreationType and defaultNumPartitions). If it's not possible, the source/sink should be configurable to define the 2 points (partitioned or not and if partitioned, the default partitions number).

Additional context Tested with :

  • pulsar 2.8.0
  • pulsar-flink 1.13.1.2
  • flink 1.13.1

mathieudruart avatar Aug 23 '21 21:08 mathieudruart

@mathieudruart

Ideally the topic created should respect Pulsar configuration for auto topic creation (as defined by the Pulsar parameters : allowAutoTopicCreationType and defaultNumPartitions).

When the pulsar broker is configured to automatically create partitions, the flink-pulsar don't create topic with partition = 1.

If the switch is on(allowAutoTopicCreationType = partitioned/non-partitioned), the code will trigger pulsar to automatically create topic

admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;

However, flink needs to ensure that all configured topics exist, so it create the topic with the default partition of 1 when pulsar not config autoTopicCreate.

shibd avatar Sep 03 '21 08:09 shibd

@shibd

Even if allowAutoTopicCreation is set to true, this call will not trigger pulsar to automatically create the topic :

admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;

the admin API will only returns "Topic not exist" but without creating the topic.

You can easily reproduce it with a Pulsar deployed with the default configuration :

# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
allowAutoTopicCreation=true

# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

Try this with pulsar-admin :

pulsar-admin topics get-partitioned-topic-metadata persistent://public/default/test-topic-creation

it will only answer "Topic not exist", and no topic created.

So this code will always create a partioned topic with 1 partition if the topic doesn't exist (ignoring Pulsar broker configuration) :

    public Set<TopicRange> getTopicPartitionsAll() throws PulsarAdminException {
        List<TopicRange> topics = getTopics();
        HashSet<TopicRange> allTopics = new HashSet<>();
        for (TopicRange topic : topics) {
            int partNum = 1;
            try {
                partNum = admin.topics().getPartitionedTopicMetadata(topic.getTopic()).partitions;
            } catch (PulsarAdminException.NotFoundException e) {
                log.info("topic<{}> is not exit, auto create <{}> partition to <{}>", topic.getTopic(), partNum, topic.getTopic());
                try {
                    createTopic(topic.getTopic(), partNum);

mathieudruart avatar Sep 03 '21 21:09 mathieudruart

Sorry, I confirmed the implementation of pulsar, (getPartitionedTopicMetadata )This method does not read the broker configuration, We need pulsar support it..

The switch to automatically create topic is exposed in the pulsar interface, but admin api is not exposed. We can provide an issue to pulsar.

shibd avatar Sep 04 '21 03:09 shibd

@jianyun8023 @syhily Can you help me review the way to solve the problem?

shibd avatar Sep 04 '21 03:09 shibd

@shibd @jianyun8023 Since the MR for pulsar is too slow to be merged. I think we may need a workaround on the connector side.

syhily avatar Nov 04 '21 15:11 syhily

@shibd @jianyun8023 Since the MR for pulsar is too slow to be merged. I think we may need a workaround on the connector side.

I think connector need support configuring the default number of partitions.(interim measures)

shibd avatar Nov 05 '21 06:11 shibd