CombineExt icon indicating copy to clipboard operation
CombineExt copied to clipboard

withLatestFrom doesn't seem to be thread safe

Open fbarbat opened this issue 2 years ago • 0 comments

withLatestFrom doesn't seem to be thread safe when subscribing to two different publishers that emit elements from different threads. This is part of the current implementation of withLatestFrom. Check the comments in uppercase.

private func trackLatestFromSecond(onInitialValue: @escaping () -> Void) {
      var gotInitialValue = false

      let subscriber = AnySubscriber<Other.Output, Other.Failure>(
        receiveSubscription: { [weak self] subscription in
            self?.otherSubscription = subscription
            subscription.request(.unlimited)
        },
        receiveValue: { [weak self] value in
            guard let self = self else { return .none }

            // THIS IS CALLED FROM THREAD B
            self.latestValue = value

            if !gotInitialValue {
                // When getting initial value, start pulling values
                // from upstream in the main sink
                self.sink = Sink(upstream: self.upstream,
                                 downstream: self.downstream,
                                 transformOutput: { [weak self] value in
                                    guard let self = self,
                                          // THIS IS CALLED FROM THREAD A, ACCESSING THE SAME VAR BUT IT IS NOT SYNCHRONIZED
                                          let other = self.latestValue else { return nil }

                                    return self.resultSelector(value, other)
                                 },
                                 transformFailure: { $0 })

                // Signal initial value to start fulfilling downstream demand
                gotInitialValue = true
                onInitialValue()
            }

            return .unlimited
        },
        receiveCompletion: nil)

      self.second.subscribe(subscriber)
    }

Should a lock over latestValue be added to ensure thread safety? Are there reasons not to do it (for example, performance)? Should CombineExt at least update the Swift docs saying it is not supposed to be used from multiple threads?

Thank you.

fbarbat avatar Oct 23 '23 19:10 fbarbat