beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: Prism panic when TestStream is followed by a stateful dofn

Open shunping opened this issue 4 weeks ago • 0 comments

What happened?

This is observed in some of our test workflow. Even though it is not causing the test to fail, I believe it is only because of luck, aka our tests do not cover this case.

Thanks @aIbrahiim for bringing this up in PR #36927.

To reproduce, we can use the following code:

import logging
import apache_beam as beam
from apache_beam.coders import VarIntCoder
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.utils.timestamp import Timestamp
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_stream import TestStream

logging.basicConfig(level=logging.INFO)
#logging.basicConfig(level=logging.WARNING)

class MyDoFn(beam.DoFn):
  COUNT = ReadModifyWriteStateSpec('count', VarIntCoder())

  def __init__(self):
    pass

  def process(
      self,
      element,
      count_state=beam.DoFn.StateParam(COUNT),
  ):
    print(element)
    count = count_state.read()
    if not count:
      count = 1
    else:
      count += 1

    count_state.write(count)
    yield count




options = PipelineOptions([
    "--streaming",
    "--environment_type=LOOPBACK",
    "--runner=PrismRunner",
    "--prism_log_kind=dev",
    #"--prism_beam_version_override=v2.66.0"
    # "--runner=PortableRunner",
    # "--job_endpoint=localhost:8073",
])

with beam.Pipeline(options=options) as p:
  now = Timestamp.now()
  _ = (
      p | TestStream().add_elements([(1, 2), (3, 4)])
      | 'MyDoFn' >> beam.ParDo(MyDoFn())
      | beam.LogElements(
          prefix="result=",
          level=logging.WARNING,
          with_timestamp=True,
          with_window=True,
          use_epoch_time=True))

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • [ ] Component: Python SDK
  • [ ] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [ ] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [ ] Component: Google Cloud Dataflow Runner

shunping avatar Dec 02 '25 20:12 shunping