spring-integration
spring-integration copied to clipboard
Provide reaper for (JDBC) channel message store
There is currently a reaper for a message group store, but not for a channel message store.
At least for a jdbc channel message store the same message removal would be supportable and I wanted to suggest to include its support in the current reaper implementation.
I can offer to implement and contribute this feature if you'd consider it.
Would be great if you elaborate more what exactly such a functionality has to do.
The reaper really cleans up the group store for those group of messages which have not been released normally by a correlation endpoint.
The ChannelMessageStore implements a queue pattern, where every message is independent as it would be in JMS queue or Kafka partition.
Therefore reaping the whole group on this channel is like purging the content of the queue in the end. A global expiration for the whole queue just does not make sense in this case.
In a modular application where many modules use Spring Integration but with their own groups, the messages of an abandoned module (version) will stay forever. It would be nice if there was a convenient way to clean those up, especially if the nature of the application determines that messages of a certain age will never be of interest. For this use case, it would be nice to have a convenient way to run this. Currently, I use some SQL to do it, but in a way I regard the SQL schema as an implementation detail, so I dislike relying on it.
Well, still doesn’t sound as reasonable. This store type is exactly for the QueueChannel - the group is a queue,- which may have a consumer or not. That’s exactly the point of persistent queues: store events for future consumers and in case if failure.
What you are saying is still looks like some custom artificial requirement which point to some flaw in the design, so you have some abandoned data somehow.
There is a removeMessageGroup() on this store for your consideration, but I would like to stay from some reaper out-of-the-box solution since there is no logical connections between messages in that queue.
I do not find it so strange, most dedicated message queues offer some form of TTL for messages. And it's not so strange to retire modules of applications that might have sent messages that are never consumed, be it because of a bug or something else.
The purpose of an API like this is to offer some means for cleaning out a database which will otherwise store data for an undefinite amount of time. With GDPR, worst case storing this data accidentally might even get you in trouble, but I can of course just stick to my custom SQL.
So, you talk now about "time to live per message", which really has nothing to do with the message group expiration as a part of MessageGroupStoreReaper logic.
That's exactly my point: removal per message from the queue, even if it is some kind of automatic feature based on the configured TTL is not exactly the same as removal for the whole group.
Although it sounds more like you definitely would like to remove the whole group from that JdbcChannelMessageStore.
That's why I suggest to look into that removeMessageGroup() API.
We have an issue like this https://github.com/spring-projects/spring-integration/issues/3429, which may improve your experience for the mentioned TTL (if that the case). And then we probably can take this issue into account and implement TTL feature on this store. Many messaging middwares have a TTL (or expiration) feature on their destinations.
But that's only if we are on the same page and you don't mix TTL per message with the whole group expiration.
For my use case, a group is "orphaned" when its no longer read from or written to, but a TTL per message would have the same effect, so I'd be just as happy with that. I would use removeMessageGroup, but the JDBC channel message store does not offer a way to iterate over those that exist. If there was a way to iterate over those, as with JdbcMessageStore, this would solve my problem, too.
Good. Together with messages streaming for a group in this store abstraction I will look into an iterator implementation similar to the AbstractMessageGroupStore.
Remember: this ChannelMessageStore is not only for JDBC, so we nee to think twice what an API to expose for every single store impl.
By the way, the QueueChannel (where you use that JdbcChannelMessageStore) has a clear() operation, so you may consider to not worry about the store, but rather iterate over this type of channel and clear those you are not interested any more.
In the future this issue we may treat as a holder for the TTL feature. We have an IntegrationMessageHeaderAccessor.EXPIRATION_DATE header in the message, so it can be extracted as persistent entity property to be able to query messages from the reaper.