spring-cloud-gateway
spring-cloud-gateway copied to clipboard
How to modify request body with content type "multipart/form-data", and contains file part!
spring boot version: 2.7.2 spring cloud gateway version: 3.1.3
There is a api, just like below:
POST /api/saveTemplate
sign: xxxxxxx
timestamp: xxxxxx
name: test_template
type:1
template_file: test.xml #a file
At the gateway side, We will check sign and re-sign with another secret key to replace the old one. I write a filter to handle the logic like below.
Mono<MultiValueMap<String, Part>> modifiedBody = serverRequest.bodyToMono(
new ParameterizedTypeReference<MultiValueMap<String, Part>>() {
}).flatMap(originalBody -> {
// check sign and re-sign logic......
return Mono.just(originalBody);
});
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody,
new ParameterizedTypeReference<MultiValueMap<String, Part>>() {
});
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
ServerHttpRequest decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}));
But gateway get the error as below, There is no encoder for Part type.
org.springframework.core.codec.CodecException: No suitable writer found for part: file
at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.encodePart(MultipartHttpMessageWriter.java:260)
at org.springframework.http.codec.multipart.MultipartHttpMessageWriter.lambda$encodePartValues$4(MultipartHttpMessageWriter.java:213)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:376)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:258)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
at reactor.core.publisher.Flux.subscribe(Flux.java:8466)
at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122)
at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114)
at reactor.core.publisher.Mono.subscribe(Mono.java:4397)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:4397)
at reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:336)
at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:69)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:442)
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677)
at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:187)
at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:444)
at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:62)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:412)
at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:211)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:305)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:335)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:710)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
** I try to convert the Part to DataBuffer like below **
Mono<MultiValueMap<String, DataBuffer>> modifiedBody = serverRequest.bodyToMono(
new ParameterizedTypeReference<MultiValueMap<String, Part>>() {
}).flatMap(originalBody -> {
Map<String, List<DataBuffer>> modifiedMap = new HashMap<>();
for (String key : originalBody.keySet()) {
List<Part> parts = originalBody.get(key);
if (!modifiedMap.containsKey(key)) {
modifiedMap.put(key, new ArrayList<>(parts.size()));
}
for (Part part : parts) {
modifiedMap.get(key).add(DataBufferUtils.join(part.content()).block());
}
}
return Mono.just(new MultiValueMapAdapter<>(modifiedMap));
});
** After that, there is no error in gateway. but the backend server cannot get the 'template_file' field! I have no idea how to solution this problem! Is there have a good choice to do this modify?**
A same promble,how solve the trouble.Thank you~!
But gateway get the error as below, There is no encoder for Part type. org.springframework.core.codec.CodecException: No suitable writer found for part: file
Use BodyInserters.fromMultipartData
instead of BodyInserters.fromPublisher
If you do this, you need to make some additional hack modifications:
serverRequest.bodyToMono(new ParameterizedTypeReference<MultiValueMap<String, Part>>() {})
.map(multipartData -> {
//modify the request body like this
return BodyInserters.fromMultipartData(multipartData).with("age", 18);
})
and You need to implement a special CachedBodyOutputMessage
(for MultipartInserter
needs)
public interface FormInserter<T> extends BodyInserter<MultiValueMap<String, T>, ClientHttpRequest> {
public class CachedBodyOutputMessage implements ClientHttpRequest{
}
But gateway get the error as below, There is no encoder for Part type. org.springframework.core.codec.CodecException: No suitable writer found for part: file
Use
BodyInserters.fromMultipartData
instead ofBodyInserters.fromPublisher
If you do this, you need to make some additional hack modifications:
serverRequest.bodyToMono(new ParameterizedTypeReference<MultiValueMap<String, Part>>() {}) .map(multipartData -> { //modify the request body like this return BodyInserters.fromMultipartData(multipartData).with("age", 18); })
and You need to implement a special
CachedBodyOutputMessage
(forMultipartInserter
needs)public interface FormInserter extends BodyInserter<MultiValueMap<String, T>, ClientHttpRequest> {
public class CachedBodyOutputMessage implements ClientHttpRequest{ }
Do you have more detailed code? I have the same problem. Thank you.
@gengxiaoxiaoxin According to your tips, I have solved this problem. Thank you. This is my complete code.
public class FormDataFilter implements GatewayFilter {
private FormDataDecryptor formDataDecryptor;
public FormDataFilter() {
formDataDecryptor = new FormDataDecryptor();
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return exchange.getMultipartData().map(multiValueMap -> {
// Here we do data decryption and reconstruct the form data.
MultiValueMap<String, FormDataDecryptor.FormPart> decryptedMap = formDataDecryptor.decryptFromData(multiValueMap);
MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder();
for (Map.Entry<String, List<FormDataDecryptor.FormPart>> entry : decryptedMap.entrySet()){
for(FormDataDecryptor.FormPart formPart : entry.getValue()) {
MultipartBodyBuilder.PartBuilder partBuilder = multipartBodyBuilder.part(entry.getKey(), formPart.getContent());
formPart.getHeaders().entrySet()
.forEach(stringListEntry -> {
partBuilder.header(stringListEntry.getKey(), stringListEntry.getValue().toArray(new String[stringListEntry.getValue().size()]));
});
}
}
BodyInserter bodyInserter = BodyInserters.fromMultipartData(multipartBodyBuilder.build());
return bodyInserter;
})
.flatMap(bodyInserter -> {
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.log("modify_request", Level.INFO)
.then(Mono.defer(() -> {
ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
return chain.filter(exchange.mutate().request(decorator).build());
})).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange,
outputMessage, throwable));
});
}
protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage,
Throwable throwable) {
if (outputMessage.isCached()) {
return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
}
return Mono.error(throwable);
}
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
}
else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
static class CachedBodyOutputMessage implements ClientHttpRequest {
private ServerWebExchange serverWebExchange;
private ServerHttpRequest serverRequest;
private HttpHeaders httpHeaders;
private boolean cached = false;
//
private Flux<DataBuffer> body = Flux
.error(new IllegalStateException("The body is not set. " + "Did handling complete with success?"));
public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) {
this.serverWebExchange = exchange;
this.serverRequest = exchange.getRequest();
this.httpHeaders = httpHeaders;
}
@Override
public HttpMethod getMethod() {
return serverRequest.getMethod();
}
@Override
public URI getURI() {
return serverRequest.getURI();
}
@Override
public MultiValueMap<String, HttpCookie> getCookies() {
return serverRequest.getCookies();
}
@Override
public <T> T getNativeRequest() {
return (T) serverRequest;
}
@Override
public DataBufferFactory bufferFactory() {
return serverWebExchange.getResponse().bufferFactory();
}
@Override
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
}
@Override
public boolean isCommitted() {
return false;
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
this.body = Flux.from(body);
this.cached = true;
return Mono.empty();
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
}
@Override
public Mono<Void> setComplete() {
return writeWith(Flux.empty());
}
@Override
public HttpHeaders getHeaders() {
return this.httpHeaders;
}
boolean isCached() {
return this.cached;
}
/**
* Return the request body, or an error stream if the body was never set or when.
* @return body as {@link Flux}
*/
public Flux<DataBuffer> getBody() {
return this.body;
}
}
}
@gengxiaoxiaoxin According to your tips, I have solved this problem. Thank you. This is my complete code.
public class FormDataFilter implements GatewayFilter { private FormDataDecryptor formDataDecryptor; public FormDataFilter() { formDataDecryptor = new FormDataDecryptor(); } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return exchange.getMultipartData().map(multiValueMap -> { // Here we do data decryption and reconstruct the form data. MultiValueMap<String, FormDataDecryptor.FormPart> decryptedMap = formDataDecryptor.decryptFromData(multiValueMap); MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder(); for (Map.Entry<String, List<FormDataDecryptor.FormPart>> entry : decryptedMap.entrySet()){ for(FormDataDecryptor.FormPart formPart : entry.getValue()) { MultipartBodyBuilder.PartBuilder partBuilder = multipartBodyBuilder.part(entry.getKey(), formPart.getContent()); formPart.getHeaders().entrySet() .forEach(stringListEntry -> { partBuilder.header(stringListEntry.getKey(), stringListEntry.getValue().toArray(new String[stringListEntry.getValue().size()])); }); } } BodyInserter bodyInserter = BodyInserters.fromMultipartData(multipartBodyBuilder.build()); return bodyInserter; }) .flatMap(bodyInserter -> { HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); // the new content type will be computed by bodyInserter // and then set in the request decorator headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()) .log("modify_request", Level.INFO) .then(Mono.defer(() -> { ServerHttpRequest decorator = decorate(exchange, headers, outputMessage); return chain.filter(exchange.mutate().request(decorator).build()); })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange, outputMessage, throwable)); }); } protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage, Throwable throwable) { if (outputMessage.isCached()) { return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable)); } return Mono.error(throwable); } ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) { return new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(headers); if (contentLength > 0) { httpHeaders.setContentLength(contentLength); } else { // TODO: this causes a 'HTTP/1.1 411 Length Required' // on // httpbin.org httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); } return httpHeaders; } @Override public Flux<DataBuffer> getBody() { return outputMessage.getBody(); } }; } static class CachedBodyOutputMessage implements ClientHttpRequest { private ServerWebExchange serverWebExchange; private ServerHttpRequest serverRequest; private HttpHeaders httpHeaders; private boolean cached = false; // private Flux<DataBuffer> body = Flux .error(new IllegalStateException("The body is not set. " + "Did handling complete with success?")); public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) { this.serverWebExchange = exchange; this.serverRequest = exchange.getRequest(); this.httpHeaders = httpHeaders; } @Override public HttpMethod getMethod() { return serverRequest.getMethod(); } @Override public URI getURI() { return serverRequest.getURI(); } @Override public MultiValueMap<String, HttpCookie> getCookies() { return serverRequest.getCookies(); } @Override public <T> T getNativeRequest() { return (T) serverRequest; } @Override public DataBufferFactory bufferFactory() { return serverWebExchange.getResponse().bufferFactory(); } @Override public void beforeCommit(Supplier<? extends Mono<Void>> action) { } @Override public boolean isCommitted() { return false; } @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { this.body = Flux.from(body); this.cached = true; return Mono.empty(); } @Override public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { return writeWith(Flux.from(body).flatMap(p -> p)); } @Override public Mono<Void> setComplete() { return writeWith(Flux.empty()); } @Override public HttpHeaders getHeaders() { return this.httpHeaders; } boolean isCached() { return this.cached; } /** * Return the request body, or an error stream if the body was never set or when. * @return body as {@link Flux} */ public Flux<DataBuffer> getBody() { return this.body; } } }
Can You give me a simple sample github repo to implement this?
I created this https://github.com/hendisantika/spring-boot-logging-filter2
Thanks