RxSwift icon indicating copy to clipboard operation
RxSwift copied to clipboard

Stack overflows caused by MergeLimitedSink operators

Open geoffmacd opened this issue 1 year ago • 8 comments
trafficstars

Short description of the issue:

At Dropbox, we use RxSwift heavily in a serial queue that must be FIFO to process something that requires us to wait sometimes (for reasons im not going to go into). We have 1 main observable that represents our input, and uses concatMap in conjunction with a .just() and .delay() to achieve this. The delay period, which is rare, is < 3 seconds. The input is generally all at once (10,000s of elements in short period of time). Since at least 2021, our top crash has been a stack overflow in Rx code we've never been able to address. This crash affects a minority of users on launch and is rarely reproduced until now...

We found reproduction case (see sample code) that can cause stack overflows when using standard RxSwift concatMap() operator (or concatMap/ merge(maxConcurrent: in Merge.swift) when in combination with randomly delayed sequences. This example function will cause super deep stack traces (or cause a stack overflow crash directly if you are lucky). It seems to be important that we do not exclusively .delay or not, only that there is a random mix of delayed and not delayed "just" elements.

If you run this code, if a crash doesn't happen, you can at least see a super deep stack size inside MergeLimitedSinkIter.on with the Thread API (just print Thread.callStackReturnAddresses.count). This is the source of the S/O crash we are experiencing for some users.

It is a concurrency issue where a .just() emitting immediately on the current queue seems to mess up all internal uses of MergeLimitedSinkIter (which is concat/concatMap/merge(maxConcurrnet:)).

Expected outcome:

The above code sample should be protected against stack overflows by intelligently scheduling the next inner subscribe.

What actually happens:

Stack over flow that looks like this:

Pasted Graphic 1

Self contained code example that reproduces the issue:

func generateStackOverflow() {
        print("starting rx concatMap/just subscribe")

        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(repeating: 1, count: 100_000))
            .observe(on: scheduler)
            .concatMap {
                // produces super large stack traces when mixing ConcatMap+Just+Delay
                if Int.random(in: 0 ... 100) != 0 {
                    return Single.just($0)
                } else {
                    return Single.just($0).delay(.nanoseconds(Int.random(in: 1 ... 7)), scheduler: scheduler)
                }
            }
            .subscribe(onCompleted: {
                print("finished rx concatMap")
            })
            .disposed(by: disposeBag)
    }

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

we are on 6.6.0 but this has not been addressed or even noted.

Platform/Environment

  • [X] iOS
  • [X] macOS
  • [ ] tvOS
  • [ ] watchOS
  • [ ] playgrounds

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

  • [ ] easy, 100% repro
  • [X] sometimes, 10%-100%
  • [ ] hard, 2% - 10%
  • [ ] extremely hard, %0 - 2%

Xcode version:

15.4

Level of RxSwift knowledge: (this is so we can understand your level of knowledge and formulate the response in an appropriate manner)

  • [ ] just starting
  • [ ] I have a small code base
  • [X] I have a significant code base

geoffmacd avatar Jul 26 '24 23:07 geoffmacd

The simplest solution is to add .delay(.seconds(0), scheduler: scheduler) to the just... Yes?

danielt1263 avatar Jul 26 '24 23:07 danielt1263

To both sides of the if statement? Yes- I suppose that would work - but it was a surprise that using the API like this could cause this in the first place. Its weird to have to know to balance it like this.

geoffmacd avatar Jul 26 '24 23:07 geoffmacd

Very weird indeed. Especially given that if the body of the closure is only { Single.just($0) } or if it's only { Single.just($0).delay(.nanoseconds(Int.random(in: 1 ... 7)), scheduler: scheduler) }, then the call stack is fine.

danielt1263 avatar Jul 27 '24 00:07 danielt1263

Yes, my point is that devs should feel free to pass any given Observable back to concatMap() and not fear stack overflows. I believe my fix should protect against that.

geoffmacd avatar Jul 27 '24 00:07 geoffmacd

What is the performance hit? I see if I use: return Observable.just($0).delay(.nanoseconds(0), scheduler: scheduler) in the if block it takes 14 seconds to run through the 100_000 elements whereas if I use return Observable.generate(initialState: $0, condition: { _ in false }, iterate: { $0 }) it only takes 6 seconds. (Both have a stack size of 31)

danielt1263 avatar Jul 27 '24 00:07 danielt1263

I'm seeing that if I take your fix and use your initial code, the stack size goes up to 42 but the runtime is only 3 seconds... Looks like a good fix to me (I assume all the tests that use MergeLimitedSinkIter still pass?

danielt1263 avatar Jul 27 '24 00:07 danielt1263

yes tests pass both in this repo (they didn't auto run in CI though) and like 3000 unit tests that heavily rely on this func in our own codebase.

When I tested with this function with device -iPhone XS:

        let t1 = Date()
        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(repeating: 1, count: 100_000))
            .observe(on: scheduler)
            .concatMap {
                return Single.just($0).delay(.nanoseconds(0), scheduler: scheduler)
            }
            .subscribe(
                onCompleted: {
                    print("finished rx concatMap in \(Date().timeIntervalSince(t1)) s")
                }
            )
            .disposed(by: disposeBag)

I got 6.6 seconds runtime with my fix and about the same on master.

geoffmacd avatar Jul 27 '24 00:07 geoffmacd

I found an even simpler repro function for this issue, only the first inner subscription needs to be a delay (or anything that queues), then the DelaySink will emit all elements (Justs) at once.

    func generateStackOverflow() {
        print("starting rx concatMap + just + delay")

        let scheduler = SerialDispatchQueueScheduler(qos: .userInteractive, internalSerialQueueName: "1")
        // stack overflow
        Observable.from(Array(0 ..< 100_000))
            .observe(on: scheduler)
            .concatMap {
                // produces super large stack traces when mixing ConcatMap+Just+Delay
                if $0 == 0 {
                    // only on the first element is it delayed, the `DelaySink` will then emit all elements emitted from during this delay
                    return Single.just($0).delay(.seconds(1), scheduler: scheduler)
                } else {
                    return Single.just($0)
                }
            }
            .subscribe(
                onCompleted: {
                    print("finished rx concatMap + just + delay")
                }
            )
            .disposed(by: disposeBag)
    }

geoffmacd avatar Aug 22 '24 22:08 geoffmacd

Let's move the discussion over to #2616

freak4pc avatar Oct 03 '24 12:10 freak4pc