10/7/2020, 9:25:13 PM
public protocol Publisher {
// 1
associatedtype Output
// 2
associatedtype failure: Error
// 4
func receive<S>(subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
extension Publisher {
// 3
public func subscribe<S>(_ subscriber: S) where S: Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
Publisher
产生数据的类型Publisher
可能产生错误的类型,如果能够保证一定不出错,使用Never
subscribe(-:)
来接受消息subscribe(-:)
需要调用receive(subscriber:)
来将订阅者订阅到publisher
。public protocol Subscriber: CustomCombineIdentifierConvertible {
// 1
associatedtype Input
// 2
associatedtype Failure: Error
// 3
func receive(subscription: Subscription)
// 4
func receive(_ input: Self.Input) -> Subscribers.Demand
// 5
func receive(completion: Subscribers.Completion<Self.Failure>)
}
Publisher
不会出错,则是Never
。Publisher
会调用receive(subscription:)
来传递 subscriptionPublisher
调用receive(_:)
来传值。Publisher
调用receive(completion:)
,在结束或者出错时候。public protocol Subscription: Cancellable, CustomCombineIdentifierconvertible {
func request(_ demand: Subscribers.Demand)
}
订阅者调用request(_:_)
来告知愿意接受更多的值,从一个最大的值,到无限制。
subscription.request
背压 backpressure mmanagementSubscriber.receive(_:_)
可以在每次接收到值时候调整,表示后续要接受的数量Subscriber.receive(_:_)
调整的值是增量的,也就是说不能是负数(否则会返回fatalError
)。// 1
let publisher = (1...6).publisher
// 2
final class IntSubscriber: Subscriber {
// 3
typealias Input = Int
typealias Failuse = Never
// 4
func reveive(subscription: Subscription) {
subscription.request(.max(3))
}
// 5
func receive(_ input: Int) -> Subscribers.Demand {
print("Recevied value", input)
// return .unlimited
return .none // 等于 .max(0)
}
// 6
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
// 1
var subscriptions = Set<AnyCancellable>()
// 2
let subject = CurrentValueSubject<Int, Never>(0)
// 3
subject
.sink(receiveValue: {print($0)})
.store(in: &subscriptions) // 4
subscriptions
中,通过inout
参数引用而不是拷贝的方式来传递。subject.value
的方式来获取值subject.value = 3
赋值,或者subject.send(1)
方式来发送事件subject.send(completion: .finished)
来结束事件。// 1
let subject = PassthroughSubject<Int, Never>
// 2
let publisher = subject.eraseToAnyPublisher()
// 3
publisher
.sink(receiveValue {print($0)})
.store(in: &subscriptions)
// 4
subject.send(0)
anyPublisher<Int, Never>
,后续的就不会知道具体的类型和细节send
方法,