spring-cloud-stream icon indicating copy to clipboard operation
spring-cloud-stream copied to clipboard

PartitionAwareFunctionWrapper can't work in concurrency >1

Open nicolas-zheng opened this issue 3 years ago • 5 comments
trafficstars

Spring Boot 2.6.6 and Spring Cloud 2021.0.1

sending a partitioned message with a function input=message output=message

there are some messages miss the partition assignment as other threads may set the Enhancer to null. Thanks!

public Object apply(Object input) {
	this.setEnhancerIfNecessary();
	return this.function.apply(input);
	Object result = this.function.apply(input);
	if (!((FunctionInvocationWrapper) this.function).isInputTypePublisher()) {
		**((FunctionInvocationWrapper) this.function).setEnhancer(null);**
	}
	return result;
}

nicolas-zheng avatar May 30 '22 09:05 nicolas-zheng

Spring Boot 2.6.6 and Spring Cloud 2021.0.1 I meet the same issue related to PartitionAwareFunctionWrapper , when using streamBridge to send partition and non-partition message in multiple threads. Thread 1 try to send non-partition message Thread 2 try to send partition message Thread 2 set the enhancer to partitionAwareFunctionWrapper When Thread 1 try to execute the function, it checks the enhancer is not null and try get the partition key will get the error StreamBridge.send(...) image

SimpleFunctionRegistry.convertOutputIfNecessary(...) image

Error image

XIAOANAndy avatar Jun 06 '22 05:06 XIAOANAndy

it is fixed in spring cloud stream version 3.2.4 or <spring-cloud.version>2021.0.3</spring-cloud.version>

nicolas-zheng avatar Jun 07 '22 03:06 nicolas-zheng

Can you please provide a small sample that would reproduce the issue. Basically a small project that we can clone and test

olegz avatar Jun 09 '22 13:06 olegz

found this fixed in StreamBridge but didn't cover sending message by function.

https://github.com/spring-cloud/spring-cloud-stream/commit/f58dcf371db7cadb9ddf9de9ef018fa575ec10fd

nicolas-zheng avatar Jun 22 '22 05:06 nicolas-zheng

I have the same problem.

Here is a test project using function.

testconsumerconcurrency.zip

@olegz

Just execute

mvn spring-boot:run

Messages(MyEvent) with same partition key would not be in same partition of test-response queue.

Partition Size
0 59
1 73
2 809
3 59

My quick fix

public Object apply(Object input) {
    Object result;
    synchronized (this.function)
    {
        this.setEnhancerIfNecessary();
        result = this.function.apply(input);
        if (!((FunctionInvocationWrapper) this.function).isInputTypePublisher()) {
            ((FunctionInvocationWrapper) this.function).setEnhancer(null);
        }
    }
    return result;
}

pongpongcn avatar Sep 07 '22 10:09 pongpongcn

@pongpongcn Is this still an issue for you? There have been a lot of code changes in this area since the last time you updated on the issue. If it works, can we close the issue? If you are still facing the issue, please ping us here, and we will look at this soon.

sobychacko avatar Sep 13 '23 21:09 sobychacko

Closing the issue due to no activity. Feel free to re-open if there is a need.

sobychacko avatar Oct 19 '23 13:10 sobychacko