metrics
metrics copied to clipboard
InstrumentedEE10Handler not recording metrics on Jetty 12 with Jersey CompletableFuture
Hi,
I upgraded from Jetty 11 to 12 and now I have to use InstrumentedEE10Handler
for recording request metrics. I'm running into a problem with this setup where the active request start is recorded, but the request finish is never and thus most metrics aren't being collected.
Could you please take a look at the below minimal reproducer to check if I'm doing something wrong? Please note that this used to work with Jetty 11 before with InstrumentedHttpChannelListener
, but that class is not available anymore for Jetty 12 and I have to use InstrumentedEE10Handler
which I cannot figure out how to configure properly.
Please consider this minimal reproducer with two classes, a Jersey resource and a JUnit 5 test class (I am using JDK21 locally on Linux - Fedora 38):
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import java.util.concurrent.CompletableFuture;
@Path("")
public class PingResource {
@GET
public CompletableFuture<String> ping() {
return CompletableFuture.completedFuture("pong");
}
}
import static com.codahale.metrics.MetricRegistry.name;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.dropwizard.metrics.jetty12.ee10.InstrumentedEE10Handler;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHandler;
import org.eclipse.jetty.server.Server;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.junit.jupiter.api.Test;
class Jetty12DropwizardTest {
@Test
void test() throws Exception {
// Jetty Server on random free port
var server = new Server(0);
// Setup Jersey with our simple async resource returning CompletableFuture
var jerseyConfig = new ResourceConfig()
.registerClasses(PingResource.class);
var jersey = new ServletContainer(jerseyConfig);
// Configure the handler
var contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/");
contextHandler.addServlet(jersey, "/*");
// Add handler instrumentation
var metrics = new MetricRegistry();
var instrumentedHandler = new InstrumentedEE10Handler(metrics);
contextHandler.insertHandler(instrumentedHandler);
// Tell the server to use our handler and start it
server.setHandler(contextHandler);
server.start();
try (var client = HttpClient.newHttpClient()) {
// Ping the server and wait for the response
var response = client.send(
HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
HttpResponse.BodyHandlers.ofString());
assertEquals(200, response.statusCode(), "response code");
assertEquals("pong", response.body(), "response body");
// Print metric counts
metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));
// No active requests after the request succeeded
var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
assertEquals(0, activeRequestsCounter.getCount(), "active requests");
// request recorded
var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
assertEquals(1, requestsTimer.getCount(), "requests");
// 200 response recorded
var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
assertEquals(1, response200Meter.getCount(), "2xx responses");
} finally {
server.stop();
}
}
String printCount(Metric metric) {
return switch (metric) {
case Counter c -> "count=" + c.getCount();
case Meter m -> "meter=" + m.getCount();
case Timer t -> "timer=" + t.getCount();
case Histogram h -> "histogram=" + h.getCount();
case Gauge<?> g -> "gauge=" + g.getValue();
default -> metric.toString();
};
}
}
I expect to see 1xx-responses
and all the other stats like requests
recorded just like with InstrumentedHttpChannelListener
on Jetty 11, but metrics are not being recorded except for an always increasing active-requests
metric, which should be 0 in case there are no active requests.
From a brief debugging session I can see that AbstractInstrumentedHandler#updateResponses
, the method responsible for collecting metrics after each request, is never called. In InstrumentedEE10Handler
there are two places that invoke this method. One is after a synchronous request, which is invoked on state.isInitial()
but this is not called, because the state is completed at that point. The another one is from the InstrumentedAsyncListener#onComplete
callback, which is again not invoked for some reason.
I'm not sure if I am missing something or the new Dropwizard instrumentation is just not covering this case of async processing with CompletableFutures returned from Jersey resources.
@zUniQueX Would you have a chance to look at this, please? 😃
@mihalyr Thanks for reporting this! I have a workaround for this problem. Fixing it properly will probably take some time.
As you've correctly pointed out, the finally
block would reset the state (e.g. active requests), if the request won't be handled by the current handler chain. The 'real' response update is performed by the asynchronous listener.
Prior to Jetty 12, Jersey has put a request into asynchronous processing e.g. in the JettyHttpContainer
. But the servlet classes aren't available in this container in 3.1.x (Jetty 12) anymore. So the logic for asynchronous processing was removed there. I'm not sure how the normal ServletContainer
handles this, but while debugging I haven't seen that the request was updated for asynchronous processing.
If the request isn't processed asynchronously, the async listeners won't be invoked by Jetty. So our InstrumentedAsyncListener
will not be able to set the metrics because it simply isn't called.
To work around this problem, one can manually set a request to asynchronous processing. This will allow the async listener to be called and the test works fine. Just add the following code to your definition of ServletContextHandler
:
contextHandler.addFilter((request, response, chain) -> {
AsyncContext asyncContext = request.startAsync();
chain.doFilter(request, response);
asyncContext.complete();
}, "/*", EnumSet.allOf(DispatcherType.class));
@mihalyr @joschi After further investigation I've pushed a first fix for this problem: https://github.com/zUniQueX/metrics/commit/4548985dfb26d24bcb3881abedd8fd858b336f2d.
I'm not happy with this yet and will do some additional tests over the next days. Especially the statement Callback.from(metricUpdater, callback)
isn't very good. This will prevent sending the response before the metrics are collected. When changing this to the better Callback.from(callback, metricUpdater)
the test case isn't stable because the metrics are just being collected when the response gets processed. Additionally, I'm not sure, if Jetty will resume suspended requests on handler level. This will make the async
flag redundant.
What do you think about the time of metrics collection? Should we focus on fast response processing or on accurate metrics?
Let me add a bit of more detail, which I think makes a difference here.
This problem seems to only affect the case when the Jersey resource returns an already completed future like in my example in the issue description:
@GET
public CompletableFuture<String> ping() {
return CompletableFuture.completedFuture("pong");
}
However, if I change this to a future that will complete with a delay - like in most real world scenarios, the metrics seem to be working:
@GET
public CompletableFuture<String> ping() {
var future = new CompletableFuture<String>();
CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS).execute(() -> future.complete("pong"));
return future;
}
I still don't think this is sufficient, because you can return a completed future from the method in case the async processing already completes or is not even necessary. The metric library should handle both cases, IMO.
To work around this problem, one can manually set a request to asynchronous processing.
@zUniQueX unfortunately the suggested workaround with the filter only works if the future is already completed, but breaks async processing in case the future will complete with a delay.
@zUniQueX I quickly tried the fix from your branch and it seems to be working in both cases - completed and delayed. However, I am not that familiar with the implementation in detail. I agree that running the metric updater callback after the request handling would be a better choice as you suggested. I have some concerns regarding the async
flag and race conditions, because once the request is being handled from one thread you check isSuspended
then set async.set(true)
and another concurrent thread might call the metric updater callback any time between to read the async
flag, which might happen before you set it to true
- is this a valid concern?
I'm afraid my concern was valid about the race condition, I slightly modified my test to be able to run repeatedly and used a resource method that completes the response future with 1ms delay, then ran it 100k times and got 32 failures due to not recorded metrics. This sounds small, but it depends on the test setup, the point is that there is a race condition which makes the metric collection a bit flaky.
@mihalyr After diving into the Jetty implementation the last few days, I'm now pretty sure we cannot provide accurate data for the activeSuspended
and the asyncDispatches
metrics.
As you've correctly pointed out, the async
flag might cause race conditions. Since processing of the request is asynchronous, the request can already be completed when the call to handle(Request, Response, Callback)
returns. So setting the flag in the finally
block may have no effect and updating activeSuspended
would be wrong. Additionally, a handler only gets called once. So the current check for the initial
processing can be removed from the AbstractInstrumentedHandler
and the else
block with the check for the suspension state is redundant.
The only way to get correct data would be to track the _state
and _requestState
variables of the ServletChannelState
. Those variables are updated without notifying any listener, hence we cannot track the asynchronous states.
Having that said, I'd like to completely remove the aforementioned metrics from the handler. When including the other proposed change with updating the other metrics after the Callback
completion, all metrics should contain accurate values.
@joschi When integrating this change, we wouldn't need any servlet related classes in the handler anymore and could remove the metrics-jetty12-ee10
module. That's a very intrusive change, but I also don't want to have a broken implementation as the last stable release. Since the InstrumentedEE10Handler
apparently hasn't worked correctly at any time, I'd propose a fix for the next patch release if you're fine with that.
@mihalyr I've pushed a fix for this one in #3928. If you have additional tests for this issue, I'd appreciate feedback on my current solution.
Hi @zUniQueX, sorry for the late response here, I got a bit busy with work and other things.
I ran my tests and it shows me 78 failed out of 100k runs (it takes about 2 and a half mins on my laptop), which is not that bad, but this is only a synthetic case, not sure how would it behave in the wild.
I was also thinking if I'm doing something wrong in my test case here. Could you please take a look?
import static com.codahale.metrics.MetricRegistry.name;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.dropwizard.metrics.jetty12.ee10.InstrumentedEE10Handler;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler;
import org.eclipse.jetty.ee10.servlet.ServletHandler;
import org.eclipse.jetty.server.Server;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
class Jetty12DropwizardTest {
static Server server;
static MetricRegistry metrics;
static HttpClient client;
static AtomicInteger counter = new AtomicInteger();
@BeforeAll
static void beforeAll() throws Exception {
// Jetty Server on random free port
server = new Server(0);
// Setup Jersey with our simple async resource returning CompletableFuture
var jerseyConfig = new ResourceConfig()
.registerClasses(PingResource.class);
var jersey = new ServletContainer(jerseyConfig);
// Configure the handler
var contextHandler = new ServletContextHandler();
contextHandler.setContextPath("/");
contextHandler.addServlet(jersey, "/*");
// Add handler instrumentation
metrics = new MetricRegistry();
var instrumentedHandler = new InstrumentedEE10Handler(metrics);
contextHandler.insertHandler(instrumentedHandler);
// Tell the server to use our handler and start it
server.setHandler(contextHandler);
server.start();
client = HttpClient.newHttpClient();
}
@AfterAll
static void afterAll() throws Exception {
client.close();
server.stop();
}
@RepeatedTest(100_000)
void test() throws Exception {
counter.incrementAndGet();
// Ping the server and wait for the response
var response = client.send(
HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
HttpResponse.BodyHandlers.ofString());
assertEquals(200, response.statusCode(), "response code");
assertEquals("pong", response.body(), "response body");
// Print metric counts
// metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));
// No active requests after the request succeeded
var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
assertEquals(0, activeRequestsCounter.getCount(), "active requests");
// request recorded
var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
assertEquals(counter.get(), requestsTimer.getCount(), "requests");
// 200 response recorded
var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
assertEquals(counter.get(), response200Meter.getCount(), "2xx responses");
}
String printCount(Metric metric) {
return switch (metric) {
case Counter c -> "count=" + c.getCount();
case Meter m -> "meter=" + m.getCount();
case Timer t -> "timer=" + t.getCount();
case Histogram h -> "histogram=" + h.getCount();
case Gauge<?> g -> "gauge=" + g.getValue();
default -> metric.toString();
};
}
}
And the resource is the same as before:
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Path("")
public class PingResource {
@GET
public CompletableFuture<String> ping() {
// return CompletableFuture.completedFuture("pong");
// return CompletableFuture.supplyAsync(() -> "pong");
var future = new CompletableFuture<String>();
CompletableFuture.delayedExecutor(1, TimeUnit.MILLISECONDS).execute(() -> future.complete("pong"));
return future;
}
}
One thing I can think of with regards to my test case is that if the metricUpdater runs on a different thread in the background, then it could be possible to return from a request even before the callback is finished. If this is the case, my tests are not testing the right thing.
I think, the problem was indeed with my test, it seems the metric updater callback now completes sometime after the request is processed, which can be after the client has received it. I tried different tests where I just submit everything and then check the totals at the end instead of each metric like this:
@Test
void testAggregate() throws Exception {
final int iterations = 100_000;
for (int i = 0; i < iterations; i++) {
// Ping the server and wait for the response
var response = client.send(
HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
HttpResponse.BodyHandlers.ofString());
assertEquals(200, response.statusCode(), "response code");
assertEquals("pong", response.body(), "response body");
}
// Give plenty of time for any background thread to finish
Thread.sleep(Duration.ofSeconds(5));
// Print metric counts
metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));
// No active requests after the request succeeded
var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
assertEquals(0, activeRequestsCounter.getCount(), "active requests");
// request recorded
var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
assertEquals(iterations, requestsTimer.getCount(), "requests");
// 200 response recorded
var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
assertEquals(iterations, response200Meter.getCount(), "2xx responses");
}
This is working fine. I was also curious about the concurrent case, so modified the test a little bit to this:
@Test
void testAggregateAsync() throws Exception {
final int iterations = 100_000;
final int concurrency = Runtime.getRuntime().availableProcessors();
var futures = new ArrayList<Future<?>>(iterations);
try (var executor = Executors.newFixedThreadPool(concurrency)) {
for (int i = 0; i < iterations; i++) {
final int iteration = i;
var future = executor.submit(() -> {
try {
var response = client.send(
HttpRequest.newBuilder().uri(server.getURI()).GET().build(),
HttpResponse.BodyHandlers.ofString());
assertEquals(200, response.statusCode(), "response code");
assertEquals("pong", response.body(), "response body");
} catch (InterruptedException e) {
System.err.println("Interrupted request " + iteration + ": " + e);
Thread.currentThread().interrupt();
} catch (IOException e) {
System.err.println("Failed request " + iteration + ": " + e);
throw new RuntimeException(e);
} finally {
if (iteration % 1_000 == 0) {
System.out.println("Completed " + iteration + " requests");
}
}
});
futures.add(future);
}
}
// Ensure responses are complete and successful
for (var future : futures) {
future.get();
}
System.out.println("All responses received successfully");
// Give plenty of time for any background thread to finish
Thread.sleep(Duration.ofSeconds(5));
// Print metric counts
metrics.getMetrics().forEach((name, metric) -> System.out.println(name + ": " + printCount(metric)));
// No active requests after the request succeeded
var activeRequestsCounter = metrics.counter(name(ServletHandler.class, "active-requests"));
assertEquals(0, activeRequestsCounter.getCount(), "active requests");
// request recorded
var requestsTimer = metrics.timer(name(ServletHandler.class, "requests"));
assertEquals(iterations, requestsTimer.getCount(), "requests");
// 200 response recorded
var response200Meter = metrics.meter(name(ServletHandler.class, "2xx-responses"));
assertEquals(iterations, response200Meter.getCount(), "2xx responses");
}
And this also completes successfully - took only 20 secs on my laptop.
@zUniQueX From my POV your patch works for the case I had problems with originally, thanks a lot for your efforts :+1: