gcs-pubsub pr
A work in progress MVP for https://github.com/benthosdev/benthos/issues/1034
Pardon my mediocre diagram, but this is the scenario I am using to test an MVP. Read it from the bottom up, wherein logs are generated and flushed periodically as NLD-JSON files to a GCS bucket. This triggers a bucket event notification on to a Pub/Sub topic and then dropped on a subscription Benthos watches. Benthos then reads event notification information from the message to determine the bucket and object name and the input then proceeds to download the referenced file.

See this repository for Pulumi code which builds the aforementioned scenario:
https://github.com/mhite/benthos-gcp-dev/tree/main/input_cloud_storage
Thank you for working on this, I have exactly this use case and was thinking about an implementation, glad you got to it first.
@mhite is this already functional? What parts are still in progress, is it only the TODO items in the comments?
For the GSFUSE 0-byte object thing IMO it's probably ok to just immediately acknowledge the pubsub message if the pubsub message attribute size == "0" and move on.
I'm testing on a local fork on my machine and I think you can do the above like this:
func (ps *pubsubTargetReader) parseObjectPath(pubsubMsgAttributes map[string]string) (*gcpCloudStorageObjectTarget, error) {
objectSize, ok := pubsubMsgAttributes["size"]
if !ok {
return nil, errors.New("pub/sub message missing size attribute")
}
if objectSize == "0" {
return nil, errors.New("object size is 0 bytes")
}
@mhite is this already functional? What parts are still in progress, is it only the TODO items in the comments?
Yes, I've noted stuff that I don't understand/want to discuss/need to fix in the TODO items.
It does run and "works" but is not merge worthy yet due to some of the outstanding issues I've noted.
For the GSFUSE 0-byte object thing IMO it's probably ok to just immediately acknowledge the pubsub message if the pubsub message attribute
size == "0"and move on.I'm testing on a local fork on my machine and I think you can do the above like this:
func (ps *pubsubTargetReader) parseObjectPath(pubsubMsgAttributes map[string]string) (*gcpCloudStorageObjectTarget, error) { objectSize, ok := pubsubMsgAttributes["size"] if !ok { return nil, errors.New("pub/sub message missing size attribute") } if objectSize == "0" { return nil, errors.New("object size is 0 bytes") }
Yes, would take a similar approach, although you won't find size in pubsubMsgAttributes but rather in the message body as a JSON key. Also, if it was a notification for a 0 byte file, I think we'd probably also want to "ack" the message to delete it so we don't get it redelivered to us.
Maybe it makes sense to check for size == "0" even before checking the event type?
What do you think about a config option to point to a cache resource to de-duplicate on gcs_key? At work we built an event-based log ingestion pipeline and one of the pain points was handling when a production system uploads the same log file twice. I think it would make sense to de-duplicate based on the pubsub message content before the object is even retrieved.
Maybe it makes sense to check for
size == "0"even before checking the event type?
Well, the eventType attribute of type OBJECT_FINALIZE in attributes/metadata gives us enough information about the Pub/Sub message to know it is a valid notification message and therefore we know the format/schema of the message body. In other words, we now know with some certainty that the message will contain a JSON payload with a key of size. So my thinking is that the check goes right after we know it is an OBJECT_FINALIZE event (ie. basically where to TODO is right now).
What do you think about a config option to point to a cache resource to de-duplicate on gcs_key? At work we built an event-based log ingestion pipeline and one of the pain points was handling when a production system uploads the same log file twice. I think it would make sense to de-duplicate based on the pubsub message content before the object is even retrieved.
Oh, that's an interesting idea. But it should probably be captured in a different issue/PR for tracking / simplicity sake.
I think I have an approach that addresses the problematic gcsfuse behavior[1]. By checking the object generation before download, we can avoid the race condition of potentially consuming the non-zero byte version of a file twice. Additionally, we will refuse to process 0 byte files [if the user has json object payload enabled for the notification config].
I'll do a bit of testing this weekend and update the PR.
[1] - https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/docs/semantics.md#pubsub-notifications-on-file-creation
@loicalleyne - want to give this latest version a test?
@mhite ran a short smoke test, everything appeared to work ok.
@mhite is this PR still a WIP?
@loicalleyne - Should be ready for a review. I will update PR title.
At some point I want to add functionality that allows a user to choose (by configuration) to acknowledge (ie. delete) bad/unsupported Pub/Sub messages. For now, you very much need to have a dead letter queue setup.
This looks like a good start. I did a bit of search and it looks like there’s an official PubSub emulator that can be run via gcloud to add integration tests. Annoyingly, in Google fashion, they don’t offer a Docker container for this, but I see there’s a maintained 3rd party one over at https://hub.docker.com/r/thekevjames/gcloud-pubsub-emulator. I’ll defer to someone else for a proper review, since I’m not very familiar with PubSub
@mihaitodor - There is unfortunately no official Cloud Storage emulator which also makes things tricky. I did come across this, though - https://github.com/fsouza/fake-gcs-server
I think it might also support publishing cloud storage trigger notifications to Pub/Sub, so perhaps it could all be wired up for something simulating end-to-end environment.
@mhite Yeah, Benthos is using that 3rd party one already. It has worked OK and the maintainer is friendly if anything new needs to be added to it
@mhite did you ever make this as a plugin?
@loicalleyne - The PR should work but needs merge conflicts resolved along with integration tests authored.
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.