Erstellen Sie Ihren Publisher in Combine


Heute möchte ich Ihnen zeigen, wie Sie Ihren eigenen Publisher im neuen Apple Combine Framework erstellen.


Als erstes müssen wir uns kurz daran erinnern, wie die grundlegenden Teile von Combine miteinander interagieren, nämlich Publisher, Subscription, Subscriber.


  • Abonnent tritt Publisher bei
  • Herausgeber sendet Abonnement Abonnent
  • Der Abonnent fragt nach N Werten für das Abonnement
  • Publisher sendet N Werte oder weniger
  • Publisher sendet ein Abschlusssignal

Herausgeber


Beginnen wir also mit der Erstellung unseres Publishers. In der Apple-Dokumentation sehen wir, dass Publisher ein Protokoll ist.


public protocol Publisher { associatedtype Output associatedtype Failure : Error func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input } 

Wenn "Ausgabe" der Wertetyp ist, der von diesem Verleger übergeben wird, ist "Fehler" der Fehlertyp, der dem Fehlerprotokoll folgen muss.


Und die Empfangsfunktion (_: Subscriber), die aufgerufen wird, um diesem Publisher mithilfe von subscribe (_ :) einen Subscriber hinzuzufügen.


Als Beispiel implementieren wir Publisher, die für uns Fibonacci-Zahlen generieren.


 struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never } 

Da die Sequenz aus Zahlen besteht, ist der Ausgabetyp "Ausgabe" vom Typ "Int", und "Fehler" ist ein spezieller Typ "Nie", der angibt, dass dieser Publisher niemals fehlschlagen wird.


Aus Gründen der Flexibilität geben wir die Grenze für die Anzahl der Elemente an, die wir empfangen möchten, und binden diesen Wert in ein Konfigurationsobjekt unseres Publishers ein.


 struct FibonacciConfiguration { var count: UInt } 

Schauen wir uns diesen Code genauer an, var count: UInt sieht nach einer guten Option aus, aber seine Verwendung beschränkt uns auf den Bereich gültiger Werte des Typs UInt und es ist auch nicht ganz klar, was angezeigt werden soll, wenn wir immer noch eine unbegrenzte Sequenz haben möchten.


Anstelle von UInt verwenden wir den Subscribers.Demand-Typ, der in Combine definiert ist. Dort wird er auch als der Typ beschrieben, der vom Abonnenten über das Abonnement an den Verleger gesendet wird. In einfachen Worten zeigt es den Bedarf an Elementen, wie viele Elemente vom Abonnenten angefordert werden. unbegrenzt - nicht begrenzt, keine - überhaupt nicht, max (N) - nicht mehr als N-mal.


  public struct Demand : Equatable, Comparable, Hashable, Codable, CustomStringConvertible { public static let unlimited: Subscribers.Demand public static let none: Subscribers.Demand ///  Demand.max(0) @inlinable public static func max(_ value: Int) -> Subscribers.Demand .... } 

Wir haben FibonacciConfiguration umgeschrieben und den Typ für die Zählung auf einen neuen geändert.


 struct FibonacciConfiguration { var count: Subscribers.Demand } 

Kehren wir zu Publisher zurück und implementieren die Methode receive (_: Subscriber). Wie wir uns erinnern, wird diese Methode benötigt, um Subscriber zu Publisher hinzuzufügen. Und er tut dies mit einem Abonnement, Publisher muss ein Abonnement erstellen und das Abonnement auf den Abonnenten übertragen.


  func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) } 

Dies ist eine generische Funktion, die Subscriber als Parameter verwendet. Die Ausgabewerte des Publishers müssen mit den Eingabewerten des Subscriber übereinstimmen (Output == S.Input). Dies gilt auch für Fehler. Dies ist notwendig, um Publisher'a und Subscriber'a zu "verbinden".


Erstellen Sie in der Funktion selbst ein FibonacciSubscription-Abonnement, und übertragen Sie im Konstruktor den Abonnenten und die Konfiguration. Danach wird das Abonnement auf den Abonnenten übertragen.


Unser Verlag ist fertig, am Ende haben wir:


 struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never var configuration: FibonacciConfiguration func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) } } 

Wie Sie sehen, enthält Publisher selbst keine Logik zum Generieren einer Fibonacci-Sequenz, die gesamte Logik befindet sich in der Subskriptionsklasse - FibonacciSubscription.


Wie Sie vielleicht bereits erraten haben, folgt die FibonacciSubscription-Klasse dem Subscription-Protokoll. Schauen wir uns die Definition dieses Protokolls an.


 public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible { func request(_ demand: Subscribers.Demand) } 

Die Funktion request (_: Subscribers.Demand) teilt Publisher mit, dass er dem Abonnenten weitere Werte senden kann. In dieser Methode wird die Logik des Sendens von Fibonacci-Zahlen sein.
Wir müssen auch nach dem Cancellable-Protokoll implementieren und die cancel () -Funktion implementieren.


 public protocol Cancellable { func cancel() } 

Folgen Sie einfach dem CustomCombineIdentifierConvertible-Protokoll und definieren Sie die schreibgeschützte Variable combineIdentifier.


 public protocol CustomCombineIdentifierConvertible { var combineIdentifier: CombineIdentifier { get } } 

Hier gibt es eine Klarstellung: Wenn Sie direkt unter der Definition des CustomCombineIdentifierConvertible-Protokolls in Combine scrollen, können Sie feststellen, dass Combine eine Erweiterung für dieses Protokoll bereitstellt, die die Form hat:


 extension CustomCombineIdentifierConvertible where Self : AnyObject { public var combineIdentifier: CombineIdentifier { get } } 

Was uns sagt, dass die Definition der Variablen combineIdentifier: CombineIdentifier standardmäßig bereitgestellt wird, wenn der Typ, der auf dieses Protokoll folgt, auch auf das AnyObject-Protokoll folgt, dh, wenn dieser Typ eine Klasse ist. FibonacciSubscription ist eine Klasse, daher erhalten wir die Standardvariablendefinition.


Abo


Und so werden wir beginnen, unser Fibonacci-Abonnement zu implementieren.


 private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand init(subscriber: S?, configuration: FibonacciConfiguration) { self.subscriber = subscriber self.configuration = configuration self.count = configuration.count } ... } 

Wie Sie sehen, enthält FibonacciConfiguration eine starke Verbindung zum Abonnenten, ist also der Eigentümer des Abonnenten. Dies ist ein wichtiger Punkt. Das Abonnement ist für die Aufbewahrung des Abonnenten verantwortlich und muss so lange aufbewahrt werden, bis die Arbeit beendet ist, ein Fehler aufgetreten ist oder gekündigt wurde.


Als Nächstes implementieren wir die cancel () -Methode aus dem Cancellable-Protokoll.


 func cancel() { subscriber = nil } 

Wenn Sie den Subskribenten auf "nil" setzen, kann nicht auf das Abonnement zugegriffen werden.


Jetzt können wir mit der Implementierung des Sendens von Fibonacci-Nummern beginnen.
Wir implementieren die Request-Methode (_: Subscribers.Demand).


 func request(_ demand: Subscribers.Demand) { // 1 guard count > .none else { subscriber?.receive(completion: .finished) return } // 2 count -= .max(1) subscriber?.receive(0) if count == .none { subscriber?.receive(completion: .finished) return } // 3 count -= .max(1) subscriber?.receive(1) if count == .none { subscriber?.receive(completion: .finished) return } // 4 var prev = 0 var current = 1 var temp: Int while true { temp = prev prev = current current += temp subscriber?.receive(current) count -= .max(1) if count == .none { subscriber?.receive(completion: .finished) return } } } 

1) Wir prüfen von Anfang an, wie viele Elemente Publisher uns, wenn überhaupt nicht, zur Verfügung stellen können, schließen Sie den Versand ab und senden Sie dem Abonnenten ein Signal, dass der Versand der Nummern abgeschlossen ist.
2) Reduzieren Sie bei Bedarf die Gesamtzahl der angeforderten Nummern um eins, senden Sie das erste Element der Fibonacci-Sequenz an den Abonnenten, nämlich 0, und überprüfen Sie dann erneut, wie viele Elemente der Verlag uns geben kann, wenn nicht, senden Sie ein Signal an den Abonnenten, um den Vorgang abzuschließen .
3) Dieselbe Vorgehensweise wie in Absatz 2), jedoch nur für das zweite Element in der Fibonacci-Sequenz.
4) Wenn mehr als 2 Elemente erforderlich sind, implementieren wir einen iterativen Algorithmus zum Auffinden von Fibonacci-Zahlen, wobei wir bei jedem Schritt die nächste Nummer aus der Fibonacci-Sequenz an Subscriber'y übertragen und auch überprüfen, wie viele Elemente Publisher noch bereitstellen kann. Wenn Publisher keine neuen Nummern mehr bereitstellt, senden Sie dem Abonnenten ein Abschlusssignal.


Im Moment haben wir einen solchen Code geschrieben
 struct FibonacciConfiguration { var count: Subscribers.Demand } struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never var configuration: FibonacciConfiguration func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) } } private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand init(subscriber: S?, configuration: FibonacciConfiguration) { self.subscriber = subscriber self.configuration = configuration self.count = configuration.count } func cancel() { subscriber = nil } func request(_ demand: Subscribers.Demand) { // 1 guard count > .none else { subscriber?.receive(completion: .finished) return } // 2 count -= .max(1) subscriber?.receive(0) if count == .none { subscriber?.receive(completion: .finished) return } // 3 count -= .max(1) subscriber?.receive(1) if count == .none { subscriber?.receive(completion: .finished) return } // 4 var prev = 0 var current = 1 var temp: Int while true { temp = prev prev = current current += temp subscriber?.receive(current) count -= .max(1) if count == .none { subscriber?.receive(completion: .finished) return } } } } 

Erster Test


Jetzt werden wir testen, was wir haben, wir haben unseren Publisher und unser Abonnement, wir haben nicht genug Sibscriber, Combine bietet 2 Sibscriber aus der Box: versenken und zuweisen.


  • sink - Diese Methode erstellt einen Abonnenten und fordert sofort eine unbegrenzte Anzahl von Werten an.
  • zuweisen - legt jedes Element von Publisher auf eine Objekteigenschaft fest.

sink ist für unseren zweck gut geeignet, ein besonderes augenmerk sollte darauf gelegt werden, dass es eine unbegrenzte anzahl von werten verlangt.


Und hier müssen wir eine wichtige Unterscheidung treffen, unser Publisher in der Variable count bestimmt die Anzahl der Elemente, die unser Publisher angeben kann, und diese Bedingungen werden von uns selbst festgelegt. Grundsätzlich könnten wir auf diese Variable verzichten und uns nicht auf die Übertragung von Fibonacci-Zahlen beschränken, aber bald würden wir den Bereich zulässiger Werte vom Typ Int verlassen.
Bei sink ist der Fall anders. Jeder Abonnent bestimmt, wie viele Werte er empfangen möchte. Sink fordert eine unbegrenzte Anzahl von Werten an. Dies bedeutet, dass er Werte empfängt, bis er ein Abschluss-, Fehler- oder Löschungssignal erhält.


Um die Verwendung unseres Publishers zu vereinfachen, fügen wir dessen Erstellung der Publishers-Protokollerweiterung hinzu.


 extension Publishers { private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher { FibonacciPublisher(configuration: configuration) } static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher { FibonacciPublisher(configuration: FibonacciConfiguration(count: count)) } } 

Probieren Sie unseren Verlag aus


 Publishers.fibonacci(count: .max(10)) .sink { value in print(value, terminator: " ") } // print 0 1 1 2 3 5 8 13 21 34 - OK 

Und jetzt die Grenzfälle


 Publishers.fibonacci(count: .max(1)) .sink { value in print(value, terminator: " ") } // prinst 0 - OK Publishers.fibonacci(count: .max(2)) .sink { value in print(value, terminator: " ") } // prints 0 1 - OK Publishers.fibonacci(count: .none) .print() //    publisher'a .sink { value in print(value, terminator: " ") } // prints receive finished - OK 

Aber was passiert, wenn Sie .unlimited angeben?


 Publishers.fibonacci(count: .unlimited) .print() .sink { value in print(value, terminator: " ") } // prints 0 1 1 2 3 5 8 13 21 ...   ,     Int. 

Wie können Sie .unlimited verwenden, aber mehrere Zahlen ausgeben? Dazu benötigen wir den Operator .prefix (_), der genauso funktioniert wie .prefix (_) aus Auflistungen, dh, es bleiben nur die ersten N Elemente übrig.


 Publishers.fibonacci(count: .unlimited) .print() .prefix(5) .sink { _ in } // prints 0 1 1 2 3 cancel   ,       Int. 

Was ist das problem Vielleicht in .prefix (_)? Lassen Sie uns ein kleines Experiment mit der Standardsequenz von Foundation durchführen.


 //   1 2 3 4 5 6 7 8 ... 1... .publisher .print() .prefix(5) .sink { _ in } // prints 1 2 3 4 5 cancel -  

Wie wir sehen können, hat der obige Code korrekt funktioniert, dann liegt das Problem in unserer Implementierung von Publisher.
Wir sehen uns die Protokolle von .print () an und stellen fest, dass wir nach N Anfragen von .prefix (_) cancel () für unser FibonacciSubscription aufrufen, wobei wir Subscriber auf nil setzen.


  func cancel() { subscriber = nil } 

Wenn Sie die Anrufliste öffnen, können Sie sehen, dass cancel () von request (_ :) aufgerufen wird, und zwar während des Anrufs an den Teilnehmer? .Receive (_). Daraus können wir schließen, dass zu einem bestimmten Zeitpunkt innerhalb der Anfrage (_ :) der Teilnehmer Null werden kann und wir dann die Arbeit an der Generierung neuer Nummern beenden müssen. Fügen Sie diese Bedingung unserem Code hinzu.


  func request(_ demand: Subscribers.Demand) { // 1 guard count > .none else { subscriber?.receive(completion: .finished) return } // 2 count -= .max(1) subscriber?.receive(0) guard let _ = subscriber else { return } // new if count == .none { subscriber?.receive(completion: .finished) return } // 3 count -= .max(1) subscriber?.receive(1) guard let _ = subscriber else { return } // new if count == .none { subscriber?.receive(completion: .finished) return } // 4 var prev = 0 var current = 1 var temp: Int while let subscriber = subscriber { // new temp = prev prev = current current += temp subscriber.receive(current) count -= .max(1) if count == .none { subscriber.receive(completion: .finished) return } } } 

Führen Sie jetzt unseren Testcode aus.


 Publishers.fibonacci(count: .unlimited) .print() .prefix(5) .sink { _ in } // prints 0 1 1 2 3 cancel -  

Habe das erwartete Verhalten.


Abonnent


Und so ist unser FibonacciSubscription bereit? Nicht wirklich, in unseren Tests haben wir nur einen Sink-Abonnenten verwendet, der nach einer unbegrenzten Anzahl von Nummern fragt, aber was ist, wenn wir stattdessen einen Abonnenten verwenden, der eine bestimmte begrenzte Anzahl von Nummern erwartet. Combine stellt einen solchen Abonnenten nicht zur Verfügung, aber was hindert uns daran, unseren eigenen zu schreiben? Nachfolgend finden Sie die Implementierung unseres Fibonacci-Abonnenten.


 class FibonacciSubscriber: Subscriber { typealias Input = Int typealias Failure = Never var limit: Subscribers.Demand init(limit: Subscribers.Demand) { self.limit = limit } func receive(subscription: Subscription) { subscription.request(limit) } func receive(_ input: Input) -> Subscribers.Demand { .none } func receive(completion: Subscribers.Completion<Failure>) { print("Subscriber's completion: \(completion)") } } 

Daher verfügt unser Fibonacci-Abonnent über eine Limit-Eigenschaft, die festlegt, wie viele Elemente dieser Abonnent empfangen möchte. Und dies geschieht mit der Methode receive (_: Subscription), bei der wir dem Abonnement mitteilen, wie viele Elemente wir benötigen. Beachten Sie auch die Funktion receive (_: Input) -> Subscribers.Demand. Diese Funktion wird aufgerufen, wenn ein neuer Wert empfangen wird. Als Rückgabewert geben wir an, wie viele zusätzliche Elemente wir empfangen möchten: .none - not at all, .max (N) N pieces Insgesamt entspricht die Gesamtzahl der empfangenen Elemente der Summe aus dem Wert des gesendeten Abonnements in receive (_: Subscription) und allen Rückgabewerten von receive (_: Input) -> Subscribers.Demand.


Zweiter Test


Versuchen wir es mit FibonacciSubscriber.


 let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5)) .print() .subscribe(subscriber) // prints 0 1 1 2 3 -     0 1 1 

Wie wir sehen, hat unser Publisher 5 statt 3 Werte gesendet. Warum? Da die request (_: Subscribers.Demand) -Methode von FibonacciSubscription'a die Anforderungen des Abonnenten nicht berücksichtigt, müssen wir sie korrigieren. Hierzu fügen wir eine zusätzliche angeforderte Eigenschaft hinzu, mit der wir die Anforderungen des Abonnenten verfolgen.


 private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand var requested: Subscribers.Demand = .none // new init(subscriber: S?, configuration: FibonacciConfiguration) { self.subscriber = subscriber self.configuration = configuration self.count = configuration.count } func cancel() { subscriber = nil } func request(_ demand: Subscribers.Demand) { guard count > .none else { subscriber?.receive(completion: .finished) return } requested += demand // new count -= .max(1) requested -= .max(1) // new requested += subscriber?.receive(0) ?? .none // new guard let _ = subscriber, requested > .none else { return } // new if count == .none { subscriber?.receive(completion: .finished) return } count -= .max(1) requested -= .max(1) // new requested += subscriber?.receive(1) ?? .none // new guard let _ = subscriber, requested > .none else { return } // new if count == .none { subscriber?.receive(completion: .finished) return } var prev = 0 var current = 1 var temp: Int while let subscriber = subscriber, requested > .none { // new temp = prev prev = current current += temp requested += subscriber.receive(current) // new count -= .max(1) requested -= .max(1) // new if count == .none { subscriber.receive(completion: .finished) return } } } } 

Dritte Prüfung


 let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5)) .print() .subscribe(subscriber) // prints 0 1 1 - OK 

Publisher funktioniert jetzt ordnungsgemäß.


Endgültiger Code
 import Foundation import Combine struct FibonacciConfiguration { var count: Subscribers.Demand } struct FibonacciPublisher: Publisher { typealias Output = Int typealias Failure = Never var configuration: FibonacciConfiguration func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input { let subscription = FibonacciSubscription(subscriber: subscriber, configuration: configuration) subscriber.receive(subscription: subscription) } } private final class FibonacciSubscription<S: Subscriber>: Subscription where S.Input == Int { var subscriber: S? var configuration: FibonacciConfiguration var count: Subscribers.Demand var requested: Subscribers.Demand = .none init(subscriber: S?, configuration: FibonacciConfiguration) { self.subscriber = subscriber self.configuration = configuration self.count = configuration.count } func cancel() { subscriber = nil } func request(_ demand: Subscribers.Demand) { guard count > .none else { subscriber?.receive(completion: .finished) return } requested += demand count -= .max(1) requested -= .max(1) requested += subscriber?.receive(0) ?? .none guard let _ = subscriber, requested > .none else { return } if count == .none { subscriber?.receive(completion: .finished) return } count -= .max(1) requested -= .max(1) requested += subscriber?.receive(1) ?? .none guard let _ = subscriber, requested > .none else { return } if count == .none { subscriber?.receive(completion: .finished) return } var prev = 0 var current = 1 var temp: Int while let subscriber = subscriber, requested > .none { temp = prev prev = current current += temp requested += subscriber.receive(current) count -= .max(1) requested -= .max(1) if count == .none { subscriber.receive(completion: .finished) return } } } } extension Publishers { private static func fibonacci(configuration: FibonacciConfiguration) -> FibonacciPublisher { FibonacciPublisher(configuration: configuration) } static func fibonacci(count: Subscribers.Demand = .max(6)) -> FibonacciPublisher { FibonacciPublisher(configuration: FibonacciConfiguration(count: count)) } } class FibonacciSubscriber: Subscriber { typealias Input = Int typealias Failure = Never var limit: Subscribers.Demand init(limit: Subscribers.Demand) { self.limit = limit } func receive(subscription: Subscription) { subscription.request(limit) } func receive(_ input: Input) -> Subscribers.Demand { .none } func receive(completion: Subscribers.Completion<Failure>) { print("Subscriber's completion: \(completion)") } } Publishers.fibonacci(count: .max(4)) .print() .sink { _ in } let subscriber = FibonacciSubscriber(limit: .max(3)) Publishers.fibonacci(count: .max(5)) .print() .subscribe(subscriber) 

Ergebnis


Ich hoffe, dieser Artikel hat Ihnen ein besseres Verständnis dafür vermittelt, was Publisher, Subscription und Subscriber sind, wie sie miteinander interagieren und auf welche Punkte Sie achten müssen, wenn Sie sich für die Implementierung Ihres Publishers entscheiden. Kommentare, Erläuterungen zum Artikel sind willkommen.

Source: https://habr.com/ru/post/de482690/


All Articles