async-http-client
async-http-client copied to clipboard
Thread Safety issues when io threads set to a low number
When using non-blocking IO it's often useful to have only a small number of threads handling the actual IO. Reduced resource use is one of the purposes, afterall. AIUI, AHC defaults to using 2*numCoresOnSystem for io threads, which is quite a lot.
Unfortunately when using a small number of threads for io, I run into problems. I've written a test with various thread setups:
/*
* Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.asynchttpclient.testserver.HttpServer;
import org.asynchttpclient.testserver.HttpTest;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.asynchttpclient.test.TestUtils.TIMEOUT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
public class MultithreadedClientTest extends HttpTest {
private static HttpServer server;
@BeforeMethod
public static void start() throws Throwable {
server = new HttpServer();
server.start();
}
@AfterMethod
public static void stop() throws Throwable {
server.close();
}
@Test
public void testMultithreadedGet_1_1() throws Throwable {
testMultithreadedGet_inner(1, 1);
}
@Test
public void testMultithreadedGet_5_5() throws Throwable {
testMultithreadedGet_inner(5, 5);
}
@Test
public void testMultithreadedGet_20_20() throws Throwable {
testMultithreadedGet_inner(20, 20);
}
// This fails.
@Test
public void testMultithreadedGet_1_20() throws Throwable {
testMultithreadedGet_inner(1, 20);
}
// This fails.
@Test
public void testMultithreadedGet_2_20() throws Throwable {
testMultithreadedGet_inner(2, 20);
}
private void testMultithreadedGet_inner(int ioThreads, int requestThreads) throws Throwable {
final int contentLength = 1024 * 1024 ;
final byte[] payload = new byte[contentLength];
DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder();
builder.setIoThreadsCount(ioThreads);
withClient(builder).run(client ->
withServer(server).run(server -> {
String url = server.getHttpUrl();
ExecutorService executorService = Executors.newFixedThreadPool(requestThreads);
final int numParallelRequests = 100;
List<Future<?>> responses = new ArrayList<>(numParallelRequests);
for(int i = 0; i < numParallelRequests; i++ ){
Future<?> requestResponse = executorService.submit(() -> {
try {
server.enqueueOk();
Request request = client.preparePut(url)
.setHeader("Content-Length", String.valueOf(contentLength))
.setHeader("Content-Type", "application/octet-stream")
.setBody(ByteBuffer.wrap(payload))
.build();
Response response = client.executeRequest(request)
//.toCompletableFuture()
.get(TIMEOUT, SECONDS);
assertEquals(response.getUri().toUrl(), url);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail("Failed: " + e);
}
});
responses.add(requestResponse);
}
for (Future<?> response : responses) {
response.get(TIMEOUT, SECONDS);
}
executorService.shutdown();
}));
}
}
When the number of threads making requests outnumbers the number of io threads given to AHC, I hit exceptions with Channel handling. e.g.:
java.io.IOException: Connection reset by peer
at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
I have also seen the following in the wild (which is what I was trying to reproduce here):
java.lang.IllegalArgumentException: Duplicate handler name: request-body-streamer
at io.netty.channel.DefaultChannelPipeline.checkDuplicateName(DefaultChannelPipeline.java:1101)
at io.netty.channel.DefaultChannelPipeline.filterName(DefaultChannelPipeline.java:302)
at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:210)
at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:201)
at org.asynchttpclient.netty.request.body.NettyReactiveStreamsBody.write(NettyReactiveStreamsBody.java:59)
at org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:426)
at org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithOpenChannel(NettyRequestSender.java:259)
at org.asynchttpclient.netty.request.NettyRequestSender.sendRequestWithCertainForceConnect(NettyRequestSender.java:141)
at org.asynchttpclient.netty.request.NettyRequestSender.sendRequest(NettyRequestSender.java:113)
at org.asynchttpclient.DefaultAsyncHttpClient.execute(DefaultAsyncHttpClient.java:241)
at org.asynchttpclient.DefaultAsyncHttpClient.executeRequest(DefaultAsyncHttpClient.java:210)
at org.asynchttpclient.DefaultAsyncHttpClient.executeRequest(DefaultAsyncHttpClient.java:231)
Expected: The expectation is that AHC should be able to run on 1 io thread and still be able to handle multiple threads performing requests.
Note: the 1MB payload size appears to be important. If the payload is smaller then everything seems to work. I guess this might be because of some internal buffer size in some Channel and when we go to 1MB, it spills into multiple buffers causing the bad Channel/thread interaction.
Hi. Is anyone able to reproduce this?
I can see this error the http4s-async-http-client binding under load. complete is not reliably called, which causes removeFromPipeline to be missed.
I made it run clean by removing the NAME_IN_CHANNEL_PIPELINE argument in NettyReactiveStreamsBody.write, and verified that the results are uncorrupted. But this leaves cruft in the pipeline, so it's not a good solution.
Reading your issue again, the focus is on the Connection reset by peer. I've been chasing (and reliably reproducing) the request-body-streamer error. I'll open a new issue for that. Sorry for the pollution.
Sorry for the pollution.
No bother at all. Glad that someone else is able to see this!
I'm able to reproduce. I slightly changed your test:
private void testMultithreadedGet_inner(int ioThreads, int requestThreads) throws Throwable {
final int contentLength = 1024 * 1024;
final byte[] payload = new byte[contentLength];
withClient(config().setIoThreadsCount(ioThreads)).run(client ->
withServer(server).run(server -> {
String url = server.getHttpUrl();
ExecutorService executorService = Executors.newFixedThreadPool(requestThreads);
final int numParallelRequests = 100;
CountDownLatch countDownLatch = new CountDownLatch(numParallelRequests);
AtomicReference<Throwable> t = new AtomicReference<>();
for (int i = 0; i < numParallelRequests; i++) {
executorService.submit(() -> {
server.enqueueOk();
Request request = client.preparePut(url)
.setHeader("Content-Length", String.valueOf(contentLength))
.setHeader("Content-Type", "application/octet-stream")
.setBody(ByteBuffer.wrap(payload))
.build();
client.executeRequest(request)
.toCompletableFuture()
.whenComplete((response, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
t.set(throwable);
}
countDownLatch.countDown();
});
});
}
countDownLatch.await();
executorService.shutdown();
if (t.get() != null) {
fail("Fail", t.get());
}
}));
}
I sometimes get non sensical errors, both with Java 8 and Java 11:
Caused by: java.io.IOException: Protocol wrong type for socket
at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:58)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:466)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:405)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:928)
No idea what to make of those. I suspect there's not much to be done in AHC.
I sometimes get non sensical errors, both with Java 8 and Java 11:
That usually means there is a race condition.