spring-integration icon indicating copy to clipboard operation
spring-integration copied to clipboard

How to configure the XTRIM or XADD

Open zhuangzibin opened this issue 2 years ago • 3 comments

Hi, I am using spring-integration for redis stream, but the data will be kept in memory all the time, resulting in more and more memory, how I configure XTRIM or XADD? XADD mystream MAXLEN ~ 10 * value XTRIM mystream MAXLEN ~ 10

1. producerConfig.java

@Slf4j
@Configuration
public class ProducerConfig {

    @Bean
    public FluxMessageChannel fluxMessageChannel() {
        return new FluxMessageChannel();
    }

    /**
     * 生产者 
     * send message to redis stream
     *
     * @param reactiveRedisConnectionFactory reactiveRedisConnectionFactory
     * @return ReactiveRedisStreamMessageHandler
     */
    @Bean
    @ServiceActivator(inputChannel = "fluxMessageChannel", reactive = @Reactive)
    public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext.<String, Object>newSerializationContext()
                .key(new StringRedisSerializer())
                .value(new GenericJackson2JsonRedisSerializer())
                .hashKey(new StringRedisSerializer())
                .hashValue(new StringRedisSerializer())
                .build();
        ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
                new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey");
        reactiveStreamMessageHandler.setSerializationContext(serializationContext);
        reactiveStreamMessageHandler.setExtractPayload(true);
        return reactiveStreamMessageHandler;
    }
}

2. producer.java

@Component
@RequiredArgsConstructor
public class RedisStreamProducer {

    private final FluxMessageChannel fluxMessageChannel;

    public Mono<Boolean> send(BaseMessage downMessage) {
        var message = MessageBuilder.withPayload(downMessage).build();
        return Mono.fromCallable(() -> fluxMessageChannel.send(message));
    }
}

zhuangzibin avatar Apr 08 '23 03:04 zhuangzibin

The mentioned XTRIM command can be used via ReactiveStreamOperations:

	/**
	 * Trims the stream to {@code count} elements.
	 *
	 * @param key the stream key.
	 * @param count length of the stream.
	 * @return number of removed entries.
	 * @see <a href="https://redis.io/commands/xtrim">Redis Documentation: XTRIM</a>
	 */
	Mono<Long> trim(K key, long count);

The XADD is used from the ReactiveRedisStreamMessageHandler via ReactiveStreamOperations.add(), but looks like Spring Data for Redis does not expose a maxlen as a property on the Record abstraction.

So, you probably have to go via Lettuce API directly in your code to perform an XADD with a MAXLEN.

Please, consider to raise a GH issue in the https://github.com/spring-projects/spring-data-redis/issues to expose maxlen (and approximateTrimming) on the org.springframework.data.redis.connection.stream.Record abstraction. Then we can expose that option on that ReactiveRedisStreamMessageHandler to let you to manipulate stream trimming from Spring Integration level.

artembilan avatar Apr 10 '23 14:04 artembilan

Thank you for your answer. I have another question: the producerConfig.java configures the topic, i.e. the stream key name. what if I want to send to another topic? All I can think of at the moment is to configure another ReactiveRedisStreamMessageHandler and FluxMessageChannel @Bean?

Is there a similar method: send(String topic, Message message).

zhuangzibin avatar Apr 11 '23 02:04 zhuangzibin

I'm not sure what do you see wrong with that ReactiveRedisStreamMessageHandler and its streamKeyExpression property:

	/**
	 * Create an instance based on provided {@link ReactiveRedisConnectionFactory} and expression for stream key.
	 * @param connectionFactory the {@link ReactiveRedisConnectionFactory} to use
	 * @param streamKeyExpression the SpEL expression to evaluate a key for stream
	 */
	public ReactiveRedisStreamMessageHandler(ReactiveRedisConnectionFactory connectionFactory,
			Expression streamKeyExpression) {

So, let's imaging your message you send to the input channel of this channel adapter has a header like my_stream. You can configure that streamKeyExpression like this: new FunctionExpression<Message<?>>(message -> message.getHeaders().get("my_stream")).

That ReactiveRedisStreamMessageHandler has the further logic like this:

	protected Mono<Void> handleMessageInternal(Message<?> message) {
		return Mono
				.fromSupplier(() -> {
					String streamKey = this.streamKeyExpression.getValue(this.evaluationContext, message, String.class);
					Assert.notNull(streamKey, "'streamKey' must not be null");
					return streamKey;
				})
				.flatMap((streamKey) -> {
					Object value = message;
					if (this.extractPayload) {
						value = message.getPayload();
					}

					Record<String, ?> record =
							StreamRecords.objectBacked(value)
									.withStreamKey(streamKey);

					return this.reactiveStreamOperations.add(record);
				})
				.then();
	}

I'm not sure where have you found that send(String topic, Message message). But I believe it is not relevant.

Please, note that GH issues are really for issues and features requests. To ask questions it is better to go to StackOverflow or Discussions tab here in GitHub project.

artembilan avatar Apr 11 '23 13:04 artembilan