tiled icon indicating copy to clipboard operation
tiled copied to clipboard

Add `composite` structure family

Open danielballan opened this issue 1 year ago • 27 comments

~This builds on commits from #661 and should be merged after it.~ [Update: #661 is in, and this has been rebased.]

Problem statement

This PR is designed to solve the same problem that the pandas BlockManager^1 solves: presenting data in a flat namespace to the user, but enabling groups of items in that namespace to be transparently backed by shared data structures, for better performance.

For example, data coming from Bluesky includes data stored directly in the Event documents and large data written externally by detectors as arrays. The data in the Event documents is a great fit for tabular storage and transfer formats (e.g. Feather, Parquet, even CSV...). The externally-written data is not; it is better stored and transferred in large N-dimensional array formats like Zarr, HDF5, or a simple C-ordered buffer.

Users focused on science would like to smooth over these details. That is, we want to store and (often) move the data like this:

data
├── table
│   ├── motor_readback
│   ├── motor_setpoint
├── image_array

But offer a way to model it to the user in a flat namespace:

data
├── motor_readback
├── motor_setpoint
├── image_array

When writing (especially appending) the client will want to use the former view, so both views need to be available.

Solution

This PR adds a new structure family, union. The name is inspired by AwkwardArray UnionForm. It holds a heterogenous mixture of structures (e.g. tables and arrays). It enables the columns of the table and the arrays to be explored from a flat namespace. Name collisions are forbidden. But it also describes the underlying structures individually, enabling them to be read or written separately.

To the user, this behaves much like a Container structure, would:

In [2]: c['x']
Out[2]: <UnionClient {'A', 'B', 'C'}>

I can, for example, access fields by key and download data:

In [3]: c['x']['A']
Out[3]: <ArrayClient shape=(3,) chunks=((3,),) dtype=int64>

In [4]: c['x']['A'][:]
Out[4]: array([1, 2, 3])

In [5]: c['x']['C']
Out[5]: <ArrayClient shape=(5, 5) chunks=((5,), (5,)) dtype=float64>

In [6]: c['x']['C'][:]
Out[6]: 
array([[1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.]])

Digging a little deeper, we can see a difference from Containers. The union shows that A and B are backed by a table (coincidentally named "table", could be anything) while C is standalone array.

In [8]: c['x'].contents  # The name `contents` is up for discussion...
Out[8]: <UnionContents {'table', 'C'}>

In [9]: c['x'].contents['table']
Out[9]: <DataFrameClient ['A', 'B']>

In [10]: c['x'].contents['C']
Out[10]: <ArrayClient shape=(5, 5) chunks=((5,), (5,)) dtype=float64>

In [11]: c['x'].contents['table'].read()
Out[11]: 
   A  B
0  1  4
1  2  5
2  3  6

In [12]: c['x'].contents['C'].read()
Out[12]: 
array([[1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.]])

The structure of the union node reveals more detail; expand to view:

In [16]: from dataclasses import asdict

In [17]: asdict(c['x'].structure())
Out[17]: 
{'contents': [{'data_source_id': 1,
   'structure_family': 'table',
   'structure': {'arrow_schema': 'data:application/vnd.apache.arrow.file;base64,/////+gCAAAQAAAAAAAKAA4ABgAFAAgACgAAAAABBAAQAAAAAAAKAAwAAAAEAAgACgAAAEACAAAEAAAAAQAAAAwAAAAIAAwABAAIAAgAAAAIAAAAEAAAAAYAAABwYW5kYXMAAAkCAAB7ImluZGV4X2NvbHVtbnMiOiBbeyJraW5kIjogInJhbmdlIiwgIm5hbWUiOiBudWxsLCAic3RhcnQiOiAwLCAic3RvcCI6IDMsICJzdGVwIjogMX1dLCAiY29sdW1uX2luZGV4ZXMiOiBbeyJuYW1lIjogbnVsbCwgImZpZWxkX25hbWUiOiBudWxsLCAicGFuZGFzX3R5cGUiOiAidW5pY29kZSIsICJudW1weV90eXBlIjogIm9iamVjdCIsICJtZXRhZGF0YSI6IHsiZW5jb2RpbmciOiAiVVRGLTgifX1dLCAiY29sdW1ucyI6IFt7Im5hbWUiOiAiQSIsICJmaWVsZF9uYW1lIjogIkEiLCAicGFuZGFzX3R5cGUiOiAiaW50NjQiLCAibnVtcHlfdHlwZSI6ICJpbnQ2NCIsICJtZXRhZGF0YSI6IG51bGx9LCB7Im5hbWUiOiAiQiIsICJmaWVsZF9uYW1lIjogIkIiLCAicGFuZGFzX3R5cGUiOiAiaW50NjQiLCAibnVtcHlfdHlwZSI6ICJpbnQ2NCIsICJtZXRhZGF0YSI6IG51bGx9XSwgImNyZWF0b3IiOiB7ImxpYnJhcnkiOiAicHlhcnJvdyIsICJ2ZXJzaW9uIjogIjE0LjAuMiJ9LCAicGFuZGFzX3ZlcnNpb24iOiAiMi4wLjMifQAAAAIAAABEAAAABAAAANT///8AAAECEAAAABQAAAAEAAAAAAAAAAEAAABCAAAAxP///wAAAAFAAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAECEAAAABwAAAAEAAAAAAAAAAEAAABBAAAACAAMAAgABwAIAAAAAAAAAUAAAAA=',
    'npartitions': 1,
    'columns': ['A', 'B'],
    'resizable': False},
   'name': 'table'},
  {'data_source_id': 2,
   'structure_family': 'array',
   'structure': {'data_type': {'endianness': 'little',
     'kind': 'f',
     'itemsize': 8},
    'chunks': [[5], [5]],
    'shape': [5, 5],
    'dims': None,
    'resizable': False},
   'name': 'C'}],
 'all_keys': ['A', 'B', 'C']}

Unlike container, the union structure always describes its full contents inline. It does not support paginating through its contents. It is not designed to scale beyond ~1000 fields.

This script shows how the union was constructed. Code like this will rarely be user-facing; envision it wrapped in a utility that consumes Bluesky documents and writes and registers the relevant data into Tiled.

import numpy
import pandas

from tiled.client import from_profile
from tiled.structures.array import ArrayStructure
from tiled.structures.data_source import DataSource
from tiled.structures.table import TableStructure

c = from_profile("local", api_key="secret")

df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
arr = numpy.ones((5, 5))

s1 = TableStructure.from_pandas(df)
s2 = ArrayStructure.from_array(arr)
x = c.create_union(
    [
        DataSource(structure_family="table", structure=s1, name="table"),
        DataSource(structure_family="array", structure=s2, name="C"),
    ],
    key="x",
)
x.contents["table"].write(df)
x.contents["C"].write(arr)

The requests look like:

INFO:     127.0.0.1:59404 - "POST /api/v1/metadata/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:59404 - "PUT /api/v1/table/full/x?data_source=table HTTP/1.1" 200 OK
INFO:     127.0.0.1:59404 - "PUT /api/v1/array/full/x?data_source=C HTTP/1.1" 200 OK

The query parameter ?data_source={name} is used to address a specific component backing the node.

Review of abstraction levels

  1. Everything is just a node and we blissfully ignore anything about data sources.
c['x']['A']
  1. We look at data source names denoting how fields are grouped in the underlying storage, but we still ignore everything about storage formats and other storage details.
c['x'].contents
c['x'].structure()
  1. We look at low-level storage details, encoded in DataSource and Asset.
c['x'].data_sources()

To Do

  • Implement read() on UnionClient (i.e. c['x']) itself, which could pull each data source in turn and return an xarray.Dataset.
  • Implement GET /union/full/{path} to enable bulk download. This will work similar to container export.

danielballan avatar Feb 26 '24 13:02 danielballan

I like the features of this PR.

I wonder whether we can consolidate some naming consistency around the behaviors of:

  • contents: c['x'].contents returns the names of the Union's data_sources
  • data_sources(): c['x'].data_sources() returns file-level details of the data_sources backing the Union's members
  • data_source: URL parameter containing the name of a Union member data_source, as in PUT /api/v1/table/full/x?data_source=table

On the other hand, this might already be as simple as it gets, and I just need a minute to get comfortable with the usage. :)

padraic-shafer avatar Feb 26 '24 14:02 padraic-shafer

  • contents: c['x'].contents returns the names of the Union's data_sources

This is perhaps a bit too reductive a statement, as really this contains structure and named sources and can iterate through the members.

padraic-shafer avatar Feb 26 '24 14:02 padraic-shafer

Even though the contents map 1:1 to a data_source, it might be cleaner to not use them interchangeably. I will refer to the contents here as parts for brevity and to avoid possible confusion with Container. I'm not attached to the name parts.

Then the above could be used something like this...maybe?

client['x'].parts   # <UnionParts {'table', 'C'}>
client['x'].data_sources()   # low-level storage details, encoded in DataSource and Asset
client['x']['C'].data_sources()   # one-element list, or would data_source() be better?

PUT /api/v1/array/full/x?part=C ...BODY  # Refer to the union member, rather than its data source

padraic-shafer avatar Feb 26 '24 15:02 padraic-shafer

I like that suggestion very well, and I like the name part.

In the future (months away) I hazily foresee enabling Tiled to track multiple versions of data:

  • replicas stored "on prem" or in the cloud
  • copies with different file formats and/or chunking to make a range of use cases fast

This is why data_sources() is a one-element list, in anticipation of there being more than one someday, and wanting to leave room for that.

But, this also underlines why separate part and data_source could be important: they happen to be 1:1 today but may not always be.

danielballan avatar Feb 26 '24 16:02 danielballan

I also like that giving a distinct name to this concept helps clarify which abstraction level you are operating at. Referring to a part moves you from (1) to (2) but until you mention a data_source you have not crossed into (3).

danielballan avatar Feb 26 '24 17:02 danielballan

Rebased on main after merging #661. The renaming of data_source to part, in the places discussed has been done.

The to-dos...

Implement read() on UnionClient (i.e. c['x']) itself, which could pull each data source in turn and return an xarray.Dataset. Implement GET /union/full/{path} to enable bulk download. This will work similar to container export.

I would at least like the validate this branch by connecting it to be a Bluesky document stream before merging.

are, I believe, strictly additive and could be done in separate PRs or in this PR.

danielballan avatar Feb 27 '24 18:02 danielballan

Rethinking this in terms of a "view" is very interesting. Off the top of my head, I like that:

  • A "view" is a widely-recognized concept and may require less explanation.
  • This seems like it might address a separate issue noticed by @genematx, that in the current data model data and timestamps are allowed to be uneven in length. (And, in fact, in the middle of an update, they always will be.) If there were one table with data and timestamps "views" into subsets of it, that would fix the problem.

danielballan avatar Mar 03 '24 20:03 danielballan

For example, maybe this is what a event stream could look like. Notice that:

  • The layout and URLs are backward compatible with what we have been doing. It simply adds a new key, __name_me__ (needs a good name...).
  • It ensures that data and timestamps are the same length because they are views on the same table.
  • It exposes the "real" structures to the client, but gives a flattened view of it too.
primary
├── data  # view
│   ├── time
│   ├── motor_readback
│   ├── motor_setpoint
│   └── image_array
├── timestamps  # view
│   ├── time
│   ├── motor_readback
│   ├── motor_setpoint
│   └── image_array
├── __name_me__
│   ├── event_table  # values stream inline in Event documents
│   │   ├── time
│   │   ├── data_motor_readback
│   │   ├── data_motor_setpoint
│   │   ├── timestamps_motor_readback
│   │   └──  timestamps_motor_setpoint
│   └── image_array  # externally-written array data

danielballan avatar Mar 03 '24 20:03 danielballan

Either path we take, union or view, would be speculative. We have to add this and really try it to understand how it flies. There is some risk either way.

View, in addition to being a more widely recognized concept, could solve a broader category of problems for us. BlueskyRun is very nested, and this can mean a lot of clicks in the UI. I can imagine a view (or views) of the good parts, flat. Views could be combined with containers to create a nested-but-not-THAT-nested structure. (Dare I call it a "projection"?)

I think we should keep the specification light and focused on current requirements, but I can see a lot of useful scope in this direction. Maybe I’ll start by opening a parallel PR for comparison that builds on this branch but refactors union into view.

danielballan avatar Mar 03 '24 21:03 danielballan

This is why we keep @dylanmcreynolds around. :-)

danielballan avatar Mar 03 '24 21:03 danielballan

Could views be a better place to put Databroker's projections and projectors? To sum up, projection are a way add a mapping to the start document. projectors are a python function that take a run, its projection and returns an xarray with datafields mapped as specified in the projection. The major idea was to have a simple way to create multiple views from the same run. One projection could be customized for a user interface, another could be customized for a particular ontology (like nexus).

If we took this view idea even further, the definition of the view could also include information about mapping to a desired ontology.

dylanmcreynolds avatar Mar 03 '24 21:03 dylanmcreynolds

I think we should keep the specification light and focused on current requirements,

That sounds prudent.

... but I can see a lot of useful scope in this direction.

Do you envision that views might evolve to include keys from other nodes (ancestors, siblings, 2nd-cousin-once-removed), or is that something that should be firmly disallowed? I can imagine complications arising from access policy as well as latency/timeouts from trying to include too many keys.

padraic-shafer avatar Mar 03 '24 22:03 padraic-shafer

I think there will be significant pressure to enable views that reach anywhere across the tree:

  • Mix raw data and analyzed
  • Experiment with alternative views outside of the main tree without making it "noisy"
  • Probably more….

The specification of a union node involves listing data sources directly. If the specification of a view involves instead referencing other nodes, I think that access control is manageable, and the scaling can be managed if we enforce reasonable limits on the number of nodes allowed in one view.

danielballan avatar Mar 03 '24 22:03 danielballan

It ensures that data and timestamps are the same length because they are views on the same table.

More generally, would merged views only work if all parts have the same length (number of rows)?...and if so would it enforce that by:

  • rejecting data sources that don't meet that condition? --OR--
  • returning a table with number of rows equals to the shortest part (like python's zip())? --OR--
  • returning a table with number of rows equals to the longest part (like python's itertools.zip_longest())? Filler value could be None, numpy.nan, "", or probably a user-supplied value --OR--
  • any of the above, depending on a query parameter passed by the caller?

Or should views require a key to join upon, using merge behavior such as LEFT OUTER JOIN, RIGHT OUTER JOIN, FULL OUTER JOIN, INNER JOIN?

padraic-shafer avatar Mar 03 '24 22:03 padraic-shafer

Could views be a better place to put Databroker's projections and projectors?

So rather than injecting that info into the run documents, you're suggesting to instead let tiled handle that when the data gets accessed? That makes a lot of sense.

Run documents would be more "pure"--less coupled to how someone thought they should be viewed when they were recorded. When new views get dreamed up, they could be added to the tiled server config--restarting the server (or maybe registering the new view with the catalog) would allow that view to be applied to all data new and old.

padraic-shafer avatar Mar 03 '24 23:03 padraic-shafer

So rather than injecting that info into the run documents, you're suggesting to instead let tiled handle that when the data gets accessed? That makes a lot of sense.

Maybe. The projections schema was added to the run start document so that they could be the default projection for a particular run. If a newer version were available, the projector code could use it if asked to.

But I only know of one case where projection/projectors were used since they were developed four years ago. Maybe that's a sign? Perhaps the issue is they weren't needed much, perhaps they weren't advertised well, or perhaps the mechanism was too complicated.

I think there will be significant pressure to enable views that reach anywhere across the tree.

That's an interesting thought. I feel like that if it kept the scope in check, I'd be happy to say that a view was limited to objects of the same row/timestamp. We could call it RowView and if we decided we need something more flexible in the future, come up with a SuperView with extra powers?

dylanmcreynolds avatar Mar 03 '24 23:03 dylanmcreynolds

To fit our present use case, we would need a view to look a lot like union, mixing tables and arrays in one namespace. There would be [edit: NO] special guarantees about the relation between the items in the namespace, nothing about their length or how to join them. (It goes without saying that the constitutive tables would each internally have the normal length guarantee that it is made of whole rows.)

I think the change from union is, the parts in a view would have their own canonical locations in the tree, as normal nodes. A view becomes an additional place to get (mixtures of…) nodes. Each views look a lot like a union would have, but their parts are pointers to first-class nodes, not to captive data sources. This enables us to separately build multiple views on the same data. And it avoids placing the canonical data in a weird structure that would require explanation (union).


As far as "projections" goes, I like that this presents clients with the result rather than instructions (in a novel descriptive language…) for rearranging a structure on the client side.

Yes, one can imagine constructing views dynamically through special adapters added to the server config—I think this is what @padraic-shafer’s last message envisions. For our present requirements I would start, though, by building them as static entities. The client specifies, via a HTTP request, "Add a view node to the database that combines array node X and columns A, B from table node Y into one namespace and present it to clients.

danielballan avatar Mar 04 '24 00:03 danielballan

The client specifies, via a HTTP request, "Add a view node to the database that combines array node X and columns A, B from table node Y into one namespace and present it to clients.

I think this Union PR didn't yet add the capability to read the combined result at once, right? So what do we think should happen when the outer dimension of array X differs from the length of table Y? (or equivalently when event_table has more rows than image_array has images in the earlier example?)

I might be quibbling about edge cases. But I wonder about how 'edge'y these cases are. We could of course enforce this when the view node (meaning the outer node, not the projection node) is created, and then wait to see if it runs into issues during testing.

padraic-shafer avatar Mar 04 '24 00:03 padraic-shafer

That's an interesting thought. I feel like that if it kept the scope in check, I'd be happy to say that a view was limited to objects of the same row/timestamp. We could call it RowView and if we decided we need something more flexible in the future, come up with a SuperView with extra powers?

For our present requirements we can keep this pretty limited, and spin out further discussion on whether and how to expand it once we have something to play with.

danielballan avatar Mar 04 '24 00:03 danielballan