# Publisher - Overview
퍼블리셔는 시간에 따라 방출하는 일련의 값들을 선언한다. 이렇게 방출된 값들은 하나 이상의 구독자에게 전달할 수 있다.
구독자의 Input과 Failure타입은 퍼블리셔에 선언된 Output, Failure타입과 일치해야 한다. 퍼블리셔는 구독자의 구독을 받아들이기 위해 receive(subscriber:) 메서드를 내부적으로 구현한다. Publisher 중첩 구조체를 살펴보면 내부에 receive 함수 타입이 정의된 것을 볼 수 있다.
퍼블리셔는 아래의 메서드들을 구독자에게 호출할 수 있다.
receive(subscription:): 구독 요청을 승인하고Subscription인스턴스를 반환한다. 구독자는Subscription인스턴스를 통해 퍼블리셔에게 값을 요구하고 값의 방출을 취소할 수 있다.receive(_:): 퍼블리셔로부터 하나의 값을 구독자에게 전달한다.receive(completion:): 구독자에게 값의 방출이 끝났음을 알린다.
모든 퍼블리셔는 Publisher프로토콜 채택에 따라 receive 메서드를 반드시 구현해야 한다.
Publishers 열거형 내부 구현을 살펴보면 수 많은 퍼블리셔들이 정의되어 있는 것을 볼 수 있다. 이는 퍼블리셔 체이닝을 위해 오퍼레이터로 사용될 목적으로 만들어진 것이다. 예를 들어 map 오퍼레이터의 경우 Publishers.Map클래스의 인스턴스를 리턴하도록 구현되어 있다.
Publisher vs AsyncSequence
퍼블리셔와 스위프트 표준 라이브러리의 AsyncSequence는 시간에 따라 값을 방출한다는 점에서 동일하지만 퍼블리셔는 구독자를 통해 퍼블리셔로부터 값을 요청하는 반면 AsyncSequence는 for-await-in을 통해 방출되는 값들을 순회한다.
둘다 모두 필터링 및 매핑 관련 연산자를 제공한다.
let endpointURL = URL(string: "https://zeddios.tistory.com")!
let lineCount = endpointURL.lines.map { $0.count } ✅ // AsyncSequence 인스턴스의 map 연산자
do {
for try await line in lineCount {
print(line)
}
} catch {
}
// 출처: https://zeddios.tistory.com/1339 [ZeddiOS:티스토리]
컴바인에서는 AsyncSequence에서 제공하지 않는 시간 기반의 오퍼레이터 debounce, throttle을 추가로 제공하며 퍼블리셔 간 결합 오퍼레이터인 merge(with:), combineLatest(_: _:) 연산자들도 제공한다.
AsyncSequence와 컴바인의 퍼블리셔를 브릿징하기 위해서는 퍼블리셔에서 방출하는 값들을 AsyncSequence로 노출시켜 구독자를 통한 값의 처리방법 대신 for-await-in으로 순회할 수 있다.
퍼블리셔 프로토콜을 채택하여 직접 정의하는 대신 컴바인에서 제공하는 몇가지 타입을 활용하면 커스텀 퍼블리셔를 쉽게 정의할 수 있다. PassThroughSubject와 같은 Subject 하위클래스 타입을 사용하면 된다. (PassThroughSubject는 RxSwift에서 PublishSubject, CurrentValueSubject는 BehaviorSubject와 동일하다.)
@Published 프로퍼티 래퍼를 사용하면 속성의 값이 변할때마다 값을 방출하는 퍼블리셔를 쉽게 정의할 수도 있다.
# Publisher - Output, Failure
Output은 퍼블리셔로부터 방출되는 값의 타입을 의미한다. Failure는 퍼블리셔가 방출할 수 있는 에러의 타입을 의미한다. 퍼블리셔가 에러를 방출하지 않을 경우 Never를 사용한다.
# Operators
RxSwift에서 다뤄보지 않은 오퍼레이터를 추가로 정리한다.
# map, tryMap, mapError
RxSwift map 오퍼레이터와 동일하다. 업스트림 퍼블리셔로부터 받은 엘리먼트를 클로저에 따라 변형시킨 뒤 새로운 옵저버블로 반환한다. tryMap은 클로저에서 에러를 throw하는 경우 sink completion클로저에서 처리하기 위해 사용한다.
struct ParseError: Error {}
func romanNumeral(from:Int) throws -> String {
let romanNumeralDict: [Int : String] =
[1:"I", 2:"II", 3:"III", 4:"IV", 5:"V"]
guard let numeral = romanNumeralDict[from] else {
throw ParseError()
}
return numeral
}
let numbers = [5, 4, 3, 2, 1, 0]
// 0에서 에러 방출
let cancellable = numbers.publisher
.tryMap { try romanNumeral(from: $0) }
.sink(
receiveCompletion: { print ("completion: \($0)") },
receiveValue: { print ("\($0)", terminator: " ") }
)
mapError오퍼레이터는 방출된 에러 타입을 map처럼 다른 형태로 변형하고 싶을때 사용한다.
struct MyGenericError: Error { var wrappedError: Error }
divisors.publisher
.tryMap { try myDivide(1, $0) }
.mapError { MyGenericError(wrappedError: $0) }
// ....
# replaceNil(with:)
replaceNil 오퍼레이터는 업스트림 퍼블리셔에서 전달된 요소 중 nil값을 with 파라미터에 전달된 값으로 대체해준다.
let numbers: [Double?] = [1.0, 2.0, nil, 3.0]
numbers.publisher
.replaceNil(with: 0.0)
.sink { print("\(String(describing: $0))", terminator: " ") }
.cancel()
# removeDuplicates
Output타입이 Equatable 프로토콜을 구현한 경우 방출되는 요소들의 중복을 제거하여 반환한다.
let numbers: [Int] = [1,1,2,2,3,4]
numbers.publisher
.removeDuplicates()
.sink(receiveValue: {print($0)})
by 클로저를 통해 직접 동일성 여부 로직을 구성할 수도 있다.
let points = [Point(x: 0, y: 0), Point(x: 0, y: 1),
Point(x: 1, y: 1), Point(x: 2, y: 1)]
cancellable = points.publisher
.removeDuplicates { prev, current in
prev.x == current.x
}
.sink { print("\($0)", terminator: " ") }
# replaceEmpty
시퀀스에서 방출하는 요소가 empty상태일때 새로운 값으로 대체한다.
let numbers: [String] = []
numbers.publisher
.replaceEmpty(with: "ABC")
.sink(receiveValue: { print($0) })
# collect
방출되는 요소들을 종합하여 하나의 배열로 합친다. count파라미터는 담을 버퍼값을 지정하고 모두 담으면 방출한다. 모든 요소를 순회하며 버퍼에 계속해서 채운다.
let numbers = 1..<10
numbers.publisher
.collect()
// collect(3) -> 3개씩 모아서 다 모아지면 방출
.sink(receiveValue: {print($0)})
# ignoreOutput
방출되는 값들은 무시하고 completion의 failure만 방출한다.
# reduce vs scan
scan은 누적 계산되는 모든 값을 매번 방출하고, reduce는 최종 연산된 마지막 결과만 방출한다.
let numbers = 1..<4
numbers.publisher
.scan(0) { $0 + $1 }
.sink(receiveValue: {print($0)}) // 1 3 6
numbers.publisher
.reduce(0, { $0 + $1 })
.sink(receiveValue: {print($0)}) // 6
# allSatisfy
업스트림으로부터 전달된 요소 전체가 allSatisfy 오퍼레이터에 정의된 클로저 조건식에 만족되는지 여부를 Bool값으로 리턴한다.
let targetRange = (-1...100)
let numbers = [-1, 0, 10, 5]
numbers.publisher
.allSatisfy { targetRange.contains($0) }
.sink { print("\($0)") } // true
# drop(untilOutputFrom:), drop(while:), tryDrop(while:), dropFirst(_ count: Int)
drop(untilOutputFrom:): 파라미터에 전달되는 퍼블리셔에서 요소가 방출되기 전까지 업스트림에서 방출되는 요소들을 무시한다.drop(while:):while클로저에 전달된 조건식이 true인 동안에 업스트림에서 방출되는 요소들을 무시한다.tryDrop(while:): 에러를 던질 수 있는drop(while:)오퍼레이터dropFirst(_ count: Int): 0번째부터 count까지의 요소들을 버림
# append(_ elements: Self.Output...), append(_ elements: S), append
(_ publisher: P) & prepend
append(_ elements: Self.Output...): Output타입과 동일한 데이터들을 가변 파라미터 형태로 전달하면 업스트림에서 방출되는 요소에 이어붙여진다.append<S>(_ elements: S): 파라미터에 시퀀스를 전달하면 업스트림에서 방출된 시퀀스에 이어붙여진다.append<P>(_ publisher: P): 업스트림에서 방출되는 요소에 이어 파라미터에 전달된 다운스트림 퍼블리셔에서 방출되는 요소를 이어붙인다.prepend(_ elements: Self.Output...): append와 동일하게 동작하지만, 삽입되는 위치만 맨 앞
# prefix(_: Int), prefix(while: ), tryPrefix(while:), prefix
(untilOutputFrom publisher: P)
prefix(Int): 파라미터에 전달된 갯수 값 만큼 요소를 방출한다.prefix(while: ): 조건식이 true인 동안에만 방출하고 false가 된 이후에는 방출하지 않는다.tryPrefix: 에러를 던질 수 있는prefix(while:)prefix(untilOutputFrom): 다운스트림 퍼블리셔에서 요소를 방출하기 전까지 계속해서 업스트림의 요소 방출
let upstream = PassthroughSubject<Int,Never>()
let second = PassthroughSubject<String,Never>()
let cancellable = upstream
.prefix(untilOutputFrom: second)
.sink(receiveValue: { print($0) })
upstream.send(0)
upstream.send(1)
second.send("A")
upstream.send(2)
// 0 1
# first(), first(where: (Self.Output) -> Bool), last
first: 해당 오퍼레이터 호출 후 최소 하나의 요소가 업스트림으로부터 방출될때까지unlimited요청을 실행한다.first(where: ): 조건식에 맞는 첫 번째 요소를 방출한다last: first와 반대이며last(where:)도 동일
# output(at:Int), output(in range: R)
output(at:):at위치에 해당하는 요소 방출output(in: range): range에 해당하는 요소들 방출, 범위 벗어나면 아무것도 방출 안함
let a = 1...10
a.publisher
.output(in: 3...4)
.sink(receiveValue: {print($0)}) // 4 5
.cancel()
# combineLatest
let pub1 = PassthroughSubject<Int, Never>()
let pub2 = PassthroughSubject<Int, Never>()
cancellable = pub1
.combineLatest(pub2) { (first, second) in
return first * second
}
.sink { print("Result: \($0).") }
pub1.send(1)
pub1.send(2)
pub2.send(2)
pub1.send(9)
pub1.send(3)
pub2.send(12)
pub1.send(13)
combineLatest는 업스트림과 다운스트림 퍼블리셔에서 모두 하나 이상의 요소를 방출하기 전까지는 아무 요소도 방출하지 않는다.
# merge, zip
merge(with: publisher): RxSwift merge와 동일zip(_ other: publisher): 인덱스끼리 묶어서 튜플반환
# flatMap, switchToLatest
flatMap: 업스트림에서 방출된 요소를 새로운 퍼블리셔로 리턴하여 구독자에게 전달한다.send를 통해 전달된 투두리스트 포스트 id값을 기반으로dataTaskPublisher퍼블리셔를 만들어 구독자에게 전달한다.- 전달과 동시에 업스트림 퍼블리셔의 동작이
complete되는 것이 아니라, 새로 리턴된 퍼블리셔 동작이 끝날때까지 기다리게 된다. (자동적으로 비동기처리)
switchToLatest: flatMap은 방출되는 모든 요소에 따라 새로운 퍼블리셔를 매번마다 생성 및 방출하지만switchToLatest는 자동으로 맨 마지막 퍼블리셔에 대해서만 방출한다.
// flatMap 예시코드
struct Post: Codable {
let userId: Int
let id: Int
let title: String
let body: String
}
var postPublisher = PassthroughSubject<Int, URLError>()
let cancellable = postPublisher.flatMap { id -> URLSession.DataTaskPublisher in
let url = URL(string:"https://jsonplaceholder.typicode.com/posts/\(id)")!
return URLSession.shared.dataTaskPublisher(for: url)
}
.sink(
receiveCompletion: { completion in
// Handle publisher completion (normal or error).
},
receiveValue: {
print(try? JSONDecoder().decode(Post.self, from: $0.data))
// Process the received data.
}
)
postPublisher.send(1)
postPublisher.send(2)
postPublisher.send(3)
sleep(5)
// switchToLatest 예시코드
struct User {
let name: CurrentValueSubject<String, Never>
}
let userSubject = PassthroughSubject<User, Never>()
let cancellable = userSubject
.map(\.name)
// .flatMap(\.name)
.switchToLatest()
.sink { print($0) }
let user = User(name: .init("User 1"))
let anotherUser = User(name: .init("AnotherUser 1"))
userSubject.send(user)
userSubject.send(anotherUser)
anotherUser.name.send("AnotherUser 2")
user.name.send("User 2")
flatMap을 사용하는 경우 User 인스턴스의 name 퍼블리셔에서 새로운 요소 방출때마다 새로운 문자열을 방출하여 구독자에게 전달한다. 따라서 출력결과가 User 1, AnotherUser 1, AnotherUser 2, User 2가 된다.
위 코드상에서 userSubject에 전달되는 퍼블리셔를 map으로 리턴하고, switchToLatest를 호출하면 마지막에 send(anotherUser) 형태로 전달된 퍼블리셔가 완전히 대체하게 된다. 따라서 출력결과는 AnotherUser에 관해서만 나타난다.
# encode, decode
struct Article: Codable {
let title: String
let author: String
let pubDate: Date
}
let dataProvider = PassthroughSubject<Article, Never>()
let cancellable = dataProvider
.encode(encoder: JSONEncoder()) // 인코딩 ..
.decode(type: Article.self, decoder: JSONDecoder()) // 디코딩..
.sink(receiveCompletion: { print ("Completion: \($0)") },
receiveValue: { data in
print(data) // Article 인스턴스 출력
})
dataProvider.send(Article(title: "My First Article", author: "Gita Kumar", pubDate: Date()))
# multicast, share
multicast: 여러개의 다운스트림 퍼블리셔가 하나의 업스트림과 연결되어 있을때, 업스트림으로부터 전달된 이벤트 당 하나의receive만 호출되기 원할때 사용한다. 업스트림 퍼블리셔의 동작이 무거울때 사용하면 좋다. 클로저에는 구독을 공유하게 될 퍼블리셔 하나를 추가로 정의하여 리턴한다.multicast호출시ConnectablePublisher를 채택하는Multicast타입 퍼블리셔로 변환되기 때문에connect오퍼레이터를 호출해줘야 요소들을 정상적으로 방출하게 된다.
share:share는 직접connect를 호출할 필요 없이 내부적으로autoconnect를 호출한다. 따라서 첫 구독자의 sink때부터 요소들이 바로 방출된다. 구독을 퍼블리셔들이 갖춰지고 업스트림으로부터 요소들을 방출해야 하기 때문에 보통delay오퍼레이터와 함께 사용한다.
let pub = ["First", "Second", "Third"].publisher
.map( { return ($0, Int.random(in: 0...100)) } )
.print("Random")
let cancellable1 = pub
.sink { print ("Stream 1 received: \($0)")}
let cancellable2 = pub
.sink { print ("Stream 2 received: \($0)")}
위 코드는 멀티캐스트를 사용하지 않기 때문에 업스트림 pub 퍼블리셔에서 방출되는 각 요소에 독립적인 구독자가 생성된다.
let pub = ["First", "Second", "Third"].publisher
.map( { return ($0, Int.random(in: 0...100)) } )
.print("Random")
.multicast { PassthroughSubject<(String, Int), Never>() } // 추가
let cancellable1 = pub
.sink { print ("Stream 1 received: \($0)")}
let cancellable2 = pub
.sink { print ("Stream 2 received: \($0)")}
pub.connect() // 추가
let pub = (1...3).publisher
.delay(for: 1, scheduler: DispatchQueue.global())
.map( { _ in return Int.random(in: 0...100) } )
.print("Random")
.share()
let cancellable1 = pub
.sink { print ("Stream 1 received: \($0)")}
let cancellable2 = pub
.sink { print ("Stream 2 received: \($0)")}
위 코드에서 delay를 주석처리하면 Stream 1 received만 여러번 호출된다.
# eraseToAnyPublisher
RxSwift에서의 asObservable과 동일하다. 해당 오퍼레이터를 사용하는 실제 이유는 TypeEraser로서의 역할을 주기 위함이다. 퍼블리셔 세부 타입을 감추고 AnyPublisher의 형태로만 추상화하기 위해 사용한다.
let x = PassthroughSubject<String, Never>()
.flatMap { name in
return Future<String, Error> { promise in
promise(.success(""))
}.catch { _ in
Just("No user found")
}.map { result in
return "\(result) foo"
}
}
// .eraseToAnyPublisher()
// .sink(receiveValue: {print($0)})
위 코드에서 flatMap 오퍼레이터로 반환되는 퍼블리셔는 구체 타입을 가지고 있어서 데이터 파이프라인의 흐름이 외부로 노출된다. 어차피 구독자에게 전달되는 데이터를 단순 활용만 할거라면 퍼블리셔의 구체적인 타입은 불필요하기 때문에 eraseToAnyPublisher를 호출하여 AnyPublisher 오퍼레이터를 추가하는 것이다.
# assign(to🔛)
assign(ReferenceWritableKeyPath<Root, Self.Output>, on object: Root): 업스트림에서 방출되는 요소들을 가지고 키패스를 통해 인스턴스에 속성 할당을 진행한다.assign(to published: inout Published<Self.Output>.Publisher):Published속성에 직접 속성을 할당한다.
class MyClass {
var anInt: Int = 0 {
didSet {
print("anInt was set to: \(anInt)", terminator: "; ")
}
}
}
var myObject = MyClass()
let myRange = (0...2)
cancellable = myRange.publisher
.assign(to: \.anInt, on: myObject)
class MyModel2: ObservableObject {
@Published var id: Int = 0
}
let model2 = MyModel2()
Just(100).assign(to: &model2.$id)
# AsyncPublisher
퍼블리셔에는 values속성이 있는데, 해당 속성을 통해 방출되는 값들을 Swift의 async-await 형태로 비동기적인 순회가 가능하다. AsyncPublisher가 AsyncSequence를 채택하기 때문에 가능한 것
let numbers: [Int] = [1, 2, 3, 4, 5]
let filtered = numbers.publisher
.filter { $0 % 2 == 0 }
for await number in filtered.values
{
print("\(number)", terminator: " ")
}
print를 사용하면 디버깅이 가능하다.
let numbers: [Int] = [1, 2, 3, 4, 5]
let filtered = numbers.publisher
.filter { $0 % 2 == 0 }
.print("DEBUG: ")
.sink(receiveValue: {print($0)})
# AnyPublisher
AnyPublisher는 eraseToAnyPublisher 오퍼레이터에 대한 설명에서 언급되었듯 자체적인 속성은 갖고 있지 않고, 방출된 요소 및 완료 값을 전달하기 위한 목적으로만 사용된다.
다른 모듈에서 API들을 노출하고 싶지 않을때 사용한다. 또한 서브젝트를 AnyPublisher로 래핑하면 외부에서 send 메서드를 호출하여 불필요한 요소 방출을 방지할 수도 있다.
# Published
@Published 프로퍼티 래퍼로 선언된 속성은 해당 타입에 맞게 퍼블리셔를 생성하게 된다. 속성값이 변경되면 willSet블록에서 방출이 이루어진다. willSet 블록이기 때문에 프로퍼티에 실제 변경된 값이 반영되기 전에 구독자가 값을 받게 된다.
class Weather {
@Published var temperature: Double
init(temperature: Double) {
self.temperature = temperature
}
}
let weather = Weather(temperature: 20)
let cancellable = weather.$temperature
.sink() {
print ("Temperature now: \($0)")
}
print(weather.temperature)
weather.temperature = 25
print(weather.temperature)
temperature속성값이 25로 변경되었을 때 구독자 클로저가 먼저 실행되어 received가 실행되고 이후 print가 실행된다.
# Reference
- Zeddios - AsyncSequence (opens new window)
- Apple Document - Publisher (opens new window)
- try code by Marin Todorov - subscribe(on:) vs receive(on:) (opens new window)
- Velog - subscribe(on:) VS. receive(on:) (opens new window)
- Transforming Operators in Swift Combine Framework: Map vs FlatMap vs SwitchToLatest (opens new window)
- Medium - Swift Combine: TypeEraser, things you might have never known of (opens new window)
← Combine - 개요 PS 모음집 →