workflow-core icon indicating copy to clipboard operation
workflow-core copied to clipboard

EnableIndexes is set to false but Index-related work are still being queued

Open cubitus81 opened this issue 2 years ago • 3 comments

Describe the bug When EnableIndexes is set to false during set up, we are making IndexConsumer inactive. One issue is that QueueProvider.QueueWork(WorkflowId, QueueType.Index) is still being called in different places in the code, causing the queue for Index to build up.

To Reproduce Steps to reproduce the behavior: Set EnableIndexes to false and use Workflow Core. In Persistence of your choice (in my case Redis), you will see an entry for {prefix}-index. This persisted data will grow as there is no process dequeuing the entries

Expected behavior When EnableIndexes is set to false, no queuing for QueueType.Index should happen.

Additional context This behavior caused storage to increaase. In the case of Redis, Key with name {prefix}-index will keep growing in size as there is no process to dequeue the Index items.

cubitus81 avatar Apr 11 '22 17:04 cubitus81

We meet a similar issue. {prefix}-index queue increased due to cannot consume in time and caused high server load of redis. Actually, I don't want to enable indexes. So, We meet the same problem now.

brucezlata avatar Mar 17 '23 10:03 brucezlata

@danielgerlag how about getting workflowoptions and disable queue and dequeue in redis queue provider?

` public static WorkflowOptions UseRedisQueues(this WorkflowOptions options, string connectionString, string prefix) { options.UseQueueProvider(sp => new RedisQueueProvider(connectionString, prefix, sp.GetService<WorkflowOptions>(), sp.GetService<ILoggerFactory>())); return options; }

    private readonly Dictionary<QueueType, bool> _enabledQueues = new Dictionary<QueueType, bool>
    {
    };

    public RedisQueueProvider(string connectionString, string prefix, WorkflowOptions options, ILoggerFactory logFactory)
    {
        _connectionString = connectionString;
        _prefix = prefix;
        _logger = logFactory.CreateLogger(GetType());
        _enabledQueues[QueueType.Index] = options.EnableIndexes;
        _enabledQueues[QueueType.Event] = options.EnableEvents;
        _enabledQueues[QueueType.Workflow] = options.EnableWorkflows;
    }
    
    public async Task QueueWork(string id, QueueType queue)
    {
        if (!_enabledQueues[queue])
        {
            return;
        }

        if (_redis == null)
            throw new InvalidOperationException();

        var queueName = GetQueueName(queue);

        var insertResult = await _redis.ListInsertBeforeAsync(queueName, id, id);
        if (insertResult == -1 || insertResult == 0)
            await _redis.ListRightPushAsync(queueName, id, When.Always);
        else
            await _redis.ListRemoveAsync(queueName, id, 1);
    }

    public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
    {
        if (!_enabledQueues[queue])
        {
            return null;
        }

        if (_redis == null)
            throw new InvalidOperationException();

        var result = await _redis.ListLeftPopAsync(GetQueueName(queue));

        if (result.IsNull)
            return null;

        return result;
    }

`

brucezlata avatar Mar 17 '23 13:03 brucezlata

https://github.com/danielgerlag/workflow-core/pull/1154

brucezlata avatar Mar 17 '23 14:03 brucezlata