Add Support for Remote Resources
First of all, sweet project! I have been dreaming about writing something exactly like this in Go, but my master's degree has kept me too busy to give it a shot.
One super critical piece of functionality for me is the ability to run jobs on remote resources (mainly over SSH). Something just like the functionality in GNU Parallel. I have access to a few powerful machines, and it is nice to be able to distribute jobs to all of them automatically.
I read the documentation on your website (very nice by the way), and I think your design could nicely accommodate this functionality. What I'm envisioning is creating multiple types of Resource objects (LocalResource, SSHResource, SLURMResource, etc) that all expose a consistent Resource interface and handle all the details of executing a Task on that Resource.
You would then attach any number of these resources to a Process object. The Process object is responsible for sending Tasks to it's constituent Resources, and each Resource object is responsible for copying over any inputs to a Task, running the Task, and copying back any outputs.
Thoughts? On a high level this seems doable, but I'm not sure how painful this would be to actually implement in the guts of your code. Are you taking pull requests? I'd love to give this a shot as soon as I get the chance.
Hi @kwrobert and thanks for the input and kind words!
This request is timely, as I'm looking at refactoring the IP (possibly corresponding to what you call "resource"?) into a more composable core, which different types of implementations can extend, via struct embedding.
I actually started this, which can be seen in the naming of the IP interface, the BaseIP struct and the FileIP one, in https://github.com/scipipe/scipipe/blob/master/ip.go ... but this is unfinished work, and not really cleaned up in that particular file (be warned!).
I have successfully created a re-usable core for Process and Tasks, but the IP one requires a bit more thinking to get right.
There is some really tricky things to think through to get this right. I have some notes spread out over issues and my journal etc. Will see if I can try to collect them in one place, perhaps in this issue.
For what you mention about remote task execution, that sounds vaguely a little bit more like Task Execution Schemes and the one implementation of a TES-supporting task-running server I know (Funnel)?
Important to note is that there is a one-to-many relationship between tasks and data, so it's important not to tie the concepts of data and tasks too closely together. This is not always becoming visible in data science workflows in industry where people are often just processing one big table-like dataset at a time, but it shows up in biology where multiple inputs and outputs per task is very common (I have outlined this realization in an old blog post: http://bionics.it/posts/workflows-dataflow-not-task-deps ).
So, reg. pull requests, I have to warn that I already have some thoughts about this. OTOH, I will always be interested to compare notes, and approaches, but I will also push really hard to get a really generic solution, according to a few patterns that have emerged. It will for sure help a lot to have more eyes and brain cells look at this though! :)
@kwrobert So, I realize I should perhaps write up a few short notes on the design thinking for the current plans, which could be a starting point for new contributors (to review and discuss, before starting to code).
Notes would be great! It would help keep everybody on the same page.
I was wondering what this IP stuff meant (Information Packet, right?). So I can kind of imagine where this is going (we have various types of IPs flowing through a data pipeline).
So when I use the word Resource, I'm referring to a machine where Tasks can be executed. You have your current local machine (LocalResource) from which you start the pipeline, but maybe you also have SSH access to a powerful workstation somewhere or a virtual machine in the cloud (SSHResource). Finally, maybe you're an academic that has access to a big cluster running the SLURM job scheduler (SLURMResource). Each of these Resources would be able do the following things:
-
Consume a Task given to it by a Process
-
Copy any input IPs for the Task to the remote location. This might get tricky if you're using pipes and not files, but you could still stream input data up to the remote resource. It's also fine for there to be many input IPs for a single Task, we just need to copy them all.
-
Execute the Task on the remote resource
-
Copy any output IP's back to the local machine you started the pipeline from.
-
Notify the Process object that the Task has completed. I'm not sure how this is currently handled in the code. Do you check for the presence of specified outputs to signify a task is done?
In my mind, it's really important to avoid any sort of server-based architecture, so the implementation I'm envisioning isn't similar to the links you posted. Setting up and running servers is usually a bit of a pain, especially if you have a bunch of different resources you need to run jobs on. It also feels like overkill for our academic kind of use case where the number of tasks is small compared to some sort of industry use case that has a massive number of tasks to execute.
I also envision SSH as the primary communication protocol to remote Resources. It's simple, secure, and the Go standard library has an implementation of it.
Finally, I think this would all require moving func (t *Task) Execute() from Task objects into Resource objects. So we would have something like func (r *Resource) Execute(t Task) which takes a task and completes all the steps listed above.
Does that clarify my thinking? How do feel about this? Do you think it would work in the code base?
I was wondering what this IP stuff meant (Information Packet, right?).
Yes, a slightly awkward bit of flow-based programming lingo.
So I can kind of imagine where this is going (we have various types of IPs flowing through a data pipeline).
Exactly. Although, in batch workflows, the IPs are not really long-lived as in the original FBP ideas, but rather short lived, between two processes only, and typically immutable.
So when I use the word Resource, I'm referring to a machine where Tasks can be executed.
Ah, that makes sense!
In my mind, it's really important to avoid any sort of server-based architecture, so the implementation I'm envisioning isn't similar to the links you posted. Setting up and running servers is usually a bit of a pain, especially if you have a bunch of different resources you need to run jobs on. It also feels like overkill for our academic kind of use case where the number of tasks is small compared to some sort of industry use case that has a massive number of tasks to execute.
Actually very much second this, and I'm happy you're saying it! :) I've been looking at TES / funnel with a kind of worry, since it would make the scipipe code base which is currently only just over 1k LOC, a lot larger and more complex.
Finally, I think this would all require moving func (t *Task) Execute() from Task objects into Resource objects. So we would have something like func (r *Resource) Execute(t Task) which takes a task and completes all the steps listed above.
I see what you mean.
These are all really interesting ideas I think. I like your focus on simplicity!
Some thoughts coming to mind off arm: "Resource" might be a slightly vague term, as it is often used to refer to other things like the data items (which is why I thought you were referring to the IPs (which also might not have the optimal name :D)). Could it be "Executor" or similar? A clear name might help think more clearly about how it would fit into the code base as well.
I'll need to think more about how it can fit into the code base, but I like the general direction, and would love it if we could find a simple yet robust way of "light-weight task distribution"! It's been on the wish list for a long time :) (We used the light weight distributor in luigi a bit, but I think it is a bit more focused on a shared file system (this is what we used) or other data store).
Absolutely, I'm totally open to changing the name from "Resource" to "Executor", and I agree the term "Resource" feels somewhat ambiguous. Now that we're on the same page about what we mean here, choose a (reasonable) name you like best and I'll adapt :)
Absolutely, I'm totally open to changing the name from "Resource" to "Executor", and I agree the term "Resource" feels somewhat ambiguous. Now that we're on the same page about what we mean here, choose a (reasonable) name you like best and I'll adapt :)
:+1: :slightly_smiling_face: To me, Executor is fine. At least it would follow the existing naming in scipipe somehow, where I've made a distinction between processes' Run() methods, which are "long-running", and tasks, which are more distinct (in "space and time" :)) and are "executed" once. Maybe subjective :) but it seemed reasonable to use separate naming for these at least, because of their differing behavior.
@kwrobert And, to clarify, refactoring to enable custom executors, should not clash too much with my work on refactoring the IPs, as we now figured these are different concepts :slightly_smiling_face: , so I don't see too many blockers, off arm for a PR (so in other words, go ahead! :slightly_smiling_face: )
JFYI: Just found in my notebook now I have some sketches on an API for this, (from January or so I think :P):
Something like:
helloWorld := myWorkflow.NewProc("hello_world", "echo 'hello world' > {o:out}")
helloWorld.AddExecutor("slurm", scipipe.NewSlurmExecutor(
"1:00:00",
"HelloWorld",
"myProject_20180131",
"core",
"8"
))
... which would then allow switching between multiple configured executors on a whim:
helloWorld.SetActiveExecutor("slurm")
... or:
helloWorld.SetActiveExecutor("local")
... etc.
We found this (being able to easily switch) very helpful when switching between debugging locally, and executing large jobs remotely.
proc.AddExecutor() would of course take an Executor interface rather than a concrete struct type, so that different executors, even completely custom 3rd party ones, can easily be plugged in.
Cool! This is exactly the kind of interface I was envisioning. I do have a few thoughts about this.
I think the ability to attach multiple Executors to a Process would be really cool, and I think the notion of an "ActiveExecutor" might limit this capability. Instead, one should be able to add many Executors to a Process, and the Process becomes responsible for distributing Tasks among its attached Executors. This would require some sort of scheduling algorithm to be included in the Run method of Process, but this could be made a simple as you like (just a simple round robin algorithm to start, and we could get fancy with it later). So to sketch what I'm thinking, an Executor should have channels for incoming and completed Tasks:
type Executor interface {
// This function just receives a Task, and returns an error if
/// the Executor can't take any more tasks at the moment
ReceiveTask(t *Task) Error
// This should be a loop that runs continuously and executes
// tasks as they are received. All completed tasks should
// has the Task.Done signal set
ExecuteIncomingTasks()
// Stop receiving incoming tasks, finish all currently running tasks,
// cleanup and exit
Close()
}
and a Process should have a slice of Executors as an additional member
type Process struct {
// other stuff ...
Executors []Executor
}
And I think the Run method of Process would have something like this:
EDIT: I just realized this is not round robin scheduling, and you end up always filling up the first Executor before moving on to the others, but hopefully this illustrates the general idea.
// Start up ExecuteIncomingTasks for each Executor
for executor := range p.Executors {
go executor.ExecuteIncomingTasks()
defer executor.Close()
}
tasks := []*Task{}
for t := range p.createTasks() {
tasks = append(tasks, t)
taskReceived := false
for taskReceived {
for executor := range p.Executors {
// Pass in a pointer to a Task so we can set Task.Done
err := executor.ReceiveTask(t)
// if err == nil, the Task has been received!
// If we got an error back, the executor can't accept any more tasks
// at the moment and we need to try giving this task to another executor
if err == nil {
taskReceived = true
break
}
}
}
}
// make sure all tasks are finished, this logic shouldn't change from the existing loop at all
And each special Executor type would implement ExecuteIncomingTasks differently depending on the type of Executor it is (Local, SSH, SLURM, etc).
What have I missed here? Because each Task contains its InIPs and OutIPs it should be possible to do all data transferring within ExecuteIncomingTasks using just the information contained in the Task itself, right?
BTW, the fact that this should be straightforward to implement is a testament to your design. Well done! Also sorry for the super long comment!
Many thanks for the detail input @kwrobert ! Being able to distribute work among multiple executors would indeed be cool!
I still think one needs the ability to quickly switch between running e.g. only locally, and distributing the work to everything available, but I'm sure both of these goals can be combined. In the simplest form, one could just allow SetActiveExecutor("*"), and that would activate all configured executors ... or something similar.
Regarding the implementation of the executors, overall looks legit!
About taking tasks via channels though, I'm still not sure (but still thinking!). Right now, the scheduling logic is guaranteed by each process keeping an idle go-routine per task, waiting for it to return from (blocking) shell commands before proceeding. Handing off a task to an executor might change this logic considerably. Perhaps this is needed to get true scalability (will need to think more), if so be it, but just raising a flag that it might complicate things a bit. Will easily be convinced by well-working code though :)