orleans
orleans copied to clipboard
#Question Reactive background processing inside grain
Hello!
How to implement background processing inside grain? For example, I have a grain that has an inner queue, and I enqueue items into it.
public class QueueGrain : Grain
{
private readonly Queue<string> _queue = new();
public Task Enqueue(string item)
{
_queue.Enqueue(item);
return Task.CompletedTask;
}
private async Task Dequeue()
{
if (_queue.TryDequeue(out var item))
await DoWork(item);
await Task.Yield();
}
private Task DoWork(string item)
{
// Do some work
return Task.CompletedTask;
}
}
I want the method Dequeue
to be called reactively (and I can't use standard grain timers cause of big CPU pressure).
I see two possible options:
- Use Task.Factory.StartNew
public override Task OnActivateAsync()
{
Task.Factory.StartNew(StartBackgroundProcessing);
return base.OnActivateAsync();
}
private async Task StartBackgroundProcessing()
{
while (_cts.IsCancellationRequested)
{
await Dequeue();
}
}
But this code may interleave some message processing and it won't be atomic, for example
public async Task Enqueue(string item)
{
await Enqueue(item, _cts.Token);
// interleaved by StartBackgroundProcessing
await Smt();
}
And also it will still be CPU intensive.
The alternative to the pure queue is to use the Channel
class, where I can await on Dequeue, but I don't see any difference while I'm using Task.Factory.StartNew(StartBackgroundProcessing)
on activating because I still have interleaving problems.
- Use thread pool via
Task.Run
in OnActivateAsync to call itself throughAsReference
and useChannel
.
public interface IHaveDequeue
{
Task Dequeue();
}
public class QueueGrain : Grain, IHaveDequeue
{
private readonly Channel<string> _queue = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
}
);
private readonly CancellationTokenSource _cts = new();
public async Task Enqueue(string item)
{
await _queue.Writer.WriteAsync(item);
}
public async Task Dequeue()
{
var item = await _queue.Reader.ReadAsync(_cts.Token);
await DoWork(item);
}
private Task DoWork(string item)
{
// Do some work
return Task.CompletedTask;
}
public override Task OnActivateAsync()
{
Task.Run(
async () =>
{
while (!_cts.IsCancellationRequested)
{
await this.AsReference<IHaveDequeue>().Dequeue();
}
}
);
return base.OnActivateAsync();
}
}
Is this code correct in terms of no interrupts (all public methods calls are 'atomic') and a proper task scheduler (all actions with grain data take place in grain's activation context without race condition)?