Einführung in die reaktive Programmierung

Guten Tag. In diesem Artikel werde ich durch Europa galoppieren, nämlich Ihnen sagen, was sie unter reaktiver Programmierung verstehen, Schauspieler und reaktive Flüsse vorstellen und schließlich mithilfe reaktiver Flüsse Mausgesten erkennen, wie in der alten Oper und ihrem spirituellen Nachfolger - Vivaldi .

Ziel ist es, die Grundkonzepte der reaktiven Programmierung vorzustellen und zu zeigen, dass nicht alles so kompliziert und beängstigend ist, wie es auf den ersten Blick erscheinen mag.

Bild
Quelle

Was ist reaktive Programmierung?


Um diese Frage zu beantworten, wenden wir uns an die Website . Es hat ein schönes Bild, das 4 Hauptkriterien zeigt, die reaktive Anwendungen erfüllen müssen.

Bild

Die Anwendung sollte schnell, fehlertolerant und gut skalierbar sein.
Es sieht so aus, als ob "wir für alle gut gegen alle schlecht sind", oder?

Was ist mit diesen Worten gemeint:

  1. Reaktionsfähigkeit

    Die Anwendung sollte dem Benutzer das Ergebnis in einer halben Sekunde liefern. Dies schließt auch das Prinzip des schnellen Ausfalls ein. Wenn also etwas schief geht, ist es besser, eine Fehlermeldung wie „Entschuldigung, es ist ein Problem aufgetreten. Versuchen Sie es später noch einmal, damit das Wetter am Meer wartet. Wenn der Vorgang lang ist, zeigen wir dem Benutzer einen Fortschrittsbalken. Wenn es sehr lang ist - „Ihre Anfrage wird vorläufig am 18. März 2042 erfüllt. Wir senden Ihnen eine Benachrichtigung per Post. "
  2. Skalierbarkeit ist eine Möglichkeit, unter Last reaktionsschnell zu sein. Stellen Sie sich den Lebenszyklus eines relativ erfolgreichen Dienstes vor:
    1. Start - Der Anforderungsfluss ist klein, der Dienst wird auf einer virtuellen Maschine mit einem Kern ausgeführt.
    2. Der Anforderungsfluss nimmt zu - Kernel werden der virtuellen Maschine hinzugefügt und Anforderungen werden in mehreren Threads verarbeitet.
    3. Noch mehr Last - wir verbinden Batching - Anfragen an die Datenbank und die Festplatte werden gruppiert.
    4. Noch mehr Last - Sie müssen mehr Server anheben und Arbeit im Cluster bereitstellen.
      Idealerweise sollte das System selbst je nach Belastung vergrößert oder verkleinert werden.
  3. Fehlertoleranz

    Wir akzeptieren, dass wir in einer unvollkommenen Welt leben und alles passiert. Für den Fall, dass in unserem System etwas schief geht, müssen wir Methoden zur Fehlerbehandlung und Wiederherstellung bereitstellen
  4. Und schließlich sind wir eingeladen, all dies mit einem System zu erreichen, dessen Architektur auf nachrichtengesteuertem Messaging basiert

Bevor ich fortfahre, möchte ich näher darauf eingehen, wie sich ereignisgesteuerte Systeme von nachrichtengesteuerten Systemen unterscheiden.

Ereignisgesteuert:

  • Ereignis - Das System meldet, dass es einen bestimmten Status erreicht hat.
  • Es können viele Abonnenten der Veranstaltung sein.
  • Die Ereigniskette ist normalerweise kurz und die Ereignishandler befinden sich (sowohl physisch als auch im Code) in der Nähe der Quelle.
  • Die Ereignisquelle und ihre Handler haben normalerweise einen gemeinsamen Status (physisch - sie verwenden denselben RAM-Teil für den Informationsaustausch).

Im Gegensatz zu ereignisgesteuert in einem nachrichtengesteuerten System:

  • Jede Nachricht hat nur einen Empfänger.
  • Nachrichten sind unveränderlich: Sie können nichts an der empfangenen Nachricht ändern, damit der Absender davon erfährt und die Informationen lesen kann.
  • Elemente des Systems reagieren auf empfangene Nachrichten (oder nicht) und können Nachrichten an andere Elemente des Systems senden.

Das alles bietet uns

Schauspieler Modell


Meilensteine ​​der Entwicklung:

  • Die erste Erwähnung von Schauspielern findet sich in einem wissenschaftlichen Artikel von 1973 - Carl Hewitt, Peter Bishop und Richard Steiger, "Ein universeller modularer ACTOR-Formalismus für künstliche Intelligenz".
  • 1986 - Erlang erscheint. Ericson benötigte eine Sprache für Telekommunikationsgeräte, die Fehlertoleranz und fehlerfreie Ausbreitung bietet. Im Kontext dieses Artikels sind seine Hauptmerkmale:

    • Alles ist ein Prozess
    • Nachrichten sind die einzige Art der Kommunikation (Erlang ist eine funktionale Sprache und Nachrichten darin sind unveränderlich).
  • ..
  • 2004 - die erste Version der Scala-Sprache. Seine Eigenschaften:
    • Powered by JVM,
    • Funktionell
    • Für Multithreading wurde ein Akteurmodell ausgewählt.

  • 2009 - Die Umsetzung der Akteure wurde in einer separaten Bibliothek - Akka - zugewiesen
  • 2014 - Akka.net - es wurde auf .Net portiert.

Was können Schauspieler tun?


Schauspieler sind die gleichen Objekte, aber:

  • Im Gegensatz zu gewöhnlichen Objekten können sich die Akteure nicht gegenseitig aufrufen.
  • Akteure können Informationen nur über unveränderliche Nachrichten übertragen .
  • Nach Erhalt der Nachricht kann der Schauspieler
    • Erstellen Sie neue Akteure (sie werden in der Hierarchie niedriger sein),
    • Senden Sie Nachrichten an andere Akteure,
    • Stoppen Sie die Akteure unten in der Hierarchie und sich selbst.

Schauen wir uns ein Beispiel an.

Bild

Schauspieler A möchte eine Nachricht an Schauspieler B senden. Er hat nur ActorRef (eine Adresse). Schauspieler B kann überall sein.
Akteur A sendet einen Buchstaben B über das System (ActorSystem). Das System legt den Brief in die Mailbox von Schauspieler B und „weckt“ Schauspieler B. Schauspieler B nimmt den Brief aus der Mailbox und tut etwas.

Im Vergleich zum Aufrufen von Methoden für ein anderes Objekt sieht es unnötig kompliziert aus, aber das Modell der Schauspieler passt perfekt in die reale Welt, wenn Sie sich vorstellen, dass Schauspieler Menschen sind, die darauf trainiert sind, auf bestimmte Reize zu reagieren.

Stellen Sie sich einen Vater und einen Sohn vor:



Der Vater schickt seinem Sohn SMSku „Clean in the room“ und macht weiterhin sein eigenes Ding. Der Sohn liest SMSku und beginnt zu putzen. Vater spielt mittlerweile Poker. Der Sohn beendet die Reinigung und sendet eine SMS „Fertig stellen“. Es sieht einfach aus, oder?

Stellen Sie sich nun vor, Vater und Sohn sind keine Schauspieler, sondern gewöhnliche Objekte, die sich gegenseitig in die Quere kommen lassen. Der Vater zieht seinen Sohn für die Methode „Zimmer reinigen“ und folgt ihm auf den Fersen. Er wartet, bis der Sohn mit der Reinigung fertig ist und die Kontrolle an seinen Vater zurückgibt. Vater kann derzeit nicht Poker spielen. In diesem Zusammenhang wird das Akteurmodell attraktiver.

Nun gehen wir weiter zu

Akka.NET


Alles, was unten geschrieben steht, gilt für das ursprüngliche Akka für die JVM, aber für mich ist C # näher als Java, daher werde ich Akka.NET als Beispiel verwenden.

Was sind die Vorteile von Akka?


  • Multithreading durch Messaging. Sie müssen nicht mehr mit allen Arten von Sperren, Semaphoren, Mutexen und anderen Reizen leiden, die für klassisches Multithreading mit gemeinsamem Speicher charakteristisch sind.
  • Transparente Kommunikation zwischen dem System und seinen Komponenten. Sie müssen sich keine Gedanken über komplexen Netzwerkcode machen - das System selbst findet das Nachrichtenziel und garantiert die Nachrichtenübermittlung (hier können Sie einen Witz über UDP und TCP einfügen).
  • Flexible Architektur, die automatisch vergrößert oder verkleinert werden kann. Unter Last kann das System beispielsweise zusätzliche Clusterknoten anheben und die Last gleichmäßig verteilen.

Das Thema Skalierung ist jedoch sehr umfangreich und verdient eine gesonderte Veröffentlichung. Daher werde ich nur auf die Funktion näher eingehen, die in allen Projekten nützlich sein wird:

Fehlerbehandlung


Akteure haben eine Hierarchie - sie kann als Baum dargestellt werden. Jeder Schauspieler hat einen Elternteil und kann „Kinder“ haben.

Bild
Akka.NET-Dokumentation Copyright 2013-2018 Akka.NET-Projekt

Für jeden Schauspieler können Sie eine Überwachungsstrategie festlegen - was zu tun ist, wenn für die „Kinder“ etwas schief geht. Schlagen Sie beispielsweise einen Schauspieler, der Probleme hat, und erstellen Sie dann einen neuen Schauspieler des gleichen Typs und vertrauen Sie ihm die gleiche Arbeit an.

Zum Beispiel habe ich eine Anwendung auf Akka.net CRUD erstellt, in der die Ebene der "Geschäftslogik" auf den Akteuren implementiert ist. Das Ziel dieses Projekts war es herauszufinden, ob Akteure in nicht skalierbaren Systemen eingesetzt werden sollten - werden sie das Leben verbessern oder mehr Schmerzen verursachen.

Wie die integrierte Fehlerbehandlung von Akka helfen kann:

Gif


  1. alles ist in Ordnung, die Anwendung funktioniert,
  2. Es ist etwas mit dem Repository passiert, und jetzt gibt es nur noch 1 von 5 Ergebnissen.
  3. Ich habe die Überwachungsstrategie auf "10 Mal pro Sekunde versuchen" eingestellt.
  4. Die Anwendung funktioniert wieder (wenn auch langsamer), und ich habe Zeit, um herauszufinden, was los ist.

Es besteht die Versuchung zu sagen: "Komm schon, ich schreibe solche Fehler selbst. Warum müssen einige Schauspieler einen Fehler machen?" Faire Bemerkung, aber nur, wenn es nur wenige Fehler gibt.

Und etwas Code. So sieht die Initialisierung des Akteursystems im IoC-Container aus:

public Container() { system = ActorSystem.Create("MySystem"); var echo = system.ActorOf<EchoActor>("Echo"); //stop initialization if something is wrong with actor system var alive = echo.Ask<bool>(true, TimeSpan.FromMilliseconds(100)).Result; container = new WindsorContainer(); //search for dependencies //register controllers //register ActorSystem propsResolver = new WindsorDependencyResolver(container, (ActorSystem)system); system.AddDependencyResolver(propsResolver); actorSystemWrapper = new ActorSystemWrapper(system, propsResolver); container.Register(Component.For<IActorRefFactory>().Instance(actorSystemWrapper)); container.Register(Component.For<IDependencyResolver>().Instance(propsResolver)); } 

EchoActor ist der einfachste Akteur, der dem Absender einen Wert zurückgibt:

  public class EchoActor : ReceiveActor { public EchoActor() { Receive<bool>(flag => { Sender.Tell(flag); }); } } 

Um die Akteure mit dem „normalen“ Code zu verbinden, wird der Befehl Ask verwendet:

  public async Task<ActionResult> Index() { ViewBag.Type = typeof(Model); var res = await CrudActorRef.Ask<IEnumerable<Model>>(DataMessage.GetAll<Model>(), maxDelay); return View(res); } 

Insgesamt


Mit den Schauspielern kichern kann ich sagen:

  • Schauen Sie sich diese an, wenn Sie Skalierbarkeit benötigen.
  • Für komplexe Geschäftslogik ist es besser, sie wegen nicht zu verwenden
    • seltsame Abhängigkeitsinjektion. Um einen Akteur mit den erforderlichen Abhängigkeiten zu initialisieren, müssen Sie zuerst ein Requisitenobjekt erstellen und es dann dem ActorSystem übergeben, um einen Akteur des gewünschten Typs zu erstellen. Zum Erstellen von Requisiten mit IoC-Containern (z. B. Castle Windsor oder Autofac) gibt es vorgefertigte Wrapper - DependencyResolvers. Ich war jedoch mit der Tatsache konfrontiert, dass der IoC-Container versuchte, die Abhängigkeitslebensdauer zu kontrollieren, und nach einer Weile fiel das System leise ab.

      * Vielleicht sollten Sie diese Abhängigkeit als untergeordneten Akteur platzieren, anstatt eine Abhängigkeit in ein Objekt einzufügen.
    • Tippprobleme. ActorRef weiß nichts über die Art des Schauspielers, auf den es sich bezieht. Das heißt, zur Kompilierungszeit ist nicht bekannt, ob ein Akteur eine Nachricht dieses Typs verarbeiten kann oder nicht.

Teil 2: Jetstreams


Kommen wir nun zu einem populäreren und nützlicheren Thema - Jet Flows. Wenn Sie sich während der Arbeit nie mit Schauspielern treffen können, sind Rx-Streams sowohl im Frontend als auch im Backend sicherlich nützlich. Ihre Implementierung erfolgt in fast allen modernen Programmiersprachen. Ich werde Beispiele für RxJs geben, da heutzutage sogar Backend-Programmierer manchmal etwas in JavaScript tun müssen.


Rx-Streams sind für alle gängigen Programmiersprachen verfügbar.

" Einführung in die reaktive Programmierung, die Sie vermisst haben " von Andre Staltz , lizenziert unter CC BY-NC 4.0

Um zu erklären, was Jetstream ist, beginne ich mit den Pull- und Push-Sammlungen.
Einzelner RückgabewertMehrere Rückgabewerte
Ziehen
Synchron
Interaktiv
T.IEnumerable <T>
Drücken Sie
Asynchron
Reaktiv
Aufgabe <T>IObservable <T>

Pull-Sammlungen sind das, was wir alle in der Programmierung gewohnt sind. Das auffälligste Beispiel ist ein Array.

 const arr = [1,2,3,4,5]; 

Es hat bereits Daten, er selbst wird diese Daten nicht ändern, aber er kann sie auf Anfrage geben.

 arr.forEach(console.log); 

Bevor Sie etwas mit den Daten tun, können Sie sie auch irgendwie verarbeiten.

 arr.map(i => i+1).map(I => “my number is ”+i).forEach(console.log); 

Stellen wir uns nun vor, dass sich anfangs keine Daten in der Sammlung befinden, aber es wird Sie definitiv darüber informieren, dass sie erschienen sind (Push). Gleichzeitig können wir die notwendigen Transformationen auf diese Sammlung anwenden.

Zum Beispiel:

 source.map(i => i+1).map(I => “my number is ”+i).forEach(console.log); 

Wenn in der Quelle ein Wert wie 1 angezeigt wird, gibt console.log "Meine Nummer ist 1" aus.

Wie es funktioniert:

Eine neue Entität wird angezeigt - Betreff (oder Beobachtbar):

 const observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); }); 

Dies ist eine Push-Sammlung, die Benachrichtigungen über Änderungen in ihrem Status sendet.

In diesem Fall erscheinen die Nummern 1, 2 und 3 sofort darin, in einer zweiten 4, und dann wird die Sammlung "beendet". Dies ist eine besondere Art von Veranstaltung.

Die zweite Entität ist Observer. Er kann Betreffereignisse abonnieren und mit den empfangenen Daten etwas unternehmen. Zum Beispiel:

 observable.subscribe(x => console.log(x)); observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), }); observable .map(x => 'This is ' + x) .subscribe(x => console.log(x)); 

Es ist ersichtlich, dass ein Betreff viele Abonnenten haben kann.

Es sieht einfach aus, aber es ist noch nicht klar, warum dies notwendig ist. Ich werde zwei weitere Definitionen geben, die Sie kennen müssen, wenn Sie mit reaktiven Flüssen arbeiten, und dann werde ich in der Praxis zeigen, wie sie funktionieren und in welchen Situationen ihr volles Potenzial offenbart wird.

Kalte Observablen


  • Benachrichtigen Sie über Ereignisse, wenn jemand sie abonniert.
  • Der gesamte Datenstrom wird unabhängig vom Zeitpunkt des Abonnements erneut an jeden Teilnehmer gesendet.
  • Daten werden für jeden Teilnehmer kopiert.

Was bedeutet das? Nehmen wir an, das Unternehmen (Betreff) hat beschlossen, die Verteilung der Geschenke zu arrangieren. Jeder Mitarbeiter (Beobachter) kommt zur Arbeit und erhält seine Kopie des Geschenks. Niemand bleibt beraubt.

Heiße Observable


  • Sie versuchen, das Ereignis unabhängig von der Anwesenheit von Abonnenten zu benachrichtigen. Wenn zum Zeitpunkt der Veranstaltung keine Abonnenten vorhanden waren, gehen die Daten verloren.

Beispiel: Am Morgen werden heiße Kuchen für Mitarbeiter ins Unternehmen gebracht. Wenn sie hereingebracht werden, fliegen alle Lerchen zum Geruch und machen die Kuchen zum Frühstück aus. Aber die Eulen, die später kamen, bekommen keine Kuchen mehr.

In welchen Situationen werden Jetstreams eingesetzt?


Wenn ein Datenstrom über die Zeit verteilt ist. Zum Beispiel Benutzereingaben. Oder protokolliert von jedem Dienst. In einem der Projekte sah ich einen selbst erstellten Logger, der Ereignisse in N Sekunden sammelte und dann gleichzeitig das gesamte Paket aufzeichnete. Der Batteriecode belegte die Seite. Wenn Rx-Streams verwendet würden, wäre dies viel einfacher:

Bild
RxJs Reference / Observable , Dokumentation lizenziert unter CC BY 4.0 .
(Es gibt viele Beispiele und Bilder, die erklären, was verschiedene Operationen mit reaktiven Strömungen bewirken.)

 source.bufferTime(2000).subsribe(doThings); 

Und schließlich ein Anwendungsbeispiel.

Erkennen von Mausgesten mit Rx-Streams


In der alten Oper oder ihrem spirituellen Nachfolger - Vivaldi - gab es eine Browsersteuerung mit Mausgesten.

Gif - Mausgesten in Vivaldi


Das heißt, Sie müssen Mausbewegungen nach oben / unten, rechts / links und Kombinationen davon erkennen. Es kann ohne Rx-Streams geschrieben werden, aber der Code ist komplex und schwer zu pflegen.

Und so sieht es mit Rx-Streams aus:


Ich beginne am Ende - Ich lege fest, welche Daten und in welchem ​​Format ich in der ursprünglichen Reihenfolge suchen werde:

 //gestures to look for const gestures = Rx.Observable.from([ { name: "Left", sequence: Rx.Observable.from([{ x: -1, y: 0 }]) }, { name: "Right", sequence: Rx.Observable.from([{ x: 1, y: 0 }]) }, { name: "Up", sequence: Rx.Observable.from([{ x: 0, y: -1 }]) }, { name: "Down", sequence: Rx.Observable.from([{ x: 0, y: 1 }]) }, { name: "Down+Up", sequence: Rx.Observable.from([{ x: 0, y: 1 }, { x: 0, y: -1 }]) }, { name: "Up+Right", sequence: Rx.Observable.from([{ x: 0, y: -1 }, { x: 1, y: 0 }]) } ]); 

Dies sind Einheitsvektoren und ihre Kombinationen.

Als Nächstes müssen Sie die Mausereignisse in Rx-Streams konvertieren. Alle Rx-Bibliotheken verfügen über integrierte Tools, mit denen Standardereignisse in Observables umgewandelt werden können.

 const mouseMoves = Rx.Observable.fromEvent(canvas, 'mousemove'), mouseDowns = Rx.Observable.fromEvent(canvas, 'mousedown'), mouseUps = Rx.Observable.fromEvent(canvas, 'mouseup'); 

Als nächstes gruppiere ich die Koordinaten der Maus nach 2 und finde ihren Unterschied, wobei ich den Mausversatz erhalte.

 const mouseDiffs = mouseMoves .map(getOffset) .pairwise() .map(pair => { return { x: pair[1].x-pair[0].x, y: pair[1].y-pair[0].y } }); 

Und gruppieren Sie diese Bewegungen mit den Ereignissen 'Mousedown' und 'Mouseup'.

 const mouseGestures = mouseDiffs .bufferToggle(mouseDowns, x => mouseUps) .map(concat); 

Die Concat-Funktion schneidet zu kurze Bewegungen aus und gruppiert Bewegungen, die grob in Richtung ausgerichtet sind.

 function concat(values) {//summarize move in same direction return values.reduce((a, v) => { if (!a.length) { a.push(v); } else { const last = a[a.length - 1]; const lastAngle = Math.atan2(last.x, last.y); const angle = Math.atan2(vx, vy); const angleDiff = normalizeAngle(angle - lastAngle); const dist = Math.hypot(vx, vy); if (dist < 1) return a;//move is too short – ignore //moving in same direction => adding vectors if (Math.abs(angleDiff) <= maxAngleDiff) { last.x += vx; last.y += vy; } else { a.push(v); } } return a; }, []); } 

Wenn die Bewegung auf der X- oder Y-Achse zu kurz ist, wird sie auf Null zurückgesetzt. Und dann bleibt nur das Vorzeichen von den erhaltenen Verschiebungskoordinaten übrig. Somit werden die Einheitsvektoren erhalten, nach denen wir gesucht haben.

 const normalizedMouseGestures = mouseGestures.map(arr => arr.map(v => { const dist = Math.hypot(vx, vy);//length of vector vx = Math.abs(vx) > minMove && Math.abs(vx) * treshold > dist ? vx : 0; vy = Math.abs(vy) > minMove && Math.abs(vy) * treshold > dist ? vy : 0; return v; }) ).map(arr => arr .map(v => { return { x: Math.sign(vx), y: Math.sign(vy) }; }) .filter(v => Math.hypot(vx, vy) > 0) ); 

Ergebnis:

 gestures.map(gesture => normalizedMouseGestures.mergeMap( moves => Rx.Observable.from(moves) .sequenceEqual(gesture.sequence, comparer) ).filter(x => x).mapTo(gesture.name) ).mergeAll().subscribe(gestureName => actions[gestureName]()); 

Mit sequenceEqual können Sie die empfangenen Bewegungen mit den ursprünglichen Bewegungen vergleichen und bei Übereinstimmung eine bestimmte Aktion ausführen.

Gif


Hier können Sie mit Gesten spielen

Bitte beachten Sie, dass neben der Gestenerkennung auch eine Zeichnung der anfänglichen und normalisierten Mausbewegungen auf der HTML-Zeichenfläche angezeigt wird. Die Lesbarkeit des Codes leidet nicht darunter.

Daraus ergibt sich ein weiterer Vorteil: Die mit Hilfe von Rx-Streams geschriebene Funktionalität kann einfach ergänzt und erweitert werden.

Zusammenfassung


  • Bibliotheken mit Rx-Streams sind für fast alle Programmiersprachen verfügbar.
  • Rx-Streams sollten verwendet werden, wenn ein Stream von Ereignissen über die Zeit verteilt ist (z. B. Benutzereingaben).
  • Mit Rx-Streams geschriebene Funktionen können einfach ergänzt und erweitert werden.
  • Ich habe keine wesentlichen Mängel festgestellt.

Source: https://habr.com/ru/post/de432004/


All Articles