spring-cloud-gateway icon indicating copy to clipboard operation
spring-cloud-gateway copied to clipboard

How to modify request body with content type "multipart/form-data", and contains file part!

Open xuyao178 opened this issue 2 years ago • 5 comments

spring boot version: 2.7.2 spring cloud gateway version: 3.1.3

There is a api, just like below:

POST /api/saveTemplate
sign: xxxxxxx
timestamp: xxxxxx
name: test_template
type:1
template_file: test.xml   #a file

At the gateway side, We will check sign and re-sign with another secret key to replace the old one. I write a filter to handle the logic like below.

Mono<MultiValueMap<String, Part>> modifiedBody = serverRequest.bodyToMono(
                new ParameterizedTypeReference<MultiValueMap<String, Part>>() {
                }).flatMap(originalBody -> {
            // check sign and re-sign logic......
            return Mono.just(originalBody);
        });
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody,
                new ParameterizedTypeReference<MultiValueMap<String, Part>>() {
                });
        HttpHeaders headers = new HttpHeaders();
        headers.putAll(exchange.getRequest().getHeaders());
        headers.remove(HttpHeaders.CONTENT_LENGTH);
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
        return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
            ServerHttpRequest decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
                @Override
                public HttpHeaders getHeaders() {
                    long contentLength = headers.getContentLength();
                    HttpHeaders httpHeaders = new HttpHeaders();
                    httpHeaders.putAll(headers);
                    if (contentLength > 0) {
                        httpHeaders.setContentLength(contentLength);
                    } else {
                        httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                    }
                    return httpHeaders;
                }

                @Override
                public Flux<DataBuffer> getBody() {
                    return outputMessage.getBody();
                }
            };
            return chain.filter(exchange.mutate().request(decorator).build());
        }));

But gateway get the error as below, There is no encoder for Part type.

org.springframework.core.codec.CodecException: No suitable writer found for part: file
	at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.encodePart(MultipartHttpMessageWriter.java:260)
	at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$encodePartValues$4(MultipartHttpMessageWriter.java:213)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:376)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
	at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
	at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
	at reactor.core.publisher.Operators.complete(Operators.java:137)
	at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122)
	at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4397)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4397)
	at reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:336)
	at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:69)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:442)
	at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:187)
	at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:444)
	at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:62)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:412)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
	at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:211)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
	at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:305)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:335)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:710)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

** I try to convert the Part to DataBuffer like below **

Mono<MultiValueMap<String, DataBuffer>> modifiedBody = serverRequest.bodyToMono(
                new ParameterizedTypeReference<MultiValueMap<String, Part>>() {
                }).flatMap(originalBody -> {
            Map<String, List<DataBuffer>> modifiedMap = new HashMap<>();
            for (String key : originalBody.keySet()) {
                List<Part> parts = originalBody.get(key);
                if (!modifiedMap.containsKey(key)) {
                    modifiedMap.put(key, new ArrayList<>(parts.size()));
                }
                for (Part part : parts) {
                    modifiedMap.get(key).add(DataBufferUtils.join(part.content()).block());
                }
            }
            return Mono.just(new MultiValueMapAdapter<>(modifiedMap));
        });

** After that, there is no error in gateway. but the backend server cannot get the 'template_file' field! I have no idea how to solution this problem! Is there have a good choice to do this modify?**

xuyao178 avatar Aug 08 '22 07:08 xuyao178

A same promble,how solve the trouble.Thank you~!

Layfolk-zcy avatar Sep 05 '22 06:09 Layfolk-zcy

But gateway get the error as below, There is no encoder for Part type. org.springframework.core.codec.CodecException: No suitable writer found for part: file

Use BodyInserters.fromMultipartData instead of BodyInserters.fromPublisher

If you do this, you need to make some additional hack modifications:

serverRequest.bodyToMono(new ParameterizedTypeReference<MultiValueMap<String, Part>>() {})
.map(multipartData -> {
        //modify the request body like this
	return BodyInserters.fromMultipartData(multipartData).with("age", 18);
})

and You need to implement a special CachedBodyOutputMessage (for MultipartInserter needs)

public interface FormInserter<T> extends BodyInserter<MultiValueMap<String, T>, ClientHttpRequest> {

public class CachedBodyOutputMessage implements ClientHttpRequest{
}

xinkeng0 avatar Sep 05 '22 11:09 xinkeng0

But gateway get the error as below, There is no encoder for Part type. org.springframework.core.codec.CodecException: No suitable writer found for part: file

Use BodyInserters.fromMultipartData instead of BodyInserters.fromPublisher

If you do this, you need to make some additional hack modifications:

serverRequest.bodyToMono(new ParameterizedTypeReference<MultiValueMap<String, Part>>() {})
.map(multipartData -> {
        //modify the request body like this
	return BodyInserters.fromMultipartData(multipartData).with("age", 18);
})

and You need to implement a special CachedBodyOutputMessage (for MultipartInserter needs)

public interface FormInserter extends BodyInserter<MultiValueMap<String, T>, ClientHttpRequest> {

public class CachedBodyOutputMessage implements ClientHttpRequest{
}

Do you have more detailed code? I have the same problem. Thank you.

qinchunabng avatar Aug 31 '23 07:08 qinchunabng

@gengxiaoxiaoxin According to your tips, I have solved this problem. Thank you. This is my complete code.

public class FormDataFilter implements GatewayFilter {

    private FormDataDecryptor formDataDecryptor;

    public FormDataFilter() {
        formDataDecryptor = new FormDataDecryptor();
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return exchange.getMultipartData().map(multiValueMap -> {
                    // Here we do data decryption and reconstruct the form data.
                    MultiValueMap<String, FormDataDecryptor.FormPart> decryptedMap = formDataDecryptor.decryptFromData(multiValueMap);
                    MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder();
                    for (Map.Entry<String, List<FormDataDecryptor.FormPart>> entry : decryptedMap.entrySet()){
                        for(FormDataDecryptor.FormPart formPart : entry.getValue()) {
                            MultipartBodyBuilder.PartBuilder partBuilder = multipartBodyBuilder.part(entry.getKey(), formPart.getContent());
                            formPart.getHeaders().entrySet()
                                    .forEach(stringListEntry -> {
                                        partBuilder.header(stringListEntry.getKey(), stringListEntry.getValue().toArray(new String[stringListEntry.getValue().size()]));
                                    });
                        }
                    }
                    BodyInserter bodyInserter = BodyInserters.fromMultipartData(multipartBodyBuilder.build());
                    return bodyInserter;
                })
                .flatMap(bodyInserter -> {
                    HttpHeaders headers = new HttpHeaders();
                    headers.putAll(exchange.getRequest().getHeaders());
                    // the new content type will be computed by bodyInserter
                    // and then set in the request decorator
                    headers.remove(HttpHeaders.CONTENT_LENGTH);

                    CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
                    return bodyInserter.insert(outputMessage, new BodyInserterContext())
                            .log("modify_request", Level.INFO)
                            .then(Mono.defer(() -> {
                                ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
                                return chain.filter(exchange.mutate().request(decorator).build());
                            })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange,
                                    outputMessage, throwable));

                });

    }



    protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage,
                                 Throwable throwable) {
        if (outputMessage.isCached()) {
            return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
        }
        return Mono.error(throwable);
    }

    ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
                                        CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                }
                else {
                    // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
                    // httpbin.org
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }

    static class CachedBodyOutputMessage implements ClientHttpRequest {

        private ServerWebExchange serverWebExchange;

        private ServerHttpRequest serverRequest;

        private HttpHeaders httpHeaders;

        private boolean cached = false;
        //
        private Flux<DataBuffer> body = Flux
                .error(new IllegalStateException("The body is not set. " + "Did handling complete with success?"));

        public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) {
            this.serverWebExchange = exchange;
            this.serverRequest = exchange.getRequest();
            this.httpHeaders = httpHeaders;
        }

        @Override
        public HttpMethod getMethod() {
            return serverRequest.getMethod();
        }

        @Override
        public URI getURI() {
            return serverRequest.getURI();
        }

        @Override
        public MultiValueMap<String, HttpCookie> getCookies() {
            return serverRequest.getCookies();
        }

        @Override
        public <T> T getNativeRequest() {
            return (T) serverRequest;
        }

        @Override
        public DataBufferFactory bufferFactory() {
            return serverWebExchange.getResponse().bufferFactory();
        }

        @Override
        public void beforeCommit(Supplier<? extends Mono<Void>> action) {

        }

        @Override
        public boolean isCommitted() {
            return false;
        }

        @Override
        public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
            this.body = Flux.from(body);
            this.cached = true;
            return Mono.empty();
        }

        @Override
        public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
            return writeWith(Flux.from(body).flatMap(p -> p));
        }

        @Override
        public Mono<Void> setComplete() {
            return writeWith(Flux.empty());
        }

        @Override
        public HttpHeaders getHeaders() {
            return this.httpHeaders;
        }

        boolean isCached() {
            return this.cached;
        }

        /**
         * Return the request body, or an error stream if the body was never set or when.
         * @return body as {@link Flux}
         */
        public Flux<DataBuffer> getBody() {
            return this.body;
        }

    }
}

qinchunabng avatar Sep 01 '23 08:09 qinchunabng

@gengxiaoxiaoxin According to your tips, I have solved this problem. Thank you. This is my complete code.

public class FormDataFilter implements GatewayFilter {

    private FormDataDecryptor formDataDecryptor;

    public FormDataFilter() {
        formDataDecryptor = new FormDataDecryptor();
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return exchange.getMultipartData().map(multiValueMap -> {
                    // Here we do data decryption and reconstruct the form data.
                    MultiValueMap<String, FormDataDecryptor.FormPart> decryptedMap = formDataDecryptor.decryptFromData(multiValueMap);
                    MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder();
                    for (Map.Entry<String, List<FormDataDecryptor.FormPart>> entry : decryptedMap.entrySet()){
                        for(FormDataDecryptor.FormPart formPart : entry.getValue()) {
                            MultipartBodyBuilder.PartBuilder partBuilder = multipartBodyBuilder.part(entry.getKey(), formPart.getContent());
                            formPart.getHeaders().entrySet()
                                    .forEach(stringListEntry -> {
                                        partBuilder.header(stringListEntry.getKey(), stringListEntry.getValue().toArray(new String[stringListEntry.getValue().size()]));
                                    });
                        }
                    }
                    BodyInserter bodyInserter = BodyInserters.fromMultipartData(multipartBodyBuilder.build());
                    return bodyInserter;
                })
                .flatMap(bodyInserter -> {
                    HttpHeaders headers = new HttpHeaders();
                    headers.putAll(exchange.getRequest().getHeaders());
                    // the new content type will be computed by bodyInserter
                    // and then set in the request decorator
                    headers.remove(HttpHeaders.CONTENT_LENGTH);

                    CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
                    return bodyInserter.insert(outputMessage, new BodyInserterContext())
                            .log("modify_request", Level.INFO)
                            .then(Mono.defer(() -> {
                                ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
                                return chain.filter(exchange.mutate().request(decorator).build());
                            })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange,
                                    outputMessage, throwable));

                });

    }



    protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage,
                                 Throwable throwable) {
        if (outputMessage.isCached()) {
            return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
        }
        return Mono.error(throwable);
    }

    ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
                                        CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                }
                else {
                    // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
                    // httpbin.org
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }

    static class CachedBodyOutputMessage implements ClientHttpRequest {

        private ServerWebExchange serverWebExchange;

        private ServerHttpRequest serverRequest;

        private HttpHeaders httpHeaders;

        private boolean cached = false;
        //
        private Flux<DataBuffer> body = Flux
                .error(new IllegalStateException("The body is not set. " + "Did handling complete with success?"));

        public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) {
            this.serverWebExchange = exchange;
            this.serverRequest = exchange.getRequest();
            this.httpHeaders = httpHeaders;
        }

        @Override
        public HttpMethod getMethod() {
            return serverRequest.getMethod();
        }

        @Override
        public URI getURI() {
            return serverRequest.getURI();
        }

        @Override
        public MultiValueMap<String, HttpCookie> getCookies() {
            return serverRequest.getCookies();
        }

        @Override
        public <T> T getNativeRequest() {
            return (T) serverRequest;
        }

        @Override
        public DataBufferFactory bufferFactory() {
            return serverWebExchange.getResponse().bufferFactory();
        }

        @Override
        public void beforeCommit(Supplier<? extends Mono<Void>> action) {

        }

        @Override
        public boolean isCommitted() {
            return false;
        }

        @Override
        public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
            this.body = Flux.from(body);
            this.cached = true;
            return Mono.empty();
        }

        @Override
        public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
            return writeWith(Flux.from(body).flatMap(p -> p));
        }

        @Override
        public Mono<Void> setComplete() {
            return writeWith(Flux.empty());
        }

        @Override
        public HttpHeaders getHeaders() {
            return this.httpHeaders;
        }

        boolean isCached() {
            return this.cached;
        }

        /**
         * Return the request body, or an error stream if the body was never set or when.
         * @return body as {@link Flux}
         */
        public Flux<DataBuffer> getBody() {
            return this.body;
        }

    }
}

Can You give me a simple sample github repo to implement this?

I created this https://github.com/hendisantika/spring-boot-logging-filter2

Thanks

hendisantika avatar Dec 29 '23 10:12 hendisantika