Introduction à la programmation réactive

Bonjour Dans cet article je galoperai à travers l'Europe, à savoir, je vais vous dire ce qu'ils entendent par programmation réactive, vous présenter les acteurs, les flux réactifs, et enfin, en utilisant les flux réactifs, nous reconnaîtrons les gestes de la souris, comme dans l'ancien Opéra et son successeur spirituel - Vivaldi .

L'objectif est d'introduire les concepts de base de la programmation réactive et de montrer que tout n'est pas aussi compliqué et effrayant que cela puisse paraître à première vue.

image
Source

Qu'est-ce que la programmation réactive?


Pour répondre à cette question, nous nous tournons vers le site . Il a une belle image qui montre 4 critères principaux auxquels les applications réactives doivent répondre.

image

L'application doit être rapide, tolérante aux pannes et bien évolutive.
Il semble que «nous sommes pour tous bons contre tous mauvais», non?

Que signifient ces mots:

  1. Réactivité

    L'application doit donner à l'utilisateur le résultat en une demi-seconde. Cela inclut également le principe de l'échec rapide - c'est-à-dire qu'en cas de problème, il est préférable de renvoyer à l'utilisateur un message d'erreur tel que «Désolé, il y a eu un problème. Réessayez plus tard que d'attendre le temps au bord de la mer. Si l'opération est longue, nous montrons à l'utilisateur une barre de progression. S'il est très long - «votre demande sera satisfaite provisoirement le 18 mars 2042. Nous vous enverrons une notification par la poste. "
  2. L'évolutivité est un moyen de fournir une réactivité sous charge. Imaginez le cycle de vie d'un service relativement réussi:
    1. Lancement - le flux de demandes est petit, le service s'exécute sur une machine virtuelle avec un cœur.
    2. Le flux de demandes augmente - les noyaux sont ajoutés à la machine virtuelle et les demandes sont traitées dans plusieurs threads.
    3. Encore plus de charge - nous connectons le traitement par lots - les demandes à la base de données et au disque dur sont regroupées.
    4. Encore plus de charge - vous devez augmenter le nombre de serveurs et fournir du travail dans le cluster.
      Idéalement, le système lui-même devrait augmenter ou diminuer en fonction de la charge.
  3. Tolérance aux pannes

    Nous acceptons que nous vivons dans un monde imparfait et tout se passe. En cas de problème dans notre système, nous devons fournir des méthodes de gestion des erreurs et de récupération
  4. Et enfin, nous sommes invités à réaliser tout cela en utilisant un système dont l'architecture est basée sur la messagerie pilotée par les messages

Avant de continuer, je veux m'attarder sur la différence entre les systèmes pilotés par les événements et les systèmes pilotés par les messages.

Evénementiel:

  • Événement - le système signale qu'il a atteint un certain état.
  • Il peut y avoir de nombreux abonnés à l'événement.
  • La chaîne d'événements est généralement courte et les gestionnaires d'événements sont proches (physiquement et en code) de la source.
  • La source d'événement et ses gestionnaires ont généralement un état commun (physiquement - ils utilisent le même morceau de RAM pour l'échange d'informations).

Contrairement à l'événementiel, dans un système axé sur les messages:

  • Chaque message n'a qu'un seul destinataire.
  • Les messages sont immuables: vous ne pouvez rien changer dans le message reçu afin que l'expéditeur en soit informé et puisse lire les informations.
  • Les éléments du système répondent (ou ne répondent pas) à la réception de messages et peuvent envoyer des messages à d'autres éléments du système.

Tout cela nous offre

Modèle d'acteur


Jalons de développement:

  • La première mention d'acteurs est dans un article scientifique de 1973 - Carl Hewitt, Peter Bishop et Richard Steiger, «Un formalisme universel modulaire ACTOR pour l'intelligence artificielle»,
  • 1986 - Erlang apparaît. Ericson avait besoin d'un langage pour les équipements de télécommunications qui fournirait une tolérance aux pannes et une propagation sans erreur. Dans le cadre de cet article, ses principales caractéristiques sont:

    • Tout est un processus
    • Les messages sont le seul moyen de communication (Erlang est un langage fonctionnel et les messages qu'il contient sont immuables).
  • ..
  • 2004 - la première version de la langue Scala. Ses caractéristiques:
    • Propulsé par JVM,
    • Fonctionnel
    • Pour le multi-threading, un modèle d'acteur a été sélectionné.

  • 2009 - la mise en œuvre des acteurs a été allouée dans une bibliothèque séparée - Akka
  • 2014 - Akka.net - il a été porté sur .Net.

Que peuvent faire les acteurs?


Les acteurs sont les mêmes objets, mais:

  • Contrairement aux objets ordinaires, les acteurs ne peuvent pas s'appeler les uns les autres.
  • Les acteurs ne peuvent transmettre des informations que par le biais de messages immuables .
  • Dès réception du message, l'acteur peut
    • Créer de nouveaux acteurs (ils seront plus bas dans la hiérarchie),
    • Envoyer des messages à d'autres acteurs,
    • Arrêtez les acteurs ci-dessous dans la hiérarchie et vous-même.

Regardons un exemple.

image

L'acteur A veut envoyer un message à l'acteur B. Tout ce qu'il a, c'est ActorRef (une adresse). L'acteur B peut être n'importe où.
L'acteur A envoie une lettre B via le système (ActorSystem). Le système place la lettre dans la boîte aux lettres de l'acteur B et «réveille» l'acteur B. L'acteur B prend la lettre de la boîte aux lettres et fait quelque chose.

Comparé aux méthodes d'appel sur un autre objet, cela semble inutilement compliqué, mais le modèle des acteurs s'intègre parfaitement dans le monde réel, si vous imaginez que les acteurs sont des personnes formées à faire quelque chose en réponse à certains stimuli.

Imaginez un père et un fils:



Le père envoie son fils SMSku «Nettoyer dans la chambre» et continue de faire son propre truc. Le fils lit SMSku et commence le nettoyage. Père, quant à lui, joue au poker. Le fils termine le nettoyage et envoie un SMS "Terminer". Ça a l'air simple, non?

Imaginez maintenant que le père et le fils ne sont pas des acteurs, mais des objets ordinaires qui peuvent tirer les méthodes l'un de l'autre. Le père tire son fils pour la méthode «nettoyer la chambre» et le suit sur ses talons, attendant que le fils termine le nettoyage et transfère le contrôle à son père. Père ne peut pas jouer au poker pour le moment. Dans ce contexte, le modèle d'acteur devient plus attractif.

Passons maintenant à

Akka.NET


Tout ce qui est écrit ci-dessous est vrai pour l'Akka d'origine pour la JVM, mais pour moi, C # est plus proche que Java, donc je vais utiliser Akka.NET comme exemple.

Quels sont donc les avantages d'Akka?


  • Multithreading via la messagerie. Vous n'avez plus à souffrir de toutes sortes de verrous, sémaphores, mutex et autres charmes caractéristiques du multithreading classique avec mémoire partagée.
  • Communication transparente entre le système et ses composants. Pas besoin de vous soucier du code réseau complexe - le système lui-même trouvera la destination du message et garantira la livraison du message (ici vous pouvez insérer une blague sur UDP vs TCP).
  • Architecture flexible pouvant évoluer automatiquement vers le haut ou vers le bas. Par exemple, sous charge, le système peut augmenter les nœuds de cluster supplémentaires et répartir uniformément la charge.

Mais le sujet de la mise à l'échelle est très vaste et mérite une publication séparée. Par conséquent, je ne parlerai plus en détail que de la fonctionnalité, qui sera utile dans tous les projets:

Gestion des erreurs


Les acteurs ont une hiérarchie - elle peut être représentée comme un arbre. Chaque acteur a un parent et peut avoir des «enfants».

image
Documentation Akka.NET Copyright 2013-2018 Projet Akka.NET

Pour chaque acteur, vous pouvez définir une stratégie de supervision - que faire si quelque chose ne va pas pour les «enfants». Par exemple, «battre» un acteur qui a des problèmes, puis créer un nouvel acteur du même type et lui confier le même travail.

Par exemple, j'ai réalisé une application sur Akka.net CRUD, dans laquelle la couche de "logique métier" est implémentée sur les acteurs. L'objectif de ce projet était de savoir si les acteurs devaient être utilisés dans des systèmes non évolutifs - rendront-ils la vie meilleure ou ajouteront-ils plus de douleur.

Comment la gestion intégrée des erreurs d'Akka peut aider:

Gif


  1. tout va bien, l'application fonctionne,
  2. quelque chose est arrivé au référentiel, et maintenant il ne donne le résultat qu'une seule fois sur 5,
  3. J'ai défini la stratégie de supervision pour "essayer 10 fois par seconde",
  4. l'application fonctionne à nouveau (quoique plus lentement), et j'ai le temps de comprendre quel est le problème.

Il y a une tentation de dire: "Allez, je vais écrire moi-même une telle erreur, pourquoi certains acteurs doivent-ils faire une erreur?" Bonne remarque, mais seulement si les points d'échec sont peu nombreux.

Et du code. Voici à quoi ressemble l'initialisation du système d'acteur dans le conteneur IoC:

public Container() { system = ActorSystem.Create("MySystem"); var echo = system.ActorOf<EchoActor>("Echo"); //stop initialization if something is wrong with actor system var alive = echo.Ask<bool>(true, TimeSpan.FromMilliseconds(100)).Result; container = new WindsorContainer(); //search for dependencies //register controllers //register ActorSystem propsResolver = new WindsorDependencyResolver(container, (ActorSystem)system); system.AddDependencyResolver(propsResolver); actorSystemWrapper = new ActorSystemWrapper(system, propsResolver); container.Register(Component.For<IActorRefFactory>().Instance(actorSystemWrapper)); container.Register(Component.For<IDependencyResolver>().Instance(propsResolver)); } 

EchoActor est l'acteur le plus simple qui renvoie une valeur à l'expéditeur:

  public class EchoActor : ReceiveActor { public EchoActor() { Receive<bool>(flag => { Sender.Tell(flag); }); } } 

Pour connecter les acteurs avec le code «normal», la commande Ask est utilisée:

  public async Task<ActionResult> Index() { ViewBag.Type = typeof(Model); var res = await CrudActorRef.Ask<IEnumerable<Model>>(DataMessage.GetAll<Model>(), maxDelay); return View(res); } 

Total


En ricanant avec les acteurs, je peux dire:

  • Regardez-les si vous avez besoin d'évolutivité.
  • Pour une logique métier complexe, il vaut mieux ne pas les utiliser car
    • injection de dépendance étrange. Pour initialiser un acteur avec les dépendances nécessaires, vous devez d'abord créer un objet Props, puis le donner à ActorSystem pour créer un acteur du type souhaité. Pour créer des accessoires à l'aide de conteneurs IoC (par exemple Castle Windsor ou Autofac), il existe des wrappers prêts à l'emploi - DependencyResolvers. Mais j'étais confronté au fait que le conteneur IoC essayait de contrôler la durée de vie de la dépendance, et après un certain temps, le système est tombé en panne.

      * Peut-être, au lieu d'injecter une dépendance dans un objet, vous devriez placer cette dépendance en tant qu'acteur enfant.
    • problèmes de frappe. ActorRef ne sait rien du type d'acteur auquel il se réfère. Autrement dit, au moment de la compilation, on ne sait pas si un acteur peut traiter un message de ce type ou non.

Partie 2: Jet streams


Passons maintenant à un sujet plus populaire et utile - les flux de jets. Si vous ne pouvez jamais rencontrer des acteurs en cours de travail, les flux Rx seront certainement utiles à la fois dans le frontend et dans le backend. Leur implémentation est dans presque tous les langages de programmation modernes. Je vais donner des exemples sur RxJs, car de nos jours même les programmeurs back-end doivent parfois faire quelque chose en JavaScript.


Les flux Rx sont disponibles pour tous les langages de programmation populaires.

« Introduction à la programmation réactive que vous avez manquée » par Andre Staltz , sous licence CC BY-NC 4.0

Pour expliquer ce qu'est le jet stream, je vais commencer par les collections pull and push.
Valeur de retour uniqueValeurs de retour multiples
Tirer
Synchrone
Interactif
TIEnumerable <T>
Poussez
Asynchrone
Réactif
Tâche <T>IObservable <T>

Les collections de tirages sont ce à quoi nous sommes tous habitués en programmation. L'exemple le plus frappant est un tableau.

 const arr = [1,2,3,4,5]; 

Il dispose déjà de données, il ne modifiera pas lui-même ces données, mais il peut les fournir sur demande.

 arr.forEach(console.log); 

De plus, avant de faire quelque chose avec les données, vous pouvez en quelque sorte les traiter.

 arr.map(i => i+1).map(I => “my number is ”+i).forEach(console.log); 

Imaginons maintenant qu'au départ, il n'y ait pas de données dans la collection, mais cela vous informera certainement qu'elles sont apparues (Push). Et en même temps, nous pouvons toujours appliquer les transformations nécessaires à cette collection.

Par exemple:

 source.map(i => i+1).map(I => “my number is ”+i).forEach(console.log); 

Lorsqu'une valeur telle que 1 apparaît dans la source, console.log affichera «mon numéro est 1».

Comment ça marche:

Une nouvelle entité apparaît - Sujet (ou observable):

 const observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); }); 

Il s'agit d'une collection push qui enverra des notifications sur les modifications de son état.

Dans ce cas, les numéros 1, 2 et 3 y apparaîtront immédiatement, dans un second 4, puis la collection «se terminera». Il s'agit d'un type d'événement si particulier.

La deuxième entité est Observer. Il peut s'abonner aux événements Sujet et faire quelque chose avec les données reçues. Par exemple:

 observable.subscribe(x => console.log(x)); observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), }); observable .map(x => 'This is ' + x) .subscribe(x => console.log(x)); 

On peut voir qu'un sujet peut avoir plusieurs abonnés.

Cela semble facile, mais on ne sait pas encore pourquoi cela est nécessaire. Je donnerai 2 définitions supplémentaires que vous devez connaître lorsque vous travaillez avec des flux réactifs, puis je montrerai en pratique comment ils fonctionnent et dans quelles situations leur plein potentiel est révélé.

Observables à froid


  • Avertissez des événements lorsque quelqu'un s'y abonne.
  • L'ensemble du flux de données est envoyé à nouveau à chaque abonné, quel que soit le moment de l'abonnement.
  • Les données sont copiées pour chaque abonné.

Qu'est-ce que cela signifie: disons que l'entreprise (Sujet) a décidé d'organiser la distribution de cadeaux. Chaque employé (Observateur) vient travailler et reçoit sa copie du cadeau. Personne ne reste privé.

Observables chauds


  • Ils essaient de notifier l'événement indépendamment de la présence d'abonnés. Si au moment de l'événement il n'y avait pas d'abonnés, les données sont perdues.

Exemple: le matin, des petits pains chauds pour les employés sont apportés à l'entreprise. Lorsqu'elles sont introduites, toutes les alouettes volent à l'odeur et distinguent les tartes pour le petit déjeuner. Mais les hiboux qui sont venus plus tard n’ont plus de tartes.

Dans quelles situations les jet streams sont-ils utilisés?


Lorsqu'il existe un flux de données réparti dans le temps. Par exemple, entrée utilisateur. Ou les journaux de n'importe quel service. Dans l'un des projets, j'ai vu un enregistreur self-made qui collectait les événements en N secondes, puis enregistrait simultanément l'ensemble du pack. Le code de la batterie occupait la page. Si des flux Rx étaient utilisés, ce serait beaucoup plus simple:

image
« RxJs Reference / Observable , documentation sous licence CC BY 4.0 .
(il existe de nombreux exemples et images expliquant ce que font différentes opérations avec des flux réactifs)

 source.bufferTime(2000).subsribe(doThings); 

Et enfin, un exemple d'utilisation.

Reconnaître les gestes de la souris avec les flux Rx


Dans l'ancien Opéra ou son successeur spirituel - Vivaldi - il y avait un contrôle de navigateur utilisant des gestes de souris.

Gif - gestes de la souris dans Vivaldi


Autrement dit, vous devez reconnaître les mouvements de la souris vers le haut / le bas, la droite / la gauche et leurs combinaisons. Il peut être écrit sans flux Rx, mais le code sera complexe et difficile à maintenir.

Et voici à quoi cela ressemble avec les flux Rx:


Je vais commencer par la fin - je vais définir quelles données et dans quel format je vais rechercher dans la séquence d'origine:

 //gestures to look for const gestures = Rx.Observable.from([ { name: "Left", sequence: Rx.Observable.from([{ x: -1, y: 0 }]) }, { name: "Right", sequence: Rx.Observable.from([{ x: 1, y: 0 }]) }, { name: "Up", sequence: Rx.Observable.from([{ x: 0, y: -1 }]) }, { name: "Down", sequence: Rx.Observable.from([{ x: 0, y: 1 }]) }, { name: "Down+Up", sequence: Rx.Observable.from([{ x: 0, y: 1 }, { x: 0, y: -1 }]) }, { name: "Up+Right", sequence: Rx.Observable.from([{ x: 0, y: -1 }, { x: 1, y: 0 }]) } ]); 

Ce sont des vecteurs unitaires et leurs combinaisons.

Ensuite, vous devez convertir les événements de souris en flux Rx. Toutes les bibliothèques Rx ont des outils intégrés pour transformer des événements standard en observables.

 const mouseMoves = Rx.Observable.fromEvent(canvas, 'mousemove'), mouseDowns = Rx.Observable.fromEvent(canvas, 'mousedown'), mouseUps = Rx.Observable.fromEvent(canvas, 'mouseup'); 

Ensuite, je regroupe les coordonnées de la souris par 2 et trouve leur différence, obtenant le décalage de la souris.

 const mouseDiffs = mouseMoves .map(getOffset) .pairwise() .map(pair => { return { x: pair[1].x-pair[0].x, y: pair[1].y-pair[0].y } }); 

Et regroupez ces mouvements en utilisant les événements «mousedown» et «mouseup».

 const mouseGestures = mouseDiffs .bufferToggle(mouseDowns, x => mouseUps) .map(concat); 

La fonction concat supprime les mouvements trop courts et regroupe les mouvements à peu près alignés en direction.

 function concat(values) {//summarize move in same direction return values.reduce((a, v) => { if (!a.length) { a.push(v); } else { const last = a[a.length - 1]; const lastAngle = Math.atan2(last.x, last.y); const angle = Math.atan2(vx, vy); const angleDiff = normalizeAngle(angle - lastAngle); const dist = Math.hypot(vx, vy); if (dist < 1) return a;//move is too short – ignore //moving in same direction => adding vectors if (Math.abs(angleDiff) <= maxAngleDiff) { last.x += vx; last.y += vy; } else { a.push(v); } } return a; }, []); } 

Si le mouvement sur l'axe X ou Y est trop court, il est remis à zéro. Et puis seul le signe reste des coordonnées de déplacement obtenues. Ainsi, les vecteurs unitaires que nous recherchions sont obtenus.

 const normalizedMouseGestures = mouseGestures.map(arr => arr.map(v => { const dist = Math.hypot(vx, vy);//length of vector vx = Math.abs(vx) > minMove && Math.abs(vx) * treshold > dist ? vx : 0; vy = Math.abs(vy) > minMove && Math.abs(vy) * treshold > dist ? vy : 0; return v; }) ).map(arr => arr .map(v => { return { x: Math.sign(vx), y: Math.sign(vy) }; }) .filter(v => Math.hypot(vx, vy) > 0) ); 

Résultat:

 gestures.map(gesture => normalizedMouseGestures.mergeMap( moves => Rx.Observable.from(moves) .sequenceEqual(gesture.sequence, comparer) ).filter(x => x).mapTo(gesture.name) ).mergeAll().subscribe(gestureName => actions[gestureName]()); 

En utilisant sequenceEqual, vous pouvez comparer les mouvements reçus avec ceux d'origine et, s'il y a une correspondance, effectuer une certaine action.

Gif


Vous pouvez jouer avec des gestes ici

Veuillez noter qu'en plus de la reconnaissance des gestes, il existe également un dessin des mouvements initiaux et normalisés de la souris sur le canevas HTML. La lisibilité du code n'en souffre pas.

D'où un avantage supplémentaire: les fonctionnalités écrites à l'aide de flux Rx peuvent être facilement complétées et étendues.

Résumé


  • Les bibliothèques avec des flux Rx sont disponibles pour presque tous les langages de programmation.
  • Les flux Rx doivent être utilisés lorsqu'il existe un flux d'événements étalés dans le temps (par exemple, entrée utilisateur).
  • Les fonctionnalités écrites à l'aide de flux Rx peuvent être facilement complétées et étendues.
  • Je n'ai trouvé aucun défaut significatif.

Source: https://habr.com/ru/post/fr432004/


All Articles