spring-cloud-gateway icon indicating copy to clipboard operation
spring-cloud-gateway copied to clipboard

Help debug LEAK: ByteBuf.release() in custom filter

Open Maplejw opened this issue 7 months ago • 4 comments

Describe the bug I only add a custom filter about check request body and a custom exception handler

I also ask chatgpt for help,but it seems not wrong about my code.

please help me to find why leak memory

Java version: 21 Spring boot version: 3.4.0 Spring cloud version: 2024.0.1 JVM -Dio.netty.leakDetection.level=PARANOID

2025-05-14 12:07:58.969 ERROR , [ reactor-http-epoll-11] i.n.util.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:410) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179) io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116) org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:72) org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39) org.springframework.core.codec.CharSequenceEncoder.encodeValue(CharSequenceEncoder.java:90) org.springframework.core.codec.CharSequenceEncoder.lambda$encode$0(CharSequenceEncoder.java:74) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113) reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158) reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149) reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149) reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275) reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149) reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413) reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:455) reactor.netty.http.server.HttpServerOperations.handleLastHttpContent(HttpServerOperations.java:864) reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:786) reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:316) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:1583)

Sample My custom Filter is about check request Body. This is My code:

package igg.report.gateway.filter;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.igg.boot.framework.exception.HttpSystemException;
import com.igg.boot.framework.exception.HttpSystemExceptionCode;
import igg.report.gateway.cache.AppInfo;
import igg.report.gateway.constant.EncryptEnum;
import igg.report.gateway.constant.GatewayConstant;
import igg.report.gateway.constant.MessageDigestProvider;
import igg.report.gateway.exception.GatewayException;
import igg.report.gateway.exception.GatewayExceptionCode;
import io.micrometer.tracing.Tracer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

@Slf4j
public class ApiAuthFilter implements GlobalFilter, Ordered {
    @Autowired
    private ServerCodecConfigurer codecConfigurer;
    @Autowired
    private Tracer tracer;
    private ObjectMapper objectMapper;
    private final List<HttpMessageReader<?>> messageReaders;
    private static final String APP_ID = "app_id";
    private static final String SIGN = "sign";
    private static final String TIMESTAMP = "timestamp";
    private static final String SIGN_TYPE = "sign_type";
    private static final String SOURCE = "source";
    private static final String DATA = "data";
    private static final String NONCE_STR = "nonce_str";
    private static final String PROPERTY = "property";
    private static final String EQUAL = "=";
    private static final String[] NECESSARY_PARAM = { APP_ID, SIGN, TIMESTAMP, SIGN_TYPE, DATA, SOURCE, NONCE_STR };

    public ApiAuthFilter(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.messageReaders = HandlerStrategies.withDefaults().messageReaders();
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerRequest serverRequest = ServerRequest.create(exchange,
                codecConfigurer.getReaders());
        Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
                .flatMap(o ->{
                    checkParam(o);
                    return Mono.just(o);
                });

        BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody,
                String.class);
        HttpHeaders headers = new HttpHeaders();
        headers.putAll(exchange.getRequest().getHeaders());
        // the new content type will be computed by bodyInserter
        // and then set in the request decorator
        headers.remove(HttpHeaders.CONTENT_LENGTH);
        // if the body is changing content types, set it here, to the bodyInserter
        // will know about it
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(
                exchange, headers);
        return bodyInserter.insert(outputMessage, new BodyInserterContext())
                .then(Mono.defer(() -> {
                    ServerHttpRequest decorator = decorate(exchange, headers,
                            outputMessage);
                    return chain
                            .filter(exchange.mutate().request(decorator).build());
                })).onErrorResume(throwable->
                        outputMessage.getBody()
                                .map(DataBufferUtils::release)
                                .then(Mono.error((Throwable) throwable)));

    }


    private void checkParam(String bodyStr) {
        try {
            TreeMap<String, Object> param = objectMapper.readValue(bodyStr, TreeMap.class);
            checkNecessary(param);

        } catch (IOException e) {
            log.warn(e.getMessage(),e);
            throw new GatewayException(GatewayExceptionCode.JSON_ERROR,e.getMessage());
        }
    }

    private void checkNecessary(Map<String, Object> param) throws JsonProcessingException {
        if (Integer.parseInt(param.get(SOURCE) + "") != GatewayConstant.SOURCE_WEB) {
            for (String parm : NECESSARY_PARAM) {
                if (!param.containsKey(parm)) {
                    log.error(objectMapper.writeValueAsString(param));
                    throw new HttpSystemException(HttpSystemExceptionCode.METHOD_PARAM_ERROR, parm + " 不能为空;");
                }
            }
        }
        if (Integer.parseInt(param.get(SOURCE) + "") == GatewayConstant.SOURCE_APP) {
            if (!param.containsKey(PROPERTY)) {
                log.error(objectMapper.writeValueAsString(param));
                throw new HttpSystemException(HttpSystemExceptionCode.METHOD_PARAM_ERROR, PROPERTY + " 不能为空;");
            }
            checkData(param);
        }
    }

    private void checkData(Map<String,Object> param){
        Object data = param.get(DATA);
        if(!(data instanceof ArrayList)){
            log.info(param.toString());
            throw new GatewayException(GatewayExceptionCode.DATA_EMPTY);
        }else{
            ArrayList<Object> d = (ArrayList)data;
            d.forEach(o -> {
                if(o instanceof String){
                    log.info(param.toString());
                    throw new GatewayException(GatewayExceptionCode.DATA_EMPTY);
                }
            });
        }
    }

    ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
                                        CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
             
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                }
                else {
                    // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
                    // httpbin.org
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                httpHeaders.add("IGG-PROXY-IP", getIpAddr(exchange.getRequest()));
                httpHeaders.add("IGG-REQUEST-ID", tracer.currentSpan().context().traceId());
//                httpHeaders.add("IGG-REQUEST-ID", MDC.get(LogbackFilter.REQUEST_ID));
                List<String> list = new ArrayList<>(2);
                list.add("application/json;charset=UTF-8");
                httpHeaders.put("Content-Type", list);
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }


    private String getIpAddr(ServerHttpRequest request) {
        String ip = request.getHeaders().getFirst("x-forwarded-for");
        if (StringUtils.isEmpty(ip)) {
            ip = request.getHeaders().getFirst("Proxy-Client-IP");
        }

        if (StringUtils.isEmpty(ip)) {
            ip = request.getHeaders().getFirst("WL-Proxy-Client-IP");
        }

        if (StringUtils.isEmpty(ip)) {
            ip = request.getHeaders().getFirst("HTTP_CLIENT_IP");
        }

        if (StringUtils.isEmpty(ip)) {
            ip = request.getHeaders().getFirst("HTTP_X_FORWARDED_FOR");
        }

        if (StringUtils.isEmpty(ip)) {
            ip = request.getRemoteAddress().getAddress().getHostAddress();
        }

        if (ip != null && ip.indexOf(",") != -1) {
            String[] ipWithMultiProxy = ip.split(",");
            for (int i = 0; i < ipWithMultiProxy.length; ++i) {
                String eachIpSegement = ipWithMultiProxy[i];
                if (!"unknown".equalsIgnoreCase(eachIpSegement)) {
                    ip = eachIpSegement;
                    break;
                }
            }
        }

        return ip;
    }

    @Override
    public int getOrder() {
        return RouteToRequestUrlFilter.ROUTE_TO_URL_FILTER_ORDER + 2;
    }
}

my exception handler code:

package igg.report.gateway.exception;

import com.igg.boot.framework.exception.CommonExceptionCode;
import com.igg.boot.framework.exception.FrameworkSystemException;
import igg.report.gateway.model.CodeResponse;
import io.micrometer.tracing.Tracer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.web.ErrorProperties;
import org.springframework.boot.autoconfigure.web.WebProperties.Resources;
import org.springframework.boot.autoconfigure.web.reactive.error.DefaultErrorWebExceptionHandler;
import org.springframework.boot.web.error.ErrorAttributeOptions;
import org.springframework.boot.web.reactive.error.ErrorAttributes;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.bind.support.WebExchangeBindException;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.util.Map;

import static org.springframework.web.reactive.function.server.RouterFunctions.route;


@Slf4j
public class IggExceptionHandler extends DefaultErrorWebExceptionHandler {
    private static final String TEMPLATE_STR_CODE = "code";
    private static final String TEMPLATE_STR_MESSAGE = "message";
    private static final String TEMPLATE_STR_CODE_TPL = new StringBuilder(16).append("${").append(TEMPLATE_STR_CODE)
            .append("}").toString();
    private static final String TEMPLATE_STR_MESSAGE_TPL = new StringBuilder(16).append("${")
            .append(TEMPLATE_STR_MESSAGE).append("}").toString();

    private static final String TEMPLATE_STR = "<!DOCTYPE html><html><head><meta http-equiv='Content-Type' content='text/html; charset=utf-8'/>"
            + "<title>error message</title><style>body{margin:0;margin:auto;"
            + "width:50pc;color:#666;text-align:center;font-size:.875rem;font-family:Helvetica Neue,Helvetica,Arial,sans-serif}"
            + "h1{color:#456;font-weight:400;font-size:3.5rem;line-height:75pt}h2{color:#666;font-size:1.5rem;line-height:1.5em}"
            + "h3{color:#456;font-weight:400;font-size:1.25rem;line-height:1.75rem}"
            + "hr{margin:18px 0;border:0;border-top:1px solid #eee;border-bottom:1px solid " +
            "#fff}</style></head><body><h1>"
            + TEMPLATE_STR_CODE_TPL + "</h1><h3>" + TEMPLATE_STR_MESSAGE_TPL + "</h3></body></html>";
    private final ErrorProperties errorProperties;
    private ApplicationContext applicationContext;

    @Override
    protected RouterFunction<ServerResponse> getRoutingFunction(ErrorAttributes errorAttributes) {
        return route(acceptsTextHtml(), this::renderErrorView).andRoute(RequestPredicates.all(),
                this::renderErrorResponse);
    }


    public IggExceptionHandler(ErrorAttributes errorAttributes, Resources resources,
            ErrorProperties errorProperties, ApplicationContext applicationContext) {
        super(errorAttributes, resources, errorProperties,applicationContext);
        this.errorProperties = errorProperties;
        this.applicationContext = applicationContext;
    }

    /**
     * Render the error information as an HTML view.
     *
     * @param request the current request
     * @return a {@code Publisher} of the HTTP response
     */
    protected Mono<ServerResponse> renderErrorView(ServerRequest request) {
        ErrorAttributeOptions options = ErrorAttributeOptions.defaults();
        Map<String, Object> error = getErrorAttributes(request, options);
        int errorStatus = this.getHttpStatus(error);
        ServerResponse.BodyBuilder responseBody = ServerResponse.status(errorStatus).contentType(MediaType.TEXT_HTML);
        logError(request, errorStatus);
        return renderDefaultErrorView(responseBody, error);
    }

    /**
     * Render a default HTML "Whitelabel Error Page".
     * <p>
     * Useful when no other error view is available in the application.
     * 
     * @param responseBody the error response being built
     * @param error        the error data as a map
     * @return a Publisher of the {@link ServerResponse}
     */
    protected Mono<ServerResponse> renderDefaultErrorView(ServerResponse.BodyBuilder responseBody,
            Map<String, Object> error) {
        String code = error.containsKey(TEMPLATE_STR_CODE) ? error.get(TEMPLATE_STR_CODE).toString()
                : CommonExceptionCode.SYSTEM_ERROR.getCode() + "";
        String message = error.containsKey(TEMPLATE_STR_CODE) ? error.get(TEMPLATE_STR_MESSAGE).toString()
                : getCommonErrorMsg(error);
        String content = TEMPLATE_STR.replace(TEMPLATE_STR_CODE_TPL, code).replace(TEMPLATE_STR_MESSAGE_TPL, message);

        return responseBody.bodyValue(content);
    }

    private String getCommonErrorMsg(Map<String, ?> model) {
        return CommonExceptionCode.SYSTEM_ERROR.getMessage() + ":status = " + model.get("status") + ",error = "
                + model.get("error") + ", message = " + model.get("message");
    }

    /**
     * Render the error information as a JSON payload.
     * 
     * @param request the current request
     * @return a {@code Publisher} of the HTTP response
     */
    protected Mono<ServerResponse> renderErrorResponse(ServerRequest request) {
        Throwable ex = getError(request);
        CodeResponse codeResponse = new CodeResponse();
        int errorStatus = 200;
        if (ex != null && ex instanceof FrameworkSystemException) {
            FrameworkSystemException e = (FrameworkSystemException) ex;
            codeResponse.setCode(e.getExceptionCode().getCode());
            codeResponse.setMsg(e.getExceptionCode().getMessage());
        } else if(ex instanceof WebExchangeBindException){
            WebExchangeBindException be = (WebExchangeBindException) ex;
            codeResponse.setCode(CommonExceptionCode.SYSTEM_ERROR.getCode());
            codeResponse.setMsg(be.getBindingResult().getFieldError().getDefaultMessage());
        } else {
            codeResponse.setCode(CommonExceptionCode.SYSTEM_ERROR.getCode());
            codeResponse.setMsg(CommonExceptionCode.SYSTEM_ERROR.getMessage());
            errorStatus = 500;
        }
        Tracer tracer = this.applicationContext.getBean(Tracer.class);
        codeResponse.setRId(tracer.currentSpan().context().traceId());
        logError(request,errorStatus);
        return ServerResponse.status(errorStatus).contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromValue(codeResponse));
    }

    /**
     * Determine if the stacktrace attribute should be included.
     * 
     * @param request  the source request
     * @param produces the media type produced (or {@code MediaType.ALL})
     * @return if the stacktrace attribute should be included
     */
    protected boolean isIncludeStackTrace(ServerRequest request, MediaType produces) {
        switch(this.errorProperties.getIncludeStacktrace()) {
            case ALWAYS:
                return true;
            case ON_PARAM:
                return this.isTraceEnabled(request);
            default:
                return false;
        }
    }

    /**
     * Log the original exception if handling it results in a Server Error or a Bad
     * Request (Client Error with 400 status code) one.
     * 
     * @param request     the source request
     * @param httpStatus the HTTP error status
     */
    protected void logError(ServerRequest request, int httpStatus) {
        Throwable ex = getError(request);
        if (httpStatus == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
            log.error(ex.getMessage(), ex);
        } else {
            if (ex != null && ex instanceof FrameworkSystemException) {
                FrameworkSystemException e = (FrameworkSystemException) ex;
                log.warn(ex.getMessage(), ex);
            }else{
                log.warn(ex.getMessage(), ex);
            }
        }
    }

    /**
     * Get the HTTP error status information from the error map.
     * 
     * @param errorAttributes the current error information
     * @return the error HTTP status
     */
    protected int getHttpStatus(Map<String, Object> errorAttributes) {
        return (Integer)errorAttributes.get("status");
    }

}

Maplejw avatar May 15 '25 07:05 Maplejw

I'm going to leave this open as a question as maybe the community can help.

spencergibb avatar May 15 '25 13:05 spencergibb

@Maplejw, I believe you need to release CachedBodyOutputMessage in your' ServerHttpRequestDecorator's getBody () method as well.

@Override
public Flux<DataBuffer> getBody() {
    return outputMessage.getBody()
        .doFinally(s -> outputMessage.body.subscribe(DataBufferUtils::release));
}

If you are reading the body many times under the filter chain, you may also have to use DataBufferUtils.retain(DataBuffer) before finally in a map operator.

SinhaAmit avatar Jun 06 '25 10:06 SinhaAmit

我和你一样,咋弄吗?

z6165039 avatar Sep 24 '25 05:09 z6165039

@Maplejw, I believe you need to release CachedBodyOutputMessage in your' ServerHttpRequestDecorator's getBody () method as well.

@Override public Flux<DataBuffer> getBody() { return outputMessage.getBody() .doFinally(s -> outputMessage.body.subscribe(DataBufferUtils::release)); } If you are reading the body many times under the filter chain, you may also have to use DataBufferUtils.retain(DataBuffer) before finally in a map operator.

Thanks for you suggestion,I will try again

Maplejw avatar Sep 28 '25 05:09 Maplejw