elsa-core
elsa-core copied to clipboard
OnResume activity method never called after server shutdown
Hello, I'm trying to make a long process running activity in place, that's working well until the server shutdown.
Idea:
- Call OnExecute() the long running job (can be few hours until 24h). Store jobId as state
- Start a job sync in background, calling the broker to get the progress and status. Update activity metadata setting progress as state and save activity in store.
- Returns Done() or exception depending of job sync
I need to get a Running workflow, and not a Suspended workflow global status.
When I shutdown application: I see on startup logs the engine try to resume the running activity task. But the strange behavior is: Workflow pass Finished and the OnResume() method from my custom activity is never called!
If that was the case, I could just continue my job sync in OnResume() and the work could be done successfully!
Do you have an idea about why the OnResume() is never called?
Thanks in advance.
Just bellow, the custom activity code:
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;
using Serilog;
using System;
using System.Reflection;
using System.Threading.Tasks;
using Sdk.Core.Database.Types;
using WorkflowEngineService.Server.Manager;
namespace WorkflowEngineService.Server.ActivityStencils
{
/// <summary>
/// Workflow Activity: Job Rule apply and execute by Name.
/// </summary>
[Activity(
Category = "Job Management",
DisplayName = "Apply and Execute Job Rule by Name",
Description = "Apply and Execute Job Rule by Name",
Traits = Elsa.Metadata.ActivityTraits.Action,
Type = WorkflowActivityInternalType.JobRuleExecByName
)]
public class
WorkflowActivityJobRuleExecutionByName : WorkflowActivityBase
{
static readonly ILogger log = Log.ForContext(MethodBase.GetCurrentMethod().DeclaringType);
/// <summary>
/// Input object Id
/// Liquid syntax is {{Variables.myObject.myprop}}
/// </summary>
[ActivityInput(Hint = "Object Id (optional)", DefaultSyntax = SyntaxNames.Liquid, SupportedSyntaxes = new[] { SyntaxNames.Literal, SyntaxNames.Variable, SyntaxNames.Liquid, SyntaxNames.JavaScript })]
public long? objectId { get; set; }
/// <summary>
/// Input platform Id
/// Liquid syntax is {{Variables.myObject.myprop}}
/// </summary>
[ActivityInput(Hint = "Platform Id (optional)", DefaultSyntax = SyntaxNames.Liquid, SupportedSyntaxes = new[] { SyntaxNames.Literal, SyntaxNames.Variable, SyntaxNames.Liquid, SyntaxNames.JavaScript })]
public long? platformId { get; set; }
/// <summary>
/// Job Rule Name
/// Liquid syntax is {{Variables.myObject.myprop}}
/// </summary>
[ActivityInput(Hint = "Rule Name", DefaultSyntax = SyntaxNames.Liquid, SupportedSyntaxes = new[] { SyntaxNames.Literal, SyntaxNames.Variable, SyntaxNames.Liquid, SyntaxNames.JavaScript })]
public string ruleName { get; set; }
/// <summary>
/// Output
/// </summary>
[ActivityOutput]
public DbJobInfos Output { get; set; }
DbJobInfos job;
/// <summary>
/// Execute first time
/// [AC] Notes: Import to use async to get dedicated non-locking thread...
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
protected override async ValueTask<IActivityExecutionResult>
OnExecuteAsync(ActivityExecutionContext context)
{
long jobId = context.GetState<long>(WorkflowActivityState.jobId);
long? contextPlatformId = GetContextPlatform(context);
long? contextObjectId = GetContextObjectId(context);
long pid = platformId.HasValue ? platformId.Value : contextPlatformId.Value;
long oid = objectId.HasValue ? objectId.Value : contextObjectId.Value;
// Start and apply job rule for first time
if (jobId == 0) {
DbJobRuleInfos rule = DbJobRuleInfos.Get(pid, ruleName);
if (rule == null) {
log.Warning($"Exception() Rule not found for Name = [{ruleName}] and platformId = [{platformId}] about activityId = [{context.ActivityId}]");
throw new Exception($"Job rule {ruleName} not found");
}
DbAssetObjectInfos obj;
try {
obj = DbAssetObjectInfos.GetUnAuth(pid, oid);
}
catch (Exception e) {
log.Warning($"Exception() Object not found for Id = [{objectId}] and platformId = [{platformId}] about activityId = [{context.ActivityId}]. Exception = [{e.Message}]");
throw new Exception($"Object #{objectId} not found");
}
DbJobRuleInfos.JobRuleCreationResponse response = rule.Apply(obj);
if (!response.success) {
log.Warning($"Exception() Job rule apply Id = [{ruleName}] on objectId = [{objectId}] failed about activityId = [{context.ActivityId}]. Message = [{response.message}]");
// Throw exception and not return fault => allow to get details on UI
throw new Exception(response.message);
//return Fault(new Exception(response.message));
}
else {
// Set correct latest job from jobs hierarchy (allow to enable tracking of real job graph)
job = response.jobSecondary == null ? response.job : response.jobSecondary;
// Set output job data
context.Output = job;
// Set state attachment
context.SetState(WorkflowActivityState.jobId, job.id);
context.SetState(WorkflowActivityState.progress, 0);
log.Information($"Processing started, set jobId = [{job.id}] about activityId = [{context.ActivityId}]");
// Entry means new "state" history
context.AddEntry("Processing", "Processing", null);
WorkflowEngineApi.UpdateInstance(context.WorkflowInstance);
}
}
return await ProcessSync(context);
}
/// <summary>
/// Execute resume
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
protected override async ValueTask<IActivityExecutionResult>
OnResumeAsync(ActivityExecutionContext context)
{
// Retry or resume sync job
long jobId = context.GetState<long>(WorkflowActivityState.jobId);
if (jobId == 0)
throw new Exception("Job Id not set on activity. Cant resume.");
job = DbJobInfos.Get(jobId);
if (job == null)
throw new Exception("Job is null or not set on activity. Cant resume.");
try {
job.Retry(false);
}
catch (Exception e) {
log.Error($"Exception() Unable to retry job Id = [{jobId}] about activityId = [{context.ActivityId}]. Exception = [{e.Message}] -> return activity fault.");
throw new Exception(e.Message);
}
return await ProcessSync(context);
}
/// <summary>
/// Processing sync
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
async ValueTask<IActivityExecutionResult>
ProcessSync(ActivityExecutionContext context)
{
// Update and sync job
do {
job = DbJobInfos.Get(context.GetState<long>(WorkflowActivityState.jobId));
context.SetState(WorkflowActivityState.progress, job.progress);
WorkflowEngineApi.UpdateInstance(context.WorkflowInstance);
await Task.Delay(30000); // Hardcoded for tests
}
while (!job.HasFinished());
// When finished returns response
if (job.HasFailed()) {
log.Information($"Exception() because HasFailed() for jobId = [{job.id}] about activityId = [{context.ActivityId}]");
throw new Exception(job.GetErrorLogs());
}
else {
log.Information($"Done() for jobId = [{job.id}] about activityId = [{context.ActivityId}]");
return Done();
}
}
}
}
All the best, Adrien
From my understanding, OnResume is used for activities that suspend and not for recovering a activity that was running during a shutdown/crash.
So maybe anyone has an answer for that? How manage the server crash/stop and long running activities? How to start the workflow engine and keep the "running" running?
Thanks, Adrien
@adriencorbin
maybe I miss interpret your question, but this is how I look at the position of the Elsa engine:
- The engine is the orchestrator of work.
- It should focus on activating activities and less on the execution itself.
- small tasks, short-running ones, can be handled by the engine, but long-running ones, preferably offloading.
- signal back to the engine to activate the next step.
The reason is that the engine does not know the activity's intent. Therefore it also is unaware whether or not the activity can be started again. if it is idempotent etc.
From your question, I would design it with a fork where one branch will execute, and the other branch will regularly check the status and decide if it can continue or have to go through another iteration of waiting.
Still, you will not see it as running. (running means the engine is actively working on the workflow) but you could set the process state either in the name of the metadata, to see and track progress.
is there an automated system that checks on the state or is it more to identify the running ones more quickly in the dashboard?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.