# startWith

startWith는 가변 파라미터를 받는다. 전달된 요소들을 이벤트 앞에 삽입한다.

let numbers = [1, 2, 3, 4, 5]

Observable.from(numbers)
    .startWith(0)
    .startWith(-2, -1)
    .subscribe{ print($0) } // -2 -1 0 1 2 3 4 5
    .disposed(by: bag)

# concat

let fruits = Observable.from(["🍏", "🍎", "🥝", "🍑", "🍋", "🍉"])
let animals = Observable.from(["🐶", "🐱", "🐹", "🐼", "🐯", "🐵"])

Observable.concat([fruits, animals])
    .subscribe{ print($0) }
    .disposed(by: bag)

concat 파라미터의 배열 내에 옵저버블을 넣으면 순서대로 합친 뒤 구독자에게 데이터를 전달한다.

위의 형태는 ObservableType의 타입 메서드를 호출한 것이고, 옵저버블 인스턴스 내에도 concat 메서드가 존재한다.

fruits.concat(animals) // fruit 옵저버블 뒤에 animals 옵저버블을 concat
    .subscribe{ print($0) }
    .disposed(by: bag)

concat은 앞에 붙인 옵저버블이 이벤트 방출이 모두 끝나 onCompleted가 호출되면 나머지 뒤의 옵저버블 이벤트들을 방출하는 방식으로 동작한다.

# merge

concat과 다르게 merge 연산자는 요소들을 묶어 한 옵저버블로 리턴한다.

병합 후 부분 옵저버블에 next 이벤트를 전달하면 병합 이후의 순서에 삽입된다.

let oddNumbers = BehaviorSubject(value: 1)
let evenNumbers = BehaviorSubject(value: 2)
let negativeNumbers = BehaviorSubject(value: -1)

let source = Observable.of(oddNumbers, evenNumbers)

source
    .merge()
    .subscribe{ print($0) } // 1 2가 먼저 next로 전달
    .disposed(by: bag)

oddNumbers.onNext(3) // 병합된 1 2 이후 3이 삽입

병합되는 두 옵저버블 모두 onCompleted 되어야 병합된 전체 옵저버블에서 onCompleted가 호출된다.

병합대상 중 하나라도 에러가 방출되면 그 즉시 전체 옵저버블도 에러를 구독자에게 방출하며 종료된다.

병합 최대 갯수는 merge(maxConcurrent: Int) 파라미터를 전달하여 지정 가능하다.

# combineLatest

rx-4

옵저버블을 병합하여 result Observable에 값을 담아 이벤트를 방출하는데, 두 옵저버블이 모두 이벤트를 방출하기 시작한 시점부터 각 옵저버블의 가장 최근 이벤트와 결합하여 리절트 옵저버블에 요소를 담는다.

let greetings = PublishSubject<String>()
let languages = PublishSubject<String>()

Observable.combineLatest(greetings, languages){ lhs, rhs -> String in
    return "\(lhs) \(rhs)"
}
.subscribe{ print($0) } // Hi America , 안녕 America, 하이 America, 하이 코리아
.disposed(by: bag)

greetings.onNext("HI")
languages.onNext("America")

greetings.onNext("안녕")
greetings.onNext("하이")
languages.onNext("코리아")

combineLatest도 부분 옵저버블 둘다 onCompleted가 전달되어야 구독자에게 onCompleted가 방출된다.

만약 부분 옵저버블이 종료되었다면 마지막에 방출된 이벤트 요소를 계속해서 사용하게 된다.

부분 옵저버블에서 에러를 방출하는 경우 그 즉시 구독자에게 에러를 전달한다.

# zip

combineLatest는 가장 최근에 전달된 이벤트를 유지한다면, zip은 인덱스가 일치하는 짝끼리 결합하여 result 옵저버블에 전달한다. 결합할 짝이 없는 대상들은 이벤트로 방출되지 않는다.

이렇게 소스 옵저버블들이 방출하는 요소들의 인덱스를 맞춰 이벤트를 방출하는 것을 Indexed Sequencing이라고 한다.

result 옵저버블에 결합된 이들을 방출하는 시점은 둘이 결합되는 시점이므로, 더 늦게 등장한 상대 짝 이벤트의 시점이 된다.

Observable.zip(numbers, strings){ lhs, rhs -> String in
    return "\(lhs) \(rhs)"
}
.subscribe{ print($0) } // next(1 A)만 방출되고 종료된다.
.disposed(by: bag)

numbers.onNext(1)
strings.onNext("A")

numbers.onNext(2)
numbers.onNext(3)

zip 연산자도 마찬가지로 소스 옵저버블 모두 onCompleted 이벤트를 전달해야 구독자에게 onCompleted가 방출되고, 하나라도 에러가 발생하면 구독자에게 즉시 에러를 전달한다.

# withLatestFrom

triggerObservable.withLatestFrom(dataObservable) 형태로 호출한다. withLatestFrom 연산자를 호출하는 옵저버블을 트리거 옵저버블, 파라미터에 전달되는 옵저버블을 데이터 옵저버블이라고 한다.

트리거 옵저버블에서 next 이벤트를 방출하면 데이터 옵저버블의 가장 최신 이벤트를 방출한다.


let trigger = PublishSubject<Void>()
let data = PublishSubject<String>()

trigger.withLatestFrom(data)
    .subscribe{ print($0) }
    .disposed(by: bag)

data.onNext("HI")  // 여기까지 실행하면 이벤트 방출하지 않음

trigger.onNext(()) // 여기까지 실행하면 데이터 옵저버블의 HI를 방출
trigger.onNext(()) // 데이터 옵저버블의 가장 최신 이벤트인 HI를 다시 방출

데이터 옵저버블이 onCompleted되어도 트리거 옵저버블의 이벤트 방출에 따라 데이터 옵저버블의 마지막 이벤트는 계속해서 방출된다.

반면 데이터 옵저버블에서 에러를 방출하는 경우 트리거 옵저버블의 이벤트 방출시 구독자에게 에러를 방출한다.

트리거 옵저버블이 onCompleted되면 데이터 옵저버블의 완료 여부와 상관없이 onCompleted가 구독자에게 방출된다.

# sample

dataObservable.sample(triggerObservable)형태로 사용한다. 트리거 옵저버블에 next이벤트 전달시 데이터 옵저버블의 최신 이벤트를 방출하는 것은 동일하다.

하지만 한번 방출한 이벤트를 재사용하지 않는다.

let trigger = PublishSubject<Void>()
let data = PublishSubject<String>()

data.sample(trigger)
    .subscribe{ print($0) }
    .disposed(by: bag)

data.onNext("HI")
trigger.onNext(()) // next(HI)
trigger.onNext(()) // 아무 이벤트도 방출되지 않음

data.onNext("HI 2")
trigger.onNext(()) // next(HI 2)

data.onCompleted()
trigger.onNext() // completed

샘플 연산자의 데이터 옵저버블이 onCompleted()를 방출한 뒤 트리거 옵저버블에서 next 이벤트를 방출하면 구독자에게 바로 onCompleted를 방출한다.

withLatestFrom에서는 데이터 옵저버블이 완료되었어도 트리거 옵저버블의 이벤트를 방출하면 데이터 옵저버블의 마지막 이벤트를 전달한다.

데이터 옵저버블에서 에러를 방출하면 트리거 옵저버블의 이벤트 방출 여부와 상관없이 구독자에게 에러를 방출한다.

# switchLatest

껐다 켜는 스위치를 생각하면 된다. 주로 옵저버블을 리턴하는 옵저버블에 대해 특정 옵저버블을 선택하여 대상에 대해서만 최신 이벤트를 받고자 할때 사용한다.

let a = PublishSubject<String>()
let b = PublishSubject<String>()

let source = PublishSubject<Observable<String>>()

source
    .subscribe{ print($0) }
    .disposed(by: bag)

위의 코드를 보면 Observable<String>을 이벤트 타입으로 받는 퍼블리시 서브젝트가 있다.

이때 switchLatest를 source 서브젝트 연산에 추가하면 다음과 같다.

source
    .switchLatest()
    .subscribe{ print($0) }
    .disposed(by: bag)

source.onNext(a)

a.onNext("A event") // next(A event)

소스의 넥스트 이벤트로 최신 이벤트를 받을 옵저버블을 전달한다. 이후 전달된 옵저버블에 이벤트를 전달하면 소스 옵저버블에서 구독자에게 해당 이벤트를 방출한다.

소스의 넥스트 이벤트 내에 최신 옵저버블을 파라미터로 전달하여 지정하면 된다.

지정된 최신 옵저버블의 onCompleted여부에 상관 없이 소스 옵저버블에서 onCompleted를 방출해야 소스 옵저버블이 종료된다.

에러의 경우 최신 옵저버블에서 에러를 방출해도 소스 옵저버블이 에러를 구독자에게 방출한다. 최신 옵저버블로 지정되지 않은 옵저버블에서 에러를 방출해도 소스는 이를 무시한다.

# reduce

scan 연산자는 accumulator파라미터에 전달된 클로저의 연산 과정 모두를 이벤트로 방출한다.

그에 반해 reduce 연산자는 최종 결과 하나만 방출한다.

let o = Observable.range(start: 1, count: 5)

o.reduce(0, accumulator: +)
    .subscribe{ print($0) }
    .disposed(by: bag)

위의 코드는 1부터 5까지의 누적합 최종 15값을 이벤트로 방출하는 코드이다.

# Conditional Operator - amb

amb 연산자는 비교 대상 옵저버블 중 먼저 이벤트가 방출되는 옵저버블의 이벤트만 구독자에게 방출한다. 방출이 더 늦어진 옵저버블은 넥스트 이벤트 뿐만 아니라 onCompleted, onError도 무시된다.

let a = PublishSubject<String>()
let b = PublishSubject<String>()

a.amb(b)
    .subscribe{ print($0) }
    .disposed(by: bag)

a.onNext("A") // a 옵저버블에 이벤트가 먼저 방출
b.onNext("B") // 무시
b.onCompleted() // 무시

여러 옵저버블들을 한번에 비교하기 위해서는 Observable.amb(Sequence)형태의 타입 메서드를 사용한다.

let a = PublishSubject<String>()
let b = PublishSubject<String>()
let c = PublishSubject<String>()

Observable.amb([a,b,c])
    .subscribe{ print($0) }
    .disposed(by: bag)

b.onNext("B") // B 옵저버블에서 가장 먼저 이벤트 방출
a.onNext("A")
c.onNext("C")

b.onCompleted() // completed 구독자에게 방출