orleans icon indicating copy to clipboard operation
orleans copied to clipboard

#Question Reactive background processing inside grain

Open AndrewP-GH opened this issue 2 years ago • 4 comments

Hello!

How to implement background processing inside grain? For example, I have a grain that has an inner queue, and I enqueue items into it.

public class QueueGrain : Grain
{
    private readonly Queue<string> _queue = new();

    public Task Enqueue(string item)
    {
        _queue.Enqueue(item);
        return Task.CompletedTask;
    }

    private async Task Dequeue()
    {
        if (_queue.TryDequeue(out var item))
            await DoWork(item);
        await Task.Yield();
    }

    private Task DoWork(string item)
    {
        // Do some work
        return Task.CompletedTask;
    }
}

I want the method Dequeue to be called reactively (and I can't use standard grain timers cause of big CPU pressure).

I see two possible options:

  1. Use Task.Factory.StartNew
    public override Task OnActivateAsync()
    {
        Task.Factory.StartNew(StartBackgroundProcessing);
        return base.OnActivateAsync();
    }
    
    private async Task StartBackgroundProcessing()
    {
        while (_cts.IsCancellationRequested)
        {
            await Dequeue();
        }
    }

But this code may interleave some message processing and it won't be atomic, for example

    public async Task Enqueue(string item)
    {
       await Enqueue(item, _cts.Token);
       // interleaved by StartBackgroundProcessing
       await Smt();
    }

And also it will still be CPU intensive.

The alternative to the pure queue is to use the Channel class, where I can await on Dequeue, but I don't see any difference while I'm using Task.Factory.StartNew(StartBackgroundProcessing) on activating because I still have interleaving problems.

  1. Use thread pool via Task.Run in OnActivateAsync to call itself through AsReference and use Channel.
public interface IHaveDequeue
{
    Task Dequeue();
}

public class QueueGrain : Grain, IHaveDequeue
{
    private readonly Channel<string> _queue = Channel.CreateUnbounded<string>(
        new UnboundedChannelOptions
        {
            SingleReader = true,
            SingleWriter = true,
            AllowSynchronousContinuations = false
        }
    );
    private readonly CancellationTokenSource _cts = new();

    public async Task Enqueue(string item)
    {
        await _queue.Writer.WriteAsync(item);
    }

    public async Task Dequeue()
    {
        var item = await _queue.Reader.ReadAsync(_cts.Token);
        await DoWork(item);
    }

    private Task DoWork(string item)
    {
        // Do some work
        return Task.CompletedTask;
    }

    public override Task OnActivateAsync()
    {
        Task.Run(
            async () =>
            {
                while (!_cts.IsCancellationRequested)
                {
                    await this.AsReference<IHaveDequeue>().Dequeue();
                }
            }
        );
        return base.OnActivateAsync();
    }
}

Is this code correct in terms of no interrupts (all public methods calls are 'atomic') and a proper task scheduler (all actions with grain data take place in grain's activation context without race condition)?

AndrewP-GH avatar Nov 13 '22 09:11 AndrewP-GH