1331 lines
41 KiB
Objective-C
1331 lines
41 KiB
Objective-C
//
|
|
// RACSignal+Operations.m
|
|
// ReactiveObjC
|
|
//
|
|
// Created by Justin Spahr-Summers on 2012-09-06.
|
|
// Copyright (c) 2012 GitHub, Inc. All rights reserved.
|
|
//
|
|
|
|
#import "RACSignal+Operations.h"
|
|
#import "NSObject+RACDeallocating.h"
|
|
#import "NSObject+RACDescription.h"
|
|
#import "RACBlockTrampoline.h"
|
|
#import "RACCommand.h"
|
|
#import "RACCompoundDisposable.h"
|
|
#import "RACDisposable.h"
|
|
#import "RACEvent.h"
|
|
#import "RACGroupedSignal.h"
|
|
#import "RACMulticastConnection+Private.h"
|
|
#import "RACReplaySubject.h"
|
|
#import "RACScheduler.h"
|
|
#import "RACSerialDisposable.h"
|
|
#import "RACSignalSequence.h"
|
|
#import "RACStream+Private.h"
|
|
#import "RACSubject.h"
|
|
#import "RACSubscriber+Private.h"
|
|
#import "RACSubscriber.h"
|
|
#import "RACTuple.h"
|
|
#import "RACUnit.h"
|
|
#import <libkern/OSAtomic.h>
|
|
#import <objc/runtime.h>
|
|
|
|
NSErrorDomain const RACSignalErrorDomain = @"RACSignalErrorDomain";
|
|
|
|
// Subscribes to the given signal with the given blocks.
|
|
//
|
|
// If the signal errors or completes, the corresponding block is invoked. If the
|
|
// disposable passed to the block is _not_ disposed, then the signal is
|
|
// subscribed to again.
|
|
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
|
|
next = [next copy];
|
|
error = [error copy];
|
|
completed = [completed copy];
|
|
|
|
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
|
|
|
|
RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
|
|
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
|
|
[compoundDisposable addDisposable:selfDisposable];
|
|
|
|
__weak RACDisposable *weakSelfDisposable = selfDisposable;
|
|
|
|
RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
|
|
@autoreleasepool {
|
|
error(e, compoundDisposable);
|
|
[compoundDisposable removeDisposable:weakSelfDisposable];
|
|
}
|
|
|
|
recurse();
|
|
} completed:^{
|
|
@autoreleasepool {
|
|
completed(compoundDisposable);
|
|
[compoundDisposable removeDisposable:weakSelfDisposable];
|
|
}
|
|
|
|
recurse();
|
|
}];
|
|
|
|
[selfDisposable addDisposable:subscriptionDisposable];
|
|
};
|
|
|
|
// Subscribe once immediately, and then use recursive scheduling for any
|
|
// further resubscriptions.
|
|
recursiveBlock(^{
|
|
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
|
|
|
|
RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
|
|
[compoundDisposable addDisposable:schedulingDisposable];
|
|
});
|
|
|
|
return compoundDisposable;
|
|
}
|
|
|
|
@implementation RACSignal (Operations)
|
|
|
|
- (RACSignal *)doNext:(void (^)(id x))block {
|
|
NSCParameterAssert(block != NULL);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return [self subscribeNext:^(id x) {
|
|
block(x);
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
[subscriber sendCompleted];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -doNext:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)doError:(void (^)(NSError *error))block {
|
|
NSCParameterAssert(block != NULL);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return [self subscribeNext:^(id x) {
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
block(error);
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
[subscriber sendCompleted];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -doError:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)doCompleted:(void (^)(void))block {
|
|
NSCParameterAssert(block != NULL);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return [self subscribeNext:^(id x) {
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
block();
|
|
[subscriber sendCompleted];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -doCompleted:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)throttle:(NSTimeInterval)interval {
|
|
return [[self throttle:interval valuesPassingTest:^(id _) {
|
|
return YES;
|
|
}] setNameWithFormat:@"[%@] -throttle: %f", self.name, (double)interval];
|
|
}
|
|
|
|
- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
|
|
NSCParameterAssert(interval >= 0);
|
|
NSCParameterAssert(predicate != nil);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
|
|
|
|
// We may never use this scheduler, but we need to set it up ahead of
|
|
// time so that our scheduled blocks are run serially if we do.
|
|
RACScheduler *scheduler = [RACScheduler scheduler];
|
|
|
|
// Information about any currently-buffered `next` event.
|
|
__block id nextValue = nil;
|
|
__block BOOL hasNextValue = NO;
|
|
RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];
|
|
|
|
void (^flushNext)(BOOL send) = ^(BOOL send) {
|
|
@synchronized (compoundDisposable) {
|
|
[nextDisposable.disposable dispose];
|
|
|
|
if (!hasNextValue) return;
|
|
if (send) [subscriber sendNext:nextValue];
|
|
|
|
nextValue = nil;
|
|
hasNextValue = NO;
|
|
}
|
|
};
|
|
|
|
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
|
|
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
|
|
BOOL shouldThrottle = predicate(x);
|
|
|
|
@synchronized (compoundDisposable) {
|
|
flushNext(NO);
|
|
if (!shouldThrottle) {
|
|
[subscriber sendNext:x];
|
|
return;
|
|
}
|
|
|
|
nextValue = x;
|
|
hasNextValue = YES;
|
|
nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
|
|
flushNext(YES);
|
|
}];
|
|
}
|
|
} error:^(NSError *error) {
|
|
[compoundDisposable dispose];
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
flushNext(YES);
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
[compoundDisposable addDisposable:subscriptionDisposable];
|
|
return compoundDisposable;
|
|
}] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
|
|
}
|
|
|
|
- (RACSignal *)delay:(NSTimeInterval)interval {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
|
|
|
|
// We may never use this scheduler, but we need to set it up ahead of
|
|
// time so that our scheduled blocks are run serially if we do.
|
|
RACScheduler *scheduler = [RACScheduler scheduler];
|
|
|
|
void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
|
|
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
|
|
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
|
|
[disposable addDisposable:schedulerDisposable];
|
|
};
|
|
|
|
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
|
|
schedule(^{
|
|
[subscriber sendNext:x];
|
|
});
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
schedule(^{
|
|
[subscriber sendCompleted];
|
|
});
|
|
}];
|
|
|
|
[disposable addDisposable:subscriptionDisposable];
|
|
return disposable;
|
|
}] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
|
|
}
|
|
|
|
- (RACSignal *)repeat {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return subscribeForever(self,
|
|
^(id x) {
|
|
[subscriber sendNext:x];
|
|
},
|
|
^(NSError *error, RACDisposable *disposable) {
|
|
[disposable dispose];
|
|
[subscriber sendError:error];
|
|
},
|
|
^(RACDisposable *disposable) {
|
|
// Resubscribe.
|
|
});
|
|
}] setNameWithFormat:@"[%@] -repeat", self.name];
|
|
}
|
|
|
|
- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
|
|
NSCParameterAssert(catchBlock != NULL);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
|
|
|
|
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
RACSignal *signal = catchBlock(error);
|
|
NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
|
|
catchDisposable.disposable = [signal subscribe:subscriber];
|
|
} completed:^{
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
return [RACDisposable disposableWithBlock:^{
|
|
[catchDisposable dispose];
|
|
[subscriptionDisposable dispose];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -catch:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)catchTo:(RACSignal *)signal {
|
|
return [[self catch:^(NSError *error) {
|
|
return signal;
|
|
}] setNameWithFormat:@"[%@] -catchTo: %@", self.name, signal];
|
|
}
|
|
|
|
+ (RACSignal *)try:(id (^)(NSError **errorPtr))tryBlock {
|
|
NSCParameterAssert(tryBlock != NULL);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
NSError *error;
|
|
id value = tryBlock(&error);
|
|
RACSignal *signal = (value == nil ? [RACSignal error:error] : [RACSignal return:value]);
|
|
return [signal subscribe:subscriber];
|
|
}] setNameWithFormat:@"+try:"];
|
|
}
|
|
|
|
- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
|
|
NSCParameterAssert(tryBlock != NULL);
|
|
|
|
return [[self flattenMap:^(id value) {
|
|
NSError *error = nil;
|
|
BOOL passed = tryBlock(value, &error);
|
|
return (passed ? [RACSignal return:value] : [RACSignal error:error]);
|
|
}] setNameWithFormat:@"[%@] -try:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
|
|
NSCParameterAssert(mapBlock != NULL);
|
|
|
|
return [[self flattenMap:^(id value) {
|
|
NSError *error = nil;
|
|
id mappedValue = mapBlock(value, &error);
|
|
return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
|
|
}] setNameWithFormat:@"[%@] -tryMap:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)initially:(void (^)(void))block {
|
|
NSCParameterAssert(block != NULL);
|
|
|
|
return [[RACSignal defer:^{
|
|
block();
|
|
return self;
|
|
}] setNameWithFormat:@"[%@] -initially:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)finally:(void (^)(void))block {
|
|
NSCParameterAssert(block != NULL);
|
|
|
|
return [[[self
|
|
doError:^(NSError *error) {
|
|
block();
|
|
}]
|
|
doCompleted:^{
|
|
block();
|
|
}]
|
|
setNameWithFormat:@"[%@] -finally:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
|
|
NSCParameterAssert(scheduler != nil);
|
|
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
|
|
NSMutableArray *values = [NSMutableArray array];
|
|
|
|
void (^flushValues)(void) = ^{
|
|
@synchronized (values) {
|
|
[timerDisposable.disposable dispose];
|
|
|
|
if (values.count == 0) return;
|
|
|
|
RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
|
|
[values removeAllObjects];
|
|
[subscriber sendNext:tuple];
|
|
}
|
|
};
|
|
|
|
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
|
|
@synchronized (values) {
|
|
if (values.count == 0) {
|
|
timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
|
|
}
|
|
|
|
[values addObject:x ?: RACTupleNil.tupleNil];
|
|
}
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
flushValues();
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
return [RACDisposable disposableWithBlock:^{
|
|
[selfDisposable dispose];
|
|
[timerDisposable dispose];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
|
|
}
|
|
|
|
- (RACSignal *)collect {
|
|
return [[self aggregateWithStartFactory:^{
|
|
return [[NSMutableArray alloc] init];
|
|
} reduce:^(NSMutableArray *collectedValues, id x) {
|
|
[collectedValues addObject:(x ?: NSNull.null)];
|
|
return collectedValues;
|
|
}] setNameWithFormat:@"[%@] -collect", self.name];
|
|
}
|
|
|
|
- (RACSignal *)takeLast:(NSUInteger)count {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
|
|
return [self subscribeNext:^(id x) {
|
|
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];
|
|
|
|
while (valuesTaken.count > count) {
|
|
[valuesTaken removeObjectAtIndex:0];
|
|
}
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
for (id value in valuesTaken) {
|
|
[subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
|
|
}
|
|
|
|
[subscriber sendCompleted];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
|
|
}
|
|
|
|
- (RACSignal *)combineLatestWith:(RACSignal *)signal {
|
|
NSCParameterAssert(signal != nil);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
|
|
|
|
__block id lastSelfValue = nil;
|
|
__block BOOL selfCompleted = NO;
|
|
|
|
__block id lastOtherValue = nil;
|
|
__block BOOL otherCompleted = NO;
|
|
|
|
void (^sendNext)(void) = ^{
|
|
@synchronized (disposable) {
|
|
if (lastSelfValue == nil || lastOtherValue == nil) return;
|
|
[subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
|
|
}
|
|
};
|
|
|
|
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
|
|
@synchronized (disposable) {
|
|
lastSelfValue = x ?: RACTupleNil.tupleNil;
|
|
sendNext();
|
|
}
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
@synchronized (disposable) {
|
|
selfCompleted = YES;
|
|
if (otherCompleted) [subscriber sendCompleted];
|
|
}
|
|
}];
|
|
|
|
[disposable addDisposable:selfDisposable];
|
|
|
|
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
|
|
@synchronized (disposable) {
|
|
lastOtherValue = x ?: RACTupleNil.tupleNil;
|
|
sendNext();
|
|
}
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
@synchronized (disposable) {
|
|
otherCompleted = YES;
|
|
if (selfCompleted) [subscriber sendCompleted];
|
|
}
|
|
}];
|
|
|
|
[disposable addDisposable:otherDisposable];
|
|
|
|
return disposable;
|
|
}] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
|
|
}
|
|
|
|
+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
|
|
return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
|
|
return [left combineLatestWith:right];
|
|
}] setNameWithFormat:@"+combineLatest: %@", signals];
|
|
}
|
|
|
|
+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(RACGenericReduceBlock)reduceBlock {
|
|
NSCParameterAssert(reduceBlock != nil);
|
|
|
|
RACSignal *result = [self combineLatest:signals];
|
|
|
|
// Although we assert this condition above, older versions of this method
|
|
// supported this argument being nil. Avoid crashing Release builds of
|
|
// apps that depended on that.
|
|
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
|
|
|
|
return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
|
|
}
|
|
|
|
- (RACSignal *)merge:(RACSignal *)signal {
|
|
return [[RACSignal
|
|
merge:@[ self, signal ]]
|
|
setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
|
|
}
|
|
|
|
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
|
|
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
|
|
for (RACSignal *signal in signals) {
|
|
[copiedSignals addObject:signal];
|
|
}
|
|
|
|
return [[[RACSignal
|
|
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
|
|
for (RACSignal *signal in copiedSignals) {
|
|
[subscriber sendNext:signal];
|
|
}
|
|
|
|
[subscriber sendCompleted];
|
|
return nil;
|
|
}]
|
|
flatten]
|
|
setNameWithFormat:@"+merge: %@", copiedSignals];
|
|
}
|
|
|
|
- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
|
|
|
|
// Contains disposables for the currently active subscriptions.
|
|
//
|
|
// This should only be used while synchronized on `subscriber`.
|
|
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
|
|
|
|
// Whether the signal-of-signals has completed yet.
|
|
//
|
|
// This should only be used while synchronized on `subscriber`.
|
|
__block BOOL selfCompleted = NO;
|
|
|
|
// Subscribes to the given signal.
|
|
__block void (^subscribeToSignal)(RACSignal *);
|
|
|
|
// Weak reference to the above, to avoid a leak.
|
|
__weak __block void (^recur)(RACSignal *);
|
|
|
|
// Sends completed to the subscriber if all signals are finished.
|
|
//
|
|
// This should only be used while synchronized on `subscriber`.
|
|
void (^completeIfAllowed)(void) = ^{
|
|
if (selfCompleted && activeDisposables.count == 0) {
|
|
[subscriber sendCompleted];
|
|
}
|
|
};
|
|
|
|
// The signals waiting to be started.
|
|
//
|
|
// This array should only be used while synchronized on `subscriber`.
|
|
NSMutableArray *queuedSignals = [NSMutableArray array];
|
|
|
|
recur = subscribeToSignal = ^(RACSignal *signal) {
|
|
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
|
|
|
|
@synchronized (subscriber) {
|
|
[compoundDisposable addDisposable:serialDisposable];
|
|
[activeDisposables addObject:serialDisposable];
|
|
}
|
|
|
|
serialDisposable.disposable = [signal subscribeNext:^(id x) {
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
__strong void (^subscribeToSignal)(RACSignal *) = recur;
|
|
RACSignal *nextSignal;
|
|
|
|
@synchronized (subscriber) {
|
|
[compoundDisposable removeDisposable:serialDisposable];
|
|
[activeDisposables removeObjectIdenticalTo:serialDisposable];
|
|
|
|
if (queuedSignals.count == 0) {
|
|
completeIfAllowed();
|
|
return;
|
|
}
|
|
|
|
nextSignal = queuedSignals[0];
|
|
[queuedSignals removeObjectAtIndex:0];
|
|
}
|
|
|
|
subscribeToSignal(nextSignal);
|
|
}];
|
|
};
|
|
|
|
[compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
|
|
if (signal == nil) return;
|
|
|
|
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
|
|
|
|
@synchronized (subscriber) {
|
|
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
|
|
[queuedSignals addObject:signal];
|
|
|
|
// If we need to wait, skip subscribing to this
|
|
// signal.
|
|
return;
|
|
}
|
|
}
|
|
|
|
subscribeToSignal(signal);
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
@synchronized (subscriber) {
|
|
selfCompleted = YES;
|
|
completeIfAllowed();
|
|
}
|
|
}]];
|
|
|
|
[compoundDisposable addDisposable:[RACDisposable disposableWithBlock:^{
|
|
// A strong reference is held to `subscribeToSignal` until we're
|
|
// done, preventing it from deallocating early.
|
|
subscribeToSignal = nil;
|
|
}]];
|
|
|
|
return compoundDisposable;
|
|
}] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
|
|
}
|
|
|
|
- (RACSignal *)then:(RACSignal * (^)(void))block {
|
|
NSCParameterAssert(block != nil);
|
|
|
|
return [[[self
|
|
ignoreValues]
|
|
concat:[RACSignal defer:block]]
|
|
setNameWithFormat:@"[%@] -then:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)concat {
|
|
return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
|
|
}
|
|
|
|
- (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock {
|
|
NSCParameterAssert(startFactory != NULL);
|
|
NSCParameterAssert(reduceBlock != NULL);
|
|
|
|
return [[RACSignal defer:^{
|
|
return [self aggregateWithStart:startFactory() reduce:reduceBlock];
|
|
}] setNameWithFormat:@"[%@] -aggregateWithStartFactory:reduce:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock {
|
|
return [[self
|
|
aggregateWithStart:start
|
|
reduceWithIndex:^(id running, id next, NSUInteger index) {
|
|
return reduceBlock(running, next);
|
|
}]
|
|
setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduce:", self.name, RACDescription(start)];
|
|
}
|
|
|
|
- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
|
|
return [[[[self
|
|
scanWithStart:start reduceWithIndex:reduceBlock]
|
|
startWith:start]
|
|
takeLast:1]
|
|
setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, RACDescription(start)];
|
|
}
|
|
|
|
- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object {
|
|
return [self setKeyPath:keyPath onObject:object nilValue:nil];
|
|
}
|
|
|
|
- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
|
|
NSCParameterAssert(keyPath != nil);
|
|
NSCParameterAssert(object != nil);
|
|
|
|
keyPath = [keyPath copy];
|
|
|
|
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
|
|
|
|
// Purposely not retaining 'object', since we want to tear down the binding
|
|
// when it deallocates normally.
|
|
__block void * volatile objectPtr = (__bridge void *)object;
|
|
|
|
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
|
|
// Possibly spec, possibly compiler bug, but this __bridge cast does not
|
|
// result in a retain here, effectively an invisible __unsafe_unretained
|
|
// qualifier. Using objc_precise_lifetime gives the __strong reference
|
|
// desired. The explicit use of __strong is strictly defensive.
|
|
__strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
|
|
[object setValue:x ?: nilValue forKeyPath:keyPath];
|
|
} error:^(NSError *error) {
|
|
__strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
|
|
|
|
NSCAssert(NO, @"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
|
|
|
|
// Log the error if we're running with assertions disabled.
|
|
NSLog(@"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
|
|
|
|
[disposable dispose];
|
|
} completed:^{
|
|
[disposable dispose];
|
|
}];
|
|
|
|
[disposable addDisposable:subscriptionDisposable];
|
|
|
|
#if DEBUG
|
|
static void *bindingsKey = &bindingsKey;
|
|
NSMutableDictionary *bindings;
|
|
|
|
@synchronized (object) {
|
|
bindings = objc_getAssociatedObject(object, bindingsKey);
|
|
if (bindings == nil) {
|
|
bindings = [NSMutableDictionary dictionary];
|
|
objc_setAssociatedObject(object, bindingsKey, bindings, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
|
|
}
|
|
}
|
|
|
|
@synchronized (bindings) {
|
|
NSCAssert(bindings[keyPath] == nil, @"Signal %@ is already bound to key path \"%@\" on object %@, adding signal %@ is undefined behavior", [bindings[keyPath] nonretainedObjectValue], keyPath, object, self);
|
|
|
|
bindings[keyPath] = [NSValue valueWithNonretainedObject:self];
|
|
}
|
|
#endif
|
|
|
|
RACDisposable *clearPointerDisposable = [RACDisposable disposableWithBlock:^{
|
|
#if DEBUG
|
|
@synchronized (bindings) {
|
|
[bindings removeObjectForKey:keyPath];
|
|
}
|
|
#endif
|
|
|
|
while (YES) {
|
|
void *ptr = objectPtr;
|
|
if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr)) {
|
|
break;
|
|
}
|
|
}
|
|
}];
|
|
|
|
[disposable addDisposable:clearPointerDisposable];
|
|
|
|
[object.rac_deallocDisposable addDisposable:disposable];
|
|
|
|
RACCompoundDisposable *objectDisposable = object.rac_deallocDisposable;
|
|
return [RACDisposable disposableWithBlock:^{
|
|
[objectDisposable removeDisposable:disposable];
|
|
[disposable dispose];
|
|
}];
|
|
}
|
|
|
|
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
|
|
return [[RACSignal interval:interval onScheduler:scheduler withLeeway:0.0] setNameWithFormat:@"+interval: %f onScheduler: %@", (double)interval, scheduler];
|
|
}
|
|
|
|
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway {
|
|
NSCParameterAssert(scheduler != nil);
|
|
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return [scheduler after:[NSDate dateWithTimeIntervalSinceNow:interval] repeatingEvery:interval withLeeway:leeway schedule:^{
|
|
[subscriber sendNext:[NSDate date]];
|
|
}];
|
|
}] setNameWithFormat:@"+interval: %f onScheduler: %@ withLeeway: %f", (double)interval, scheduler, (double)leeway];
|
|
}
|
|
|
|
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
|
|
void (^triggerCompletion)(void) = ^{
|
|
[disposable dispose];
|
|
[subscriber sendCompleted];
|
|
};
|
|
|
|
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
|
|
triggerCompletion();
|
|
} completed:^{
|
|
triggerCompletion();
|
|
}];
|
|
|
|
[disposable addDisposable:triggerDisposable];
|
|
|
|
if (!disposable.disposed) {
|
|
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
[disposable dispose];
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
[disposable addDisposable:selfDisposable];
|
|
}
|
|
|
|
return disposable;
|
|
}] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
|
|
}
|
|
|
|
- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
|
|
return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
|
|
|
|
RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
|
|
[selfDisposable dispose];
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
[selfDisposable dispose];
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
[selfDisposable dispose];
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
if (!selfDisposable.disposed) {
|
|
selfDisposable.disposable = [[self
|
|
concat:[RACSignal never]]
|
|
subscribe:subscriber];
|
|
}
|
|
|
|
return [RACDisposable disposableWithBlock:^{
|
|
[selfDisposable dispose];
|
|
[replacementDisposable dispose];
|
|
}];
|
|
}];
|
|
}
|
|
|
|
- (RACSignal *)switchToLatest {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACMulticastConnection *connection = [self publish];
|
|
|
|
RACDisposable *subscriptionDisposable = [[connection.signal
|
|
flattenMap:^(RACSignal *x) {
|
|
NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
|
|
|
|
// -concat:[RACSignal never] prevents completion of the receiver from
|
|
// prematurely terminating the inner signal.
|
|
return [x takeUntil:[connection.signal concat:[RACSignal never]]];
|
|
}]
|
|
subscribe:subscriber];
|
|
|
|
RACDisposable *connectionDisposable = [connection connect];
|
|
return [RACDisposable disposableWithBlock:^{
|
|
[subscriptionDisposable dispose];
|
|
[connectionDisposable dispose];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -switchToLatest", self.name];
|
|
}
|
|
|
|
+ (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
|
|
NSCParameterAssert(signal != nil);
|
|
NSCParameterAssert(cases != nil);
|
|
|
|
for (id key in cases) {
|
|
id value __attribute__((unused)) = cases[key];
|
|
NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
|
|
}
|
|
|
|
NSDictionary *copy = [cases copy];
|
|
|
|
return [[[signal
|
|
map:^(id key) {
|
|
if (key == nil) key = RACTupleNil.tupleNil;
|
|
|
|
RACSignal *signal = copy[key] ?: defaultSignal;
|
|
if (signal == nil) {
|
|
NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
|
|
return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
|
|
}
|
|
|
|
return signal;
|
|
}]
|
|
switchToLatest]
|
|
setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
|
|
}
|
|
|
|
+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
|
|
NSCParameterAssert(boolSignal != nil);
|
|
NSCParameterAssert(trueSignal != nil);
|
|
NSCParameterAssert(falseSignal != nil);
|
|
|
|
return [[[boolSignal
|
|
map:^(NSNumber *value) {
|
|
NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
|
|
|
|
return (value.boolValue ? trueSignal : falseSignal);
|
|
}]
|
|
switchToLatest]
|
|
setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
|
|
}
|
|
|
|
- (id)first {
|
|
return [self firstOrDefault:nil];
|
|
}
|
|
|
|
- (id)firstOrDefault:(id)defaultValue {
|
|
return [self firstOrDefault:defaultValue success:NULL error:NULL];
|
|
}
|
|
|
|
- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
|
|
NSCondition *condition = [[NSCondition alloc] init];
|
|
condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
|
|
|
|
__block id value = defaultValue;
|
|
__block BOOL done = NO;
|
|
|
|
// Ensures that we don't pass values across thread boundaries by reference.
|
|
__block NSError *localError;
|
|
__block BOOL localSuccess;
|
|
|
|
[[self take:1] subscribeNext:^(id x) {
|
|
[condition lock];
|
|
|
|
value = x;
|
|
localSuccess = YES;
|
|
|
|
done = YES;
|
|
[condition broadcast];
|
|
[condition unlock];
|
|
} error:^(NSError *e) {
|
|
[condition lock];
|
|
|
|
if (!done) {
|
|
localSuccess = NO;
|
|
localError = e;
|
|
|
|
done = YES;
|
|
[condition broadcast];
|
|
}
|
|
|
|
[condition unlock];
|
|
} completed:^{
|
|
[condition lock];
|
|
|
|
localSuccess = YES;
|
|
|
|
done = YES;
|
|
[condition broadcast];
|
|
[condition unlock];
|
|
}];
|
|
|
|
[condition lock];
|
|
while (!done) {
|
|
[condition wait];
|
|
}
|
|
|
|
if (success != NULL) *success = localSuccess;
|
|
if (error != NULL) *error = localError;
|
|
|
|
[condition unlock];
|
|
return value;
|
|
}
|
|
|
|
- (BOOL)waitUntilCompleted:(NSError **)error {
|
|
BOOL success = NO;
|
|
|
|
[[[self
|
|
ignoreValues]
|
|
setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
|
|
firstOrDefault:nil success:&success error:error];
|
|
|
|
return success;
|
|
}
|
|
|
|
+ (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
|
|
NSCParameterAssert(block != NULL);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return [block() subscribe:subscriber];
|
|
}] setNameWithFormat:@"+defer:"];
|
|
}
|
|
|
|
- (NSArray *)toArray {
|
|
return [[[self collect] first] copy];
|
|
}
|
|
|
|
- (RACSequence *)sequence {
|
|
return [[RACSignalSequence sequenceWithSignal:self] setNameWithFormat:@"[%@] -sequence", self.name];
|
|
}
|
|
|
|
- (RACMulticastConnection *)publish {
|
|
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
|
|
RACMulticastConnection *connection = [self multicast:subject];
|
|
return connection;
|
|
}
|
|
|
|
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
|
|
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
|
|
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
|
|
return connection;
|
|
}
|
|
|
|
- (RACSignal *)replay {
|
|
RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];
|
|
|
|
RACMulticastConnection *connection = [self multicast:subject];
|
|
[connection connect];
|
|
|
|
return connection.signal;
|
|
}
|
|
|
|
- (RACSignal *)replayLast {
|
|
RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];
|
|
|
|
RACMulticastConnection *connection = [self multicast:subject];
|
|
[connection connect];
|
|
|
|
return connection.signal;
|
|
}
|
|
|
|
- (RACSignal *)replayLazily {
|
|
RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
|
|
return [[RACSignal
|
|
defer:^{
|
|
[connection connect];
|
|
return connection.signal;
|
|
}]
|
|
setNameWithFormat:@"[%@] -replayLazily", self.name];
|
|
}
|
|
|
|
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
|
|
NSCParameterAssert(scheduler != nil);
|
|
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
|
|
|
|
RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
|
|
[disposable dispose];
|
|
[subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
|
|
}];
|
|
|
|
[disposable addDisposable:timeoutDisposable];
|
|
|
|
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
|
|
[subscriber sendNext:x];
|
|
} error:^(NSError *error) {
|
|
[disposable dispose];
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
[disposable dispose];
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
[disposable addDisposable:subscriptionDisposable];
|
|
return disposable;
|
|
}] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
|
|
}
|
|
|
|
- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return [self subscribeNext:^(id x) {
|
|
[scheduler schedule:^{
|
|
[subscriber sendNext:x];
|
|
}];
|
|
} error:^(NSError *error) {
|
|
[scheduler schedule:^{
|
|
[subscriber sendError:error];
|
|
}];
|
|
} completed:^{
|
|
[scheduler schedule:^{
|
|
[subscriber sendCompleted];
|
|
}];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
|
|
}
|
|
|
|
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
|
|
|
|
RACDisposable *schedulingDisposable = [scheduler schedule:^{
|
|
RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
|
|
|
|
[disposable addDisposable:subscriptionDisposable];
|
|
}];
|
|
|
|
[disposable addDisposable:schedulingDisposable];
|
|
return disposable;
|
|
}] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
|
|
}
|
|
|
|
- (RACSignal *)deliverOnMainThread {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
__block volatile int32_t queueLength = 0;
|
|
|
|
void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
|
|
int32_t queued = OSAtomicIncrement32(&queueLength);
|
|
if (NSThread.isMainThread && queued == 1) {
|
|
block();
|
|
OSAtomicDecrement32(&queueLength);
|
|
} else {
|
|
dispatch_async(dispatch_get_main_queue(), ^{
|
|
block();
|
|
OSAtomicDecrement32(&queueLength);
|
|
});
|
|
}
|
|
};
|
|
|
|
return [self subscribeNext:^(id x) {
|
|
performOnMainThread(^{
|
|
[subscriber sendNext:x];
|
|
});
|
|
} error:^(NSError *error) {
|
|
performOnMainThread(^{
|
|
[subscriber sendError:error];
|
|
});
|
|
} completed:^{
|
|
performOnMainThread(^{
|
|
[subscriber sendCompleted];
|
|
});
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
|
|
}
|
|
|
|
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
|
|
NSCParameterAssert(keyBlock != NULL);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
NSMutableDictionary *groups = [NSMutableDictionary dictionary];
|
|
NSMutableArray *orderedGroups = [NSMutableArray array];
|
|
|
|
return [self subscribeNext:^(id x) {
|
|
id<NSCopying> key = keyBlock(x);
|
|
RACGroupedSignal *groupSubject = nil;
|
|
@synchronized(groups) {
|
|
groupSubject = groups[key];
|
|
if (groupSubject == nil) {
|
|
groupSubject = [RACGroupedSignal signalWithKey:key];
|
|
groups[key] = groupSubject;
|
|
[orderedGroups addObject:groupSubject];
|
|
[subscriber sendNext:groupSubject];
|
|
}
|
|
}
|
|
|
|
[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
|
|
} error:^(NSError *error) {
|
|
[subscriber sendError:error];
|
|
|
|
[orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
|
|
} completed:^{
|
|
[subscriber sendCompleted];
|
|
|
|
[orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
|
|
return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)any {
|
|
return [[self any:^(id x) {
|
|
return YES;
|
|
}] setNameWithFormat:@"[%@] -any", self.name];
|
|
}
|
|
|
|
- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
|
|
NSCParameterAssert(predicateBlock != NULL);
|
|
|
|
return [[[self materialize] bind:^{
|
|
return ^(RACEvent *event, BOOL *stop) {
|
|
if (event.finished) {
|
|
*stop = YES;
|
|
return [RACSignal return:@NO];
|
|
}
|
|
|
|
if (predicateBlock(event.value)) {
|
|
*stop = YES;
|
|
return [RACSignal return:@YES];
|
|
}
|
|
|
|
return [RACSignal empty];
|
|
};
|
|
}] setNameWithFormat:@"[%@] -any:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
|
|
NSCParameterAssert(predicateBlock != NULL);
|
|
|
|
return [[[self materialize] bind:^{
|
|
return ^(RACEvent *event, BOOL *stop) {
|
|
if (event.eventType == RACEventTypeCompleted) {
|
|
*stop = YES;
|
|
return [RACSignal return:@YES];
|
|
}
|
|
|
|
if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
|
|
*stop = YES;
|
|
return [RACSignal return:@NO];
|
|
}
|
|
|
|
return [RACSignal empty];
|
|
};
|
|
}] setNameWithFormat:@"[%@] -all:", self.name];
|
|
}
|
|
|
|
- (RACSignal *)retry:(NSInteger)retryCount {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
__block NSInteger currentRetryCount = 0;
|
|
return subscribeForever(self,
|
|
^(id x) {
|
|
[subscriber sendNext:x];
|
|
},
|
|
^(NSError *error, RACDisposable *disposable) {
|
|
if (retryCount == 0 || currentRetryCount < retryCount) {
|
|
// Resubscribe.
|
|
currentRetryCount++;
|
|
return;
|
|
}
|
|
|
|
[disposable dispose];
|
|
[subscriber sendError:error];
|
|
},
|
|
^(RACDisposable *disposable) {
|
|
[disposable dispose];
|
|
[subscriber sendCompleted];
|
|
});
|
|
}] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
|
|
}
|
|
|
|
- (RACSignal *)retry {
|
|
return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];
|
|
}
|
|
|
|
- (RACSignal *)sample:(RACSignal *)sampler {
|
|
NSCParameterAssert(sampler != nil);
|
|
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
NSLock *lock = [[NSLock alloc] init];
|
|
__block id lastValue;
|
|
__block BOOL hasValue = NO;
|
|
|
|
RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
|
|
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
|
|
[lock lock];
|
|
hasValue = YES;
|
|
lastValue = x;
|
|
[lock unlock];
|
|
} error:^(NSError *error) {
|
|
[samplerDisposable dispose];
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
[samplerDisposable dispose];
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
|
|
BOOL shouldSend = NO;
|
|
id value;
|
|
[lock lock];
|
|
shouldSend = hasValue;
|
|
value = lastValue;
|
|
[lock unlock];
|
|
|
|
if (shouldSend) {
|
|
[subscriber sendNext:value];
|
|
}
|
|
} error:^(NSError *error) {
|
|
[sourceDisposable dispose];
|
|
[subscriber sendError:error];
|
|
} completed:^{
|
|
[sourceDisposable dispose];
|
|
[subscriber sendCompleted];
|
|
}];
|
|
|
|
return [RACDisposable disposableWithBlock:^{
|
|
[samplerDisposable dispose];
|
|
[sourceDisposable dispose];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
|
|
}
|
|
|
|
- (RACSignal *)ignoreValues {
|
|
return [[self filter:^(id _) {
|
|
return NO;
|
|
}] setNameWithFormat:@"[%@] -ignoreValues", self.name];
|
|
}
|
|
|
|
- (RACSignal *)materialize {
|
|
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
|
|
return [self subscribeNext:^(id x) {
|
|
[subscriber sendNext:[RACEvent eventWithValue:x]];
|
|
} error:^(NSError *error) {
|
|
[subscriber sendNext:[RACEvent eventWithError:error]];
|
|
[subscriber sendCompleted];
|
|
} completed:^{
|
|
[subscriber sendNext:RACEvent.completedEvent];
|
|
[subscriber sendCompleted];
|
|
}];
|
|
}] setNameWithFormat:@"[%@] -materialize", self.name];
|
|
}
|
|
|
|
- (RACSignal *)dematerialize {
|
|
return [[self bind:^{
|
|
return ^(RACEvent *event, BOOL *stop) {
|
|
switch (event.eventType) {
|
|
case RACEventTypeCompleted:
|
|
*stop = YES;
|
|
return [RACSignal empty];
|
|
|
|
case RACEventTypeError:
|
|
*stop = YES;
|
|
return [RACSignal error:event.error];
|
|
|
|
case RACEventTypeNext:
|
|
return [RACSignal return:event.value];
|
|
}
|
|
};
|
|
}] setNameWithFormat:@"[%@] -dematerialize", self.name];
|
|
}
|
|
|
|
- (RACSignal *)not {
|
|
return [[self map:^(NSNumber *value) {
|
|
NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);
|
|
|
|
return @(!value.boolValue);
|
|
}] setNameWithFormat:@"[%@] -not", self.name];
|
|
}
|
|
|
|
- (RACSignal *)and {
|
|
return [[self map:^(RACTuple *tuple) {
|
|
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
|
|
NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
|
|
|
|
return @([tuple.rac_sequence all:^(NSNumber *number) {
|
|
NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
|
|
|
|
return number.boolValue;
|
|
}]);
|
|
}] setNameWithFormat:@"[%@] -and", self.name];
|
|
}
|
|
|
|
- (RACSignal *)or {
|
|
return [[self map:^(RACTuple *tuple) {
|
|
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
|
|
NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
|
|
|
|
return @([tuple.rac_sequence any:^(NSNumber *number) {
|
|
NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
|
|
|
|
return number.boolValue;
|
|
}]);
|
|
}] setNameWithFormat:@"[%@] -or", self.name];
|
|
}
|
|
|
|
- (RACSignal *)reduceApply {
|
|
return [[self map:^(RACTuple *tuple) {
|
|
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
|
|
NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");
|
|
|
|
// We can't use -array, because we need to preserve RACTupleNil
|
|
NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
|
|
for (id val in tuple) {
|
|
[tupleArray addObject:val];
|
|
}
|
|
RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];
|
|
|
|
return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
|
|
}] setNameWithFormat:@"[%@] -reduceApply", self.name];
|
|
}
|
|
|
|
@end
|