histbook icon indicating copy to clipboard operation
histbook copied to clipboard

histbook/Spark questions

Open PerilousApricot opened this issue 6 years ago • 20 comments

Hey Jim (writing here since I think you prefer it over slack. lemme know otherwise)- I have a couple questions

** Is it possible to take the following code and fill via spark with a single fill() call?

wjet_met = Hist(bin("MET_pt", 100, 0, 200))
wjet_met.fill(wjet_df.where('HLT_IsoMu24 == True'))

dyjet_met = Hist(bin("MET_pt", 100, 0, 200))
dyjet_met.fill(dyjet_df.where('HLT_IsoMu24 == True'))
overlay(wjet_met.step(), dyjet_met.step()).to(canvas)

Filling 'at once' would let histbook perform the collects in parallel, which increases parallelism (if i'm reading the code right).

** I would like to normalize some histograms to 1, is there a native way to do this, or should I export to pandas, perform the manipulation, then convert from pandas back into histbook?

** Finally, is there some type of "subobject" operator in the expression syntax? I.e. to do something like the following pseudocode:

h = Hist(bin("sqrt( (x.MET_x - y.MET_x) ** 2 + (z.MET_y - y.MET_y) ** 2)"))
h.fill(x = object_1, y = object_2)

PerilousApricot avatar Aug 25 '18 06:08 PerilousApricot

One thing you can work with is a cut axis, which has two bins and requires a boolean expression. Although this wasn't your original question, you could make a histogram of events that pass HLT_IsoMu24 and those that fail like this:

wj_met = Hist(cut("HLT_IsoMu24"), bin("MET_pt", 100, 0, 200))
wj_met.fill(wjet_df)

which is nice because it puts all of the physics logic in the histogram definition. If you don't want the events that fail (you're not going to do an overlay or efficiency plot), you can put a "keep only" cut in the Hist definition like this:

wjet_met = Hist(bin("MET_pt", 100, 0, 200), filter="HLT_IsoMu24")

(the filter parameter simply multiplies the weight parameter, after converting booleans into 0 and 1).

Getting back to your question, you can use cut to distinguish between data samples in a single fill if the data samples were already combined into a single data frame. For instance:

mega_df = pyspark.join(wjet_met, dyjet_met)   # not sure of the syntax or how to preserve identities
h = Hist(cut("identity"), bin("MET_pt", 100, 0, 200))
h.fill(mega_df)

Or if you have several physics types identified by a categorical (string) "identity" field,

h = Hist(groupby("identity"), bin("MET_pt", 100, 0, 200))
h.fill(mega_df)

This groupby bin has the same meaning as Spark's "group by" and the same function is called when passing it to Spark. It's handled less efficiently with numpy.unique and dicts when dealing with Numpy arrays.

However, if you're not starting with a combined DataFrame, I can't see how mixing the sources and then doing a groupby to separate them out again can possibly be more efficient than filling them with separate queries (in parallel). Joins and group by (shuffle/reduce) are particularly bad for distributed datasets— I don't know why you'd want to invoke that pain. Can't you launch non-blocking jobs in PySpark? If so, wouldn't it be much better to launch a simple map-only job for each dataset in parallel?

If histbook needs to be taught how to work with non-blocking PySpark jobs, and/or how to interpret PySpark future objects, let's do it. Although this issue was originally a question, we can convert it into a feature request.

Second: normalizing histograms to 1— you can do it with normalized=True passed to step, line, area, or marker. These, in turn, pass it to table, which is used to convert histbook's internal format (which is purely additive, so no normalization allowed) into values to plot. It's also what turns histbook's internal format into Pandas DataFrames, so I guess I duplicate Pandas's functionality (you can normalize when making the Pandas DataFrame or in the Pandas DataFrame). But you definitely don't have to go through a Pandas DataFrame just to get normalization.

Third question: no, there isn't an "object" syntax. Although compatibility isn't 100% yet, I'm making histbook's syntax identical to NumExpr's syntax, which is what you use to pass apply strings to Pandas. I don't think that has an object type. It wouldn't make sense for Numpy arrays.

However, NumExpr is expanding in 3.0 and I'm adding awkward-arrays to Numpy, which have features like this. They have a natural analogy with Spark's structs, so we might see this in the future.

I'm leaving this issue open until you say whether you want to try to add non-blocking PySpark fills and if so, until that happens.

jpivarski avatar Aug 25 '18 12:08 jpivarski

Thanks for the help!

RE spark and blocking, I think we might be talking about different things.

The snippet above calls out to spark two separate times - once to fill() the wjets histogram, and another to fill the dyjets() histogram (according to the spark UI, this occurs here [1]). This is necessarily a blocking operation -- each call to collect() return a python object and not a future, ~but this also means that each value in query is evaluated sequentially~ (EDIT I noticed that the iterator is over each row in the query, not multiple queries in a row).

The above serialization is obviously an intra-expression optimization, but it applies to multiple independent expressions as well. The following pseudo-code...

hist_1.fill(bg1_df)
hist_2.fill(bg2_df)
...
hist_n.fill(bg3_df)

... performs n collects in serial.

My question is: Instead of the above snippet, is there a way to do the equivalent of

something(a book??).fill(bg1=bg1_df, bg2=bg2_df, ..., bg3=bg3_df)

...to fill several histograms in a single call? That call could then (later) be optimized similar to [2] to allow spark-level parallelism.

[1] https://github.com/scikit-hep/histbook/blob/7fa396b0be978f6e2a627799ffe31d198475bd9e/histbook/calc/spark.py#L299-L301 [2] https://github.com/scikit-hep/histbook/blob/fd09a5713c7ebd8d1d6de9c7e7a206e676f6a593/histbook/book.py#L557-L565

PerilousApricot avatar Aug 25 '18 21:08 PerilousApricot

You can fill many histograms at once with a Book (it creates a single query for Spark to optimize however it wants to), but they have to be filled from the same DataFrame (naturally). One call to fill accesses one DataFrame. However, your data starts in multiple DataFrames, and I don't see how it could possibly be more performant to join the DataFrames into a single DataFrame (expensive join) and then use the query to separate them out (expensive group-by).

That's why I suggested non-blocking calls, because what you want to do is several disconnected queries, starting from different DataFrames, going to different histograms, at once.

If Spark has some sort of "multi-query," that could be another way.

jpivarski avatar Aug 25 '18 22:08 jpivarski

Right -- I have multiple dataframes (one per sample), and I'd like to fill multiple histograms. I'm not wanting (or suggesting!) to combine them into a single dataframe, then separate them back out*.

You say:

they have to be filled from the same DataFrame (naturally)

... and I'm asking if it's instead possible to fill from multiple dataframes. Perhaps each histogram has two axes -- one that is the regular Bin() axis, with the 2nd being some type of GroupBin() or GroupBy().

I get that you're suggesting non-blocking calls, but that doesn't exist in the pyspark API (and I'm not sure what an aysnc API would look like). My (theoretical) implementation at the bottom would be to re-use the Book code which runs a thread-per-calculation, which allow multiple (blocking) pyspark queries to run in parallel. (FWIW, pyspark is thread-safe, and this is the recommended concurrency idiom)

  • that being said, Spark's support for partition discovery (ref) could be interesting, since the join/separation would happen on a per-partition basis, which should be performant. Again, I'm not suggesting or trying to do that.

PerilousApricot avatar Aug 25 '18 22:08 PerilousApricot

This reminded me— Spark has a non-blocking interface, but PySpark doesn't, so Book runs independent queries in Python threads.

https://github.com/scikit-hep/histbook/blob/master/histbook/book.py#L557

But maybe that's what you're telling me.

I don't understand what it would mean to fill separate axes of a single histogram with different sources/DataFrames. Those axes are dimensions of a single space.

If the end-goal is to have a histogram with a categorical axis where each category represents data from a different source, you can fill them separately (in parallel or not) and combine them with Hist.group.

https://github.com/scikit-hep/histbook/blob/master/histbook/hist.py#L610

This is to support plotting options like stacking histograms that come from different sources using the single-histogram stack method.

Sorry if I seem scattered, missing your point— I'm being tugged by a 5 year old, yet thought I could answer your question anyway.

jpivarski avatar Aug 25 '18 23:08 jpivarski

Spark has a non-blocking interface, but PySpark doesn't, so Book runs independent queries in Python threads.

https://github.com/scikit-hep/histbook/blob/master/histbook/book.py#L557

Exactly! That's what I was trying to (inarticulately) say above. Since PySpark's API is blocking, the user has to spawn multiple threads if it wants to execute multiple queries simultaneously. Since "the work" is happening on the other side of the JVM bridge, the GIL gets released, and Spark can scatter tasks to all the various

I'm wanting to do the same for multiple DataFrames. The "typical" HEP use case (if that exists) will have O(10) input samples, which all will be histogrammed with identical parameters. Like the Book case of executing multiple queries over a single dataset, I'm wanting execute N queries over multiple datasets.

I don't understand what it would mean to fill separate axes of a single histogram with different sources/DataFrames.

Well, from a physics perspective, it would be a single histogram of some kinematic variable with all of the different samples. You could hack something similar by joining all the DataFrames together with a "category" column, then splitting them apart in histbook. But, like you mention, that's not going to be very performant.

you can fill them separately (in parallel or not) and combine them with Hist.group.

Another option would be to create a histogram with Hist.group and then fill them in parallel (perhaps something like fillGrouped() if the existing fill() API doesn't match).

PerilousApricot avatar Aug 26 '18 01:08 PerilousApricot

Not your problem— I'm distracted and not only forgot what I had implemented, but also didn't realize you were telling me about it.

It sounds like we want to add non-blockingness to fills in general. uproot has non-blockingness implemented throughout— reading baskets, branches, and whole trees have an optional "blocking" parameter to block or not block, and getting non-blocking at a higher level of composition is a matter of setting blocking = False when calling into the inner levels.

There are a lot of futures frameworks out there, but for uproot, I opted for a simple one: when blocking = False, you get a zero-argument function that returns what you really want. Calling the function means joining the thread— waiting for it to finish and returning the result. The difference with uproot is that uproot was doing computational work in the asynchronous tasks, so they're added to an executor (no more than N run at a time). When offloading work to Spark, there's no limit to how many you can run at once; when computing locally with Numpy, you'd want an executor like uproot.

This applies to your case with multiple sources because they're embarrassingly independent fills that want to run at once.

Actually, fill has no return value, so even my minimalistic future that's just a zero-argument function to join the thread is overkill. With no return value, a threading.Event would be sufficient.

Oh, and there's one more relevant ingredient: if you have a big, complex Book, you can select a subtree with book.view("glob*pattern") and call fill on that. That allows you to fill part of a Book with one source and another part of a Book with another source. Combining that with a general non-blocking feature would let you fill up patches of the Book any way you wish, maximizing the use of time.

The one thing you can't do is fill different parts of a single Hist from different sources. Apart from different categories of a categorical axis, I don't know what that would mean. For different categories of a categorical axis, Hist.group is the way to do it— likely more efficient than any actual filling with a groupby axis.

jpivarski avatar Aug 26 '18 01:08 jpivarski

Actually, i just remembered that Hists and Books are not thread-safe... yet. That's a planned improvement that should probably go in tandem with a composable "blocking" parameter.

Along with views, I should point out that there's a SamplesBook with a constructor that copy-pastes all the histograms it is given, labeling each for a different physics sample. The point of this is to make a hundred histograms representing a lot of physics quantities for one sample, then getting the same for all your samples. Then you can view only the subtree representing a single sample and fill that with the appropriate source, and repeat for all sources. However, each fill blocks— that would need to change. Also, we need plotting functions that cut across Hists in a Book to stack the elements of a SamplesBook— that would really help. They're guaranteed to have compatible bins.

jpivarski avatar Aug 26 '18 02:08 jpivarski

I'll look into the SamplesBook, I guess the fill function could be overrided there to provide multiple dataframes at some point.

RE needing different plotting routines -- I guess that's another reason to treat different samples as an extra axis in the existing histogram classes -- in that case, the plotting would come "for free" without having to special-case how to plot SamplesBooks? Or am I reading the code wrong.

How would I hack the current plotting functions to do something that combines overlaying a step

overlay(wjet_met.step(normalized=True), dyjet_met.step(normalized=True)).to(canvas)

visualization

...with providing the legend:

Hist.group(WJets=wjet_met, DYJets=dyjet_met).stack("source").step("MET_pt").to(canvas)

visualization 1

I'm having trouble groking the "building blocks" to the point that I can synthesize new solutions on top, so I'm still stuck at the stage of looking at examples in the README and trying to modify them :(

Thanks!

PerilousApricot avatar Aug 26 '18 03:08 PerilousApricot

But I guess we're getting close to the use-case.. I have 10 samples and 100 different histograms I want to generate/plot. How do I do that in histbook efficiently, in terms of both CPU resources and development time? This thread gets my understanding a lot closer.

BTW, it probably won't hit for spark 2.4, but it looks like the limitations of the pyspark<-->pandas integration is being worked on, so it might be possible to sensibly have the executors run NumExpr/numpy/pandas over subblocks of the whole dataset in an intelligent way

PerilousApricot avatar Aug 26 '18 03:08 PerilousApricot

The code you showed that made the two plots are exactly how those functions were intended to be used— there's nothing "hacky" about it. (It is both the intended syntax and the intended use-case.)

In the second case, though, you must have said .stack("source").area("MET_pt"), right?

My comment about SamplesBook having plotting routines would be for doing this in bulk, to make it more convenient.

Doing separate fills in parallel would make more effective use of the CPU because if we launch all queries at the same time, Spark can fill in the cracks and keep processors busy. That's why I'm thinking you're asking about a non-blocking interface.

With a non-blocking interface to a single fill, it we'd become possible to write a high-level "multi-source fill" function that blocks, though the fills that it calls internally don't block. However, I'm having trouble thinking of a syntax that would express the user's intention of which sub-histograms should get filled by which. Maybe this?

book.multifill({"view*glob": df1, "other*view": df2})

A high-level function like this could ensure that none of the fills overlap, which you can't guarantee in separate calls.

jpivarski avatar Aug 26 '18 11:08 jpivarski

At present (without adding new code), the appropriate way to make stacked histograms is exactly what you did: fill them separately and use Hist.group to combine them so that the normal plotting functions apply.

To fill multiple histograms in parallel, you'd have to put the calls in separate threads manually. histbook doesn't yet have a story for parallel processing.

If you have a lot of them, these operations can be automated in a loop.

The user interface for histbook is very much a work in progress— I feel that the basic building blocks pieces are right, but there's a lot that can be done at a higher level. But deciding what the shape of that should be should come from real use-cases like yours.

jpivarski avatar Aug 26 '18 11:08 jpivarski

I guess perhaps it's a specific combination that's currently disallowed I'm trying to do.

Hist.group(WJets=wjet_met, DYJets=dyjet_met).stack("source").step("MET_pt").to(canvas)

yields

TypeError: only area and bar can be stacked

If I drop the stack, it appears only one of the group is rendered

PerilousApricot avatar Aug 26 '18 13:08 PerilousApricot

stack only works with bar and area because you need to color in the area to see that the data are cumulative. Also, I think that Vega Lite required it, and I didn't want it to be possible to generate Vega Lite JSON that can't render.

If you drop the stack, you'll see the sum of all categories (which might be hard to notice when normalized = True). These plotting commands project the N-dimensional space onto however many dimensions you're plotting. With only area, that's one dimension. With stack and area, that's two dimensions. With beside, stack, and area, that's three dimensions. That's how it works— you fill as many dimensions as you want to study and then decide later how you want to plot it.

If you want to slice the space (eliminating bins), rather than projecting it (combining bins), use select upstream of the plotting.

jpivarski avatar Aug 26 '18 14:08 jpivarski

I'm still trying to grok the "building blocks", unfortunately. Can I use overlay as another dimension?

I'd like to overlay the step of two different samples on each other (this is good to, for instance, compare the kinematic distributions of multiple signal samples) and have each sample be colored/labeled separately. How would I implement that?

PerilousApricot avatar Aug 26 '18 14:08 PerilousApricot

There's an overlay method, which presents a dimension of the data as a graphical attribute, just like stack (and beside and below).

There's also an overlay method, which overlays any two graphics. That might be what you need

jpivarski avatar Aug 26 '18 14:08 jpivarski

The following is close, but it unfortunately doesn't label/colorize the samples

overlay(wjet_met.step(normalized=True), dyjet_met.step(normalized=True)).to(canvas)

visualization

PerilousApricot avatar Aug 26 '18 14:08 PerilousApricot

No, by combining separate graphics, it no longer knows what they mean and therefore can't create a legend. It also doesn't make them different colors by default, but you can pass Vega Lite JSON through the step method to manually set the color.

If you're going a manual route, there's another option: install Altair and output histbook graphics a Altair (pass .altair() instead of .to(...)). Then Altair has methods to combine things and add custom legends.

If you find that kind of interaction easier, then perhaps that should be the development direction: use histbook for filling and Altair for graphics manipulation.

jpivarski avatar Aug 26 '18 15:08 jpivarski

Would that limitation be a reason to put the multiple samples into a single histogram, so the "meaning" is still known to histbook?

There's a handful of "standard" plots that are common between all analyses (for instance, stack plot of MC w/data as points), would it be helpful to perhaps enumerate the plots that would be desired to help define the scope of what "we'd" be looking for?

PerilousApricot avatar Aug 26 '18 15:08 PerilousApricot

Yes, that would definitely be useful. I started development based on what I remember being important— a stack plot chief among them. The standard stack plot is particularly challenging because the different parts come from different sources, and that breaks the chained workflow model of Spark.

This is no problem for a traditional HBOOK/ROOT style analysis because histograms are all free floating objects that can be filled out not filled by placing them in different parts of multiple event loops, but this disregards all relationships between the unbinned data and the binned data— everything must be manually maintained by the user, from booking hundreds of separate histograms to filling them in the appropriate places to combining them in the plot— the user must maintain these relationships by hand. Of course, you can do that with histbook with one-dimensional Hist objects that have trivial expressions (just "x") and the overlay function, but I wanted to fix this in Histogrammar and later histbook.

Chained functions in Spark give us the ability to express relationships once and let the framework keep track of the bookkeeping. The interface in histbook now is a start toward that, but I think there should be higher-level interfaces on top of that to do standard kinds of plots.

jpivarski avatar Aug 26 '18 16:08 jpivarski