Hangfire
Hangfire copied to clipboard
Hangfire in single application on multiple physical servers
I am running hangfire in a single web application, my application is being run on 2 physical servers but hangfire is in 1 database.
At the moment, i am generating a server for each queue, because each queue i need to run 1 worker at a time and they must be in order. I set them up like this
// core
services.AddHangfire(options =>
{
options.SetDataCompatibilityLevel(CompatibilityLevel.Version_170);
options.UseSimpleAssemblyNameTypeSerializer();
options.UseRecommendedSerializerSettings();
options.UseSqlServerStorage(appSettings.Data.DefaultConnection.ConnectionString, storageOptions);
});
// add multiple servers, this way we get to control how many workers are in each queue
services.AddHangfireServer(options =>
{
options.ServerName = "workflow-queue";
options.WorkerCount = 1;
options.Queues = new string[] { "workflow-queue" };
options.SchedulePollingInterval = TimeSpan.FromSeconds(10);
});
services.AddHangfireServer(options =>
{
options.ServerName = "alert-schedule";
options.WorkerCount = 1;
options.Queues = new string[] { "alert-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});
services.AddHangfireServer(options =>
{
options.ServerName = string.Format("trigger-schedule");
options.WorkerCount = 1;
options.Queues = new string[] { "trigger-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});
services.AddHangfireServer(options =>
{
options.ServerName = "report-schedule";
options.WorkerCount = 1;
options.Queues = new string[] { "report-schedule" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});
services.AddHangfireServer(options =>
{
options.ServerName = "maintenance";
options.WorkerCount = 5;
options.Queues = new string[] { "maintenance" };
options.SchedulePollingInterval = TimeSpan.FromMinutes(10);
});
My problem is that it is generating multiple queues on the servers, with different ports.
In my code i am then trying to stop jobs from running if they are queued/retrying, but if the job is being run on a different physical server, it is not found and queued again.
Here is the code to check if its running already
public async Task<bool> IsAlreadyQueuedAsync(PerformContext context)
{
var disableJob = false;
var monitoringApi = JobStorage.Current.GetMonitoringApi();
// get the jobId, method and queue using performContext
var jobId = context.BackgroundJob.Id;
var methodInfo = context.BackgroundJob.Job.Method;
var queueAttribute = (QueueAttribute)Attribute.GetCustomAttribute(context.BackgroundJob.Job.Method, typeof(QueueAttribute));
// enqueuedJobs
var enqueuedjobStatesToCheck = new[] { "Processing" };
var enqueuedJobs = monitoringApi.EnqueuedJobs(queueAttribute.Queue, 0, 1000);
var enqueuedJobsAlready = enqueuedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo) && enqueuedjobStatesToCheck.Contains(e.Value.State));
if (enqueuedJobsAlready > 0)
disableJob = true;
// scheduledJobs
if (!disableJob)
{
// check if there are any scheduledJobs that are processing
var scheduledJobs = monitoringApi.ScheduledJobs(0, 1000);
var scheduledJobsAlready = scheduledJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));
if (scheduledJobsAlready > 0)
disableJob = true;
}
// failedJobs
if (!disableJob)
{
var failedJobs = monitoringApi.FailedJobs(0, 1000);
var failedJobsAlready = failedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));
if (failedJobsAlready > 0)
disableJob = true;
}
// if runBefore is true, then lets remove the current job running, else it will write a "successful" message in the logs
if (disableJob)
{
// use hangfire delete, for cleanup
BackgroundJob.Delete(jobId);
// create our sqlBuilder to remove the entries altogether including the count
var sqlBuilder = new SqlBuilder()
.DELETE_FROM("Hangfire.[Job]")
.WHERE("[Id] = {0};", jobId);
sqlBuilder.Append("DELETE TOP(1) FROM Hangfire.[Counter] WHERE [Key] = 'stats:deleted' AND [Value] = 1;");
using (var cmd = _context.CreateCommand(sqlBuilder))
await cmd.ExecuteNonQueryAsync();
return true;
}
return false;
}
Each method has something like the following attributes as well
public interface IAlertScheduleService
{
[Hangfire.Queue("alert-schedule")]
[Hangfire.DisableConcurrentExecution(60 * 60 * 5)]
Task RunAllAsync(PerformContext context);
}
Why does this happen? How can i stop it happening?
In my application I have a similar one, but with the problem of many applications (different versions) with multiple machines (azure autoscale)
You must create a queue name for all machines and a queue name for each machine. For this, I have created 2 variables `
public static string MachineQueueId { get { return HostingEnvironment.SiteName.Replace(".", "_").ToLowerInvariant(); } }
public static string QueueVersionName { get { return ("version" + ConfigurationManager.AppSettings["AppVersion"]).Replace(".", "_"); } }
`
When you start the machine:
_backgroundJobServer = new BackgroundJobServer(new BackgroundJobServerOptions { Queues = new string[] { MachineQueueId, QueueVersionName, "default" }, });
And the used this attribute in order to maintain the correct queue https://discuss.hangfire.io/t/requeue-on-the-same-queue-or-possibility-to-have-server-groups/5245/2
To enqueue job correctly I have done this script
public static string TryEnqueueJob(Expression<Action> expr, bool enqueueCurrentMachineOnly = false, bool enqueueToAllServers = false, string specificQueue = null, bool currentVersionOnly = true, TimeSpan? delayTimeSpan = null, bool excludeCurrentServer = false) { if (IsAlive && JobStorage.Current != null) { if (enqueueToAllServers) { var lstJobs = new List<string>(); foreach (var svr in JobStorage.Current.GetMonitoringApi().Servers()) { if (currentVersionOnly && !svr.Queues.Contains(HangfireBootstrapper.QueueVersionName)) continue; if (excludeCurrentServer && svr.Queues.Contains(HangfireBootstrapper.MachineQueueId)) continue; IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan.HasValue) { lstJobs.Add(hangFireClient.Schedule(svr.Queues.FirstOrDefault(), expr, delayTimeSpan.Value)); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(svr.Queues.FirstOrDefault()); lstJobs.Add(hangFireClient.Create(expr, myQueueState)); } //lstJobs.Add(BackgroundJob.Enqueue(expr, )) } return string.Join(",", lstJobs); } else if (!string.IsNullOrWhiteSpace(specificQueue)) { IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan != null) { return hangFireClient.Schedule(specificQueue, expr, delayTimeSpan.Value); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(specificQueue); return hangFireClient.Create(expr, myQueueState); } } else if (enqueueCurrentMachineOnly) { IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan != null) { return hangFireClient.Schedule(HangfireBootstrapper.MachineQueueId, expr, delayTimeSpan.Value); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(HangfireBootstrapper.MachineQueueId); return hangFireClient.Create(expr, myQueueState); } } else if (currentVersionOnly) { IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan != null) { return hangFireClient.Schedule(HangfireBootstrapper.QueueVersionName, expr, delayTimeSpan.Value); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(HangfireBootstrapper.QueueVersionName); return hangFireClient.Create(expr, myQueueState); } } else { return BackgroundJob.Enqueue(expr); } } var act = expr.Compile(); act(); return null; }
IsAlive is an attribute that checks if the HangFire server is running. If not, it processes the action immediately
Usage:
HangFireHelper.TryEnqueueJob(() => staticclass.staticmethod(parameters), enqueueToAllServers: true);