RxSwift icon indicating copy to clipboard operation
RxSwift copied to clipboard

Stack overflows caused by MergeLimitedSink operators

Open geoffmacd opened this issue 7 months ago • 8 comments

Short description of the issue:

At Dropbox, we use RxSwift heavily in a serial queue that must be FIFO to process something that requires us to wait sometimes (for reasons im not going to go into). We have 1 main observable that represents our input, and uses concatMap in conjunction with a .just() and .delay() to achieve this. The delay period, which is rare, is < 3 seconds. The input is generally all at once (10,000s of elements in short period of time). Since at least 2021, our top crash has been a stack overflow in Rx code we've never been able to address. This crash affects a minority of users on launch and is rarely reproduced until now...

We found reproduction case (see sample code) that can cause stack overflows when using standard RxSwift concatMap() operator (or concatMap/ merge(maxConcurrent: in Merge.swift) when in combination with randomly delayed sequences. This example function will cause super deep stack traces (or cause a stack overflow crash directly if you are lucky). It seems to be important that we do not exclusively .delay or not, only that there is a random mix of delayed and not delayed "just" elements.

If you run this code, if a crash doesn't happen, you can at least see a super deep stack size inside MergeLimitedSinkIter.on with the Thread API (just print Thread.callStackReturnAddresses.count). This is the source of the S/O crash we are experiencing for some users.

It is a concurrency issue where a .just() emitting immediately on the current queue seems to mess up all internal uses of MergeLimitedSinkIter (which is concat/concatMap/merge(maxConcurrnet:)).

Expected outcome:

The above code sample should be protected against stack overflows by intelligently scheduling the next inner subscribe.

What actually happens:

Stack over flow that looks like this:

Pasted Graphic 1

Self contained code example that reproduces the issue:

func generateStackOverflow() {
        print("starting rx concatMap/just subscribe")

        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(repeating: 1, count: 100_000))
            .observe(on: scheduler)
            .concatMap {
                // produces super large stack traces when mixing ConcatMap+Just+Delay
                if Int.random(in: 0 ... 100) != 0 {
                    return Single.just($0)
                } else {
                    return Single.just($0).delay(.nanoseconds(Int.random(in: 1 ... 7)), scheduler: scheduler)
                }
            }
            .subscribe(onCompleted: {
                print("finished rx concatMap")
            })
            .disposed(by: disposeBag)
    }

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

we are on 6.6.0 but this has not been addressed or even noted.

Platform/Environment

  • [X] iOS
  • [X] macOS
  • [ ] tvOS
  • [ ] watchOS
  • [ ] playgrounds

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

  • [ ] easy, 100% repro
  • [X] sometimes, 10%-100%
  • [ ] hard, 2% - 10%
  • [ ] extremely hard, %0 - 2%

Xcode version:

15.4

Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)

  • [ ] just starting
  • [ ] I have a small code base
  • [X] I have a significant code base

geoffmacd avatar Jul 26 '24 23:07 geoffmacd