WebSockets en Angular. Parte 2. Soluciones de productos

imagen

En un art铆culo anterior, hablamos sobre una soluci贸n general para sockets web en Angular, donde creamos un bus basado en WebSocketSubject con reconexi贸n y servicio para usar en componentes. Dicha implementaci贸n es adecuada para la mayor铆a de los casos simples, por ejemplo, recibir y enviar mensajes de chat, etc., pero sus capacidades pueden no ser suficientes cuando necesita construir algo m谩s flexible y controlado. En este art铆culo, revelar茅 algunas caracter铆sticas al trabajar con sockets web y hablar茅 sobre los requisitos que usted mismo ha encontrado y, posiblemente, encontrar谩.

A menudo, en proyectos grandes con alta asistencia, el front-end enfrenta tareas que, en otras circunstancias, son m谩s comunes en el back-end. En condiciones de austeridad de los recursos del servidor, parte de los problemas migran al territorio front-end, por esta raz贸n se establece un m谩ximo de extensibilidad y control en el proyecto.

Aqu铆 hay una lista de los requisitos b谩sicos para un cliente de socket web que se tratar谩 en este art铆culo:

  • Reconexi贸n autom谩tica "inteligente";
  • Modo de depuraci贸n;
  • Sistema de suscripci贸n de eventos basado en RxJs;
  • Recepci贸n y an谩lisis de datos binarios;
  • Proyecci贸n (mapeo) de la informaci贸n recibida sobre el modelo;
  • Control sobre los cambios del modelo a medida que llegan nuevos eventos;
  • Ignorar eventos arbitrarios y cancelar ignorar.

Considere cada art铆culo con m谩s detalle.

Reconectar / Debag


Escrib铆 sobre reconectar en un art铆culo anterior, as铆 que solo citar茅 parte del texto:

La reconexi贸n, o la organizaci贸n de reconectarse al servidor, es un factor primordial cuando se trabaja con sockets web, como interrupciones de la red, fallas del servidor u otros errores que causan una interrupci贸n de la conexi贸n pueden hacer que la aplicaci贸n se bloquee.
Es importante tener en cuenta que los intentos de reconexi贸n no deber铆an ser demasiado frecuentes y no deber铆an continuar indefinidamente, ya que Este comportamiento puede suspender al cliente.

El socket web en s铆 no sabe c贸mo volver a conectarse cuando est谩 desconectado. Por lo tanto, si el servidor se reinici贸 o el servidor se bloque贸, o el usuario volvi贸 a conectarse a Internet, para continuar trabajando, tambi茅n debe volver a conectar el socket web.

En este art铆culo, para reconectar y depurar, utilizaremos Reconnecting WebSocket , que contiene la funcionalidad necesaria y otras opciones, como cambiar la url del socket web entre reconexiones, elegir un constructor de WebSocket arbitrario, etc. Otras alternativas tambi茅n son adecuadas. Volver a conectar desde el art铆culo anterior no es adecuado, porque Est谩 escrito bajo WebSocketSubject, que esta vez no se aplica.

Sistema de suscripci贸n a eventos RxJs


Para usar sockets web en los componentes, debe suscribirse a los eventos y cancelar la suscripci贸n cuando sea necesario. Para hacer esto, use el popular patr贸n de dise帽o Pub / Sub .
"Editor-suscriptor (ing. Editor-suscriptor o ing. Pub / sub): una plantilla de dise帽o de comportamiento para la transmisi贸n de mensajes en la que los remitentes de mensajes, llamados editores (ing. Editores), no est谩n directamente vinculados al c贸digo del programa para enviar mensajes a los suscriptores (ing. Suscriptores ) En cambio, los mensajes se dividen en clases y no contienen informaci贸n sobre sus suscriptores, si corresponde. Del mismo modo, los suscriptores manejan una o m谩s clases de mensajes, abstray茅ndose de editores espec铆ficos ".

El suscriptor no se comunica directamente con el editor, sino a trav茅s del bus intermedio: el servicio del sitio web. Tambi茅n deber铆a ser posible suscribirse a m煤ltiples eventos con el mismo tipo de retorno. Cada suscripci贸n crea su propio Asunto, que se agrega al objeto de escucha, que le permite direccionar eventos de socket web a las suscripciones necesarias. Cuando se trabaja con RxJs Subject, existen algunas dificultades con la cancelaci贸n de la suscripci贸n, por lo tanto, crearemos un recolector de basura simple que eliminar谩 los subjuntos del objeto de los oyentes cuando no tengan observadores.

Recepci贸n y an谩lisis de datos binarios.


WebSocket admite la transferencia de datos binarios, archivos o secuencias, que a menudo se utiliza en grandes proyectos. Se parece a esto:
0x80, <longitud - uno o m谩s bytes>, <cuerpo del mensaje>

Para no crear restricciones en la longitud del mensaje transmitido y al mismo tiempo no gastar bytes irracionalmente, los desarrolladores del protocolo utilizaron el siguiente algoritmo. Cada byte en la indicaci贸n de longitud se considera por separado: el m谩s alto indica si es el 煤ltimo byte (0) o los otros (1) lo siguen, y los 7 bits m谩s bajos contienen los datos transmitidos. Por lo tanto, cuando aparece el signo de la trama de datos binarios 0x80, se toma el siguiente byte y se almacena en una "alcanc铆a" separada. Luego, el siguiente byte, si tiene el conjunto de bits m谩s significativo, tambi茅n se transfiere a la "alcanc铆a" y as铆 sucesivamente hasta que se encuentre el byte con el bit m谩s significativo cero. Este byte es el 煤ltimo en el indicador de longitud y tambi茅n se agrega a la "alcanc铆a". Ahora, los bits altos se eliminan de los bytes en la alcanc铆a, y el resto se combina. Esta ser谩 la longitud del cuerpo del mensaje: n煤meros de 7 bits sin el bit m谩s significativo.

El an谩lisis frontal y el mecanismo de flujo binario es complejo y est谩 asociado con la asignaci贸n de datos en el modelo. Se puede dedicar un art铆culo separado a esto. Esta vez analizaremos una opci贸n simple y dejaremos casos dif铆ciles para las siguientes publicaciones, si hay inter茅s en el tema.

Proyecci贸n (mapeo) de la informaci贸n recibida sobre el modelo


Independientemente del tipo de transmisi贸n recibida, se requiere leer y modificar de forma segura. No hay consenso sobre c贸mo hacer esto mejor, me adhiero a la teor铆a de un modelo de datos , ya que lo considero l贸gico y confiable para programar en el estilo OOP.
鈥淯n modelo de datos es una definici贸n l贸gica, abstracta y autosuficiente de objetos, operadores y otros elementos que juntos constituyen una m谩quina de acceso a datos abstractos con la que el usuario interact煤a. "Estos objetos le permiten modelar la estructura de datos y los operadores: el comportamiento de los datos".

Todos los tipos de neum谩ticos populares que no dan una idea de un objeto como una clase en la que se definen el comportamiento, la estructura, etc. crean confusi贸n, est谩n menos controlados y a veces est谩n cubiertos de algo que no es t铆pico para ellos. Por ejemplo, una clase de perros debe describir a un perro bajo cualquier condici贸n. Si el perro se percibe como un conjunto de campos: cola, color, cara, etc., entonces el perro puede crecer una pata adicional, y aparecer谩 otro perro en lugar de la cabeza.

imagen

Control sobre los cambios del modelo a medida que llegan nuevos eventos


En este p谩rrafo describir茅 el problema que encontr茅 al trabajar en la interfaz web de la aplicaci贸n m贸vil de apuestas deportivas. La API de la aplicaci贸n funcion贸 a trav茅s de sockets web a trav茅s de los cuales recibieron: actualizaci贸n de cuotas, adici贸n y eliminaci贸n de nuevos tipos de apuestas, notificaciones sobre el inicio o el final de un partido, etc. - total de unos trescientos eventos del socket web. Durante el partido, las apuestas y la informaci贸n se actualizan constantemente, a veces 2-3 veces por segundo, por lo que el problema fue que despu茅s de ellas la interfaz se actualiz贸 sin control intermedio.

Cuando el usuario supervis贸 la oferta desde un dispositivo m贸vil y, al mismo tiempo, las listas se actualizaron en su pantalla, la oferta desapareci贸 del campo de visi贸n, por lo que el usuario tuvo que buscar la oferta rastreada nuevamente. Este comportamiento se repiti贸 para cada actualizaci贸n.

imagen

La soluci贸n requer铆a la inmutabilidad de los objetos que se mostraban en la pantalla, pero al mismo tiempo los coeficientes de apuesta ten铆an que cambiar, las ofertas irrelevantes para volverse inactivas, y no se agregaban nuevas hasta que el usuario desplazaba la pantalla. Las opciones desactualizadas no se almacenaban en el back-end, por lo tanto, dichas l铆neas ten铆an que recordarse y marcarse con el indicador "eliminado", para lo cual se cre贸 un almac茅n de datos intermedio entre el sitio web y la suscripci贸n, lo que garantizaba el control sobre los cambios.

En el nuevo servicio, tambi茅n crearemos una capa sustituta y esta vez usaremos Dexie.js , un contenedor sobre la API IndexedDB, pero cualquier otra base de datos virtual o de navegador funcionar谩. Redux es aceptable.

Ignorar eventos arbitrarios y cancelar ignorar


En la misma compa帽铆a, a menudo hay varios proyectos del mismo tipo a la vez: versiones m贸viles y web, versiones con diferentes configuraciones para diferentes grupos de usuarios, versiones avanzadas y truncadas de la misma aplicaci贸n.

A menudo, todos usan una 煤nica base de c贸digo, por lo que a veces es necesario desactivar eventos innecesarios en tiempo de ejecuci贸n o durante la DI sin eliminar la suscripci贸n y volver a encenderla, es decir. ignore algunos de ellos, para no procesar eventos innecesarios. Esta es una caracter铆stica simple pero 煤til que agrega flexibilidad al bus Pub / Sub.

Comencemos con una descripci贸n de las interfaces:

export interface IWebsocketService { //    addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { //   DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { //   Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { //    [topic: string]: ITopic<any>; } export interface IBuffer { //    ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { //   id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; //  callMessage   

Heredaremos Asunto para crear un recolector de basura:

 export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, //    private topic: string, //   private id: string // id  ) { super(); } /* *   next, *       , *   garbageCollect */ public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect(); //   } } } /* * garbage collector * */ private garbageCollect(): void { delete this.listeners[this.topic][this.id]; //  Subject if (!Object.keys(this.listeners[this.topic]).length) { //    delete this.listeners[this.topic]; } } } 

A diferencia de la implementaci贸n anterior, websocket.events.ts formar谩 parte del m贸dulo de socket web

 export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } }; 

Para configurar al conectar el m贸dulo, cree websocket.config:

 import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket'); 

Cree un modelo para Proxy:

 import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { //    Dexie  typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); //   this.version(1).stores({ //   messages: '++id,text' }); } } 

Un analizador de modelos simple, en condiciones reales es mejor dividirlo en varios archivos:

 export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /*  */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /*   */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* ,    */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); //   IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); //    } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); //     } } else { console.log(`[${Date()}] Buffer is "undefined"`); } }; 

WebsocketModule:

 @NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } } 

Comencemos a crear un servicio:

 private listeners: IListeners; //   private uniqueId: number; //   id  private websocket: ReconnectingWebSocket; //   constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; //  this.connect(); } ngOnDestroy() { this.websocket.close(); //     } 

M茅todo de conexi贸n:

 private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, //  ,    maxRetries: 10, //  ,    ...this.wsConfig.options }; //  this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { //   console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { //   console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { //   console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { //     this.onMessage(event); }); setInterval(() => { //    this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); } 

Duplicar el recolector de basura, buscar谩 suscripciones por tiempo de espera:

 private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; //  Subject    if (!subject.observers.length) { delete topic[key]; } } }  ,   if (!Object.keys(topic).length) { delete this.listeners[event]; } } } } 

Analizamos a qu茅 suscripci贸n enviar el evento:

 private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/'); //      const isMessage = keys.includes(message.event); const model = modelParser(message); //     if (isMessage && typeof model !== 'undefined') { model.then((data: ITopicDataType) => { //   Subject this.callMessage<ITopicDataType>(topic, data); }); } } } } 

Evento de casco en Asunto:

 private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { //   subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } } 

Crear pub / subtema:

 private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; //  id   const hash = sha256.hex(key); // SHA256-   id  if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); } 

Suscribirse a uno o m谩s eventos:

 public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { //       const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } } 

Todo aqu铆 se simplifica intencionalmente, pero se puede convertir a entidades binarias, como es el caso con el servidor. Env铆o de comandos al servidor:

 public sendMessage(event: string, data: any = {}): void { //   ,      if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } } 

Agregue eventos a ignorlist en tiempo de ejecuci贸n:

 public runtimeIgnore(topics: string[]): void { if (topics && topics.length) { //    this.wsConfig.ignore.push(...topics); } } 

Eliminar eventos del ignorelista:

 public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => { //      const topicIndex = this.wsConfig.ignore.findIndex(t => t === topic); if (topicIndex > -1) { //    this.wsConfig.ignore.splice(topicIndex, 1); } }); } } 

Conectamos el m贸dulo de sockets web:

 @NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, //  "ws://mywebsocketurl" //    ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, //    options: { connectionTimeout: 1000, //   maxRetries: 10 //   } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { } 

Usamos en los componentes:

 @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] }); // get messages this.messages$ = this.wsService .addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES); // get messages multi this.messagesMulti$ = this.wsService .addEventListener<IMessage[]>([ WS_API.EVENTS.MESSAGES, WS_API.EVENTS.MESSAGES_1 ]); // get counter this.counter$ = this.wsService .addEventListener<number>(WS_API.EVENTS.COUNTER); // get texts this.texts$ = this.wsService .addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS); } ngOnDestroy() { } public sendText(): void { if (this.form.valid) { this.wsService .sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text); this.form.reset(); } } public removeText(index: number): void { this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index); } } 

El servicio est谩 listo para usar.



Aunque el ejemplo del art铆culo no es una soluci贸n universal para cada proyecto, demuestra uno de los enfoques para trabajar con sockets web en aplicaciones grandes y complejas. Puede ponerlo en servicio y modificarlo seg煤n las tareas actuales.

La versi贸n completa del servicio se puede encontrar en GitHub .

Para todas las preguntas puede contactarme en los comentarios, a m铆 en Telegram o en el canal Angular en el mismo lugar.

Source: https://habr.com/ru/post/es419099/


All Articles