beam icon indicating copy to clipboard operation
beam copied to clipboard

[Bug]: SolaceIO seems to lose messages during Dataflow scaling/rebalancing events

Open ppawel opened this issue 3 weeks ago • 14 comments

What happened?

For a few months now we've been dealing with an issue where messages are "disappearing" during scaling events. By that, I mean they are just simply not being processed by pipeline steps downstream from SolaceIO.

It is always related to some logs from Dataflow service, with different kinds of warnings. These warnings claim it's generally "expected" to happen during autoscaling events but well, the outcome for us is data loss...

Example logs:

Work is no longer active on the backend, it already succeeded or will be retried. sharding_key=1251872cc9e8c6a8 status: INTERNAL: Windmill failed to commit the work item. CommitStatus: NOT_FOUND
=== Source Location Trace: === 
dist_proc/dax/workflow/worker/streaming/streaming_worker_client.cc:888
Error while processing a work item: INTERNAL: Windmill failed to commit the work item. CommitStatus: NOT_FOUND
=== Source Location Trace: === 
dist_proc/dax/workflow/worker/streaming/streaming_worker_client.cc:888
Finalize id rejected rather than committed
1 Not able to request finalization at SDK because the work item 52522 was not committed successfully. This may occur due to autoscaling or backend rebalancing.

I can provide more details on the logs if needed. Note that this is not necessarily related to #35304 because we don't have those errors about "closed consumer" around those times when messages are lost. The only corelation we've found so far is these logs from Windmill/workers.

What we tried so far is:

  1. Locked max_workers to 1 so it doesn't do horizontal autoscaling - this makes the problem much worse and it is pretty much guaranteed that some messages are lost when pipeline is scaling up/down. Thankfully this pipeline doesn't need many resources but this doesn't solve the problem (see below).
  2. Disabled vertical autoscaling (adjusting memory) which seems to have similar effect on the pipeline wrt this issue like horizontal scaling.

Those two reduced the issues a lot but still the issue occurs. I guess maybe because Dataflow does some kind of "rebalancing" internally, we tried also reducing number of parallel keys but this doesn't have any effect.

Could there be something in SolaceIO handling of general Beam I/O contract like finalization of bundles or restarting/splitting the readers that could have such impact? @bzablocki @stankiewicz

(Note: I picked P1 prio for this bug as it is indeed data loss, but feel free to adjust it, in case you have different understanding for P1)

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • [ ] Component: Python SDK
  • [x] Component: Java SDK
  • [ ] Component: Go SDK
  • [ ] Component: Typescript SDK
  • [x] Component: IO connector
  • [ ] Component: Beam YAML
  • [ ] Component: Beam examples
  • [ ] Component: Beam playground
  • [ ] Component: Beam katas
  • [ ] Component: Website
  • [ ] Component: Infrastructure
  • [ ] Component: Spark Runner
  • [ ] Component: Flink Runner
  • [ ] Component: Samza Runner
  • [ ] Component: Twister2 Runner
  • [ ] Component: Hazelcast Jet Runner
  • [x] Component: Google Cloud Dataflow Runner

ppawel avatar Dec 04 '25 14:12 ppawel

@iht thoughts?

stankiewicz avatar Dec 04 '25 14:12 stankiewicz

I've looked at when we acknowledge messages.

theory:

  1. split was rebalanced
  2. checkpoint was rejected, finalization wasn't invoked, msg.ack wasn't invoked
  3. split reader started on different worker Solace hasn't resend messages that weren't accepted -> data loss.

There were some issues with messages that were stuck and never retried by Solace - @bzablocki do you remember?

stankiewicz avatar Dec 04 '25 15:12 stankiewicz

probably related: https://community.solace.com/t/reprocess-messages-which-are-not-ack-back-by-consumer/307

stankiewicz avatar Dec 04 '25 17:12 stankiewicz

@ppawel , how fast can you learn about data loss? if you restart workers (Cloud compute restart), are you able to see those messages processed?

stankiewicz avatar Dec 04 '25 17:12 stankiewicz

@ppawel , how fast can you learn about data loss? if you restart workers (Cloud compute restart), are you able to see those messages processed?

We see data loss pretty quickly because the logic in the downstream stages is expecting those messages.

They are not processed after restarting as they are not existing in Solace queue anymore. Solace does not redeliver them as the messages are acked so from Solace perspective, everything is fine and the messages are removed from the queue as usual. What we have seen is that those messages are going through SolaceIO parsing function (as we have a logging statement there) but they are gone afterwards - downstream transforms are not seeing them after those Dataflow worker warnings appear.

I am not an expert on Beam/Dataflow internals, but I would expect that the work containing the missing messages should be retried but this is not happening. Between SolaceIO and the Redistribute transform in our pipeline there are only some trivial steps that don't have the ability to filter out the message by themselves... so that's why the current theory is that SolaceIO is not interacting correctly/fully with some pieces of the Beam/Dataflow rebalancing lifecycle. Maybe that theory is wrong and SolaceIO is behaving correctly, but honestly we are kind of grasping at straws with this issue at this point, so I wanted to check if you have some ideas from SolaceIO side.

If you think this is unlikely that something inside SolaceIO would result in this behavior, then this bug can be closed, obviously I don't want to abuse this issue tracker to solve unrelated issues, and we will pursue maybe a support ticket with Google Cloud.

ppawel avatar Dec 04 '25 17:12 ppawel

ok, so previous comment is invalid if the message that was lost was accepted.

stankiewicz avatar Dec 04 '25 18:12 stankiewicz

it would be great if you raise the case and add me on cc ([email protected]) , I would love to see the logs where you see SolcaIO statements and missing downstream logs and dataflow warnings.

stankiewicz avatar Dec 04 '25 18:12 stankiewicz

Which runner you are using? Is this same data loss across both dataflow runners?

The issue may be with checkpointing logic. During checkpoint creation receivedMessages message references are inserted into safeToAckMessages which is passed as reference into CheckpointMark. CheckpointMark is finalized (or not) asynchronously.

As part of caching/reusing readers, we reuse UnboundedReader with safeToAckMessages. Maybe there is some late coming finalization which references same safeToAckMessages as next CheckpointMark leading to early ack of messages.

scenario:

  1. UnboundedReader is created for split 0, safeToAckMessages is empty, receivedMessages is empty
  2. UnboundedReader advances, UserParDos are processing messages, 10k or 10s passes, receivedMessages is full of message references
  3. getCheckpointMark is invoked, T1 checkpointMark is created with safeToAckMessages reference full of 10k messages, receivedMessages emptied
  4. output of stage is flushed and checkpoint, callback c1 to finalize checkpoint is set but not yet invoked
  5. UnboundedReader advances, UserParDos are processing messages, 10k or 10s passes, receivedMessages is full of message references
  6. getCheckpointMark is invoked, T2 checkpointMark is created with safeToAckMessages reference full of 20k messages
  7. Imagine now finalize for T1 is invoked, acking 20k messages as T1 and T2 has same reference
  8. flush fails as other worker is assigned this split, checkpoint is rejected, T2 is not invoked
  9. 10k/10s of messages are lost for this split

problem: safeToAckMessages should not be shared between T1 and T2, each should have it's own list. solution:

  • Checkpoint should have it's own list of safeToAckMessages, cleaned similarly to receivedMessages.

stankiewicz avatar Dec 04 '25 19:12 stankiewicz

Which runner you are using? Is this same data loss across both dataflow runners?

Dataflow runner with Streaming Engine enabled, Dataflow Prime enabled, Runner v2 enabled, streaming mode is "Exactly once".

If you think there could be some impact with one of those, I can try testing different configurations, however we've learned already in the past (not related to this topic) that it's best to keep at least the Streaming Engine (=Runner v2) enabled.

ppawel avatar Dec 05 '25 07:12 ppawel

@ppawel are you able to test with snapshot version? https://github.com/apache/beam/pull/37007 fix is ready for review, once merged you will be able to quickly validate.

stankiewicz avatar Dec 05 '25 10:12 stankiewicz

@ppawel are you able to test with snapshot version? #37007 fix is ready for review, once merged you will be able to quickly validate.

Yes, I will build it and try to test it today or latest Monday, thanks.

ppawel avatar Dec 05 '25 10:12 ppawel

@stankiewicz I see that @scwhittle closed this bug already but I would still like to test it and give feedback here... does your branch from the PR depend on anything in the 2.71.0 release or can it be tested with 2.70.0 as well?

ppawel avatar Dec 09 '25 14:12 ppawel

It was closed automatically since PR was annotated with fixing it. But reopening for verification.

scwhittle avatar Dec 09 '25 14:12 scwhittle

Paweł, I think you can run it with 2.70 and only use 2.71 snapshot for solace - https://repository.apache.org/content/groups/snapshots/org/apache/beam/beam-sdks-java-io-solace/2.71.0-SNAPSHOT/

stankiewicz avatar Dec 09 '25 14:12 stankiewicz

@stankiewicz I just tested the snapshot and it looks like a regression was introduced where the backlog of Solace messages is getting redelivered but not acked anymore. Only new incoming messages get consumed and acknoweledged.

Image Image

We had this issue a year ago which was fixed with one of these PRs:

  • https://github.com/apache/beam/pull/33269
  • https://github.com/apache/beam/pull/32962

Robbllle avatar Dec 15 '25 16:12 Robbllle

Yeah, unfortunately this is a pretty bad regression for us - if we can help somehow with diagnosing it, let us know...

ppawel avatar Dec 16 '25 08:12 ppawel

I created a Google support case (#65971204) and added you as a subscriber @stankiewicz.

Robbllle avatar Dec 16 '25 14:12 Robbllle