async-http-client icon indicating copy to clipboard operation
async-http-client copied to clipboard

Thread Safety issues when io threads set to a low number

Open ehiggs opened this issue 5 years ago • 7 comments

When using non-blocking IO it's often useful to have only a small number of threads handling the actual IO. Reduced resource use is one of the purposes, afterall. AIUI, AHC defaults to using 2*numCoresOnSystem for io threads, which is quite a lot.

Unfortunately when using a small number of threads for io, I run into problems. I've written a test with various thread setups:

/*
 * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
 *
 * This program is licensed to you under the Apache License Version 2.0,
 * and you may not use this file except in compliance with the Apache License Version 2.0.
 * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the Apache License Version 2.0 is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
 */
package org.asynchttpclient;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.asynchttpclient.testserver.HttpServer;
import org.asynchttpclient.testserver.HttpTest;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.asynchttpclient.test.TestUtils.TIMEOUT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

public class MultithreadedClientTest extends HttpTest {
  private static HttpServer server;


  @BeforeMethod
  public static void start() throws Throwable {
    server = new HttpServer();
    server.start();
  }

  @AfterMethod
  public static void stop() throws Throwable {
    server.close();
  }

  @Test
  public void testMultithreadedGet_1_1() throws Throwable {
    testMultithreadedGet_inner(1, 1);
  }

  @Test
  public void testMultithreadedGet_5_5() throws Throwable {
    testMultithreadedGet_inner(5, 5);
  }
  @Test
  public void testMultithreadedGet_20_20() throws Throwable {
    testMultithreadedGet_inner(20, 20);
  }

  // This fails.
  @Test
  public void testMultithreadedGet_1_20() throws Throwable {
    testMultithreadedGet_inner(1, 20);
  }

  // This fails.
  @Test
  public void testMultithreadedGet_2_20() throws Throwable {
    testMultithreadedGet_inner(2, 20);
  }

  private void testMultithreadedGet_inner(int ioThreads, int requestThreads) throws Throwable {
    final int contentLength = 1024 * 1024 ;
    final byte[] payload = new byte[contentLength];
    DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder();
    builder.setIoThreadsCount(ioThreads);
    withClient(builder).run(client ->
        withServer(server).run(server -> {
          String url = server.getHttpUrl();
          ExecutorService executorService = Executors.newFixedThreadPool(requestThreads);
          final int numParallelRequests = 100;
          List<Future<?>> responses = new ArrayList<>(numParallelRequests);
          for(int i = 0; i < numParallelRequests; i++ ){
            Future<?> requestResponse = executorService.submit(() -> {
              try {
                server.enqueueOk();
                Request request = client.preparePut(url)
                    .setHeader("Content-Length", String.valueOf(contentLength))
                    .setHeader("Content-Type", "application/octet-stream")
                    .setBody(ByteBuffer.wrap(payload))
                    .build();
                Response response = client.executeRequest(request)
                    //.toCompletableFuture()
                    .get(TIMEOUT, SECONDS);
                assertEquals(response.getUri().toUrl(), url);
              } catch (InterruptedException | ExecutionException | TimeoutException e) {
                fail("Failed: " + e);
              }
            });
            responses.add(requestResponse);
          }
          for (Future<?> response : responses) {
            response.get(TIMEOUT, SECONDS);
          }
          executorService.shutdown();
        }));
  }
}

When the number of threads making requests outnumbers the number of io threads given to AHC, I hit exceptions with Channel handling. e.g.:

java.io.IOException: Connection reset by peer
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

I have also seen the following in the wild (which is what I was trying to reproduce here):

java.lang.IllegalArgumentException: Duplicate handler name: request-body-streamer
        at io.netty.channel.DefaultChannelPipeline.checkDuplicateName(DefaultChannelPipeline.java:1101)
        at io.netty.channel.DefaultChannelPipeline.filterName(DefaultChannelPipeline.java:302)
        at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:210)
        at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:201)
        at org.asynchttpclient.netty.request.body.NettyReactiveStreamsBody.write(NettyReactiveStreamsBody.java:59)
        at org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:426)
        at org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithOpenChannel(NettyRequestSender.java:259)
        at org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithCertainForceConnect(NettyRequestSender.java:141)
        at org.asynchttpclient.netty.request.NettyRequestSender.sendRequest(NettyRequestSender.java:113)
        at org.asynchttpclient.DefaultAsyncHttpClient.execute(DefaultAsyncHttpClient.java:241)
        at org.asynchttpclient.DefaultAsyncHttpClient.executeRequest(DefaultAsyncHttpClient.java:210)
        at org.asynchttpclient.DefaultAsyncHttpClient.executeRequest(DefaultAsyncHttpClient.java:231)

Expected: The expectation is that AHC should be able to run on 1 io thread and still be able to handle multiple threads performing requests.

ehiggs avatar Mar 21 '19 17:03 ehiggs

Note: the 1MB payload size appears to be important. If the payload is smaller then everything seems to work. I guess this might be because of some internal buffer size in some Channel and when we go to 1MB, it spills into multiple buffers causing the bad Channel/thread interaction.

ehiggs avatar Mar 21 '19 17:03 ehiggs

Hi. Is anyone able to reproduce this?

ehiggs avatar Apr 23 '19 08:04 ehiggs

I can see this error the http4s-async-http-client binding under load. complete is not reliably called, which causes removeFromPipeline to be missed.

I made it run clean by removing the NAME_IN_CHANNEL_PIPELINE argument in NettyReactiveStreamsBody.write, and verified that the results are uncorrupted. But this leaves cruft in the pipeline, so it's not a good solution.

rossabaker avatar Aug 09 '19 13:08 rossabaker

Reading your issue again, the focus is on the Connection reset by peer. I've been chasing (and reliably reproducing) the request-body-streamer error. I'll open a new issue for that. Sorry for the pollution.

rossabaker avatar Aug 12 '19 02:08 rossabaker

Sorry for the pollution.

No bother at all. Glad that someone else is able to see this!

ehiggs avatar Aug 12 '19 08:08 ehiggs

I'm able to reproduce. I slightly changed your test:

  private void testMultithreadedGet_inner(int ioThreads, int requestThreads) throws Throwable {
    final int contentLength = 1024 * 1024;
    final byte[] payload = new byte[contentLength];
    withClient(config().setIoThreadsCount(ioThreads)).run(client ->
      withServer(server).run(server -> {
        String url = server.getHttpUrl();
        ExecutorService executorService = Executors.newFixedThreadPool(requestThreads);
        final int numParallelRequests = 100;
        CountDownLatch countDownLatch = new CountDownLatch(numParallelRequests);
        AtomicReference<Throwable> t = new AtomicReference<>();
        for (int i = 0; i < numParallelRequests; i++) {
          executorService.submit(() -> {
            server.enqueueOk();
            Request request = client.preparePut(url)
              .setHeader("Content-Length", String.valueOf(contentLength))
              .setHeader("Content-Type", "application/octet-stream")
              .setBody(ByteBuffer.wrap(payload))
              .build();
            client.executeRequest(request)
              .toCompletableFuture()
              .whenComplete((response, throwable) -> {
                if (throwable != null) {
                  throwable.printStackTrace();
                  t.set(throwable);
                }
                countDownLatch.countDown();
              });
          });
        }

        countDownLatch.await();
        executorService.shutdown();
        if (t.get() != null) {
          fail("Fail", t.get());
        }
      }));
  }

I sometimes get non sensical errors, both with Java 8 and Java 11:

Caused by: java.io.IOException: Protocol wrong type for socket
	at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:58)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:405)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:928)

No idea what to make of those. I suspect there's not much to be done in AHC.

slandelle avatar Aug 12 '19 14:08 slandelle

I sometimes get non sensical errors, both with Java 8 and Java 11:

That usually means there is a race condition.

ehiggs avatar Aug 21 '19 13:08 ehiggs