[Feature Request]: Expose message publish time in Beam YAML ReadFromPubSub
What would you like to happen?
As far as I can tell, there's no way to access the Pub/Sub message publish time / publish_time in ReadFromPubSub for the purposes of passing the time down to output messages/records.
I'd see this as a new config field named publish_time_attribute, used as follows so that my_publish_time is a new field that exists on the record.
type: ReadFromPubSub
config:
topic: "topic"
subscription: "subscription"
format: "format"
schema: schema
attributes:
- "attribute"
- "attribute"
- ...
attributes_map: "attributes_map"
id_attribute: "id_attribute"
publish_time_field: "my_publish_time"
timestamp_attribute: "timestamp_attribute"
error_handling:
output: "output"
With docs:
publish_time_field string (Optional) : Field to add on the output message with the message publish time. If None, no such field is added.
The use case is to store the publish time in the final record for diagnostics, e.g., compare the time a record was written into BigQuery vs the time it was originally published to Pub/Sub.
A possibly workaround is to have the message publisher add an attribute which is the current/publish time at the time of publish to Pub/Sub.
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [x] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
you can use timestamp_attribute: None, then it uses message publishing time as the timestamp. Is that not sufficient for your use case ?
According to the docs for that config, it doesn't seem right:
timestamp_attribute string (Optional) : Message value to use as element timestamp. If None, uses message publishing time as the timestamp.
I read this to say that there must be a field inside my incoming Pub/Sub message attributes map that will be used as the element timestamp.
I also omitted timestamp_attribute entirely (compared to using timestamp_attribute: None) and new field was written into my record.
As per my understanding, "I read this to say that there must be a field inside my incoming Pub/Sub message attributes map that will be used as the element timestamp." -> "there is a field with that name in the Pub/Sub message's attributes map whose value will be used as the element timestamp."
If we are not setting timestamp_attribute or if timestamp_attribute is None, then element timestamp will be publish_time itself.
In my opinion, new field should not be added if we omit timestamp_attribute. @liferoad can you please help here understanding the behavior ?
Removed my previous comment since I am not sure my reading is correct.
Here is the workaround that could work:
pipeline:
type: chain
transforms:
- name: ReadTaxiRides
type: ReadFromPubSub
config:
topic: projects/pubsub-public-data/topics/taxirides-realtime
format: string
- name: ExtractWindowingInfo
type: ExtractWindowingInfo
config:
fields: [timestamp, window_type]
- name: ConvertTimestampToString
type: MapToFields
config:
language: "python"
fields:
element: payload
window_type: window_type
timestamp: timestamp.to_utc_datetime().isoformat()
- name: PrintMessages
type: LogForTesting
config:
level: INFO
ExtractWindowingInfo extracts the window information that should contain the published timestamp.
python -m apache_beam.yaml.main --yaml_pipeline_file=taxi.yaml --streaming true --project your-project
I quickly tried the workaround and didn't get it to work, but I need to try again systematically. Thanks for the workaround. Ideally, support in the main step/transform would be great though if we want to keep this feature quest open.
.take-issue
This issue has been marked as stale due to 150 days of inactivity. It will be closed in 30 days if no further activity occurs. If you think that’s incorrect or this issue still needs to be addressed, please simply write any comment. If closed, you can reopen the issue at any time. Thank you for your contributions.
Hi! I’d like to work on this issue.
I plan to add support for a new configuration field publish_time_field in ReadFromPubSub to expose the Pub/Sub message publish_time in the output record, as proposed in the description.
I’m still new to open-source contributions and to the Beam codebase, so if there are any design preferences, constraints, or best practices I should follow, I would appreciate any guidance. If anyone is willing to help point me in the right direction, that would be great!