parallelly icon indicating copy to clipboard operation
parallelly copied to clipboard

ROBUSTNESS: Protect main-worker communication using suspendInterrupts()?

Open HenrikBengtsson opened this issue 3 years ago • 14 comments

Issue

If the user hits Ctrl-C (signals a user interrupt) while the main R session and a worker communicates data, then the communication ends up in an unrecoverable corrupt. The only solution is to restart with a new cluster while waiting for the old cluster node to timeout (30 days?)

Suggestion

In R (> 3.5.0), we have suspendInterrupts(expr) that suspends interrupts while evaluating an expression.

Could we wrap all communication calls, i.e. all serialize()/unserialize() calls in suspendInterrupts()?

There should be no need to do this on workers. Also, this way the worker can be terminated by the operating system or a job scheduler by signaling a nicer interrupt signal.

It should probably also be sufficient to protect interactive R sessions. When running R in batch mode, hitting Ctrl-C often means we want the whole R process to terminate. OTH, with proper interrupt handling (e.g. protecting communication as above and then capture user interrupts outside), our R process could terminate nicely, which here means calling stopCluster() etc.

Actions

Investigate exactly which type of interrupt signals are suspended.

Protect what can be protected in the existing parallelly code.

Document that Ctrl-\ can be used to kill R if above get stuck. (What happens in RStudio?)

HenrikBengtsson avatar Oct 25 '20 15:10 HenrikBengtsson

One observation: the "suspend" part in suspendInterrupts() appear to mean, disable and drop all interrupts until resume. Interrupts are not queued and resignaled afterward. Here is some code I used on R 4.0.3 on Linux to test this:

## Slow will self terminate after 'duration' seconds
slow <- function(duration = 3.0) {
  tmax <- Sys.time() + duration
  kk <- 0
  repeat {
   kk <- kk + 1
   cat(sprintf("\r%d", kk))
   if (Sys.time() > tmax) break
  }
  cat("\n")
}

test <- function(...) {
  conditions <- list()
  res <- withCallingHandlers({
    suspendInterrupts(slow(...))
  }, conditions = function(cond) {
    conditions <<- c(conditions, list(cond))
    utils::str(cond)
    cond
  })
  conditions
}

which is then called as:

> test()
68646^C
> 

HenrikBengtsson avatar Oct 25 '20 21:10 HenrikBengtsson

Another observation, which I'm pretty sure I've already posted about on R-devel or somewhere: At least on Linux, it is not possible to suspend interrupts during Sys.sleep(), e.g.

> system.time({ withCallingHandlers(suspendInterrupts(Sys.sleep(10.0)), interrupt = identity) })
^C
Timing stopped at: 0 0 1.012

HenrikBengtsson avatar Oct 25 '20 21:10 HenrikBengtsson

At least on Linux, it looks like both serialize() and unserialize() cannot be interrupted. The following simple, interactive example illustrates this:

write_test <- function(x) {
  file <- tempfile()
  message("File name: ", file)
  if (utils::file_test("-f", file)) file.remove(file)
  con <- gzfile(file, open = "wb")
  on.exit(close(con))
  serialize(x, connection=con)
  file
}

read_test <- function(file) {
  con <- gzfile(file, open = "rb")
  on.exit(close(con))
  unserialize(connection=con)
}

## Test data
set.seed(42)
x <- rnorm(1e7)

serialize() cannot be interrupted

system.time(file <- write_test(x))
File name: /tmp/hb/RtmpwbTfWb/file1c835830a0e9
   user  system elapsed 
  3.232   0.012   3.245 
file.size(file)
## [1] 76783182

and while signaling user interrupts;

system.time(file <- write_test(x))
## ^C^C^C^C^C^C^C^C
## Timing stopped at: 3.252 0.084 5.479
file.size(file)
## [1] 76783182

unserialize() cannot be interrupted

unserialize() is much faster than serialize() so we have to hit Ctrl-C very quickly;

system.time(y <- read_test(file))
## ^C   user  system elapsed 
##   0.445   0.027   0.472

> identical(y,x)
[1] TRUE

This is on Linux. This needs to be verified on other platforms too.

HenrikBengtsson avatar Oct 25 '20 21:10 HenrikBengtsson

You could try also .tryResumeInterrupt, even though it is meant for internal use...

withCallingHandlers({Sys.sleep(10)}, interrupt = \(cond) {.tryResumeInterrupt()})

This could be used to record that interrupts have been received, while still being able to finish important work.

king-of-poppk avatar Jul 28 '23 14:07 king-of-poppk

A better illustration maybe:

> withCallingHandlers({for (i in 1:10) {Sys.sleep(1); print(i)}}, interrupt = \(cond) {print("INTERRUPTED")})
[1] 1
[1] 2
^C[1] "INTERRUPTED"

versus

> withCallingHandlers({for (i in 1:10) {Sys.sleep(1); print(i)}}, interrupt = \(cond) {print("INTERRUPTED"); .tryResumeInterrupt(); print("HU")})
[1] 1
^C[1] "INTERRUPTED"
[1] 2
^C[1] "INTERRUPTED"
[1] 3
^C[1] "INTERRUPTED"
[1] 4
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
[1] 5
[1] 6
^C[1] "INTERRUPTED"
[1] 7
^C[1] "INTERRUPTED"
[1] 8
[1] 9
[1] 10

king-of-poppk avatar Jul 29 '23 16:07 king-of-poppk

Also, there is some significant overhead, but it really seems to continue from where it left off (somewhere in the middle of a sleep call):

> withCallingHandlers({for (i in 1:3) {x <- Sys.time(); Sys.sleep(3); print(Sys.time() - x)}}, interrupt = \(cond) {print("INTERRUPTED"); .tryResumeInterrupt(); print("UH")})
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
Time difference of 10.33775 secs
Time difference of 3.002125 secs
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
^C[1] "INTERRUPTED"
Time difference of 11.1719 secs

king-of-poppk avatar Jul 29 '23 16:07 king-of-poppk

Another option would be to have a dedicated channel for cluster clients to request a specific future to be "cancelled"/"unscheduled". It seems to me that perhaps interpreting interrupts to mean "interrupt the currently executing future on that worker" is too vague given "the currently executing future on that worker" is vague. And if you try to distinguish between those and interrupts that mean "interrupt the worker" I think you are in trouble. You can only rely on SIGINT and SIGTERM and I guess that is not enough to squeeze in more than what those are meant to be used for (graceful vs non-graceful kill of the worker?).

I assume you already have some sort of communication thread running on the worker?! So that those messages could be received while the future is executing.

king-of-poppk avatar Jul 29 '23 16:07 king-of-poppk

Maybe an easier first attempt is to simply drop in the following (or the C equivalent) at the very beginning of the worker:

interruptCount <- 0L
globalCallingHandlers(
  interrupt = \(.) {
    interruptCount <<- interruptCount + 1L
    .tryResumeInterrupt()
  }
)

and check/reset the count where relevant. Can also have different handling depending on whether job or management code is running. Or leave it to the user to explicitly handle interrupts within a job to abort the job early for example.

king-of-poppk avatar Jul 31 '23 12:07 king-of-poppk

I just tried

future::plan(
  future::multisession,
  rscript_startup = "globalCallingHandlers(interrupt = \\(.) .tryResumeInterrupt())"
)

and ~it does make things more robust, although not perfect.~

PS: Upon further inspection, it does not seem to make much of a difference...

king-of-poppk avatar Jul 31 '23 14:07 king-of-poppk

Maybe an easier first attempt is to simply drop in the following (or the C equivalent) at the very beginning of the worker:

interruptCount <- 0L
globalCallingHandlers(
  interrupt = \(.) {
    interruptCount <<- interruptCount + 1L
    .tryResumeInterrupt()
  }
)

and check/reset the count where relevant. Can also have different handling depending on whether job or management code is running. Or leave it to the user to explicitly handle interrupts within a job to abort the job early for example.

Thanks for looking into this and thinking about it. My gut feeling says that one cannot assume a calling handler can handle the case when there is a burst of interrupts coming in. For instance, what happens if there is another interrupt signalled when we're in the middle of the:

  interrupt = \(.) {
    interruptCount <<- interruptCount + 1L
    .tryResumeInterrupt()
  }

handler? Will that interrupt the handler? AFAIK, suspendInterrupts() should handle such cases.

Note that this issue is specifically about interrupts occurring while the main process and a worker process communicates (and it has nothing to do with interrupting futures). If such a communication is interrupted, further communication attempts will be out of sync and most likely result in rough failures. To handle that gracefully, one can either improve the communication protocol or ignore interrupts. This issue aims for the simpler solution, i.e. the latter.

FWIW, in R devel ("4.4.0"), serialize()/unserialize() can now be interrupted, i.e. they are no longer atomic, cf. https://github.com/HenrikBengtsson/parallelly/issues/29#issuecomment-716216232.

Also, there is some significant overhead, but it really seems to continue from where it left off (somewhere in the middle of a sleep call)

Note that Sys.sleep() cannot be interrupted, e.g. https://github.com/HenrikBengtsson/parallelly/issues/29#issuecomment-716212988

HenrikBengtsson avatar Jul 31 '23 20:07 HenrikBengtsson

Note that Sys.sleep() cannot be interrupted, e.g. #29 (comment)

OK. I am on MacOS Ventura 13.5 with R4.2 and I witness the following which seems contradictory:

y <- Sys.time()

globalCallingHandlers(
  interrupt = \(.) {
    print("INTERRUPT")
    y <<- Sys.time()
    .tryResumeInterrupt()
  }
)

for (i in 1:10) {
  x <- Sys.time()
  y <- x
  Sys.sleep(3)
  if (y != x) {
    print("Time elapsed since last interrupt:")
    print(Sys.time() - y)
  }
  print("Time elapsed in this loop iteration:")
  print(Sys.time() - x)
}

which logs

[1] "Time elapsed in this loop iteration:"
Time difference of 3.004497 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.0034 secs
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 0.7835841 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.005537 secs
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 2.884698 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003882 secs
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "INTERRUPT"
[1] "Time elapsed since last interrupt:"
Time difference of 0.01319003 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.002887 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003268 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.00247 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.00239 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.003064 secs
[1] "Time elapsed in this loop iteration:"
Time difference of 3.002458 secs

PS: OK, now I understand. You meant you cannot SUSPEND interrupts that occur within Sys.sleep.

king-of-poppk avatar Aug 01 '23 07:08 king-of-poppk

My gut feeling says that one cannot assume a calling handler can handle the case when there is a burst of interrupts coming in.

Indeed:

globalCallingHandlers(
  interrupt = \(.) {
    print("INTERRUPT")
    Sys.sleep(2)
    print("RESUME")
    .tryResumeInterrupt()
  }
)

Sys.sleep(10)

which logs

[1] "INTERRUPT"
[1] "RESUME"
[1] "INTERRUPT"
[1] "RESUME"
[1] "INTERRUPT"
Called from: Sys.sleep(2)

if you ^C too fast.

king-of-poppk avatar Aug 01 '23 07:08 king-of-poppk

AFAIK, suspendInterrupts() should handle such cases.

Here is what I tried, without success:

future::plan(
  future::multisession,
  rscript_args = c(
    "*",
    "-e",
    shQuote(
      "suspendInterrupts(parallel:::.workRSOCK())",
      type = "sh"
    )
  )
)

and the following, in case there is a Sys.sleep statement somewhere:

future::plan(
  future::multisession,
  rscript_args = c(
    "*",
    "-e",
    shQuote(
      "suspendInterrupts(withCallingHandlers(parallel:::.workRSOCK(), interrupt = \\(.) .tryResumeInterrupt()))",,
      type = "sh"
    )
  )
)

king-of-poppk avatar Aug 01 '23 08:08 king-of-poppk

For instance, what happens if there is another interrupt signalled when we're in the middle

I understand this can happen in general, however, I was hoping that this would never happen in my use case since I send at most one SIGINT per instantiated future (after it has been instantiated, via killNode(f$workers[[f$node]], signal = tools::SIGINT).

king-of-poppk avatar Aug 01 '23 09:08 king-of-poppk