# Scheduler

RxSwift에서는 비동기처리시 스케줄러를 사용한다. 코코아에서 사용하는 GCD, 즉 Thread, DispatchQueue, OperationQueue를 묶어 추상화 한 컨텍스트를 사용하게 된다.

GCD의 규칙은 바로 UI의 업데이트를 메인 스레드에서만 처리해야 한다는 것인데, 스케줄러도 역시 동일하게 메인 스케줄러라는 개념이 존재한다.

UI업데이트가 아닌 비동기처리시 백그라운드 스케줄러를 사용한다.

# Serial Scheduler & Concurrent Scheduler

GCD의 직렬 큐와 유사하게 스케줄러에서도 직렬 스케줄러가 존재한다.

직렬 스케줄러는 다음과 같이 분류된다. 이들은 작업의 순서가 중요할 때에 사용된다.

  1. CurrentThreadScheduler
  2. MainScheduler
  3. SerialDispatchQueueScheduler

동시 스케줄러는 아래와 같이 분류된다.

  1. ConcurrentDispatchQueueScheduler
  2. OperationQueueScheduler

# 스케줄러 다루기

스케줄러를 잘 다루기 위해서는 옵저버블이 생성된 후 연산자들이 호출되는 시점을 알아야 한다. 기본적인 옵저버블의 경우 구독자가 추가된 경우 위의 과정들이 이루어진다.

let bag = DisposeBag()
Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .filter { num -> Bool in
        return num.isMultiple(of: 2)
    }
    .map { num -> Int in
        return num * 2
    }
    .subscribe{ // 구독되어야 연산자들이 동작!
        print(Thread.isMainThread ? "Main Thread" : "Background Thread", ">> map")
        print($0)
    }
    .disposed(by: bag)

map연산자 자체를 비동기적으로 처리하기 위해서는 어떻게 해야할까?

Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .filter { num -> Bool in
        return num.isMultiple(of: 2)
    }
    .map { num -> Int in
        // 잘못된 예시
        DispatchQueue.global().async{
            return num * 2
        }
    }
    // 나머지 동일..

위의 코드는 연산자 내부 클로저를 비동기적으로 처리하는 것이지, 연산자 자체를 비동기적으로 처리하는 것이 아니다.

연산자 비동기 처리를 위한 스케줄러 지정은 observeOn(_:) 연산자와 subscribeOn(_:) 연산자를 사용한다.

observeOn 연산자는 뒤에 이어질 연산자들이 실행될 스케줄러를 지정해준다.

// ConcurrentDispatchQueueScheduler - RxSwift의 백그라운드 스케줄러
// GCD의 글로벌큐를 사용한다.
let backgroundScheduler =  ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global())

Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
    // observe(on) 호출 이전이므로 메인쓰레드에서 동작
    .filter { num -> Bool in
        print(Thread.isMainThread ? "Main Thread" : "Background Thread", ">> filter")
        return num.isMultiple(of: 2)
    }
    .observe(on: backgroundScheduler)
    // 이후 연산자들은 생성한 백그라운드 스케줄러 내에서 실행
    .map { num -> Int in
        print(Thread.isMainThread ? "Main Thread" : "Background Thread", ">> map")
        return num * 2
    }
    .subscribe{
        print(Thread.isMainThread ? "Main Thread" : "Background Thread", ">> subscribe")
        print($0)
    }
    .disposed(by: bag)

subscribe(on:)은 옵저버블이 생성 시점의 코드들이 동작할 스케줄러를 지정한다. 옵저버블 구독을 통해 이벤트가 방출되는 시점이 아닌 옵저버블 자체가 생성되는 시점에 대한 스케줄러를 지정하는 것이다.

the “subscription code” is the code that gets called from your subscribe() and is located in Observable.create { ... }. This is the code that creates your subscription and produces elements. the “observation code” is where you observe for emitted elements - this is the code you provide in onNext: { ... }, onCompleted: {...}, etc. This is where you do your observing.

Observable<Int>.create { observer in
    observer.onNext(1)
    sleep(1)
    observer.onNext(2)
    return Disposables.create()
}
.subscribe(onNext: { el in
    print(Thread.isMainThread)
})

위 코드를 메인스레드에서 실행하게 된다면 sleep 함수로 인해 에러가 발생한다. 메인스레드 블록 현상이 발생하기 때문이다.

create 클로저 내에서 옵저버블이 정의되는데, 이 영역이 바로 subscription code영역이다. 해당 코드가 메인스레드 블록을 야기하기 때문에 subscribe(on:) 을 통해 백그라운드 스케줄에 작업들을 배치해야한다.

subscribe(on:)observe(on:)과 달리 호출 시점도 중요하지 않다.

# Reference

  1. observeOn vs. subscribeOn (opens new window)