tiled
tiled copied to clipboard
How to access content in resource documents?
How can a tiled client access content stored in a stream's resource (and datum) documents? For example, this is content in a certain BlueskyRun
:
In [10]: list(run.primary._resources)
Out[10]:
[Resource({'path_semantics': 'posix',
'resource_kwargs': {'frame_per_point': 1},
'resource_path': 'clhome/BDP/voyager/adsimdet/2022/08/30/d585f272-dd9b-4ac0-b521_000.h5',
'root': '/',
'run_start': '43044b6e-f6ba-48cb-a975-90d236dcbaaa',
'spec': 'AD_HDF5',
'uid': '506944a6-7632-4db8-9448-82b258211ed4'})]
This is stream metadata.
And then:
In [20]: list(run.primary._get_datum_pages("506944a6-7632-4db8-9448-82b258211ed4"))
Out[20]:
[{'resource': '506944a6-7632-4db8-9448-82b258211ed4',
'datum_id': ['506944a6-7632-4db8-9448-82b258211ed4/0'],
'datum_kwargs': {'point_number': [0]}}]
Our motivation is to tie the Bluesky metadata together with our Data Management system. Somehow. This resource content is missing in the tiled interface at this time.
Those internal methods (_resources
, _get_datum_pages
) were not propagated into the new API because there is no efficient way to get them. For older BlueskyRuns, you have to go RunStart -> Descriptors -> Events -> Datums -> Resources which is not great!
Here's how to get the resources reliably, show on some example data:
In [12]: from tiled.client import from_uri
In [13]: c = from_uri("https://tiled-demo.blueskyproject.io")
In [14]: run = c['fxi']['raw'].values().first()
In [15]: [doc for name, doc in run.documents() if name == 'resource']
Out[15]:
[Resource({'path_semantics': 'posix',
'resource_kwargs': {'frame_per_point': 20},
'resource_path': '2018/04/19/de714974-a43a-4000-882a_000000.h5',
'root': '60599b64cd89ec0f67f434c2339b8665',
'spec': 'AD_HDF5',
'uid': '70bffef4-c092-4b57-b955-ed57218faf22'})]
In [3]: run = c["bdp2022"]["43044b6e-f6ba-48cb-a975-90d236dcbaaa"].values().first()
In [7]: run
Out[7]: <BlueskyEventStream {'config', 'data', 'config_timestamps', 'timestamps'} stream_name='primary'>
In [8]: [doc for name, doc in run.documents() if name == 'resource']
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[8], line 1
----> 1 [doc for name, doc in run.documents() if name == 'resource']
File ~/micromamba/envs/tiled/lib/python3.10/site-packages/databroker/client.py:172, in BlueskyEventStream.__getattr__(self, key)
170 if key in self:
171 return self[key]
--> 172 raise AttributeError(key)
AttributeError: documents
The documents()
method is a method on BlueskyRun
not BlueskyEventStream
. Above you accessed a BlueskyRun
by its uid
c["bdp2022"]["43044b6e-f6ba-48cb-a975-90d236dcbaaa"]
and then dug inside it to access the first event stream.
c["bdp2022"]["43044b6e-f6ba-48cb-a975-90d236dcbaaa"].values().first()
You want either
run =c["bdp2022"]["43044b6e-f6ba-48cb-a975-90d236dcbaaa"]
or
run = c["bdp2022"].values().first()
and then .documents()
on that.
Za!
In [9]: run = c["bdp2022"]["43044b6e-f6ba-48cb-a975-90d236dcbaaa"]
In [10]: [doc for name, doc in run.documents() if name == 'resource']
Out[10]:
[Resource({'path_semantics': 'posix',
'resource_kwargs': {'frame_per_point': 1},
'resource_path': 'clhome/BDP/voyager/adsimdet/2022/08/30/d585f272-dd9b-4ac0-b521_000.h5',
'root': '/',
'run_start': '43044b6e-f6ba-48cb-a975-90d236dcbaaa',
'spec': 'AD_HDF5',
'uid': '506944a6-7632-4db8-9448-82b258211ed4'})]
or even more detail:
In [14]: [doc for name, doc in run.documents() if name in ('resource', 'datum_page')]
Out[14]:
[Resource({'path_semantics': 'posix',
'resource_kwargs': {'frame_per_point': 1},
'resource_path': 'clhome/BDP/voyager/adsimdet/2022/08/30/d585f272-dd9b-4ac0-b521_000.h5',
'root': '/',
'run_start': '43044b6e-f6ba-48cb-a975-90d236dcbaaa',
'spec': 'AD_HDF5',
'uid': '506944a6-7632-4db8-9448-82b258211ed4'}),
DatumPage({'datum_id': ['506944a6-7632-4db8-9448-82b258211ed4/0'],
'datum_kwargs': {'point_number': [0]},
'resource': '506944a6-7632-4db8-9448-82b258211ed4'})]
Thanks, @danielballan !
Now, trying that same request using requests.get(uri)
👍
In [11]: r = requests.get(uri)
In [12]: import json
In [13]: json.loads(r.text)
---------------------------------------------------------------------------
JSONDecodeError Traceback (most recent call last)
Cell In[13], line 1
----> 1 json.loads(r.text)
File ~/micromamba/envs/tiled/lib/python3.10/json/__init__.py:346, in loads(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)
341 s = s.decode(detect_encoding(s), 'surrogatepass')
343 if (cls is None and object_hook is None and
344 parse_int is None and parse_float is None and
345 parse_constant is None and object_pairs_hook is None and not kw):
--> 346 return _default_decoder.decode(s)
347 if cls is None:
348 cls = JSONDecoder
File ~/micromamba/envs/tiled/lib/python3.10/json/decoder.py:340, in JSONDecoder.decode(self, s, _w)
338 end = _w(s, end).end()
339 if end != len(s):
--> 340 raise JSONDecodeError("Extra data", s, end)
341 return obj
JSONDecodeError: Extra data: line 2 column 1 (char 1509)
Same error as with r.json()
But this works:
In [28]: for line in r.text.splitlines():
...: key, doc = json.loads(line)
...: if key in ("resource", "datum_page"):
...: print(f"{key}: {doc}")
...:
resource: {'spec': 'AD_HDF5', 'root': '/', 'resource_path': 'clhome/BDP/voyager/adsimdet/2022/08/30/d585f272-dd9b-4ac0-b521_000.h5', 'resource_kwargs': {'frame_per_point': 1}, 'path_semantics': 'posix', 'uid': '506944a6-7632-4db8-9448-82b258211ed4', 'run_start': '43044b6e-f6ba-48cb-a975-90d236dcbaaa'}
datum_page: {'resource': '506944a6-7632-4db8-9448-82b258211ed4', 'datum_id': ['506944a6-7632-4db8-9448-82b258211ed4/0'], 'datum_kwargs': {'point_number': [0]}}
Something about this request:
In [30]: requests.get("http://localhost:8000/api/v1/documents/bdp2022/43044b6e-f6ba-48cb-a975-90d236dcbaaa?fill=false")
Out[30]: <Response [200]>
that returns results.text
which cannot be parsed in whole as JSON.
This works:
yaml.load(f"[{requests.get(uri).text.strip()}]".replace("\n", ","), yaml.Loader)
So does:
json.loads(f"[{requests.get(uri).text.strip()}]".replace("\n", ","))
Here is the concise code that overcomes:
# special handling here to overcome a problem with this API request
import json
r = requests.get(uri) # note no json() call here, it's a problem in the server's response
r = f"[{requests.get(uri).text.strip()}]".replace("\n", ",")
r = json.loads(r)
{k:doc for k, doc in r if k in ("datum_page", "resource")}
So the text reponse for this URI is not proper JSON. Here is a fragment:
mdet": ["adsimdet_image"]}}]\n["resource", {"spec": "AD
See, the \n
delimiter between the two documents? Should be a ,
. Also, the entire response with multiple documents must be enclosed in [response]
.
The format of the response in newline-delimited json, a.k.a. "ndjson". This format is not itself valid JSON. Rather, it is lines of text where each individual line is valid JSON.
Why not just use a JSON list? A JSON list would require the entire response to be received before it could be parsed, where ndjson can be transmitted and parsed one item at a time. It is a streaming-friendly JSON alternative, well suited to transmitting Bluesky document streams.
How can one tell what format this is without knowing a priori? The HTTP content-type
header declares the format. Take this example:
In [16]: response = requests.get('https://tiled-demo.blueskyproject.io/api/v1/documents/fxi/raw/d106586f-44e6-4045-8bf6-985cfdef3574?fill=false')
In [17]: response.headers['content-type']
Out[17]: 'application/x-ndjson'
It can be parsed in a streaming fashion like this:
import json
[json.loads(line) for line in response.iter_lines()]
@danielballan You have put a lot of thought into this tool. I learn more each day.
Thanks!
@prjemian Be advised that the next beta release of databroker, which will be databroker v2.0.0b14, will change the structure of the documents from GET /documents
. Previously, it sent JSON like [name, doc]
. Going forward it will send JSON like {"name": name, "doc": doc}
. This is preferred by our friends at Diamond and more "idiomatic" for web data payloads.
Everything about the above conversation is unchanged otherwise.