flux-core
flux-core copied to clipboard
Stream job events from flux
I don't know if existing functionality can reasonably satisfy this or not, so I will describe the use case that we have in mind:
When flux is used as the cluster-level scheduler ("native flux"?), we would like a to generate messages in real-time for job scheduling related events. Those events would include, but not be limited to:
- job submission
- job changes while queued
- job start
- job end
- events related to job prolog/epilogs
We would like the job information in the events to include all relevant information about the job, for instance:
- user name
- requested resources (maybe include the jobspec?)
- allocated resources
- event timestamp
- queue name (or flux equivalient)
- bank name
It is open for discussion whether all data is included in each event, or perhaps just the data that has changed since the previous event for the same job.
The goal of the job event interface in flux would be to allow us to publish the data to a topic in Kafka. In turn there would be one or more consumers of the job events topic. One of the first consumers would be the elasticsearch database that will back the operational monitoring dashboard implemented in grafana.
Can we cleanly achieve this with current APIs in flux? Will we need more work in flux-core (or somewhere else in flux?) to meet this need?
I think if we sent you jobspec, R, and the primary job eventlog for each job, you would have everything on your list (or if things are missing we can fill them in).
If you're curious you can examine these on a flux system like corona with
$ flux mini submit ...
<prints JOBID>
$ flux job eventlog --format=json JOBID
$ flux job info JOBID jobspec
$ flux job info JOBID R
Although you could write something now using flux APIs to grab this stuff in real time, when we heard your talk in S3G meeting, we were wondering about a new service in flux that could efficiently provide this as well as serve some other monitoring requirements we have.
Other questions we might want to discuss
- any data on your list that is not provided by those three things (bank comes to mind)
- do you require the raw data to be reduced into something simpler, or could we provide say RFCs describing the raw data and tools for manipulating it instead?
- edge cases: catching up after disconnecting, is duplicate data OK, etc.
- does the data need to be anonymized or otherwise sanitized?
- hmm, what else can we do with the entire center's flux job data in a nosql database???
Those are good questions.
I don't think the data necessarily needs to be simpler as long as it is present. The plugin's job would be to convert the data into whatever destination format needed (my immediate use case being something like an Avro schema for use in Kafka). But if the format is especially difficult to parse and use maybe I would change my mind about that. :)
Yes, the edge cases are good to consider. I don't fully know the interfaces yet, but it sounds like the current job eventlog messages do not contain all of the information we would need. The event just gives us the poke to go and look up the other information (jobspec, R, etc.). I would want to know the semantics around that. For instance, if a job is purged, does that data get deleted from the KVS? Are races possible, such as a job being submitted and purged before the plugin has a chance to look up the data? Maybe we need the relevant data to be contained in the event so the event messages are more atomic?
Whether or not we can handle duplicate data probably depends on the schema format that we describe, the format in the database(s), and maybe other factors. It wouldn't hurt to try to make the messages idempotent. If we can maintain the same ordering and content of messages in the job event stream, then it might be possible to have kafka help us throw out the duplicate messages. But don't hold me to that.
Even if a duplicate message makes it through Kafka, probably the databases wouldn't mind if we write the data twice.
Probably most of the data would not need to be anonymized, because the data will be protected behind authentication and authorization through most of the pipeline. But I wouldn't completely rule that out. Anonymization might happen at different levels in the pipeline, flux-core, the plugin that consumes job events, Kafka (Kafka Streams?), or even just have field-level authorization in the database if that is supported. It is probably too early to decide where we want to do that.
Is it possible to create a wildcard watch on the kvs or something to get job events for all jobs? It might make it more difficult to have a reproducible ordering of events (for recovery situations) using that approach?
I brought this up with @garlick on a call. It was just a brainstorming, I'm not sure if its a good idea or not, but I'll throw it out there.
I'm wondering if the job-list
module should provide a streaming service similar to other streaming services in flux. It could stream all job data changes that occur over time. On restart, it could re-stream all recovered job info. Some positives of this approach:
-
job-list
already parses all the events and their nuances -
job-list
already gets and parses jobspec, R, etc. - if changes/additions happen underneath the covers in events, jobspec, R, etc., all "job data readers" would be oblivious to them.
job-list
will hide it all (off the top of my headjobspec-update
events may happen sometime in the near future)
the bad:
- readers limited to whatever
job-list
decides to send - probably more, i can't remember right now, maybe limit to access of some data b/c
job-list
only sends data that is deemed "ok" to be retrieved by any user.
Sorry for my ignorance but is kafka running on the management node such that flux can more or less assume that it's always running and ready to accept data? Just wondering if maybe flux could get away with "open loop" publication of the required data to kafka for a start.
Kafka is not necessarily on the management node. Our current thinking in LC is to use the gateway nodes for kafka. FYI, Kafka is a group of daemons on different nodes working together, not a single node (assuming that the cluster is large enough to have more than one gw node).
The libraries for Kafka are pretty darn smart from what I have seen. librdkafka is the C/C++ client library, and employs threading and in memory caching to make Kafka producer calls asynchronous.
So at one level, the kafka plugin in flux could pretend that Kafka is always available, even if it isn't. Hopefully everything is asynchronous, even the initial creation of the kafka producer handler. At some point the cache would fill and I'm not sure what would happen then. The library is designed to be reliable, so additional calls might block at the point. If it is configurable, maybe we allow it to throw away older data. It all depends on our posture towards losing flux job data.
And data would be lost if flux was restarted before the kafka producer code could publish all of the data.
The job-list idea sounds interesting. We would need it to allow all job data to be retrieved by the plugin running in the flux.
What other streaming services are there in flux that I can look at for comparison?
What other streaming services are there in flux that I can look at for comparison?
Nothing at the level of what I'm thinking of that would go into job-list
. kvs-watch
is the one that comes to mind, as it tracks changes in a kvs key.
>flux kvs get --watch a > foo.out &
>flux kvs put a=2
>flux kvs put a=3
>flux kvs put a=4
>flux kvs put a=5
>cat foo.out
1
2
3
4
5
The job-manager event journal?
Yeah the job manager event journal is what I was thinking, with fetching of R and the jobspec, producing one stream for kafka.
Could we just store the raw jobspec and R (these are JSON) for each job along with all of the events and have Kafka or some other data processing extract the necessary data from these objects? It would seem that limiting the stored data to what is just available in job-list
for instance would possibly prevent other interesting queries (the base job environment is in jobspec for instance).
Sorry if that is a naive suggestion, but even modern sqlite can query stored JSON objects directly.
I am thinking some job-list
streaming service may not be the way to go, as it would limit what data is available.
Yeah the job manager event journal is what I was thinking, with fetching of R and the jobspec, producing one stream for kafka.
perhaps a requestor of the stream, via a flag, could say "hey, send me jobspec and R after they are available?" And they would be interjected into the event stream? Perhaps a new event called job-data
with context {"type":"jobspec", "data":<the jobspec>}
?
Hmmm. I wonder if such a stream would make the job-list
module easier, as it doesn't have to bother to get jobspec
and R
on its own. There would be trickiness in making sure the published job state in job-list
is not "transitioned" before we have read / parsed jobspec
/R
.
Well I was thinking at first this could perhaps be prototyped as a python script that consumes the job-manager journal. That would allow to quickly see how it goes without spending a lot of time building a new streaming service into flux-core.
There is this: https://docs.confluent.io/kafka-clients/python/current/overview.html