구독 공유는 불필요한 중복 작업을 줄이기 위해 사용된다. 일종의 싱글톤 패턴이라고 보면 될 것 같다.
네트워크 요청이나 DB접근시 매번 새로운 구독을 추가하게 되면 셋업을 위한 코드 및 조회 작업들이 모두 새롭게 이루어진다. 한 구독을 여러 구독자들이 공유함으로써 리소스 낭비를 줄일 수 있다.
# multicast
RxSwift의 구독 시스템은 기본적으로 유니캐스트 형태를 취한다. 옵저버블과 구독자가 일대일로 대응되는 형태로 존재한다.
멀티캐스트 연산자는 이러한 형태를 다수의 구독자가 한 옵저버블을 구독하는 형태로 바꿔준다.
일반적인 옵저버블의 이벤트 방출은 구독자가 구독을 시작한 시점부터 바로 이루어진다. 멀티캐스트 연산자로 리턴되는 옵저버블은 특별한 타입인 ConnectableObservable
을 갖는다.
위 타입은 옵저버블에 대해 구독자가 추가되는 시점에 이벤트 방출을 하지 않는다. 모든 구독자가 추가되었다고 판단한 시점에 개발자가 직접 connect
메서드를 호출한다.
멀티캐스트 연산자는 서브젝트를 파라미터로 받는다.
let bag = DisposeBag()
let subject = PublishSubject<Int>()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
우선 위 코드는 멀티캐스트 연산자를 사용하기 이전 코드이다. 위의 출력 결과는 파란 색 다섯개, 빨간 색 다섯개가 설정해둔 구독 딜레이 연산에 따라 각각 출력된다.
반면 multicast
연산자를 사용한 코드의 경우에는 출력결과가 어떻게될까?
let bag = DisposeBag()
let subject = PublishSubject<Int>()
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.multicast(subject) // 멀티캐스트 연산자 호출
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
source.connect() // 커넥트 메서드 호출 - 구독자들에게 이벤트 방출 시작
# publish
퍼블리시 연산자는 멀티캐스트 연산자의 파라미터에 빈 서브젝트를 직접 전달했던 작업을 대신 해준다.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.publish() // 퍼블리시
위의 멀티캐스트 연산자와 완전히 동일한 동작을 하게 된다.
# replay
위의 멀티캐스트, 퍼블리시 예시 코드에서 두 번째 구독은 딜레이되는 형태로 구현해두었다. 구독을 공유하는 형태이기 때문에 첫 번째 구독에서의 이벤트 방출이 모두 끝나면 두 번째 구독도 자동으로 끝난다.
이때 두 번째 구독자의 경우 딜레이 되는 동안 다른 구독자에게 방출된 이벤트를 알 수 없는 형태이다.
리플레이 서브젝트를 활용하면 앞에서 전달된 이벤트들을 버퍼에 저장하여 그 다음 구독자에게 전달할 수 있게 된다.
let subject = ReplaySubject<Int>.create(bufferSize: 5) // 리플레이 서브젝트 생성
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.multicast(subject) // 멀티캐스트 파라미터에 리플레이 서브젝트 전달
source
.subscribe { print("🔵", $0) }
.disposed(by: bag)
source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) }
.disposed(by: bag)
source.connect()
- 멀티캐스트 연산자 파라미터에 리플레이 서브젝트를 전달한다.
- 리플레이 서브젝트 생성시 지정한 버퍼 사이즈만큼 구독자에게 방출하는 이벤트를 보관한다.
- 이후 구독시점이 딜레이된 구독자가 있다면 구독 시작시 쌓인 이벤트들을 함께 방출해준다.
이때 replay
연산자는 publish
연산자와 동일한 역할을 하게 된다.
멀티캐스트 연산자에 타입에 맞는 서브젝트를 직접 생성 및 파라미터에 전달할 필요 없이 클로저 내에서 각 타입에 맞게 서브젝트를 리턴해주어 코드를 더 간단히 작성할 수 있게 된다.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(5)
.replay(5) // replay(_ bufferSize: Int)
// 나머지 동일
리플레이 연산자의 파라미터에 버퍼사이즈를 지정한다.
버퍼사이즈를 무한으로 가질 수 있는 replayAll
연산자도 있지만 사용을 지양해야 한다.
# refCount
refCount
연산자는 옵저버블을 리턴하는데, 내부 구현 코드를 보면 일반 옵저버블을 리턴하는 것이 아닌 RefCount
생성자를 통해 생성되는 특별한 옵저버블을 리턴한다.
public func refCount() -> Observable<Element> {
RefCount(source: self)
}
refCount
는 커넥터블 옵저버블을 리턴하며, 자동으로 커넥트 메서드까지 호출해준다. 따라서 연산자의 리턴 결과를 변수에 저장한 뒤 이후 connect
메서드를 호출할 필요는 없다.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.debug()
.publish()
.refCount() // refCount 연산자 호출
let observer1 = source
.subscribe { print("🔵", $0) }
// source.connect() -> 생략
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
observer1.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
let observer2 = source.subscribe { print("🔴", $0) }
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
observer2.dispose()
}
}
refCount
없이 인터벌 연산자를 사용하고 publish
연산자로 구독 공유를 구현하게 되면 실제로 구독자에게 방출된 이벤트가 전달되는 것 외에 불필요한 데이터들이 계속해서 방출되는 상황이 연출될 뿐만 아니라, connect
메서드 호출 결과로 리턴된 커넥터블 옵저버블을 직접 dispose 해줘야한다.
refCount
를 사용하면 구독되지 않은 시점에는 이벤트 방출을 멈춘다. 또한 구독자에게 이벤트 방출이 끝난 뒤 옵저버블이 dispose된 이후에 다른 구독자가 존재하지 않는다면 역시 이벤트 방출을 멈춘다.
새로운 구독자가 이후 추가되면 시퀀스는 처음부터 시작된다. 즉 위의 경우 인터벌 연산자를 호출하는 옵저버블의 이벤트 방출이 next(0)
부터 다시 시작하게 된다는 것이다.
# share
share
연산자의 구현부이다.
public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
-> Observable<Element> {
switch scope {
case .forever:
switch replay {
case 0: return self.multicast(PublishSubject()).refCount()
default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
}
case .whileConnected:
switch replay {
case 0: return ShareWhileConnected(source: self.asObservable())
case 1: return ShareReplay1WhileConnected(source: self.asObservable())
default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
}
}
}
replay
파라미터와 scope
파라미터를 받는다.
replay
파라미터에 정수값을 받으면 1이상인 경우 리플레이 서브젝트를 기준으로 멀티캐스트 연산자를 호출하고, 0인 경우 퍼블리시 서브젝트를 기준으로 멀티캐스트 연산자를 호출한다.
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.debug()
.share(replay: 5) // share - replay파라미터 지정
let observer1 = source
.subscribe { print("🔵", $0) } // 구독 진행 next(0) ..
let observer2 = source
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe { print("🔴", $0) } // 구독 진행 - replay 버퍼에서 이벤트 불러오기
// {next(0), next(1), next(2)}, next(3) ...
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
observer1.dispose()
observer2.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
let observer3 = source.subscribe { print("⚫️", $0) }
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
observer3.dispose()
}
}
share연산자에 replay
파라미터를 지정하는 경우 리플레이 서브젝트를 기준으로 커넥터블 옵저버블이 생성되므로 두번째 옵저버의 구독 시 버퍼에 저장된 이벤트들이 한번에 쏟아져 나오며 시작한다.
옵저버 1, 2가 dispose된 이후에 서브젝트가 사라지기 때문에 옵저버 3 생성 및 구독시 완전히 독립적인 커넥터블 옵저버블이 만들어진다.
위의 share
연산자의 두번째 파라미터로 scope
를 .forever
로 지정하면 어떻게 될까?
let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.debug()
.share(replay: 5, scope: .forever)
옵저버 dispose 이후에도 기존 옵저버블이 유지되므로, 리플레이 서브젝트 버퍼에 저장된 이벤트들이 유지된다.
따라서 옵저버3 첫 이벤트 방출시 쌓여있던 버퍼의 이벤트들이 한번에 쏟아진다.
버퍼 데이터들을 이벤트로 방출한 뒤 시퀀스는 처음부터 다시 시작한다. 기본적으로 share
연산자의 리턴 옵저버블이 refCount
옵저버블이기 때문에 옵저버 1,2에서 쌓여온 데이터가 버퍼에 저장되는 것과는 별개로 dispose 이후 처음부터 시퀀스가 시작되는 것은 동일하다.