Help debug LEAK: ByteBuf.release() in custom filter
Describe the bug I only add a custom filter about check request body and a custom exception handler
I also ask chatgpt for help,but it seems not wrong about my code.
please help me to find why leak memory
Java version: 21 Spring boot version: 3.4.0 Spring cloud version: 2024.0.1 JVM -Dio.netty.leakDetection.level=PARANOID
2025-05-14 12:07:58.969 ERROR , [ reactor-http-epoll-11] i.n.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:410) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179) io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116) org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:72) org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39) org.springframework.core.codec.CharSequenceEncoder.encodeValue(CharSequenceEncoder.java:90) org.springframework.core.codec.CharSequenceEncoder.lambda$encode$0(CharSequenceEncoder.java:74) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113) reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158) reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149) reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149) reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149) reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413) reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:455) reactor.netty.http.server.HttpServerOperations.handleLastHttpContent(HttpServerOperations.java:864) reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:786) reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:316) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:1583)
Sample My custom Filter is about check request Body. This is My code:
package igg.report.gateway.filter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.igg.boot.framework.exception.HttpSystemException;
import com.igg.boot.framework.exception.HttpSystemExceptionCode;
import igg.report.gateway.cache.AppInfo;
import igg.report.gateway.constant.EncryptEnum;
import igg.report.gateway.constant.GatewayConstant;
import igg.report.gateway.constant.MessageDigestProvider;
import igg.report.gateway.exception.GatewayException;
import igg.report.gateway.exception.GatewayExceptionCode;
import io.micrometer.tracing.Tracer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@Slf4j
public class ApiAuthFilter implements GlobalFilter, Ordered {
@Autowired
private ServerCodecConfigurer codecConfigurer;
@Autowired
private Tracer tracer;
private ObjectMapper objectMapper;
private final List<HttpMessageReader<?>> messageReaders;
private static final String APP_ID = "app_id";
private static final String SIGN = "sign";
private static final String TIMESTAMP = "timestamp";
private static final String SIGN_TYPE = "sign_type";
private static final String SOURCE = "source";
private static final String DATA = "data";
private static final String NONCE_STR = "nonce_str";
private static final String PROPERTY = "property";
private static final String EQUAL = "=";
private static final String[] NECESSARY_PARAM = { APP_ID, SIGN, TIMESTAMP, SIGN_TYPE, DATA, SOURCE, NONCE_STR };
public ApiAuthFilter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
this.messageReaders = HandlerStrategies.withDefaults().messageReaders();
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerRequest serverRequest = ServerRequest.create(exchange,
codecConfigurer.getReaders());
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
.flatMap(o ->{
checkParam(o);
return Mono.just(o);
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody,
String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
// if the body is changing content types, set it here, to the bodyInserter
// will know about it
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(
exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.then(Mono.defer(() -> {
ServerHttpRequest decorator = decorate(exchange, headers,
outputMessage);
return chain
.filter(exchange.mutate().request(decorator).build());
})).onErrorResume(throwable->
outputMessage.getBody()
.map(DataBufferUtils::release)
.then(Mono.error((Throwable) throwable)));
}
private void checkParam(String bodyStr) {
try {
TreeMap<String, Object> param = objectMapper.readValue(bodyStr, TreeMap.class);
checkNecessary(param);
} catch (IOException e) {
log.warn(e.getMessage(),e);
throw new GatewayException(GatewayExceptionCode.JSON_ERROR,e.getMessage());
}
}
private void checkNecessary(Map<String, Object> param) throws JsonProcessingException {
if (Integer.parseInt(param.get(SOURCE) + "") != GatewayConstant.SOURCE_WEB) {
for (String parm : NECESSARY_PARAM) {
if (!param.containsKey(parm)) {
log.error(objectMapper.writeValueAsString(param));
throw new HttpSystemException(HttpSystemExceptionCode.METHOD_PARAM_ERROR, parm + " 不能为空;");
}
}
}
if (Integer.parseInt(param.get(SOURCE) + "") == GatewayConstant.SOURCE_APP) {
if (!param.containsKey(PROPERTY)) {
log.error(objectMapper.writeValueAsString(param));
throw new HttpSystemException(HttpSystemExceptionCode.METHOD_PARAM_ERROR, PROPERTY + " 不能为空;");
}
checkData(param);
}
}
private void checkData(Map<String,Object> param){
Object data = param.get(DATA);
if(!(data instanceof ArrayList)){
log.info(param.toString());
throw new GatewayException(GatewayExceptionCode.DATA_EMPTY);
}else{
ArrayList<Object> d = (ArrayList)data;
d.forEach(o -> {
if(o instanceof String){
log.info(param.toString());
throw new GatewayException(GatewayExceptionCode.DATA_EMPTY);
}
});
}
}
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
}
else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
httpHeaders.add("IGG-PROXY-IP", getIpAddr(exchange.getRequest()));
httpHeaders.add("IGG-REQUEST-ID", tracer.currentSpan().context().traceId());
// httpHeaders.add("IGG-REQUEST-ID", MDC.get(LogbackFilter.REQUEST_ID));
List<String> list = new ArrayList<>(2);
list.add("application/json;charset=UTF-8");
httpHeaders.put("Content-Type", list);
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
private String getIpAddr(ServerHttpRequest request) {
String ip = request.getHeaders().getFirst("x-forwarded-for");
if (StringUtils.isEmpty(ip)) {
ip = request.getHeaders().getFirst("Proxy-Client-IP");
}
if (StringUtils.isEmpty(ip)) {
ip = request.getHeaders().getFirst("WL-Proxy-Client-IP");
}
if (StringUtils.isEmpty(ip)) {
ip = request.getHeaders().getFirst("HTTP_CLIENT_IP");
}
if (StringUtils.isEmpty(ip)) {
ip = request.getHeaders().getFirst("HTTP_X_FORWARDED_FOR");
}
if (StringUtils.isEmpty(ip)) {
ip = request.getRemoteAddress().getAddress().getHostAddress();
}
if (ip != null && ip.indexOf(",") != -1) {
String[] ipWithMultiProxy = ip.split(",");
for (int i = 0; i < ipWithMultiProxy.length; ++i) {
String eachIpSegement = ipWithMultiProxy[i];
if (!"unknown".equalsIgnoreCase(eachIpSegement)) {
ip = eachIpSegement;
break;
}
}
}
return ip;
}
@Override
public int getOrder() {
return RouteToRequestUrlFilter.ROUTE_TO_URL_FILTER_ORDER + 2;
}
}
my exception handler code:
package igg.report.gateway.exception;
import com.igg.boot.framework.exception.CommonExceptionCode;
import com.igg.boot.framework.exception.FrameworkSystemException;
import igg.report.gateway.model.CodeResponse;
import io.micrometer.tracing.Tracer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.web.ErrorProperties;
import org.springframework.boot.autoconfigure.web.WebProperties.Resources;
import org.springframework.boot.autoconfigure.web.reactive.error.DefaultErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.bind.support.WebExchangeBindException;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import java.util.Map;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Slf4j
public class IggExceptionHandler extends DefaultErrorWebExceptionHandler {
private static final String TEMPLATE_STR_CODE = "code";
private static final String TEMPLATE_STR_MESSAGE = "message";
private static final String TEMPLATE_STR_CODE_TPL = new StringBuilder(16).append("${").append(TEMPLATE_STR_CODE)
.append("}").toString();
private static final String TEMPLATE_STR_MESSAGE_TPL = new StringBuilder(16).append("${")
.append(TEMPLATE_STR_MESSAGE).append("}").toString();
private static final String TEMPLATE_STR = "<!DOCTYPE html><html><head><meta http-equiv='Content-Type' content='text/html; charset=utf-8'/>"
+ "<title>error message</title><style>body{margin:0;margin:auto;"
+ "width:50pc;color:#666;text-align:center;font-size:.875rem;font-family:Helvetica Neue,Helvetica,Arial,sans-serif}"
+ "h1{color:#456;font-weight:400;font-size:3.5rem;line-height:75pt}h2{color:#666;font-size:1.5rem;line-height:1.5em}"
+ "h3{color:#456;font-weight:400;font-size:1.25rem;line-height:1.75rem}"
+ "hr{margin:18px 0;border:0;border-top:1px solid #eee;border-bottom:1px solid " +
"#fff}</style></head><body><h1>"
+ TEMPLATE_STR_CODE_TPL + "</h1><h3>" + TEMPLATE_STR_MESSAGE_TPL + "</h3></body></html>";
private final ErrorProperties errorProperties;
private ApplicationContext applicationContext;
@Override
protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
return route(acceptsTextHtml(), this::renderErrorView).andRoute(RequestPredicates.all(),
this::renderErrorResponse);
}
public IggExceptionHandler(ErrorAttributes errorAttributes, Resources resources,
ErrorProperties errorProperties, ApplicationContext applicationContext) {
super(errorAttributes, resources, errorProperties,applicationContext);
this.errorProperties = errorProperties;
this.applicationContext = applicationContext;
}
/**
* Render the error information as an HTML view.
*
* @param request the current request
* @return a {@code Publisher} of the HTTP response
*/
protected Mono<ServerResponse> renderErrorView(ServerRequest request) {
ErrorAttributeOptions options = ErrorAttributeOptions.defaults();
Map<String, Object> error = getErrorAttributes(request, options);
int errorStatus = this.getHttpStatus(error);
ServerResponse.BodyBuilder responseBody = ServerResponse.status(errorStatus).contentType(MediaType.TEXT_HTML);
logError(request, errorStatus);
return renderDefaultErrorView(responseBody, error);
}
/**
* Render a default HTML "Whitelabel Error Page".
* <p>
* Useful when no other error view is available in the application.
*
* @param responseBody the error response being built
* @param error the error data as a map
* @return a Publisher of the {@link ServerResponse}
*/
protected Mono<ServerResponse> renderDefaultErrorView(ServerResponse.BodyBuilder responseBody,
Map<String, Object> error) {
String code = error.containsKey(TEMPLATE_STR_CODE) ? error.get(TEMPLATE_STR_CODE).toString()
: CommonExceptionCode.SYSTEM_ERROR.getCode() + "";
String message = error.containsKey(TEMPLATE_STR_CODE) ? error.get(TEMPLATE_STR_MESSAGE).toString()
: getCommonErrorMsg(error);
String content = TEMPLATE_STR.replace(TEMPLATE_STR_CODE_TPL, code).replace(TEMPLATE_STR_MESSAGE_TPL, message);
return responseBody.bodyValue(content);
}
private String getCommonErrorMsg(Map<String, ?> model) {
return CommonExceptionCode.SYSTEM_ERROR.getMessage() + ":status = " + model.get("status") + ",error = "
+ model.get("error") + ", message = " + model.get("message");
}
/**
* Render the error information as a JSON payload.
*
* @param request the current request
* @return a {@code Publisher} of the HTTP response
*/
protected Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
Throwable ex = getError(request);
CodeResponse codeResponse = new CodeResponse();
int errorStatus = 200;
if (ex != null && ex instanceof FrameworkSystemException) {
FrameworkSystemException e = (FrameworkSystemException) ex;
codeResponse.setCode(e.getExceptionCode().getCode());
codeResponse.setMsg(e.getExceptionCode().getMessage());
} else if(ex instanceof WebExchangeBindException){
WebExchangeBindException be = (WebExchangeBindException) ex;
codeResponse.setCode(CommonExceptionCode.SYSTEM_ERROR.getCode());
codeResponse.setMsg(be.getBindingResult().getFieldError().getDefaultMessage());
} else {
codeResponse.setCode(CommonExceptionCode.SYSTEM_ERROR.getCode());
codeResponse.setMsg(CommonExceptionCode.SYSTEM_ERROR.getMessage());
errorStatus = 500;
}
Tracer tracer = this.applicationContext.getBean(Tracer.class);
codeResponse.setRId(tracer.currentSpan().context().traceId());
logError(request,errorStatus);
return ServerResponse.status(errorStatus).contentType(MediaType.APPLICATION_JSON_UTF8)
.body(BodyInserters.fromValue(codeResponse));
}
/**
* Determine if the stacktrace attribute should be included.
*
* @param request the source request
* @param produces the media type produced (or {@code MediaType.ALL})
* @return if the stacktrace attribute should be included
*/
protected boolean isIncludeStackTrace(ServerRequest request, MediaType produces) {
switch(this.errorProperties.getIncludeStacktrace()) {
case ALWAYS:
return true;
case ON_PARAM:
return this.isTraceEnabled(request);
default:
return false;
}
}
/**
* Log the original exception if handling it results in a Server Error or a Bad
* Request (Client Error with 400 status code) one.
*
* @param request the source request
* @param httpStatus the HTTP error status
*/
protected void logError(ServerRequest request, int httpStatus) {
Throwable ex = getError(request);
if (httpStatus == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
log.error(ex.getMessage(), ex);
} else {
if (ex != null && ex instanceof FrameworkSystemException) {
FrameworkSystemException e = (FrameworkSystemException) ex;
log.warn(ex.getMessage(), ex);
}else{
log.warn(ex.getMessage(), ex);
}
}
}
/**
* Get the HTTP error status information from the error map.
*
* @param errorAttributes the current error information
* @return the error HTTP status
*/
protected int getHttpStatus(Map<String, Object> errorAttributes) {
return (Integer)errorAttributes.get("status");
}
}
I'm going to leave this open as a question as maybe the community can help.
@Maplejw, I believe you need to release CachedBodyOutputMessage in your' ServerHttpRequestDecorator's getBody () method as well.
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody()
.doFinally(s -> outputMessage.body.subscribe(DataBufferUtils::release));
}
If you are reading the body many times under the filter chain, you may also have to use DataBufferUtils.retain(DataBuffer) before finally in a map operator.
我和你一样,咋弄吗?
@Maplejw, I believe you need to release
CachedBodyOutputMessagein your' ServerHttpRequestDecorator's getBody () method as well.@Override public Flux<DataBuffer> getBody() { return outputMessage.getBody() .doFinally(s -> outputMessage.body.subscribe(DataBufferUtils::release)); } If you are reading the body many times under the filter chain, you may also have to use
DataBufferUtils.retain(DataBuffer)before finally in a map operator.
Thanks for you suggestion,I will try again