
今天,我想向您展示如何在Apple的新Combine框架中创建自己的Publisher。
因此,对于初学者来说,我们需要简要回顾一下Combine的基本部分如何相互交互,即发布者,订阅,订阅者。
- 订阅者加入发布者
- 发布者发送订阅订阅者
- 订阅服务器要求订阅提供N个值
- 发布者发送的N个值或更少
- 发布者发送完成信号
发行人
因此,让我们开始创建发布服务器。 转到Apple文档,我们将看到Publisher是一个协议。
public protocol Publisher { associatedtype Output associatedtype Failure : Error func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input }
其中“输出”是此发布服务器传递的值的类型,“失败”是必须遵循“错误”协议的错误的类型。
以及接收函数(_:Subscriber),将使用subscribe(_ :)将调用此函数添加到此发布者。
例如,我们实现了Publisher,它将为我们生成斐波那契数 。
struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never }
由于该序列由数字组成,因此输出的类型将为Int类型的Output,而Failure将是从不的特殊类型,表示此Publisher将永远不会失败。
为了灵活性,我们指定了要接收的元素数量的限制,并将此值包装在Publisher的某些配置对象中。
struct FibonacciConfiguration { var count: UInt }
让我们仔细看一下这段代码,var count:UInt看起来是一个不错的选择,但是它的使用将我们限制在UInt类型的有效值范围内,并且也不清楚要表明是否仍然需要无限序列。
代替UInt,我们将使用在Combine中定义的Subscribers.Demand类型,该类型也被描述为通过订阅从订阅服务器发送到发布服务器的类型。 简单来说,它显示了对元素的需求 ,订阅服务器请求了多少个元素。 无限制-不限制,没有限制-完全没有,最大(N)-不超过N次。
public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible { public static let unlimited: Subscribers.Demand public static let none: Subscribers.Demand
我们重写FibonacciConfiguration,将类型更改为新的类型以进行计数。
struct FibonacciConfiguration { var count: Subscribers.Demand }
让我们回到Publisher并实现receive(_:Subscriber)方法,正如我们回想的那样,需要使用此方法将Subscriber添加到Publisher。 并且他通过“订阅”完成此操作,发布者必须创建一个订阅并将该订阅转移给订阅者。
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) }
这是一个通用函数,将订阅服务器作为参数,发布服务器的输出值必须与订阅服务器的输入值(输出== S.Input)匹配,对于错误也是如此。 这对于“连接” Publisher'a和Subscriber'a是必需的。
在函数本身中,创建FibonacciSubscription订阅,在构造函数中,我们转移订阅者和配置。 之后,订阅将转移到订阅者。
我们的发布者已经准备就绪,最终我们可以:
struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never var configuration: FibonacciConfiguration func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) } }
如您所见,Publisher本身不包含任何用于生成Fibonacci序列的逻辑;所有逻辑将在订阅类-FibonacciSubscription中。
您可能已经猜到了,FibonacciSubscription类将遵循Subscription协议,让我们看一下该协议的定义。
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible { func request(_ demand: Subscribers.Demand) }
请求(_:Subscribers.Demand)函数告诉发布者他可以向订阅者发送更多值。 在这种方法中,发送斐波那契数的逻辑将是这样。
我们还需要实现以下Cancelableable协议并实现cancel()函数。
public protocol Cancellable { func cancel() }
只需遵循CustomCombineIdentifierConvertible协议并定义只读变量CombineIdentifier。
public protocol CustomCombineIdentifierConvertible { var combineIdentifier: CombineIdentifier { get } }
需要澄清的是,如果您滚动到Combine中CustomCombineIdentifierConvertible协议的定义下方,则可以看到Combine为该协议提供了扩展名,格式为-
extension CustomCombineIdentifierConvertible where Self : AnyObject { public var combineIdentifier: CombineIdentifier { get } }
这告诉我们,如果遵循该协议的类型也遵循AnyObject协议(即,如果该类型是类),则默认情况下会提供变量CombineIdentifier的定义:CombineIdentifier。 FibonacciSubscription是一个类,因此我们获得了默认的变量定义。
订阅方式
因此,我们将开始实施FibonacciSubscription。
private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand init(subscriber: S?, configuration: FibonacciConfiguration) { self.subscriber = subscriber self.configuration = configuration self.count = configuration.count } ... }
如您所见,FibonacciConfiguration包含指向订户的强大链接,换句话说,它是订户的所有者。 这是重要的一点,订阅负责保留订阅者,并且必须保留订阅者,直到完成工作,出现错误或取消为止。
接下来,我们从Cancellable协议中实现cancel()方法。
func cancel() { subscriber = nil }
将订户设置为nil使其无法进行预订。
现在,我们准备开始发送斐波那契数字的实现。
我们实现了请求方法(_:Subscribers.Demand)。
func request(_ demand: Subscribers.Demand) {
1)从一开始,我们检查发布者可以提供多少元素,如果根本不提供,则完成发送,并向订阅者发送信号,表明号码发送已完成。
2)如果有需要,则将请求的总数减少一个,将斐波那契数列的第一个元素发送给订阅服务器,即0,然后再次检查发布服务器可以提供给我们的元素数量,如果没有,则向订阅服务器发送信号以完成。
3)与2)段中的方法相同,但仅适用于斐波那契数列中的第二个元素。
4)如果需要两个以上的元素,那么我们将实现一个迭代算法来查找斐波那契数,在每一步中,我们将把下一个数字从斐波那契数列转移到Subscriber'y,并检查Publisher仍然可以提供多少个元素。 如果发布者不再提供新号码,则向订阅者发送完成信号。
目前,我们已经编写了这样的代码 struct FibonacciConfiguration { var count: Subscribers.Demand } struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never var configuration: FibonacciConfiguration func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) } } private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand init(subscriber: S?, configuration: FibonacciConfiguration) { self.subscriber = subscriber self.configuration = configuration self.count = configuration.count } func cancel() { subscriber = nil } func request(_ demand: Subscribers.Demand) {
第一次测试
现在,我们将测试所拥有的内容,拥有发布者和订阅者,没有足够的Sibscriber,Combine从框中提供2个Sibscriber:接收器和分配。
- 下沉-此方法创建一个订户并立即请求无限数量的值。
- Assign-将Publisher中的每个元素设置为对象属性。
“接收器”非常适合我们的目的,应特别注意它要求无限数量的值的事实。
在这里,我们需要做出重要区分,我们的发布者在count变量中确定我们的发布者可以提供的元素数量,而这些条件由我们自己决定。 原则上,我们可以不使用此变量,并且不受传递斐波那契数的限制,但是很快我们将超出Int类型的允许值范围。
接收器的情况有所不同,每个订阅服务器确定要接收多少个值,接收器请求无限数量的值,这意味着它将接收值,直到接收到完成,错误或取消的信号。
为了方便使用发布服务器,我们将其创建添加到发布服务器协议扩展中。
extension Publishers { private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher { FibonacciPublisher(configuration: configuration) } static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher { FibonacciPublisher(configuration: FibonacciConfiguration(count: count)) } }
因此尝试我们的发布者
Publishers.fibonacci(count: .max(10)) .sink { value in print(value, terminator: " ") }
现在边界情况
Publishers.fibonacci(count: .max(1)) .sink { value in print(value, terminator: " ") }
但是,如果指定.unlimited会发生什么?
Publishers.fibonacci(count: .unlimited) .print() .sink { value in print(value, terminator: " ") }
如何使用.unlimited,但能够输出多个数字? 为此,我们需要.prefix(_)运算符,该运算符的工作方式与集合中的.prefix(_)相同,即,它仅保留前N个元素。
Publishers.fibonacci(count: .unlimited) .print() .prefix(5) .sink { _ in }
怎么了 也许在.prefix(_)中? 让我们对Foundation的标准序列进行一些实验。
如我们所见,上面的代码正常工作,那么问题出在我们的Publisher的实现中。
我们查看来自.print()的日志,然后看到在来自.prefix(_)的N个请求之后,我们在FibonacciSubscription上调用了cancel(),将订户设置为nil。
func cancel() { subscriber = nil }
如果打开调用堆栈,则可以看到从请求(_ :)调用了cancel(),即在对订户的调用中。.Receive(_)。 从中我们可以得出结论,请求(_ :)订阅服务器内部的某个时间点可能变为nil,然后我们需要停止生成新数字的工作。 将此条件添加到我们的代码中。
func request(_ demand: Subscribers.Demand) {
现在运行我们的测试代码。
Publishers.fibonacci(count: .unlimited) .print() .prefix(5) .sink { _ in }
得到了预期的行为。
订户
那么我们的斐波那契订阅已经准备好了吗? 并非如此,在我们的测试中,我们只使用了一个接收器,要求输入无限数量的数字,但是如果我们使用一个希望获得一定数量的有限数字的用户呢? 合并没有提供这样的订阅者,但是什么阻止我们编写自己的订阅者呢? 以下是我们的FibonacciSubscriber的实现。
class FibonacciSubscriber: Subscriber { typealias Input = Int typealias Failure = Never var limit: Subscribers.Demand init(limit: Subscribers.Demand) { self.limit = limit } func receive(subscription: Subscription) { subscription.request(limit) } func receive(_ input: Input) -> Subscribers.Demand { .none } func receive(completion: Subscribers.Completion<Failure>) { print("Subscriber's completion: \(completion)") } }
因此,我们的FibonacciSubscriber具有limit属性,该属性确定该订阅服务器希望接收多少个元素。 这是通过receive(_:Subscription)方法完成的,在该方法中,我们告诉订阅我们需要多少个元素。 还需要注意receive(_:Input)-> Subscribers.Demand函数,当接收到一个新值时会调用此函数,作为返回值,我们指示要接收多少个其他元素:.none-一点也不,.max(N)N个,总共,接收到的元素总数将等于已接收的已发送订阅的值(_:订阅)与来自接收的所有返回值(_:输入)-> Subscribers.Demand之和。
第二次测试
让我们尝试使用FibonacciSubscriber。
let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5)) .print() .subscribe(subscriber)
如我们所见,发布服务器发送了5个值,而不是3个。为什么呢? 因为FibonacciSubscription'a的request(_:Subscribers.Demand)方法没有考虑订阅者的需求,所以让我们对其进行修复,为此我们添加了一个请求的附加属性,通过该属性我们可以跟踪订阅者的需求。
private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand var requested: Subscribers.Demand = .none
第三次测试
let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5)) .print() .subscribe(subscriber)
Publisher现在可以正常工作。
最终代码 import Foundation import Combine struct FibonacciConfiguration { var count: Subscribers.Demand } struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never var configuration: FibonacciConfiguration func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) } } private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand var requested: Subscribers.Demand = .none init(subscriber: S?, configuration: FibonacciConfiguration) { self.subscriber = subscriber self.configuration = configuration self.count = configuration.count } func cancel() { subscriber = nil } func request(_ demand: Subscribers.Demand) { guard count > .none else { subscriber?.receive(completion: .finished) return } requested += demand count -= .max(1) requested -= .max(1) requested += subscriber?.receive(0) ?? .none guard let _ = subscriber, requested > .none else { return } if count == .none { subscriber?.receive(completion: .finished) return } count -= .max(1) requested -= .max(1) requested += subscriber?.receive(1) ?? .none guard let _ = subscriber, requested > .none else { return } if count == .none { subscriber?.receive(completion: .finished) return } var prev = 0 var current = 1 var temp: Int while let subscriber = subscriber, requested > .none { temp = prev prev = current current += temp requested += subscriber.receive(current) count -= .max(1) requested -= .max(1) if count == .none { subscriber.receive(completion: .finished) return } } } } extension Publishers { private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher { FibonacciPublisher(configuration: configuration) } static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher { FibonacciPublisher(configuration: FibonacciConfiguration(count: count)) } } class FibonacciSubscriber: Subscriber { typealias Input = Int typealias Failure = Never var limit: Subscribers.Demand init(limit: Subscribers.Demand) { self.limit = limit } func receive(subscription: Subscription) { subscription.request(limit) } func receive(_ input: Input) -> Subscribers.Demand { .none } func receive(completion: Subscribers.Completion<Failure>) { print("Subscriber's completion: \(completion)") } } Publishers.fibonacci(count: .max(4)) .print() .sink { _ in } let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5)) .print() .subscribe(subscriber)
结果
我希望本文能使您更多地了解什么是发布者,订阅和订阅者,它们之间如何交互以及在决定实施发布者时需要注意的几点。 欢迎对本文进行任何评论和澄清。