AsyncEnumerable icon indicating copy to clipboard operation
AsyncEnumerable copied to clipboard

Chaining async's

Open JeffN825 opened this issue 5 years ago • 2 comments

Sorry if this is a question that's already been answered, but I'm trying to figure out if this library is applicable to a big of fairly simple code. Today I have (converted to a more verbose form for clarity)

IEnumerable<MyEntity> entities = await GetEntitiesAsync();
IEnumerable<Task<MyProcessedEntity>> tasks = entities.Select(ProcessEntityAsync);
IEnumerable<MyProcessedEntity> processedEntities = await Task.WhenAll(tasks); // I realize this could introduce throttling issues
var tasksToGetNames = processedEntities.Select(async e => new { Name = await GetNameAsync(e); Entity = e });
var withNames = await Task.WhenAll(tasksToGetNames);
var byName = withNames.ToDictionary(i => i.Name);
return byName

public static async Task<MyProcessedEntity> ProcessEntityAsync(MyEntity e) ....

public static async Task<string> GetNameAsync(MyProcessedEntity e) ....

Obviously this leaves a lot to be desired in terms of verbosity, especially since this kind of code is repeated dozens of times throughout my codebase.

Ideally, I'm looking for the means to write something like this:


Dictionary<string, MyProcessedEntity> byName = 
    await GetEntitiesAsync().SelectAsync(ProcessEntityAsync)
        .ToDictionaryAsync(e => GetNameAsync(e))

I can easily write SelectAsync and ToDictionaryAsync myself as extension methods, but trying to understand if this library provides that kind of functionality already.

JeffN825 avatar Mar 03 '19 21:03 JeffN825

@JeffN825 , sorry for late reply as I was on my vacation.

This library definitely has SelectAsync and ToDictionaryAsync, however, those built-in extension methods don't take in async delegates - only synchronous methods.

The solution that is available out of the box may look like this:

IAsyncEnumerble<MyProcessedEntity> ProcessEntities(this IEnumerable<MyEntity> entities) =>
new AsyncEnumerble<MyProcessedEntity>(yield =>
{
  foreach (var entity in entities)
    await yield.ReturnAsync(await ProcessEntityAsync(entity));
};

// ........

IEnumerable<MyEntity> entities = await GetEntitiesAsync();
var dictionary = entities.ProcessEntities().ToDictionaryAsync(e => e.Name);

Let me know if this helps.

kind-serge avatar Apr 06 '19 23:04 kind-serge

[Bing Translate] I also had problems with asynchronous streaming chains, I tangled for days, I wasn't familiar with Rx, so I gave up Rx. I've used AsyncEnumerable, but it doesn't seem to have a real yield.

If C#8 (IAsyncEnumerable) is officially launched, my Code should be simple:

try
{
    IEnumerable<LocalEntity> items = ReadItems(/* ... */);
    IAsyncEnumerable<WebEntity> results = await QueryAsync(items);
    IAsyncEnumerable<MyEntity> myEntities = await DoWorkAsync(results);
    await Done(myEntities);
}
catch
{
    // Log
}
finally
{
    // DoSomething
}

The asynchronous flow process I want: ReadItems:~~~~~~[a-b-c-d-e-f-g-h-i-j-k-l-m-n-o-p-q-r-s-t-u-v-w-x-y-z] QueryAsync:~~~~~[A-B-C-D-E-F-G-H-I-J-K-(may be throw exception)-...] DoWorkAsync:~~~~[A-B-C-D-E-F-G-H-(may be throw exception)-...............] Done:~~~~~~~~~[A-B-C-D-E-F-G-H] To describe it in words is: ReadItems yield "a" => QueryAsync "a" yield "A" => DoWorkAsync "A" yield "A" => Doen "A" yield "A", then ReadItems yield "b" => And so on...

However, it appears that after using Dasync/AsyncEnumerable, if an exception occurs, the asynchronous stream is interrupted and the yield data is not continued to be processed.

And, i need support CancellationToken, When I click the Cancel button of the UI, the asynchronous stream should be able to interrupt.

How should I implement this asynchronous flow?

CodingOctocat avatar May 19 '19 02:05 CodingOctocat