stac-server
stac-server copied to clipboard
add publish SNS for newly ingested Items
There should be an SNS topic deployed with stac-server.
And then there should be an option (controlled via an envvar) that will publish newly ingested items to the topic.
This allows users to subscribe and monitor new Items added to the server.
Furthermore, the topic should be published with attributes for:
- bounding box
- datetime
- collection
which allows a subscriber to filter on these attributes and only get messages which meet the criteria, e.g., message me all new Items within this bounding box for 2019 and for collection sentinel-s2-l2a.
This is a bit more complicated, since the record would have to be published, then the new record received and published. Probably a better way to do this, perhaps trigger publishing when new items added to Elasticsearch.
This is a bit more complicated, since the record would have to be published, then the new record received and published. Probably a better way to do this, perhaps trigger publishing when new items added to Elasticsearch.
@matthewhanson Can you help me understand this a bit? What is the reasoning behind pulling the new record out of ES rather than simply publishing what was pushed into ES? Does ES modify the records in some way? I realize the records are somewhat modified within stac-server during the ingest process, but that should not differ from what is in ES, right?
Regarding triggering "publishing when new items added to Elasticsearch": do you have a method in mind for this that would be simpler than ingest publishing the messages it has pushed into ES?
Another bit of relevant context from someone trying to implement ingest publishing to an SNS: any errors returned from ES (like invalid records) are swallowed up here: https://github.com/stac-utils/stac-server/blob/main/libs/esStream.js#L73, with no hope of returning that promise to a caller context to handle in the face of a problem, at least from what I can see.
This issue has popped up a couple times in recent conversation, and I wanted to post my responses here for reference.
Notably, I did implement this feature on a private fork, which I attempted to upstream in https://github.com/stac-utils/stac-server/pull/131. That was a very invasive set of changes, however, and was deemed too disruptive at the time to move forward.
To summarize why, I found the use of the stream for processing discreet ingest items was confusing and error prone. In particular, I remember individual items index to ES were not checked for errors, and batch index responses were not validated to determine which if any items failed to be indexed, so my solution was to strip all that out and always use a batch request to ES and process the response appropriately. This is has been working well on on the fork.
It would be great that any implementation here would retain compatibility with the message format I landed on. You can see that here: https://github.com/stac-utils/stac-server/pull/131/files#diff-4f94407f2f00b48b6512a71cb49db470b630a060255893a722a45a1de78fda53.
It is essentially
{
record: stac_object,
error: error_object | undefined,
}
And has the following message attributes which can be used for filtering:
{
recordType: 'collection' | 'item',
ingestStatus: 'failed' | 'successful',
collection: {string},
}
Note that one of the goals of this SNS topic is not just to respond to successfully ingested items, but to also be able to respond/react to or track failures. Thus the ingestStatus attribute and the inclusion of the optional error object in the message. Regardless, that ingest status must be known before publishing a message, else how can subscribers know that a message they receive reflects an item that was successfully ingested?
The additional proposed message attributes are a great idea, to allow subscribers more message filtering controls without having to process every message.
It does look like the batch write functionality on the ingest stream is still in place (see https://github.com/stac-utils/stac-server/blob/main/src/lib/databaseStream.js#L80 and https://github.com/stac-utils/stac-server/blob/main/src/lib/ingest.js#L29). It may now be pulling errors out from the ES batch response but it appears to be swallowing those in a log message and does not associate the given error(s) with the respective item(s). In other words, this needs to be reworked to allow associating an specific error with the triggering item to know whether the ingest for the item was successful or not, which regardless of the message format/attributes is pretty critical for this feature, as we should not be publishing items as ingested if ES rejected them.
Another note, I should probably have considered batching the SNS messages and using the batch publish function in my PR. This could be a critical optimization for some high-load pipelines, so if we did want to pick my PR back up we'd want to ensure this change was made to it.
I plan to add the four individual bbox
values as Number-type attributes to the published messages. This will allow for writing an SNS filter policy that implements a simple contains filter but since, at the time of writing, SNS filter policies do not allow multiple and or or predicates, mote complex logic that would support filtering bounds that cross the anti-meridian or filtering based on intersection rather than contains are not possible. Those more advanced filters would need to be done in in a Lamba or similar which could in turn pass along only the matching messages to another topic/queue.
SNS message attributes do not support a native timestamp type. SNS filter policies do not support filtering string attributes within a range. This means that we will need to encode the datetime attribute as a number.
The STAC spec says that a null value is allowed for datetime
if both start_datetime
and end_datetime
are provided
https://github.com/radiantearth/stac-spec/blob/ea23b295933a2e4c18f0fb5d5638c24ff2682e50/item-spec/common-metadata.md#date-and-time-range
Given all this, I plan to add 2 attributes to the SNS messages
-
startUnixEpochMsOffset
-
endUnixEpochMsOffset
The values of these attributes will be the number of seconds since (or before) the UNIX epoch
If both start_datetime
and end_datetime
are provided then they will be used to set startUnixEpochMsOffset
and endUnixEpochMsOffset
respectively.
If only datetime
is provided on the STAC item, both startUnixEpochMsOffset
and endUnixEpochMsOffset
will be set to the same value.
You can use string prefix matching to effectively select on a time range.
We have these same attributes on messages published to the cirrus publish SNS topic, so I would recommend looking at what we do there and doing the same here, for consistency.
Thanks so much for the link, @jkeifer. I added string datetime attributes to my implementation with keys matching those used in Cirrus.