spring-cloud-stream
spring-cloud-stream copied to clipboard
PartitionAwareFunctionWrapper can't work in concurrency >1
Spring Boot 2.6.6 and Spring Cloud 2021.0.1
sending a partitioned message with a function input=message output=message
there are some messages miss the partition assignment as other threads may set the Enhancer to null. Thanks!
public Object apply(Object input) {
this.setEnhancerIfNecessary();
return this.function.apply(input);
Object result = this.function.apply(input);
if (!((FunctionInvocationWrapper) this.function).isInputTypePublisher()) {
**((FunctionInvocationWrapper) this.function).setEnhancer(null);**
}
return result;
}
Spring Boot 2.6.6 and Spring Cloud 2021.0.1
I meet the same issue related to PartitionAwareFunctionWrapper , when using streamBridge to send partition and non-partition message in multiple threads.
Thread 1 try to send non-partition message
Thread 2 try to send partition message
Thread 2 set the enhancer to partitionAwareFunctionWrapper
When Thread 1 try to execute the function, it checks the enhancer is not null and try get the partition key will get the error
StreamBridge.send(...)

SimpleFunctionRegistry.convertOutputIfNecessary(...)

Error

it is fixed in spring cloud stream version 3.2.4 or <spring-cloud.version>2021.0.3</spring-cloud.version>
Can you please provide a small sample that would reproduce the issue. Basically a small project that we can clone and test
found this fixed in StreamBridge but didn't cover sending message by function.
https://github.com/spring-cloud/spring-cloud-stream/commit/f58dcf371db7cadb9ddf9de9ef018fa575ec10fd
I have the same problem.
Here is a test project using function.
@olegz
Just execute
mvn spring-boot:run
Messages(MyEvent) with same partition key would not be in same partition of test-response queue.
| Partition | Size |
|---|---|
| 0 | 59 |
| 1 | 73 |
| 2 | 809 |
| 3 | 59 |
My quick fix
public Object apply(Object input) {
Object result;
synchronized (this.function)
{
this.setEnhancerIfNecessary();
result = this.function.apply(input);
if (!((FunctionInvocationWrapper) this.function).isInputTypePublisher()) {
((FunctionInvocationWrapper) this.function).setEnhancer(null);
}
}
return result;
}
@pongpongcn Is this still an issue for you? There have been a lot of code changes in this area since the last time you updated on the issue. If it works, can we close the issue? If you are still facing the issue, please ping us here, and we will look at this soon.
Closing the issue due to no activity. Feel free to re-open if there is a need.