구독 공유는 불필요한 중복 작업을 줄이기 위해 사용된다. 일종의 싱글톤 패턴이라고 보면 될 것 같다.

네트워크 요청이나 DB접근시 매번 새로운 구독을 추가하게 되면 셋업을 위한 코드 및 조회 작업들이 모두 새롭게 이루어진다. 한 구독을 여러 구독자들이 공유함으로써 리소스 낭비를 줄일 수 있다.

# multicast

RxSwift의 구독 시스템은 기본적으로 유니캐스트 형태를 취한다. 옵저버블과 구독자가 일대일로 대응되는 형태로 존재한다.

멀티캐스트 연산자는 이러한 형태를 다수의 구독자가 한 옵저버블을 구독하는 형태로 바꿔준다.

일반적인 옵저버블의 이벤트 방출은 구독자가 구독을 시작한 시점부터 바로 이루어진다. 멀티캐스트 연산자로 리턴되는 옵저버블은 특별한 타입인 ConnectableObservable을 갖는다.

위 타입은 옵저버블에 대해 구독자가 추가되는 시점에 이벤트 방출을 하지 않는다. 모든 구독자가 추가되었다고 판단한 시점에 개발자가 직접 connect 메서드를 호출한다.

멀티캐스트 연산자는 서브젝트를 파라미터로 받는다.

let bag = DisposeBag()
let subject = PublishSubject<Int>()

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)

source
    .subscribe { print("🔵", $0) }
    .disposed(by: bag)

source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }
    .disposed(by: bag)

우선 위 코드는 멀티캐스트 연산자를 사용하기 이전 코드이다. 위의 출력 결과는 파란 색 다섯개, 빨간 색 다섯개가 설정해둔 구독 딜레이 연산에 따라 각각 출력된다.

반면 multicast 연산자를 사용한 코드의 경우에는 출력결과가 어떻게될까?

let bag = DisposeBag()
let subject = PublishSubject<Int>()

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .multicast(subject) // 멀티캐스트 연산자 호출

source
    .subscribe { print("🔵", $0) }
    .disposed(by: bag)

source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }
    .disposed(by: bag)

source.connect() // 커넥트 메서드 호출 - 구독자들에게 이벤트 방출 시작

# publish

퍼블리시 연산자는 멀티캐스트 연산자의 파라미터에 빈 서브젝트를 직접 전달했던 작업을 대신 해준다.

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .publish() // 퍼블리시

위의 멀티캐스트 연산자와 완전히 동일한 동작을 하게 된다.

# replay

위의 멀티캐스트, 퍼블리시 예시 코드에서 두 번째 구독은 딜레이되는 형태로 구현해두었다. 구독을 공유하는 형태이기 때문에 첫 번째 구독에서의 이벤트 방출이 모두 끝나면 두 번째 구독도 자동으로 끝난다.

이때 두 번째 구독자의 경우 딜레이 되는 동안 다른 구독자에게 방출된 이벤트를 알 수 없는 형태이다.

리플레이 서브젝트를 활용하면 앞에서 전달된 이벤트들을 버퍼에 저장하여 그 다음 구독자에게 전달할 수 있게 된다.

let subject = ReplaySubject<Int>.create(bufferSize: 5) // 리플레이 서브젝트 생성
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .multicast(subject) // 멀티캐스트 파라미터에 리플레이 서브젝트 전달

source
    .subscribe { print("🔵", $0) }
    .disposed(by: bag)

source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }
    .disposed(by: bag)

source.connect()
  1. 멀티캐스트 연산자 파라미터에 리플레이 서브젝트를 전달한다.
  2. 리플레이 서브젝트 생성시 지정한 버퍼 사이즈만큼 구독자에게 방출하는 이벤트를 보관한다.
  3. 이후 구독시점이 딜레이된 구독자가 있다면 구독 시작시 쌓인 이벤트들을 함께 방출해준다.

이때 replay 연산자는 publish 연산자와 동일한 역할을 하게 된다.

멀티캐스트 연산자에 타입에 맞는 서브젝트를 직접 생성 및 파라미터에 전달할 필요 없이 클로저 내에서 각 타입에 맞게 서브젝트를 리턴해주어 코드를 더 간단히 작성할 수 있게 된다.

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .replay(5) // replay(_ bufferSize: Int)

// 나머지 동일

리플레이 연산자의 파라미터에 버퍼사이즈를 지정한다.

버퍼사이즈를 무한으로 가질 수 있는 replayAll 연산자도 있지만 사용을 지양해야 한다.

# refCount

refCount 연산자는 옵저버블을 리턴하는데, 내부 구현 코드를 보면 일반 옵저버블을 리턴하는 것이 아닌 RefCount 생성자를 통해 생성되는 특별한 옵저버블을 리턴한다.

public func refCount() -> Observable<Element> {
    RefCount(source: self)
}

refCount는 커넥터블 옵저버블을 리턴하며, 자동으로 커넥트 메서드까지 호출해준다. 따라서 연산자의 리턴 결과를 변수에 저장한 뒤 이후 connect 메서드를 호출할 필요는 없다.

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .publish()
    .refCount() // refCount 연산자 호출

let observer1 = source
    .subscribe { print("🔵", $0) }

// source.connect() -> 생략

DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    observer1.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer2 = source.subscribe { print("🔴", $0) }

    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer2.dispose()
    }
}

refCount없이 인터벌 연산자를 사용하고 publish 연산자로 구독 공유를 구현하게 되면 실제로 구독자에게 방출된 이벤트가 전달되는 것 외에 불필요한 데이터들이 계속해서 방출되는 상황이 연출될 뿐만 아니라, connect 메서드 호출 결과로 리턴된 커넥터블 옵저버블을 직접 dispose 해줘야한다.

refCount를 사용하면 구독되지 않은 시점에는 이벤트 방출을 멈춘다. 또한 구독자에게 이벤트 방출이 끝난 뒤 옵저버블이 dispose된 이후에 다른 구독자가 존재하지 않는다면 역시 이벤트 방출을 멈춘다.

새로운 구독자가 이후 추가되면 시퀀스는 처음부터 시작된다. 즉 위의 경우 인터벌 연산자를 호출하는 옵저버블의 이벤트 방출이 next(0)부터 다시 시작하게 된다는 것이다.

# share

share 연산자의 구현부이다.

public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
    -> Observable<Element> {
    switch scope {
    case .forever:
        switch replay {
        case 0: return self.multicast(PublishSubject()).refCount()
        default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
        }
    case .whileConnected:
        switch replay {
        case 0: return ShareWhileConnected(source: self.asObservable())
        case 1: return ShareReplay1WhileConnected(source: self.asObservable())
        default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
        }
    }
}

replay 파라미터와 scope 파라미터를 받는다.

replay 파라미터에 정수값을 받으면 1이상인 경우 리플레이 서브젝트를 기준으로 멀티캐스트 연산자를 호출하고, 0인 경우 퍼블리시 서브젝트를 기준으로 멀티캐스트 연산자를 호출한다.

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .share(replay: 5) // share - replay파라미터 지정

let observer1 = source
    .subscribe { print("🔵", $0) } // 구독 진행 next(0) ..

let observer2 = source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) } // 구독 진행 - replay 버퍼에서 이벤트 불러오기
    // {next(0), next(1), next(2)}, next(3) ...

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = source.subscribe { print("⚫️", $0) }

    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
}

share연산자에 replay 파라미터를 지정하는 경우 리플레이 서브젝트를 기준으로 커넥터블 옵저버블이 생성되므로 두번째 옵저버의 구독 시 버퍼에 저장된 이벤트들이 한번에 쏟아져 나오며 시작한다.

옵저버 1, 2가 dispose된 이후에 서브젝트가 사라지기 때문에 옵저버 3 생성 및 구독시 완전히 독립적인 커넥터블 옵저버블이 만들어진다.

위의 share연산자의 두번째 파라미터로 scope.forever로 지정하면 어떻게 될까?

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .share(replay: 5, scope: .forever)

옵저버 dispose 이후에도 기존 옵저버블이 유지되므로, 리플레이 서브젝트 버퍼에 저장된 이벤트들이 유지된다.

따라서 옵저버3 첫 이벤트 방출시 쌓여있던 버퍼의 이벤트들이 한번에 쏟아진다.

버퍼 데이터들을 이벤트로 방출한 뒤 시퀀스는 처음부터 다시 시작한다. 기본적으로 share 연산자의 리턴 옵저버블이 refCount 옵저버블이기 때문에 옵저버 1,2에서 쌓여온 데이터가 버퍼에 저장되는 것과는 별개로 dispose 이후 처음부터 시퀀스가 시작되는 것은 동일하다.

# Reference

  1. 네트워크 - 유니캐스트 / 멀티캐스트 / 브로드캐스트란? (opens new window)