pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker] Fix ArrayIndexOutOfBoundsException due to race condition

Open leizhiyuan opened this issue 3 years ago • 14 comments

Motivation

WARN ] 2022-03-10 17:27:26.976 [pulsar-io-36-20] ServerCnx - [/127.0.0.1:55561] Got exception java.lang.ArrayIndexOutOfBoundsException: 1
	at java.util.concurrent.CopyOnWriteArrayList.get(CopyOnWriteArrayList.java:388)
	at java.util.concurrent.CopyOnWriteArrayList.get(CopyOnWriteArrayList.java:397)
	at org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers.getNextConsumer(AbstractDispatcherMultipleConsumers.java:121)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers.sendMessages(NonPersistentDispatcherMultipleConsumers.java:192)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$1(NonPersistentTopic.java:182)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
	at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:176)
	at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:210)
	at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:149)
	at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1321)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:232)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)

Modifications

assume consumerList is 1, if we use non persistent topic, and two producers publish messages, there will be two threads (pulsar-io)invoke this method, if one changed to 1, the other will faild.

currentConsumerRoundRobinIndex will be 1 and

        int currentRoundRobinConsumerPriority = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel();

will throw ArrayIndexOutOfBoundsException

reproduce:

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.pulsar.broker.service.nonpersistent;

import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class NonPersistentTopicCocurrentTest extends BrokerTestBase {

    @BeforeMethod(alwaysRun = true)
    @Override
    protected void setup() throws Exception {
        super.baseSetup();
    }

    @AfterMethod(alwaysRun = true)
    @Override
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @SneakyThrows
    @Test
    public void test() {
        final String topicName = "non-persistent://prop/ns-abc/topic" + UUID.randomUUID();
        final String subName = "non-persistent";
        PulsarClient pulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);
        @Cleanup
        Consumer<byte[]> consumer = pulsarClient1.newConsumer().topic(topicName)
                .subscriptionName(subName).subscriptionType(SubscriptionType.Shared).subscribe();
        PulsarClient pulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);
        @Cleanup
        Producer<byte[]> producer1 = pulsarClient2.newProducer().topic(topicName).create();

        PulsarClient pulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);
        @Cleanup
        Producer<byte[]> producer2 = pulsarClient3.newProducer().topic(topicName).create();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    producer1.sendAsync("msg".getBytes(StandardCharsets.UTF_8));
                }
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    producer2.sendAsync("msg".getBytes(StandardCharsets.UTF_8));
                }
            }
        });
        t1.start();
        t2.start();

        while (true) {
            Message<byte[]> message = consumer.receive(1, TimeUnit.SECONDS);
            if (message == null) {
                break;
            } else {
                System.out.println(message);
            }
        }
    }
}


run sometimes, you will see in console

image
  • [x] doc-not-needed

leizhiyuan avatar Mar 10 '22 11:03 leizhiyuan

@leizhiyuan:Thanks for your contribution. For this PR, do we need to update docs? (The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

github-actions[bot] avatar Mar 10 '22 11:03 github-actions[bot]

@leizhiyuan:Thanks for providing doc info!

github-actions[bot] avatar Mar 10 '22 11:03 github-actions[bot]

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar May 06 '22 02:05 github-actions[bot]

/pulsarbot run-failure-checks

wolfstudy avatar May 06 '22 02:05 wolfstudy

/pulsarbot run-failure-checks

leizhiyuan avatar May 06 '22 02:05 leizhiyuan

ping @eolivelli PTAL again, thanks

wolfstudy avatar May 06 '22 03:05 wolfstudy

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Jun 06 '22 02:06 github-actions[bot]

@eolivelli Please help review this PR again

codelipenghui avatar Jun 09 '22 04:06 codelipenghui

The pr had no activity for 30 days, mark with Stale label.

github-actions[bot] avatar Jul 10 '22 02:07 github-actions[bot]

LGTM, but i'm not sure if we add synchronized will affect the performance?

we have tested whith load test in our produciton env。

leizhiyuan avatar Jul 10 '22 03:07 leizhiyuan

@eolivelli PTAL. This PR is blocked for a long time.

Jason918 avatar Aug 27 '22 08:08 Jason918

@leizhiyuan hi, I move this PR to release/2.9.5, if you have any questions, please ping me. thanks.

congbobo184 avatar Nov 17 '22 12:11 congbobo184

/pulsarbot run-failure-checks

tisonkun avatar Dec 10 '22 10:12 tisonkun

As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label

michaeljmarshall avatar Jun 27 '23 21:06 michaeljmarshall

Since it takes a long time and the code differences are too large, I will close this PR.

leizhiyuan avatar Oct 15 '24 02:10 leizhiyuan