elsa-core
elsa-core copied to clipboard
[BUG]In 3.2.1 ParallelForEach didn't work as expected
Description
I use ParallelForEach in my workflow and I should get 3 lines of start Hello from resource ID, but only one line was output。
MyCode Like this:
var services = new ServiceCollection();
services.AddElsa();
var serviceProvider = services.BuildServiceProvider();
var runner = serviceProvider.GetRequiredService<IWorkflowRunner>();
await runner.RunAsync(new VideoStreamProcessor());
class VideoStreamProcessor : WorkflowBase
{
protected override void Build(IWorkflowBuilder builder)
{
builder.Name = "Video Stream Processor";
var resourceIds = builder.WithVariable<IAsyncEnumerable<string>>().WithMemoryStorage();
var outputData = builder.WithVariable<string>().WithMemoryStorage();
builder.Root = new Sequence
{
Activities =
{
new WriteLine("Fetching video stream addresses..."),
new FetchVideoStreams
{
Result = new(resourceIds)
},
new ParallelForEach
{
Items = new(resourceIds),
Body = new Sequence
{
Activities =
{
new WriteLine(context => $"Processing video stream for resource ID: {context.GetVariable<string>("CurrentValue")!}"),
new HelloWorld
{
ResourceId = new(context => context.GetVariable<string>("CurrentValue")!)
},
new WriteLine(context => $"Finished video stream for resource ID: {context.GetVariable<string>("CurrentValue")!}"),
}
}
},
new WriteLine("All video streams processed!")
}
};
}
}
/// <summary>
/// Fetches video stream addresses from a data source.
/// </summary>
lass FetchVideoStreams : CodeActivity<IAsyncEnumerable<string>>
{
protected override void Execute(ActivityExecutionContext context)
{
var streamAddresses = GetVideoStreamAddresses().ToAsyncEnumerable();
Result.Set(context, streamAddresses);
}
private IEnumerable<string> GetVideoStreamAddresses()
{
return new List<string>
{
"http://example.com/stream1",
"http://example.com/stream2",
"http://example.com/stream3"
};
}
}
class HelloWorld : CodeActivity
{
[Input]
public Input<string> ResourceId { get; set; } = default!;
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var resourceId = ResourceId.Get(context);
Console.WriteLine($"start Hello from resource ID: {resourceId}");
// 模拟异步操作
await Task.Delay(1000000);
Console.WriteLine($"end Hello from resource ID: {resourceId}");
}
}
Help Me!