ReactiveObjC icon indicating copy to clipboard operation
ReactiveObjC copied to clipboard

RACSubject flattenMap to long running signal break subscribing.

Open Pitometsu opened this issue 7 years ago • 5 comments

   RACSubject *subject = [RACSubject subject];

   RACSignal *resultSignal = [[[subject filter:^BOOL(NSNumber *item) {

        return [item isKindOfClass:[NSNumber class]];
    }] flattenMap:^RACSignal *(NSNumber *item) {

        return [RACSignal createSignal:
                ^RACDisposable *(id<RACSubscriber> subscriber) {
                    __block TheTask *veryLongRunningTask =
                    [self doVeryLongRunningTaskForItem:item
                                        withCompletion:
                     ^(id result, NSError *error) {

                         if (!error) {
                             [subscriber sendNext:result];
                             [subscriber sendCompleted];
                         } else {
                             [subscriber sendError:error];
                         }
                     }];

                    return [RACDisposable disposableWithBlock:^{
                        [veryLongRunningTask cancel];
                    }];
        }];
   [subject sendNext:@1];
   [subject sendNext:@2];

I need to receive both results for @1 and @2 values in resultSignal, but sending second time next signal to subject very soon after first one, until veryLongRunningTask not completed yet, break expected behaviour: resultSignal do not return neither values, and will ignore all next values after that.

Is it bug, or what exactly did I do wrong, and how should so?


version:

 $ grep ReactiveObjC Podfile.lock
    - ReactiveObjC (~> 2.1)
  - ReactiveObjC (2.1.0)
    - ReactiveObjC (~> 2.1)
  - ReactiveObjC (~> 2.1)
  ReactiveObjC: 2edae120982d8e6d5407503a10edcfdd87eb8639

Pitometsu avatar Nov 29 '16 18:11 Pitometsu

Backtrace:

    frame #1: 0x0000000100ffb614 ReactiveObjC`-[RACDisposable dispose](self=0x000060800021bc80, _cmd="dispose") + 180 at RACDisposable.m:81
    frame #2: 0x0000000100ffa948 ReactiveObjC`-[RACCompoundDisposable dispose](self=0x00006080004c26f0, _cmd="dispose") + 328 at RACCompoundDisposable.m:239
    frame #3: 0x0000000100ffa948 ReactiveObjC`-[RACCompoundDisposable dispose](self=0x00006080016d24b0, _cmd="dispose") + 328 at RACCompoundDisposable.m:239
    frame #4: 0x0000000101045336 ReactiveObjC`-[RACSubscriber dealloc](self=0x0000608001e42310, _cmd="dealloc") + 70 at RACSubscriber.m:61
    frame #5: 0x0000000101b09b12 libobjc.A.dylib`objc_object::sidetable_release(bool) + 212
    frame #6: 0x0000000101039b0c ReactiveObjC`-[RACSignal(self=0x0000608000823c40, _cmd="subscribeNext:error:completed:", nextBlock=0x0000000101035960, errorBlock=0x0000000101035a30, completedBlock=0x0000000101035b80) subscribeNext:error:completed:] + 1500 at RACSignal.m:294
    frame #7: 0x00000001010358a4 ReactiveObjC`__29-[RACSignal((null)=0x000061000007b2c0, signal=0x0000608000823c40) bind:]_block_invoke.87 + 644 at RACSignal.m:130
    frame #8: 0x0000000101035e75 ReactiveObjC`__29-[RACSignal((null)=0x0000610000283160, x=(long)3) bind:]_block_invoke.122 + 197 at RACSignal.m:156
    frame #9: 0x000000010104545c ReactiveObjC`-[RACSubscriber sendNext:](self=0x0000610000055000, _cmd="sendNext:", value=(long)3) + 236 at RACSubscriber.m:71
    frame #10: 0x0000000101007db9 ReactiveObjC`-[RACPassthroughSubscriber sendNext:](self=0x000061000002dda0, _cmd="sendNext:", value=(long)3) + 441 at RACPassthroughSubscriber.m:77
    frame #11: 0x00000001010359ab ReactiveObjC`__29-[RACSignal((null)=0x0000608001e41380, x=(long)3) bind:]_block_invoke_2.95 + 75 at RACSignal.m:131
    frame #12: 0x000000010104545c ReactiveObjC`-[RACSubscriber sendNext:](self=0x0000608001e412c0, _cmd="sendNext:", value=(long)3) + 236 at RACSubscriber.m:71
    frame #13: 0x000000010100b901 ReactiveObjC`__29-[RACReturnSignal subscribe:]_block_invoke((null)=0x00007fff5ef62fc8) + 81 at RACReturnSignal.m:85
    frame #14: 0x000000010104631b ReactiveObjC`-[RACSubscriptionScheduler schedule:](self=0x000060000002d6c0, _cmd="schedule:", block=0x000000010100b8b0) + 587 at RACSubscriptionScheduler.m:39
    frame #15: 0x000000010100b82c ReactiveObjC`-[RACReturnSignal subscribe:](self=0x0000608000823c60, _cmd="subscribe:", subscriber=0x0000608001e412c0) + 620 at RACReturnSignal.m:84
    frame #16: 0x0000000101039ae4 ReactiveObjC`-[RACSignal(self=0x0000608000823c60, _cmd="subscribeNext:error:completed:", nextBlock=0x0000000101035960, errorBlock=0x0000000101035a30, completedBlock=0x0000000101035b80) subscribeNext:error:completed:] + 1460 at RACSignal.m:293
    frame #17: 0x00000001010358a4 ReactiveObjC`__29-[RACSignal((null)=0x000061000007b340, signal=0x0000608000823c60) bind:]_block_invoke.87 + 644 at RACSignal.m:130
    frame #18: 0x0000000101035e75 ReactiveObjC`__29-[RACSignal((null)=0x00006100002831b0, x=(long)3) bind:]_block_invoke.122 + 197 at RACSignal.m:156
    frame #19: 0x000000010104545c ReactiveObjC`-[RACSubscriber sendNext:](self=0x0000610000055120, _cmd="sendNext:", value=(long)3) + 236 at RACSubscriber.m:71
    frame #20: 0x0000000101007db9 ReactiveObjC`-[RACPassthroughSubscriber sendNext:](self=0x000061000002dde0, _cmd="sendNext:", value=(long)3) + 441 at RACPassthroughSubscriber.m:77
    frame #21: 0x0000000101014d9b ReactiveObjC`__37-[RACSignal((null)=0x0000610000055270, x=(long)3) doCompleted:]_block_invoke_2 + 75 at RACSignal+Operations.m:123
    frame #22: 0x000000010104545c ReactiveObjC`-[RACSubscriber sendNext:](self=0x0000610000055210, _cmd="sendNext:", value=(long)3) + 236 at RACSubscriber.m:71
    frame #23: 0x0000000101007db9 ReactiveObjC`-[RACPassthroughSubscriber sendNext:](self=0x000061000002de00, _cmd="sendNext:", value=(long)3) + 441 at RACPassthroughSubscriber.m:77
    frame #24: 0x00000001010144bb ReactiveObjC`__33-[RACSignal((null)=0x0000610000055390, x=(long)3) doError:]_block_invoke_2 + 75 at RACSignal+Operations.m:108
    frame #25: 0x000000010104545c ReactiveObjC`-[RACSubscriber sendNext:](self=0x0000610000055330, _cmd="sendNext:", value=(long)3) + 236 at RACSubscriber.m:71
    frame #26: 0x0000000101007db9 ReactiveObjC`-[RACPassthroughSubscriber sendNext:](self=0x000061000002de20, _cmd="sendNext:", value=(long)3) + 441 at RACPassthroughSubscriber.m:77
    frame #27: 0x0000000101013be8 ReactiveObjC`__32-[RACSignal((null)=0x00006100000554b0, x=(long)3) doNext:]_block_invoke_2 + 104 at RACSignal+Operations.m:94
    frame #28: 0x000000010104545c ReactiveObjC`-[RACSubscriber sendNext:](self=0x0000610000055450, _cmd="sendNext:", value=(long)3) + 236 at RACSubscriber.m:71
    frame #29: 0x0000000101007db9 ReactiveObjC`-[RACPassthroughSubscriber sendNext:](self=0x000061000002de40, _cmd="sendNext:", value=(long)3) + 441 at RACPassthroughSubscriber.m:77
    frame #30: 0x000000010104473b ReactiveObjC`__23-[RACSubject sendNext:]_block_invoke((null)=0x00007fff5ef63a40, subscriber=0x000061000002de40) + 75 at RACSubject.m:95
    frame #31: 0x000000010104452e ReactiveObjC`-[RACSubject enumerateSubscribersUsingBlock:](self=0x000061000002db60, _cmd="enumerateSubscribersUsingBlock:", block=0x00000001010446f0) + 590 at RACSubject.m:87
  * frame #32: 0x00000001010446c4 ReactiveObjC`-[RACSubject sendNext:](self=0x000061000002db60, _cmd="sendNext:", value=(long)3) + 148 at RACSubject.m:94

Pitometsu avatar Nov 29 '16 19:11 Pitometsu

Why when I do

[subject sendNext:@2];

RACSubscriber do

- (void)dealloc {
	[self.disposable dispose];
}

so my task being cancelled.

Pitometsu avatar Nov 29 '16 21:11 Pitometsu

Problem flow:

subject next => disposable dispose => task cancel => task error => subscriber error => break subscription.

Pitometsu avatar Nov 29 '16 21:11 Pitometsu

Ok, fixed by replacing flattenMap: by map: plus switchToLatest.

Leave issue here because not sure is it right behaviour or bug. Please, check.

Pitometsu avatar Nov 29 '16 23:11 Pitometsu

That looks correct to me. Are you sure the disposable is what's canceling the task? Does doVeryLongRunningTaskForItem:withCompletion: handle concurrent requests properly?

mdiep avatar Dec 04 '16 02:12 mdiep