elsa-core icon indicating copy to clipboard operation
elsa-core copied to clipboard

[BUG]In 3.2.1 ParallelForEach didn't work as expected

Open Forfires opened this issue 1 year ago • 0 comments

Description

I use ParallelForEach in my workflow and I should get 3 lines of start Hello from resource ID, but only one line was output。

MyCode Like this:

var services = new ServiceCollection();
services.AddElsa();

var serviceProvider = services.BuildServiceProvider();
var runner = serviceProvider.GetRequiredService<IWorkflowRunner>();

await runner.RunAsync(new VideoStreamProcessor());

class VideoStreamProcessor : WorkflowBase
{
    protected override void Build(IWorkflowBuilder builder)
    {
        builder.Name = "Video Stream Processor";
        var resourceIds = builder.WithVariable<IAsyncEnumerable<string>>().WithMemoryStorage();
        var outputData = builder.WithVariable<string>().WithMemoryStorage();
        builder.Root = new Sequence
        {
            Activities =
            {
                new WriteLine("Fetching video stream addresses..."),
                new FetchVideoStreams
                {
                    Result = new(resourceIds)
                },
                new ParallelForEach
                {
                    Items = new(resourceIds),
                    Body = new Sequence
                    {
                        Activities =
                        {
                            new WriteLine(context => $"Processing video stream for resource ID: {context.GetVariable<string>("CurrentValue")!}"),
                            new HelloWorld
                            {
                                ResourceId = new(context => context.GetVariable<string>("CurrentValue")!)
                            },
                            new WriteLine(context => $"Finished video stream for resource ID: {context.GetVariable<string>("CurrentValue")!}"),

                        }
                    }
                },
                new WriteLine("All video streams processed!")
            }
        };
    }
}

/// <summary>
/// Fetches video stream addresses from a data source.
/// </summary>
lass FetchVideoStreams : CodeActivity<IAsyncEnumerable<string>>
{
    protected override void Execute(ActivityExecutionContext context)
    {
        var streamAddresses = GetVideoStreamAddresses().ToAsyncEnumerable();
        Result.Set(context, streamAddresses);
    }

    private IEnumerable<string> GetVideoStreamAddresses()
    {
        return new List<string>
        {
            "http://example.com/stream1",
            "http://example.com/stream2",
            "http://example.com/stream3"
        };
    }
}
class HelloWorld : CodeActivity
{
    [Input]
    public Input<string> ResourceId { get; set; } = default!;

    protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
    {
       
        var resourceId = ResourceId.Get(context);
        Console.WriteLine($"start Hello from resource ID: {resourceId}");
        // 模拟异步操作
        await Task.Delay(1000000);
        Console.WriteLine($"end Hello from resource ID: {resourceId}");
       
    }
}


Help Me!

Forfires avatar Oct 20 '24 15:10 Forfires