KMP-NativeCoroutines icon indicating copy to clipboard operation
KMP-NativeCoroutines copied to clipboard

Crash "thunk for @escaping @callee_guaranteed (@in_guaranteed"

Open mdstage opened this issue 3 months ago • 8 comments

We have some crashes on iOS, it seems to be when a publisher is dealocated on iOS side while a new value is published from KMP. We could not reproduce it ourselves, we know of it only because of crash reports.

Its happening from iOS 16 to 26, with cocoapod 'KMPNativeCoroutinesCombine', '1.0.0-ALPHA-47'.

KMP library is created with:

kotlin = "2.2.20"
kotlinx-coroutines = "1.10.2"
java-version = 21

The error is:

thunk for @escaping @callee_guaranteed (@in_guaranteed OurKMPSdkState, @guaranteed @escaping @callee_guaranteed @substituted <A> () -> (@out A) for <OurKMPSdkKotlinUnit>, @in_guaranteed OurKMPSdkKotlinUnit) -> (@out OurKMPSdkKotlinUnit)

Here is the stacktrace from crashlytics:

          Crashed: com.apple.root.default-qos
0  Combine                        0xb0620 Publishers.TryMap.Inner.receive(_:) + 44
1  Combine                        0xb0d7c protocol witness for Subscriber.receive(_:) in conformance Publishers.TryMap<A, B>.Inner<A1> + 20
2  KMPNativeCoroutinesCombine     0x6708 $s26KMPNativeCoroutinesCombine22NativeFlowSubscriptionC7requestyy0C011SubscribersO6DemandVFq0_x_q0_ycq0_tcfU_ + 332
3  OurApp                         0x185468 thunk for @escaping @callee_guaranteed (@in_guaranteed OurKMPSdkState, @guaranteed @escaping @callee_guaranteed @substituted <A> () -> (@out A) for <OurKMPSdkKotlinUnit>, @in_guaranteed OurKMPSdkKotlinUnit) -> (@out OurKMPSdkKotlinUnit) + 4380202088 (<compiler-generated>:4380202088)
4  OurApp                         0x185264 thunk for @escaping @callee_guaranteed (@guaranteed OurKMPSdkState, @guaranteed @escaping @callee_guaranteed () -> (@owned OurKMPSdkKotlinUnit), @guaranteed OurKMPSdkKotlinUnit) -> (@owned OurKMPSdkKotlinUnit) + 4380201572 (<compiler-generated>:4380201572)
5  KmpSdk                         0xe5e4b0 invokeFunction3 + 824
6  KmpSdk                         0xe5e388 invokeFunction3 + 528
7  KmpSdk                         0x169620 kfun:com.rickclephas.kmp.nativecoroutines.asNativeFlow$1.asNativeFlow$1$invoke$job$1.asNativeFlow$1$invoke$job$1$invoke$2.emit#internal + 756
8  KmpSdk                         0xddd6a0 kfun:net.ourKmp.usecases.OurUseCaseImpl.OurUseCaseImpl$data$$inlined$map$1.OurUseCaseImpl$data$$inlined$map$1$collect$2.emit#internal + 932
9  KmpSdk                         0x139acc kfun:kotlinx.coroutines.flow.DistinctFlowImpl.DistinctFlowImpl$collect$2.emit#internal + 752
10 KmpSdk                         0x13f7b0 kfun:kotlinx.coroutines.flow.onEach$$inlined$unsafeTransform$1.onEach$$inlined$unsafeTransform$1$collect$2.$emitCOROUTINE$1.invokeSuspend#internal + 776
11 KmpSdk                         0x13fa64 kfun:kotlinx.coroutines.flow.onEach$$inlined$unsafeTransform$1.onEach$$inlined$unsafeTransform$1$collect$2.emit#internal + 240
12 KmpSdk                         0x139acc kfun:kotlinx.coroutines.flow.DistinctFlowImpl.DistinctFlowImpl$collect$2.emit#internal + 752
13 KmpSdk                         0x1400d8 kfun:kotlinx.coroutines.flow.combine$$inlined$combineUnsafe$1.combine$$inlined$combineUnsafe$1$collect$2.$invokeCOROUTINE$0.invokeSuspend#internal + 780
14 KmpSdk                         0x1403a0 kfun:kotlinx.coroutines.flow.combine$$inlined$combineUnsafe$1.combine$$inlined$combineUnsafe$1$collect$2.invoke#internal + 252
15 KmpSdk                         0x134708 kfun:kotlinx.coroutines.flow.internal.combineInternal$2.$invokeCOROUTINE$2.invokeSuspend#internal + 2860
16 KmpSdk                         0x15128 kfun:kotlin.coroutines.native.internal.BaseContinuationImpl#resumeWith(kotlin.Result<kotlin.Any?>){} + 176
17 KmpSdk                         0x1466bc kfun:kotlinx.coroutines.DispatchedTask#run(){} + 1552
18 KmpSdk                         0x1666e8 kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.DarwinGlobalQueueDispatcher$dispatch$$inlined$autoreleasepool$1.$<bridge-DN>invoke(){}#internal + 184
19 KmpSdk                         0x14205c4 ___6f72672e6a6574627261696e732e6b6f746c696e783a6b6f746c696e782d636f726f7574696e65732d636f72652f6f70742f6275696c644167656e742f776f726b2f343465633665383530643563363366302f6b6f746c696e782d636f726f7574696e65732d636f72652f6e617469766544617277696e2f7372632f44697370617463686572732e6b74_knbridge2_block_invoke + 420
20 libdispatch.dylib              0x61f68 <redacted> + 24
21 libdispatch.dylib              0x62f60 <redacted> + 16
22 libdispatch.dylib              0x13df8 <redacted> + 840
23 libdispatch.dylib              0x143f0 <redacted> + 164
24 libsystem_pthread.dylib        0x1a34 _pthread_wqthread + 224
25 libsystem_pthread.dylib        0x1b10 start_wqthread + 8
        

Here is how we are creating the publisher in iOS

    public var useCasePublisher: AnyPublisher<OurState, Never> {
        createPublisher(for: ourUseCase.data())
            .tryMap { 
	            // mapping
            }
            .replaceError(with: .undefined)
            .eraseToAnyPublisher()
    }

I could not find any other issues/reports mentioning thunk for @escaping @callee_guaranteed related to KMP except for SKIE that we are not using. This crash appeared recently, along with some big refactoring on our side, so we cannot say if it comes from our changes or a new version of KMPNativeCoroutinesCombine.

mdstage avatar Oct 08 '25 11:10 mdstage

Hi. You mentioned a big refactor. What version of KMP-NativeCoroutines were you previously using?

rickclephas avatar Oct 08 '25 18:10 rickclephas

Hi, we moved from 1.0.0-ALPHA-24 to 1.0.0-ALPHA-47, these crashes didn't appear in ALPHA-24

mdstage avatar Oct 09 '25 06:10 mdstage

Thanks. There haven't been any changes to the Publisher implementation between those versions, so it's unlikely that the version bump is causing this crash. Based on the stacktrace it looks like the crash occurs inside the tryMap. Is your mapping logic written in Swift or Kotlin?

rickclephas avatar Oct 12 '25 15:10 rickclephas

The tryMap is done in Swift, it maps KMP entities to Swift entities. I'm confident that the mapping itself is not the cause of the crash, as its a straightforward 1 to 1 mapping from KMP to Swift, it was also there before the refactoring. But it could happen when the publisher created by createPublisher is getting deallocate, maybe causing the crash?

mdstage avatar Oct 13 '25 07:10 mdstage

Alright, well if the publisher/cancellable gets deallocated then it will also cancel the Flow collection in Kotlin. Based on my tests this won't result in a crash. Even if for some reason the Flow collection wouldn't be cancelled (immediately). It would either drop the values if the publisher was deallocated, or simply continue to process the values in Swift until the collection is actually cancelled.

Maybe the crash is caused by the Swift code that processes the values? Could it be that after/during the cancellation it fails to process new values?

rickclephas avatar Oct 13 '25 18:10 rickclephas

Hi, sorry for the late answer.

We created a KMP publisher that constantly publish values, and some code to randomly create and loose ref on many publishers created with KMP-NativeCoroutines on different threads.

I was able to get one crash there:

Image

mdstage avatar Oct 22 '25 14:10 mdstage

We created a KMP publisher that constantly publish values, and some code to randomly create and loose ref on many publishers created with KMP-NativeCoroutines on different threads.

Could you possibly share the code for this?

rickclephas avatar Oct 22 '25 16:10 rickclephas

Sure, here is the KMP part:

abstract class IosTestingUseCase {
    @NativeCoroutines
    abstract fun data(): Flow<String>

    abstract fun initialise(delayInMilliSeconds: Long)

    abstract fun clear()
}

internal class IosTestingUseCaseImpl(
    private val scope: CoroutineScope,
) : IosTestingUseCase() {
    private val stateFlow = MutableStateFlow("")
    private var timerJob: Job? = null
    private var counter = 0

    override fun data(): Flow<String> = stateFlow.asStateFlow()

    override fun initialise(delayInMilliSeconds: Long) {
        // Cancel any existing timer
        timerJob?.cancel()

        // Start a new timer
        timerJob =
            scope.launch {
                try {
                    while (true) {
                        delay(delayInMilliSeconds)
                        counter++
                        stateFlow.value = counter.toString()
                    }
                } catch (ex: Exception) {
                    ex.printStackTrace()
                }
            }
    }

    override fun clear() {

        // Stop the timer
        timerJob?.cancel()
        timerJob = null

        // Reset counter and flow value
        counter = 0
        stateFlow.value = ""

    }
}

Here is the Swift part that creates the publisher:

class IosTestingUseCaseSwift {

  // ...

    public var valuePublisher: AnyPublisher<String, Never> {
        createPublisher(for: iosTestingUseCase.data())
            .tryMap { string in
                StringToStringMapper().map(from: string)
            }
            .replaceError(with: "error")
            .eraseToAnyPublisher()
    }
    public func initialise(delayInMilliSeconds: Int) {
        iosTestingUseCase.initialise(delayInMilliSeconds: Int64(delayInMilliSeconds))
    }
}

// mock mapping
struct StringToStringMapper{
    func map(from entity: String, with context: ()) -> String {
        switch entity.first {
        case "0"?, "1"?, "2"?, "4"?, "5"?, "6"?, "7"?, "8"?, "9"?:
            entity
        default:
            entity
        }
    }
}

And the part that create and destroy publishers very quickly on many threads. Please note that its not production code, only quickly done to try to recreate the crash.


import SwiftUI
import OSLog

// MARK: - Heavy object to allocate/deallocate
final class Blob {
    let id: UUID = UUID()
    let createdAt: Date = .now
    var payload: AnyCancellable // simulate memory pressure / work

    init(payload: AnyCancellable) {
        self.payload = payload
    }
}

// MARK: - Thread-safe registry using an Actor
actor ObjectStore {
    private var storage: [UUID: Blob] = [:]

    func insert(_ blob: Blob) {
        storage[blob.id] = blob
    }

    func insertMany(_ blobs: [Blob]) {
        for b in blobs { storage[b.id] = b }
    }

    @discardableResult
    func remove(_ id: UUID) -> Blob? { storage.removeValue(forKey: id) }

    func removeRandom(_ n: Int) {
        guard !storage.isEmpty else { return }
        let keys = Array(storage.keys)
        for k in keys.shuffled().prefix(n) { storage.removeValue(forKey: k) }
    }

    func count() -> Int { storage.count }

    func allIDs() -> [UUID] { Array(storage.keys) }
}

// MARK: - Stressor that schedules work across different threads
final class Stressor {
    private let log = Logger(subsystem: "demo.concurrent.stress", category: "Stressor")

    let store: ObjectStore
    private(set) var isRunning: Bool = false

    // Multiple distinct queues to demonstrate true cross-thread behavior
    private let queues: [DispatchQueue] = [
        DispatchQueue(label: "worker.queue.1", qos: .userInitiated, attributes: .concurrent),
        DispatchQueue(label: "worker.queue.2", qos: .utility, attributes: .concurrent),
        DispatchQueue(label: "worker.queue.3", qos: .background, attributes: .concurrent),
        DispatchQueue.main
    ]

    // Task handles for Swift concurrency loops
    private var tasks: [Task<Void, Never>] = []
    private let iosTestingUseCaseSwift: IosTestingUseCaseSwift
    init(
        store: ObjectStore,
        iosTestingUseCaseSwift: IosTestingUseCaseSwift
    ) {
        self.store = store
        self.iosTestingUseCaseSwift = iosTestingUseCaseSwift
    }

    func create() -> AnyCancellable {
        iosTestingUseCaseSwift
            .valuePublisher
            .sink { value in
                print("##> received: \(value)")
            }
    }

    func start(
        objectSizeKB: Int = 64,
        batchRange: ClosedRange<Int> = 50...150,
        deleteRatio: Double = 0.4
    ) {
        guard !isRunning else { return }
        isRunning = true
        log.info("Stressor started")

        // 1) GCD-based recurring work on multiple queues
        for (idx, q) in queues.enumerated() {
            scheduleOnQueue(q, label: "GCD-\(idx+1)", objectSizeKB: objectSizeKB, batchRange: batchRange, deleteRatio: deleteRatio)
        }

        // 2) Swift Concurrency detached tasks (different executors/threads over time)
        for i in 0..<2 {
            let t = Task.detached { [weak self] in
                guard let self else { return }
                let label = "Task-\(i+1)"
                while self.isRunning {
                    // Random sleep between 10–60 ms
                    try? await Task.sleep(nanoseconds: UInt64(Int.random(in: 10_000_00...60_000_00)))
                    await self.randomOp(label: label, objectSizeKB: objectSizeKB, batchRange: batchRange, deleteRatio: deleteRatio)
                }
            }
            tasks.append(t)
        }
    }

    func stop() {
        guard isRunning else { return }
        isRunning = false
        tasks.forEach { $0.cancel() }
        tasks.removeAll()
        log.info("Stressor stopped")
    }

    private func scheduleOnQueue(_ queue: DispatchQueue, label: String, objectSizeKB: Int, batchRange: ClosedRange<Int>, deleteRatio: Double) {
        @Sendable
        func scheduleNext() {
            // Recursively schedule work with jitter to keep activity going
            queue.asyncAfter(deadline: .now() + .milliseconds(Int.random(in: 10...50))) {
                Task { [weak self] in
                    guard let self else { return }
                    if self.isRunning {
                        await self.randomOp(label: label, objectSizeKB: objectSizeKB, batchRange: batchRange, deleteRatio: deleteRatio)
                        scheduleNext()
                    }
                }
            }
        }
        scheduleNext()
    }

    // Randomly create or delete in varying batch sizes
    private func randomOp(label: String, objectSizeKB: Int, batchRange: ClosedRange<Int>, deleteRatio: Double) async {
        if Double.random(in: 0...1) < deleteRatio {
            print("##> delete")
            let delCount = Int.random(in: batchRange)
            await store.removeRandom(delCount)
        } else {
            let createCount = Int.random(in: batchRange)
            // Build blobs off-actor to avoid holding the actor during heavy work
            var newBlobs: [Blob] = []
            newBlobs.reserveCapacity(createCount)
            for _ in 0..<createCount { newBlobs.append(Blob(payload: create())) }
            await store.insertMany(newBlobs)
        }

        // Occasionally log status
        if Int.random(in: 0..<100) == 0 {
            let c = await store.count()
            appPrint("##> [\(label)] store.count=\(c)")
        }
    }
}

// MARK: - SwiftUI UI to control the stressor and observe live counts
final class TestViewModel {
    private var count: Int = 0
    private var running: Bool = false

    private let store = ObjectStore()
    private lazy var stressor = Stressor(store: store, iosTestingUseCaseSwift: IosTestingUseCaseSwift)

    private var timer: Timer?
    private let iosTestingUseCaseSwift: IosTestingUseCaseSwift

    init(iosTestingUseCaseSwift: IosTestingUseCaseSwift) {
        self.iosTestingUseCaseSwift = iosTestingUseCaseSwift
    }

    func start() {
        guard !running else { return }
        running = true
        stressor.start(objectSizeKB: 128, batchRange: 100...400, deleteRatio: 0.7)

        timer = Timer.scheduledTimer(withTimeInterval: 0.1, repeats: true) { [weak self] _ in
            guard let self else { return }
            Task { self.count = await self.store.count() }
        }
    }

    func stop() {
        guard running else { return }
        running = false
        stressor.stop()
        timer?.invalidate()
        timer = nil
    }
}

And then

        iosTestingUseCaseSwift.initialise(delayInMilliSeconds: 20)
        testViewModel = TestViewModel(iosTestingUseCaseSwift: iosTestingUseCaseSwift)
        testViewModel?.start()

Please note that it doesn't happens all the time, it takes many app restart, wait, kill app, restart the app to happen. It seems to happen only with a very specific timing.

mdstage avatar Oct 23 '25 08:10 mdstage