beam icon indicating copy to clipboard operation
beam copied to clipboard

[Feature Request]: Expose message publish time in Beam YAML ReadFromPubSub

Open jonathaningram opened this issue 6 months ago • 9 comments

What would you like to happen?

As far as I can tell, there's no way to access the Pub/Sub message publish time / publish_time in ReadFromPubSub for the purposes of passing the time down to output messages/records.

I'd see this as a new config field named publish_time_attribute, used as follows so that my_publish_time is a new field that exists on the record.

type: ReadFromPubSub
config:
  topic: "topic"
  subscription: "subscription"
  format: "format"
  schema: schema
  attributes:
  - "attribute"
  - "attribute"
  - ...
  attributes_map: "attributes_map"
  id_attribute: "id_attribute"
  publish_time_field: "my_publish_time"
  timestamp_attribute: "timestamp_attribute"
  error_handling:
    output: "output"

With docs:

publish_time_field string (Optional) : Field to add on the output message with the message publish time. If None, no such field is added.

The use case is to store the publish time in the final record for diagnostics, e.g., compare the time a record was written into BigQuery vs the time it was originally published to Pub/Sub.

A possibly workaround is to have the message publisher add an attribute which is the current/publish time at the time of publish to Pub/Sub.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [ ] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [x] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

jonathaningram avatar Jun 03 '25 02:06 jonathaningram

you can use timestamp_attribute: None, then it uses message publishing time as the timestamp. Is that not sufficient for your use case ?

TanuSharma2511 avatar Jun 03 '25 06:06 TanuSharma2511

According to the docs for that config, it doesn't seem right:

timestamp_attribute string (Optional) : Message value to use as element timestamp. If None, uses message publishing time as the timestamp.

I read this to say that there must be a field inside my incoming Pub/Sub message attributes map that will be used as the element timestamp.

I also omitted timestamp_attribute entirely (compared to using timestamp_attribute: None) and new field was written into my record.

jonathaningram avatar Jun 03 '25 06:06 jonathaningram

As per my understanding, "I read this to say that there must be a field inside my incoming Pub/Sub message attributes map that will be used as the element timestamp." -> "there is a field with that name in the Pub/Sub message's attributes map whose value will be used as the element timestamp."

If we are not setting timestamp_attribute or if timestamp_attribute is None, then element timestamp will be publish_time itself.

In my opinion, new field should not be added if we omit timestamp_attribute. @liferoad can you please help here understanding the behavior ?

TanuSharma2511 avatar Jun 03 '25 07:06 TanuSharma2511

Removed my previous comment since I am not sure my reading is correct.

liferoad avatar Jun 03 '25 14:06 liferoad

Here is the workaround that could work:

pipeline:
  type: chain
  transforms:
    - name: ReadTaxiRides
      type: ReadFromPubSub
      config:
        topic: projects/pubsub-public-data/topics/taxirides-realtime
        format: string
    - name: ExtractWindowingInfo
      type: ExtractWindowingInfo
      config:
        fields: [timestamp, window_type]
    - name: ConvertTimestampToString
      type: MapToFields
      config:
        language: "python"
        fields:
          element: payload
          window_type: window_type
          timestamp: timestamp.to_utc_datetime().isoformat()
    - name: PrintMessages
      type: LogForTesting
      config:
        level: INFO

ExtractWindowingInfo extracts the window information that should contain the published timestamp.

python -m apache_beam.yaml.main --yaml_pipeline_file=taxi.yaml --streaming true --project your-project

liferoad avatar Jun 03 '25 15:06 liferoad

I quickly tried the workaround and didn't get it to work, but I need to try again systematically. Thanks for the workaround. Ideally, support in the main step/transform would be great though if we want to keep this feature quest open.

jonathaningram avatar Jun 06 '25 06:06 jonathaningram

.take-issue

TanuSharma2511 avatar Jun 20 '25 00:06 TanuSharma2511

This issue has been marked as stale due to 150 days of inactivity. It will be closed in 30 days if no further activity occurs. If you think that’s incorrect or this issue still needs to be addressed, please simply write any comment. If closed, you can reopen the issue at any time. Thank you for your contributions.

github-actions[bot] avatar Nov 17 '25 12:11 github-actions[bot]

Hi! I’d like to work on this issue.

I plan to add support for a new configuration field publish_time_field in ReadFromPubSub to expose the Pub/Sub message publish_time in the output record, as proposed in the description.

I’m still new to open-source contributions and to the Beam codebase, so if there are any design preferences, constraints, or best practices I should follow, I would appreciate any guidance. If anyone is willing to help point me in the right direction, that would be great!

chaitanya-h1007 avatar Dec 09 '25 18:12 chaitanya-h1007