Hangfire icon indicating copy to clipboard operation
Hangfire copied to clipboard

Hangfire in single application on multiple physical servers

Open Gillardo opened this issue 3 years ago • 1 comments

I am running hangfire in a single web application, my application is being run on 2 physical servers but hangfire is in 1 database.

At the moment, i am generating a server for each queue, because each queue i need to run 1 worker at a time and they must be in order. I set them up like this

// core
services.AddHangfire(options =>
{
	options.SetDataCompatibilityLevel(CompatibilityLevel.Version_170);
	options.UseSimpleAssemblyNameTypeSerializer();
	options.UseRecommendedSerializerSettings();
	options.UseSqlServerStorage(appSettings.Data.DefaultConnection.ConnectionString, storageOptions);
});

// add multiple servers, this way we get to control how many workers are in each queue
services.AddHangfireServer(options =>
{
	options.ServerName = "workflow-queue";
	options.WorkerCount = 1;
	options.Queues = new string[] { "workflow-queue" };
	options.SchedulePollingInterval = TimeSpan.FromSeconds(10);
});

services.AddHangfireServer(options =>
{
	options.ServerName = "alert-schedule";
	options.WorkerCount = 1;
	options.Queues = new string[] { "alert-schedule" };
	options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
	options.ServerName = string.Format("trigger-schedule");
	options.WorkerCount = 1;
	options.Queues = new string[] { "trigger-schedule" };
	options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
	options.ServerName = "report-schedule";
	options.WorkerCount = 1;
	options.Queues = new string[] { "report-schedule" };
	options.SchedulePollingInterval = TimeSpan.FromMinutes(1);
});

services.AddHangfireServer(options =>
{
	options.ServerName = "maintenance";
	options.WorkerCount = 5;
	options.Queues = new string[] { "maintenance" };
	options.SchedulePollingInterval = TimeSpan.FromMinutes(10);
});

My problem is that it is generating multiple queues on the servers, with different ports. enter image description here

In my code i am then trying to stop jobs from running if they are queued/retrying, but if the job is being run on a different physical server, it is not found and queued again.

Here is the code to check if its running already

public async Task<bool> IsAlreadyQueuedAsync(PerformContext context)
{
	var disableJob = false;
	var monitoringApi = JobStorage.Current.GetMonitoringApi();

	// get the jobId, method and queue using performContext
	var jobId = context.BackgroundJob.Id;
	var methodInfo = context.BackgroundJob.Job.Method;
	var queueAttribute = (QueueAttribute)Attribute.GetCustomAttribute(context.BackgroundJob.Job.Method, typeof(QueueAttribute));
	
	// enqueuedJobs
	var enqueuedjobStatesToCheck = new[] { "Processing" };
	var enqueuedJobs = monitoringApi.EnqueuedJobs(queueAttribute.Queue, 0, 1000);
	var enqueuedJobsAlready = enqueuedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo) && enqueuedjobStatesToCheck.Contains(e.Value.State));

	if (enqueuedJobsAlready > 0)
		disableJob = true;

	// scheduledJobs
	if (!disableJob)
	{
		// check if there are any scheduledJobs that are processing
		var scheduledJobs = monitoringApi.ScheduledJobs(0, 1000);
		var scheduledJobsAlready = scheduledJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

		if (scheduledJobsAlready > 0)
			disableJob = true;
	}

	// failedJobs
	if (!disableJob)
	{
		var failedJobs = monitoringApi.FailedJobs(0, 1000);
		var failedJobsAlready = failedJobs.Count(e => e.Key != jobId && e.Value != null && e.Value.Job != null && e.Value.Job.Method.Equals(methodInfo));

		if (failedJobsAlready > 0)
			disableJob = true;
	}

	// if runBefore is true, then lets remove the current job running, else it will write a "successful" message in the logs
	if (disableJob)
	{
		// use hangfire delete, for cleanup
		BackgroundJob.Delete(jobId);

		// create our sqlBuilder to remove the entries altogether including the count
		var sqlBuilder = new SqlBuilder()
			.DELETE_FROM("Hangfire.[Job]")
			.WHERE("[Id] = {0};", jobId);

		sqlBuilder.Append("DELETE TOP(1) FROM Hangfire.[Counter] WHERE [Key] = 'stats:deleted' AND [Value] = 1;");

		using (var cmd = _context.CreateCommand(sqlBuilder))
			await cmd.ExecuteNonQueryAsync();
		
		return true;
	}

	return false;
}

Each method has something like the following attributes as well

public interface IAlertScheduleService
{
	[Hangfire.Queue("alert-schedule")]
	[Hangfire.DisableConcurrentExecution(60 * 60 * 5)]
	Task RunAllAsync(PerformContext context);
}

Why does this happen? How can i stop it happening?

Gillardo avatar Jan 24 '22 11:01 Gillardo

In my application I have a similar one, but with the problem of many applications (different versions) with multiple machines (azure autoscale)

You must create a queue name for all machines and a queue name for each machine. For this, I have created 2 variables `

    public static string MachineQueueId { get { return HostingEnvironment.SiteName.Replace(".", "_").ToLowerInvariant(); } }

    public static string QueueVersionName { get { return ("version" + ConfigurationManager.AppSettings["AppVersion"]).Replace(".", "_"); } }

`

When you start the machine: _backgroundJobServer = new BackgroundJobServer(new BackgroundJobServerOptions { Queues = new string[] { MachineQueueId, QueueVersionName, "default" }, });

And the used this attribute in order to maintain the correct queue https://discuss.hangfire.io/t/requeue-on-the-same-queue-or-possibility-to-have-server-groups/5245/2

To enqueue job correctly I have done this script

public static string TryEnqueueJob(Expression<Action> expr, bool enqueueCurrentMachineOnly = false, bool enqueueToAllServers = false, string specificQueue = null, bool currentVersionOnly = true, TimeSpan? delayTimeSpan = null, bool excludeCurrentServer = false) { if (IsAlive && JobStorage.Current != null) { if (enqueueToAllServers) { var lstJobs = new List<string>(); foreach (var svr in JobStorage.Current.GetMonitoringApi().Servers()) { if (currentVersionOnly && !svr.Queues.Contains(HangfireBootstrapper.QueueVersionName)) continue; if (excludeCurrentServer && svr.Queues.Contains(HangfireBootstrapper.MachineQueueId)) continue; IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan.HasValue) { lstJobs.Add(hangFireClient.Schedule(svr.Queues.FirstOrDefault(), expr, delayTimeSpan.Value)); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(svr.Queues.FirstOrDefault()); lstJobs.Add(hangFireClient.Create(expr, myQueueState)); } //lstJobs.Add(BackgroundJob.Enqueue(expr, )) } return string.Join(",", lstJobs); } else if (!string.IsNullOrWhiteSpace(specificQueue)) { IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan != null) { return hangFireClient.Schedule(specificQueue, expr, delayTimeSpan.Value); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(specificQueue); return hangFireClient.Create(expr, myQueueState); } } else if (enqueueCurrentMachineOnly) { IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan != null) { return hangFireClient.Schedule(HangfireBootstrapper.MachineQueueId, expr, delayTimeSpan.Value); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(HangfireBootstrapper.MachineQueueId); return hangFireClient.Create(expr, myQueueState); } } else if (currentVersionOnly) { IBackgroundJobClient hangFireClient = new BackgroundJobClient(); if (delayTimeSpan != null) { return hangFireClient.Schedule(HangfireBootstrapper.QueueVersionName, expr, delayTimeSpan.Value); } else { EnqueuedState myQueueState = new Hangfire.States.EnqueuedState(HangfireBootstrapper.QueueVersionName); return hangFireClient.Create(expr, myQueueState); } } else { return BackgroundJob.Enqueue(expr); } } var act = expr.Compile(); act(); return null; } IsAlive is an attribute that checks if the HangFire server is running. If not, it processes the action immediately

Usage:

HangFireHelper.TryEnqueueJob(() => staticclass.staticmethod(parameters), enqueueToAllServers: true);

ms92ita avatar May 27 '22 12:05 ms92ita