workflow-core
workflow-core copied to clipboard
EnableIndexes is set to false but Index-related work are still being queued
Describe the bug When EnableIndexes is set to false during set up, we are making IndexConsumer inactive. One issue is that QueueProvider.QueueWork(WorkflowId, QueueType.Index) is still being called in different places in the code, causing the queue for Index to build up.
To Reproduce Steps to reproduce the behavior: Set EnableIndexes to false and use Workflow Core. In Persistence of your choice (in my case Redis), you will see an entry for {prefix}-index. This persisted data will grow as there is no process dequeuing the entries
Expected behavior When EnableIndexes is set to false, no queuing for QueueType.Index should happen.
Additional context This behavior caused storage to increaase. In the case of Redis, Key with name {prefix}-index will keep growing in size as there is no process to dequeue the Index items.
We meet a similar issue. {prefix}-index queue increased due to cannot consume in time and caused high server load of redis. Actually, I don't want to enable indexes. So, We meet the same problem now.
@danielgerlag how about getting workflowoptions and disable queue and dequeue in redis queue provider?
` public static WorkflowOptions UseRedisQueues(this WorkflowOptions options, string connectionString, string prefix) { options.UseQueueProvider(sp => new RedisQueueProvider(connectionString, prefix, sp.GetService<WorkflowOptions>(), sp.GetService<ILoggerFactory>())); return options; }
private readonly Dictionary<QueueType, bool> _enabledQueues = new Dictionary<QueueType, bool>
{
};
public RedisQueueProvider(string connectionString, string prefix, WorkflowOptions options, ILoggerFactory logFactory)
{
_connectionString = connectionString;
_prefix = prefix;
_logger = logFactory.CreateLogger(GetType());
_enabledQueues[QueueType.Index] = options.EnableIndexes;
_enabledQueues[QueueType.Event] = options.EnableEvents;
_enabledQueues[QueueType.Workflow] = options.EnableWorkflows;
}
public async Task QueueWork(string id, QueueType queue)
{
if (!_enabledQueues[queue])
{
return;
}
if (_redis == null)
throw new InvalidOperationException();
var queueName = GetQueueName(queue);
var insertResult = await _redis.ListInsertBeforeAsync(queueName, id, id);
if (insertResult == -1 || insertResult == 0)
await _redis.ListRightPushAsync(queueName, id, When.Always);
else
await _redis.ListRemoveAsync(queueName, id, 1);
}
public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
{
if (!_enabledQueues[queue])
{
return null;
}
if (_redis == null)
throw new InvalidOperationException();
var result = await _redis.ListLeftPopAsync(GetQueueName(queue));
if (result.IsNull)
return null;
return result;
}
`
https://github.com/danielgerlag/workflow-core/pull/1154