WebSockets en angulaire. Partie 2. Solutions de produits

image

Dans un article précédent, nous avons parlé d'une solution générale pour les sockets Web dans Angular, où nous avons construit un bus avec reconnexion et service à utiliser dans les composants basés sur WebSocketSubject. Une telle implémentation convient à la plupart des cas simples, par exemple, la réception et l'envoi de messages de discussion, etc., mais ses capacités peuvent ne pas être suffisantes lorsque vous devez créer quelque chose de plus flexible et contrôlé. Dans cet article, je vais révéler certaines fonctionnalités lorsque vous travaillez avec des sockets Web et parler des exigences que vous avez vous-même rencontrées et, éventuellement, que vous rencontrerez.

Souvent, dans les grands projets à forte fréquentation, le front-end fait face à des tâches qui, dans d'autres circonstances, sont plus courantes à voir sur le back-end. Dans des conditions d'austérité des ressources du serveur, une partie des problèmes migre vers le territoire frontal, pour cette raison un maximum d'extensibilité et de contrôle est prévu dans le projet.

Voici une liste des exigences de base pour un client de socket Web qui seront traitées dans cet article:

  • Reconnexion «intelligente» automatique;
  • Mode débogage;
  • Système d'abonnement aux événements basé sur RxJs;
  • Réception et analyse des données binaires;
  • Projection (cartographie) des informations reçues sur le modèle;
  • Contrôle des changements de modèle à mesure que de nouveaux événements arrivent;
  • Ignorez les événements arbitraires et annulez ignorer.

Examinez chaque élément plus en détail.

Reconnecter / déboguer


J'ai écrit sur la reconnexion dans un article précédent, donc je vais juste citer une partie du texte:

La reconnexion, ou l'organisation de la reconnexion au serveur, est un facteur primordial lorsque vous travaillez avec des sockets Web, comme les ruptures de réseau, les pannes de serveur ou d'autres erreurs provoquant une rupture de connexion peuvent entraîner le blocage de l'application.
Il est important de noter que les tentatives de reconnexion ne devraient pas être trop fréquentes et ne devraient pas se poursuivre indéfiniment, car ce comportement peut suspendre le client.

La socket Web elle-même ne sait pas comment se reconnecter lorsqu'elle est déconnectée. Par conséquent, si le serveur a redémarré ou si le serveur est tombé en panne, ou si l'utilisateur s'est reconnecté à Internet, pour continuer à travailler, vous devez également reconnecter la prise Web.

Dans cet article, pour la reconnexion et le débogage, nous utiliserons Reconnecting WebSocket , qui contient les fonctionnalités nécessaires et d'autres options, telles que la modification de l'URL du socket Web entre les reconnexions, le choix d'un constructeur arbitraire WebSocket, etc. D'autres alternatives conviennent également. Se reconnecter de l'article précédent ne convient pas, car Il est écrit sous WebSocketSubject, qui cette fois ne s'applique pas.

Système d'abonnement aux événements RxJs


Pour utiliser les sockets Web dans les composants, vous devez vous abonner aux événements et vous désabonner d'eux si nécessaire. Pour ce faire, utilisez le modèle de conception populaire Pub / Sub .
"Publisher-subscriber (eng. Publisher-subscriber or eng. Pub / sub) - un modèle de conception comportementale pour la transmission de messages dans lequel les expéditeurs de messages, appelés éditeurs (eng. Publishers), ne sont pas directement liés au code de programme pour envoyer des messages aux abonnés (eng. Abonnés ) Au lieu de cela, les messages sont divisés en classes et ne contiennent pas d'informations sur leurs abonnés, le cas échéant. De même, les abonnés traitent une ou plusieurs classes de messages, en faisant abstraction d'éditeurs spécifiques. »

L'abonné ne contacte pas directement l'éditeur, mais via le bus intermédiaire - le service de site Web. Il devrait également être possible de s'abonner à plusieurs événements avec le même type de retour. Chaque abonnement crée son propre objet, qui est ajouté à l'objet écouteurs, ce qui vous permet d'adresser des événements de socket Web aux abonnements nécessaires. Lorsque vous travaillez avec RxJs Subject, il y a quelques difficultés avec la désinscription, par conséquent, nous allons créer un simple garbage collector qui supprimera les sous-projets de l'objet listeners lorsqu'ils n'ont pas d'observateurs.

Réception et analyse des données binaires


WebSocket prend en charge le transfert de données binaires, de fichiers ou de flux, qui est souvent utilisé dans les grands projets. Cela ressemble à ceci:
0x80, <longueur - un ou plusieurs octets>, <corps du message>

Afin de ne pas créer de restrictions sur la longueur du message transmis et en même temps de ne pas dépenser les octets de manière irrationnelle, les développeurs du protocole ont utilisé l'algorithme suivant. Chaque octet dans l'indication de longueur est considéré séparément: le plus élevé indique s'il s'agit du dernier octet (0) ou des autres (1) qui le suivent, et les 7 bits inférieurs contiennent les données transmises. Par conséquent, lorsque le signe de la trame de données binaires 0x80 apparaît, alors l'octet suivant est pris et stocké dans une "tirelire" distincte. Ensuite, l'octet suivant, s'il a le bit le plus significatif, est également transféré vers la «tirelire» et ainsi de suite jusqu'à ce que l'octet avec le bit le plus significatif zéro soit rencontré. Cet octet est le dernier de l'indicateur de longueur et est également ajouté à la «tirelire». Maintenant, les bits hauts sont supprimés des octets de la tirelire et le reste est combiné. Ce sera la longueur du corps du message - des nombres de 7 bits sans le bit le plus significatif.

Le mécanisme d'analyse et de flux binaire frontal est complexe et est associé au mappage de données sur le modèle. Un article séparé peut y être consacré. Cette fois, nous analyserons une option simple et laisserons des cas difficiles pour les publications suivantes, s'il y a un intérêt pour le sujet.

Projection (cartographie) des informations reçues sur le modèle


Quel que soit le type de transmission reçue, il est nécessaire de lire et de modifier en toute sécurité. Il n'y a pas de consensus sur la façon de mieux faire cela, j'adhère à la théorie d' un modèle de données , car je le considère logique et fiable pour la programmation en style POO.
«Un modèle de données est une définition logique abstraite, autosuffisante d'objets, d'opérateurs et d'autres éléments qui constituent ensemble une machine d'accès aux données abstraite avec laquelle l'utilisateur interagit. "Ces objets vous permettent de modéliser la structure des données et les opérateurs - le comportement des données."

Toutes sortes de pneus populaires qui ne donnent pas une idée d'un objet en tant que classe dans laquelle le comportement, la structure, etc. sont définis créent de la confusion, sont moins bien contrôlés et parfois envahis par quelque chose qui ne leur est pas typique. Par exemple, une classe de chiens devrait décrire un chien dans toutes les conditions. Si le chien est perçu comme un ensemble de champs: queue, couleur, visage, etc., alors le chien peut développer une patte supplémentaire et un autre chien apparaîtra à la place de la tête.

image

Contrôle des changements de modèle à mesure que de nouveaux événements arrivent


Dans ce paragraphe, je décrirai le problème que j'ai rencontré en travaillant sur l'interface web de l'application mobile de paris sportifs. L'API d'application a fonctionné via les sockets Web via lesquelles ils ont reçu: mise à jour des cotes, ajout et suppression de nouveaux types de paris, notifications sur le début ou la fin d'un match, etc. - total environ trois cents événements de la prise Web. Pendant le match, les paris et les informations sont constamment mis à jour, parfois 2 à 3 fois par seconde, le problème était qu'après eux, l'interface était mise à jour sans contrôle intermédiaire.

Lorsque l'utilisateur surveillait l'enchère à partir d'un appareil mobile, et en même temps que les listes étaient mises à jour sur son écran, l'enchère disparaissait du champ de vision, de sorte que l'utilisateur devait à nouveau rechercher l'enchère suivie. Ce comportement a été répété pour chaque mise à jour.

image

La solution nécessitait l'immuabilité pour les objets qui étaient affichés à l'écran, mais en même temps les coefficients de pari devaient changer, les offres non pertinentes devenaient inactives et de nouvelles n'étaient ajoutées que lorsque l'utilisateur faisait défiler l'écran. Les options obsolètes n'étaient pas stockées sur le backend, donc ces lignes devaient être mémorisées et marquées avec l'indicateur «supprimé», pour lequel une banque de données intermédiaire a été créée entre le site Web et l'abonnement, ce qui a assuré le contrôle des modifications.

Dans le nouveau service, nous allons également créer une couche de substitution et cette fois-ci, nous utiliserons Dexie.js - un wrapper sur l'API IndexedDB, mais toute autre base de données virtuelle ou de navigateur fera l'affaire. Redux est acceptable.

Ignorer les événements arbitraires et annuler ignorer


Dans la même entreprise, il existe souvent plusieurs projets du même type à la fois: versions mobile et Web, versions avec des paramètres différents pour différents groupes d'utilisateurs, versions étendues et tronquées de la même application.

Souvent, ils utilisent tous une seule base de code, vous devez donc parfois désactiver les événements inutiles lors de l'exécution ou pendant DI sans supprimer l'abonnement et le réactiver, c'est-à-dire ignorer certains d'entre eux, afin de ne pas traiter les événements inutiles. Il s'agit d'une fonctionnalité simple mais utile qui ajoute de la flexibilité au bus Pub / Sub.

Commençons par une description des interfaces:

export interface IWebsocketService { //    addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { //   DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { //   Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { //    [topic: string]: ITopic<any>; } export interface IBuffer { //    ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { //   id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; //  callMessage   

Nous hériterons de Subject pour créer un garbage collector:

 export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, //    private topic: string, //   private id: string // id  ) { super(); } /* *   next, *       , *   garbageCollect */ public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect(); //   } } } /* * garbage collector * */ private garbageCollect(): void { delete this.listeners[this.topic][this.id]; //  Subject if (!Object.keys(this.listeners[this.topic]).length) { //    delete this.listeners[this.topic]; } } } 

Contrairement à l'implémentation précédente, websocket.events.ts fera partie du module de socket Web

 export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } }; 

Pour configurer lors de la connexion du module, créez websocket.config:

 import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket'); 

Créez un modèle pour le proxy:

 import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { //    Dexie  typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); //   this.version(1).stores({ //   messages: '++id,text' }); } } 

Un analyseur de modèle simple, en conditions réelles, il est préférable de le diviser en plusieurs fichiers:

 export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /*  */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /*   */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* ,    */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); //   IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); //    } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); //     } } else { console.log(`[${Date()}] Buffer is "undefined"`); } }; 

WebsocketModule:

 @NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } } 

Commençons par créer un service:

 private listeners: IListeners; //   private uniqueId: number; //   id  private websocket: ReconnectingWebSocket; //   constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; //  this.connect(); } ngOnDestroy() { this.websocket.close(); //     } 

Méthode de connexion:

 private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, //  ,    maxRetries: 10, //  ,    ...this.wsConfig.options }; //  this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { //   console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { //   console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { //   console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { //     this.onMessage(event); }); setInterval(() => { //    this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); } 

Dupliquez le garbage collector, vérifiera les abonnements par timeout:

 private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; //  Subject    if (!subject.observers.length) { delete topic[key]; } } }  ,   if (!Object.keys(topic).length) { delete this.listeners[event]; } } } } 

Nous regardons dans quel abonnement envoyer l'événement:

 private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/'); //      const isMessage = keys.includes(message.event); const model = modelParser(message); //     if (isMessage && typeof model !== 'undefined') { model.then((data: ITopicDataType) => { //   Subject this.callMessage<ITopicDataType>(topic, data); }); } } } } 

Événement casque dans Sujet:

 private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { //   subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } } 

Créer un sujet Pub / Sub:

 private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; //  id   const hash = sha256.hex(key); // SHA256-   id  if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); } 

Abonnement à un ou plusieurs événements:

 public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { //       const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } } 

Tout ici est intentionnellement simplifié, mais peut être converti en entités binaires, comme c'est le cas avec le serveur. Envoi de commandes au serveur:

 public sendMessage(event: string, data: any = {}): void { //   ,      if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } } 

Ajouter des événements à ignorlist lors de l'exécution:

 public runtimeIgnore(topics: string[]): void { if (topics && topics.length) { //    this.wsConfig.ignore.push(...topics); } } 

Supprimer les événements de l'ignorelist:

 public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => { //      const topicIndex = this.wsConfig.ignore.findIndex(t => t === topic); if (topicIndex > -1) { //    this.wsConfig.ignore.splice(topicIndex, 1); } }); } } 

Nous connectons le module de sockets web:

 @NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, //  "ws://mywebsocketurl" //    ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, //    options: { connectionTimeout: 1000, //   maxRetries: 10 //   } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { } 

Nous utilisons dans les composants:

 @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] }); // get messages this.messages$ = this.wsService .addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES); // get messages multi this.messagesMulti$ = this.wsService .addEventListener<IMessage[]>([ WS_API.EVENTS.MESSAGES, WS_API.EVENTS.MESSAGES_1 ]); // get counter this.counter$ = this.wsService .addEventListener<number>(WS_API.EVENTS.COUNTER); // get texts this.texts$ = this.wsService .addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS); } ngOnDestroy() { } public sendText(): void { if (this.form.valid) { this.wsService .sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text); this.form.reset(); } } public removeText(index: number): void { this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index); } } 

Le service est prêt à l'emploi.



Bien que l'exemple de l'article ne soit pas une solution universelle pour chaque projet, il illustre l'une des approches pour travailler avec des sockets Web dans des applications grandes et complexes. Vous pouvez le mettre en service et le modifier en fonction des tâches en cours.

La version complète du service est disponible sur GitHub .

Pour toutes questions que vous pouvez contacter dans les commentaires, à moi sur Telegram ou sur le canal angulaire au même endroit.

Source: https://habr.com/ru/post/fr419099/


All Articles