Cancelling long running jobs
We’re running fairly long tasks via MBrace, hours per job. It’s working really well for getting all the needed dependencies spread onto workers.
We have had some difficulties around cancellation. By observation, it appears MBrace waits on active job completion for cancellation, is this correct? In order to cancel, we’ve resorted to force resetting the cluster which is unfortunate if there are other jobs running.
We’re triggering jobs with the Cloud.Parallel syntax:
[n1 .. n2]
|> List.map (fun i -> cloud { return i, run_simulation prog })
|> Cloud.Parallel
|> cluster.CreateProcess
Are there other options if pre-emption on cancellation is needed?
Hi Colin,
Cancellation works in pretty much the same way as async, i.e. it is cooperative. Cloud and Async workflows check for cancellation automatically in each composition at the computation expression level. However, when calling common methods (and your run_simulation method looks one) cancellation is essentially ignored. You can verify this by running the same code using async:
[n1 .. n2]
|> List.map (fun i -> async { return i, run_simulation prog })
|> Async.Parallel
|> Async.RunSynchronously
You could either pass the current cancellation token as a parameter to your own method implementation and ensure cooperative cancellation, or you could chop up your input data and process sequentially using computation expression for loops:
Option 1:
[n1 .. n2]
|> List.map (fun i -> cloud { let! ct = Cloud.CancellationToken in return i, run_simulation ct prog })
|> Cloud.Parallel
|> cluster.CreateProcess
Option 2 (cancellation will be automatically checked in each iteration of the for loop):
[n1 .. n2]
|> List.map (fun i -> cloud { for j in 1 .. 100 do run_simulation_Partial j prog })
|> Cloud.Parallel
|> cluster.CreateProcess
Thanks, that’s useful. We actually have our own cancellation tokens inside run_simulation that we can map onto in this case.
Are there any options for preemptive termination within MBrace? For cases where say the code has hung in a worker. So far we’re resorting to rebooting individual workers via the Azure portal.
Another similar approach that is more "preemptive" in nature is to encapsulate the CPU/Memory heavy kernels of computation to another Process.
let task =
[1 .. 10]
|> List.map (fun i -> cloud {
let! ct = Cloud.CancellationToken
let _ =
let proc : Process = Process.Start(...)
while not ct.IsCancellationRequested do
System.Threading.Thread.Sleep(10)
proc.Kill()
return i })
|> Cloud.Parallel
|> cluster.CreateTask
The benefit of this approach is that the computation heavy kernel doesn't need to check the CancellationToken at all.
@cgravill There is no termination mechanism baked in as such, but you can take advantage of the automatic resetting of Azure workers using the following hack:
// force the specific azure worker process to exit, will make azure reset it.
let resetAzureWorker (worker : IWorkerRef) =
let resetAzureWorker (worker : IWorkerRef) =
try cluster.Run(cloud { exit 1 }, faultPolicy = FaultPolicy.NoRetry, target = worker, taskName = "killer Process")
with :? FaultException -> ()
We can verify that this works as expected using the following implementation:
let getCurrentPid (worker : IWorkerRef) =
cluster.Run(cloud { return System.Diagnostics.Process.GetCurrentProcess().Id }, target = worker)
Put together:
let badWorker = cluster.Workers.[2]
getCurrentPid badWorker // 3424
resetAzureWorker badWorker // this call will take some time for the Azure fault tolerance mechanisms to kick in
getCurrentPid badWorker // 1948