# toArray

옵저버블의 방출되는 이벤트들을 배열에 담아 onComplete가 호출되는 시점에 해당 배열을 한번에 방출한다.

에러가 전달되면 배열에 담겼던 데이터는 무시된다.

onCompleted가 전달되기 전까지 전달되던 요소들은 이벤트로 방출되지 않고 배열내에 적재되기만 한다.

let subject = PublishSubject<Int>()
subject
    .toArray()
    .subscribe{ print($0) }
    .disposed(by: disposeBag)

subject.onNext(0)
subject.onNext(1)
subject.onCompleted() // [0, 1]

# map

map 클로저에 전달되는 동작에 맞춰 요소를 가공하여 구독자에게 전달한다.

let skills = ["Swift", "SwiftUI", "RxSwift"]

Observable.from(skills)
    .map{ $0.count }
    .subscribe{ print($0) } // next(5) next(7) next(7)
    .disposed(by: disposeBag)

요소가 가공된 데이터 타입은 소스 옵저버블에 전달되는 요소 타입을 따라갈 필요가 없다. 위에서 skills라는 변수 내에는 String으로 이루어져 있지만 map으로 리턴하는 데이터는 Int형이다.

# compactMap

기본 스위프트에서도 제공하는 compactMap과 동작이 동일하다. 옵셔널 데이터가 혼재된 배열을 순회할때 옵셔널타입은 옵셔널 바인딩을 진행하여 원본 데이터를 추출해주고, nil값은 동시에 필터링 해주는 역할을 해준다.

RxSwift에서의 compactMap은 이와 동일하게 이벤트 방출 대상에 대해 옵셔널 바인딩을 진행하며, nil값은 이벤트로 방출하지 않고 무시한다.

let subject = PublishSubject<String?>()

subject
    .compactMap{ $0 }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

Observable<Int>.interval(.milliseconds(300), scheduler: MainScheduler.instance)
    .take(10)
    .map { _ in Bool.random() ? "⭐️" : nil }
    .subscribe(onNext: { subject.onNext($0) })
    .disposed(by: disposeBag)
  1. 0.3초마다 Optional(⭐️) 또는 nil을 이벤트로 방출한다.
  2. 서브젝트의 compactMap을 호출하여 최종 구독자에게 데이터를 전달할때 옵셔널 바인딩 및 nil 필터링을 진행한다.
  3. 맨 아래 옵저버블에서 옵셔널 문자열들을 자신의 구독자인 서브젝트로 이벤트를 방출하고, 이를 다시 print($0)을 진행하는 구독자에게 데이터를 전달하는 방식이다.

# flatMap

출처: https://reactivex.io/documentation/operators/flatmap.html

flatMap은 방출 이벤트 요소를 정의한 로직에 따라 여러개의 옵저버블로 쪼갠다. 이후 최종적으로 모두를 한 옵저버블로 묶어 방출한다.

  1. 여러개로 쪼개지는 옵저버블을 이너 옵저버블(Inner Observable)이라고 한다.
  2. 최종적으로 방출되는 옵저버블을 리절트 옵저버블(Result Observable)이라고 한다.
  3. 이러한 일련의 과정을 평탄화(Flattening)라고 한다.
let redCircle = "🔴"
let greenCircle = "🟢"
let blueCircle = "🔵"

let redHeart = "❤️"
let greenHeart = "💚"
let blueHeart = "💙"

Observable.from([redCircle, greenCircle, blueCircle])
    .flatMap{ circle -> Observable<String> in
        switch circle {
        case redCircle:
            return Observable.repeatElement(redHeart)
                .take(5)
        case greenCircle:
            return Observable.repeatElement(greenHeart)
                .take(5)
        case blueCircle:
            return Observable.repeatElement(blueHeart)
                .take(5)
        default:
            return Observable.just("")
        }
    }
    .subscribe{ print($0) }
    .disposed(by: disposeBag)

리절트 옵저버블을 구독하여 방출된 이벤트를 출력해보면 그들 순서가 뒤죽박죽인 것을 볼 수 있다.

위의 예시에서 하트가 빨간색이 먼저 5개가 출력되고 그 나머지가 출력되는 방식이 아닌, 순서 상관없이 처리됨에 따라 그때그때 최종 옵저버블에 푸시되는 형태이다.

이러한 현상을 인터리빙(Interleaving)이라고 한다.

# flatMapFirst

위의 flatMap과 동일한 예시코드에서 flatMapFirst로만 코드를 수정한다.

Observable.from([redCircle, greenCircle, blueCircle])
    .flatMapFirst{ circle -> Observable<String> in // FlatMapFirst로 수정!
        switch circle {
        case redCircle:
            return Observable.repeatElement(redHeart)
                .take(5)
        case greenCircle:
            return Observable.repeatElement(greenHeart)
                .take(5)
        case blueCircle:
            return Observable.repeatElement(blueHeart)
                .take(5)
        default:
            return Observable.just("")
        }
    }
    // ... 나머지 동일
  1. from 연산자를 통해 맨 처음 redCircle문자열이 참조되며 flatMap으로 이너 옵저버블이 만들어진다.
  2. 가장 처음 만들어진 이너서클이 redCircle에서 분기처리된 옵저버블이므로, 해당 옵저버블이 아니면 나머지 이벤트들은 방출하지 않고 무시된다.

flatMapFirst주기가 겹치면 다른 이벤트들은 무시된다. 주기가 겹치지 않는 이벤트가 새롭게 방출되는 시점에 처음 방출된 이벤트를 가지고 다시 평탄화를 진행한다.

let sourceObservable = PublishSubject<String>()

sourceObservable
    .flatMapFirst { circle -> Observable<String> in
        switch circle {
        case redCircle:
            return Observable<Int>.interval(.milliseconds(200), scheduler: MainScheduler.instance)
                .map { _ in redHeart}
                .take(10)
        case greenCircle:
            return Observable<Int>.interval(.milliseconds(200), scheduler: MainScheduler.instance)
                .map { _ in greenHeart}
                .take(10)
        case blueCircle:
            return Observable<Int>.interval(.milliseconds(200), scheduler: MainScheduler.instance)
                .map { _ in blueHeart}
                .take(10)
        default:
            return Observable.just("")
        }
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sourceObservable.onNext(redCircle)

DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {
    sourceObservable.onNext(greenCircle)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
    sourceObservable.onNext(blueCircle)
}

위의 코드는 다음과 같은 과정으로 진행된다.

  1. flatMap을 사용하는 옵저버블을 생성한다.
  2. onNext 초기 이벤트로 redCircle이 전달된다.
  3. 0.2초 간격으로 redCircleredHeart로 변환되도록 하여 이너 옵저버블을 생성한다.
  4. onNextredCircle로 전달된 시점 0.5초 이후 greenCircle이 next이벤트로 방출된다.
  5. 이때 redCircle기준 0.2초마다 redHeart가 이너 옵저버블로 생성되는 주기 속에 포함되어 있으므로, greenHeart는 무시된다.
  6. redCircle - greenCircle로 이어지는 모든 주기를 끝마친 뒤 blueCircle이 이벤트로 방출된다.
  7. 이너 옵저버블 생성 후 blueHeart가 0.2초 간격으로 10번 방출된다.

greenCircleonNext전달되면서 redCircle과 주기가 어중간하게 겹치게 되는데, redCircle이 0~2초까지 이벤트 방출이 진행된다면 greenCircle은 0.5~2.5초까지 이벤트 방출이 이루어지게 된다.

이들 주기가 겹쳐 총 0~2.5초간 redCircle이 아닌 이들에 대해서는 이벤트로 방출하지 않고 무시된다.

# flatMapLatest

flatMapLatest는 생성된 이벤트중 가장 최근 이벤트에 대해서만 방출한다.

기존과 다른 이너 옵저버블로 생성되는 경우 기존 옵저버블은 그 즉시 종료된다.

let sourceObservable = PublishSubject<String>()
let trigger = PublishSubject<Void>()

sourceObservable
    .flatMapLatest { circle -> Observable<String> in
        switch circle {
        case redCircle:
            return Observable<Int>.interval(.milliseconds(200), scheduler: MainScheduler.instance)
                .map { _ in redHeart}
                .take(until: trigger)
        case greenCircle:
            return Observable<Int>.interval(.milliseconds(200), scheduler: MainScheduler.instance)
                .map { _ in greenHeart}
                .take(until: trigger)
        case blueCircle:
            return Observable<Int>.interval(.milliseconds(200), scheduler: MainScheduler.instance)
                .map { _ in blueHeart}
                .take(until: trigger)
        default:
            return Observable.just("")
        }
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sourceObservable.onNext(redCircle)

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    sourceObservable.onNext(greenCircle)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    sourceObservable.onNext(blueCircle)
}

DispatchQueue.main.asyncAfter(deadline: .now() + 10) {
    trigger.onNext(())
}
  1. redCircle을 이벤트로 전달하면 0.2초 간격으로 방출된다.
  2. 1초 뒤 greenCircle이 이벤트로 방출되는데, 기존 redCircle 옵저버블과 다르므로 방출되고 있던 redCircle 옵저버블이 종료된다.
  3. 2초 뒤 blueCircle이 이벤트로 방출되는데, 기존 greenCircle 옵저버블과 다르므로 방출되고 있던 greenCircle 옵저버블이 종료된다.
  4. 트리거 서브젝트의 next이벤트 방출에 따라 take(until:) 연산자가 실행되어 소스 옵저버블이 종료된다.

WARNING

flatMapLatest는 기존 옵저버블을 재생성하는 형태가 아닌 재사용하는 형태이다. 재사용 과정에서 현재 옵저버블과 다른 데이터가 방출 대상으로 들어오는 경우 현재 이너 옵저버블을 종료하게 되는 것이다.

# concatMap

flatMap은 인터리빙을 허용한다. concatMap은 순서를 보장한다.

# scan

누적연산을 처리할때 사용한다. 시드값을 첫번째 파라미터로, 계산 로직을 accumulator 파라미터로 받는다.

Observable.range(start: 1, count: 10)
    .scan(0, accumulator: { prev, next in
        return prev + next
    })
    .subscribe{ print($0) }
    .disposed(by: disposeBag)

# buffer

버퍼연산자는 이벤트 방출 조건을 만족하기 전까지 이벤트를 보유할 한계를 정의한다. 버퍼 연산자는 세가지 파라미터를 받는다. timeSpan, count, scheduler이다.

  1. timeSpan: 얼마나 시간동안 이벤트를 버퍼에 저장하고 있을지를 지정한다.
  2. count: 최대 몇개의 이벤트를 버퍼에 저장하고 있을지를 결정한다.

timeSpan 또는 count값을 달성하는 즉시 이벤트를 방출하는 형태이다.

Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .buffer(timeSpan: .seconds(3), count: 3, scheduler: MainScheduler.instance)
    .take(5)
    .subscribe{ print($0) }
    .disposed(by: disposeBag)
  1. 인터벌 연산자 결과 옵저버블을 5개까지만 받는다.
  2. 각 옵저버블마다 3개의 이벤트가 발생하거나, 3초가 지난 경우 이들 요소를 방출한다.

# window

버퍼와 동일한 기능을 하지만, 윈도우 연산자는 이벤트를 방출하는 것이 아니라 해당 이벤트를 담은 옵저버블을 리턴한다.

옵저버블이 옵저버블을 래핑하는 형태로 존재한다.

Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .window(timeSpan: .seconds(2), count: 3, scheduler: MainScheduler.instance)
    .take(5)
    .subscribe{
        // 이벤트로 방출된 옵저버블을 다시 구독하여 내부 요소들을 출력한다.
        if let observable = $0.element {
            observable.subscribe{ print("Inner : \($0)") }
        }
    }
    .disposed(by: disposeBag)

# groupBy

groupBy 파라미터에 전달된 값을 기준으로 동일한 요소들을 묶어 이너옵저버블을 리턴한다.

let words = ["Apple", "Banana", "Orange", "Book", "City", "Axe"]

Observable.from(words)
    .groupBy{ $0.count }
    .subscribe(onNext: { groupedObservable in
        print("== \(groupedObservable.key)")
        groupedObservable.subscribe{ print("   ", $0)}
    })
    .disposed(by: disposeBag)

딕셔너리와 유사하게 동작한다고 보면 된다.

위의 코드는 문자열 길이마다 그룹을 지어 이너 옵저버블을 생성한다. next이벤트 방출시 그룹화된 이너 옵저버블이 전달된다.

해당 이너 옵저버블의 key값에 접근하면 키값을 출력할 수 있다.

그룹바이 연산자는 주로 flatMaptoArray를 묶어 사용한다.

Observable.from(words)
    .groupBy{ $0.count }
    .flatMap{ $0.toArray() }
    .subscribe{ print($0) }
    .disposed(by: disposeBag)

// next(["Book", "City"])
// next(["Banana", "Orange"])
// next(["Apple"])
// next(["Axe"])

키셀렉터를 기준으로 묶인 이들을 배열에 담아 이너 옵저버블을 생성한다. 이들을 result 옵저버블에 담아 최종적으로 이벤트들을 모두 방출한다.

# Reference

  1. reactiveX - FlatMap (opens new window)