WebSockets em Angular. Parte 2. Soluções de produtos

imagem

Em um artigo anterior, falamos sobre uma solução geral para soquetes da Web no Angular, onde construímos um barramento com reconexão e serviço para uso em componentes baseados no WebSocketSubject. Essa implementação é adequada para os casos mais simples, por exemplo, recebimento e envio de mensagens de bate-papo etc., mas seus recursos podem não ser suficientes quando você precisa criar algo mais flexível e controlado. Neste artigo, revelarei alguns recursos ao trabalhar com soquetes da Web e falarei sobre os requisitos que você mesmo encontrou e, possivelmente, encontrará.

Freqüentemente, em grandes projetos com alta participação, o front-end enfrenta tarefas que, em outras circunstâncias, são mais comuns no back-end. Em condições de austeridade dos recursos do servidor, parte dos problemas migra para o território de front-end; por esse motivo, um máximo de extensibilidade e controle é estabelecido no projeto.

Aqui está uma lista dos requisitos básicos para um cliente de soquete da web que serão abordados neste artigo:

  • Reconexão automática “inteligente”;
  • Modo de depuração;
  • Sistema de assinatura de eventos baseado em RxJs;
  • Recepção e análise de dados binários;
  • Projeção (mapeamento) das informações recebidas no modelo;
  • Controle sobre as alterações do modelo quando novos eventos chegam;
  • Ignore eventos arbitrários e cancele ignorar.

Considere cada item com mais detalhes.

Reconectar / Debag


Eu escrevi sobre reconectar em um artigo anterior, então apenas cito parte do texto:

A reconexão ou a organização da reconexão com o servidor é um fator primordial ao trabalhar com soquetes da Web, pois quebras de rede, falhas no servidor ou outros erros que causem uma falha na conexão podem causar falhas no aplicativo.
É importante observar que as tentativas de reconexão não devem ser muito frequentes e não devem continuar indefinidamente, pois esse comportamento pode suspender o cliente.

O soquete da web em si não sabe reconectar quando desconectado. Portanto, se o servidor reiniciou ou o servidor travou ou o usuário reconectou a Internet, para continuar trabalhando, você também precisa reconectar o soquete da web.

Neste artigo, para reconectar e depurar, usaremos Reconnecting WebSocket , que contém a funcionalidade necessária e outras opções, como alterar o URL do soquete da Web entre reconexões, escolher um construtor arbitrário do WebSocket etc. Outras alternativas também são adequadas. Reconectar do artigo anterior não é adequado, porque Está escrito em WebSocketSubject, que desta vez não se aplica.

Sistema de Assinatura de Eventos RxJs


Para usar soquetes da Web em componentes, é necessário assinar eventos e cancelar a assinatura deles quando necessário. Para fazer isso, use o popular padrão de design Pub / Sub .
"Publisher-subscriber (eng. Publisher-subscriber ou eng. Pub / sub) - um modelo de design comportamental para a transmissão de mensagens em que remetentes de mensagens, chamados editores (eng. Publishers), não estão diretamente vinculados ao código do programa para enviar mensagens aos assinantes (eng. Assinantes ) Em vez disso, as mensagens são divididas em classes e não contêm informações sobre seus assinantes, se houver. Da mesma forma, os assinantes lidam com uma ou mais classes de mensagens, abstraindo de editores específicos. ”

O assinante não entra em contato diretamente com o editor, mas através do barramento intermediário - o serviço do site. Também deve ser possível se inscrever em vários eventos com o mesmo tipo de retorno. Cada assinatura cria seu próprio Assunto, que é adicionado ao objeto ouvintes, o que permite endereçar eventos de soquete da Web às assinaturas necessárias. Ao trabalhar com o Assunto RxJs, há algumas dificuldades em cancelar a inscrição, portanto, criaremos um coletor de lixo simples que removerá os objetos do objeto ouvintes quando eles não tiverem observadores.

Recepção e análise de dados binários


O WebSocket suporta a transferência de dados binários, arquivos ou fluxos, que geralmente são usados ​​em grandes projetos. Parece algo como isto:
0x80, <comprimento - um ou mais bytes>, <corpo da mensagem>

Para não criar restrições no comprimento da mensagem transmitida e ao mesmo tempo não gastar bytes irracionalmente, os desenvolvedores do protocolo usaram o seguinte algoritmo. Cada byte na indicação de comprimento é considerado separadamente: o mais alto indica se é o último byte (0) ou os outros (1) que o seguem e os 7 bits inferiores contêm os dados transmitidos. Portanto, quando o sinal do quadro de dados binários 0x80 aparecer, o próximo byte será capturado e armazenado em um "cofrinho" separado. Em seguida, o próximo byte, se tiver o conjunto de bits mais significativo, também é transferido para o “mealheiro” e assim sucessivamente até que o byte com o bit mais significativo seja encontrado. Este byte é o último no indicador de comprimento e também é adicionado ao "mealheiro". Agora, os bits altos são removidos dos bytes no mealheiro e o restante é combinado. Esse será o comprimento do corpo da mensagem - números de 7 bits sem o bit mais significativo.

O mecanismo de análise de front-end e fluxo binário é complexo e está associado ao mapeamento de dados no modelo. Um artigo separado pode ser dedicado a isso. Desta vez, analisaremos uma opção simples e deixaremos casos difíceis para as seguintes publicações, se houver interesse no tópico.

Projeção (mapeamento) das informações recebidas no modelo


Independentemente do tipo de transmissão recebida, você deve lê-la e modificá-la com segurança. Não há consenso sobre como fazer isso melhor, eu aderir à teoria de um modelo de dados , pois considero lógico e confiável para a programação no estilo OOP.
“Um modelo de dados é uma definição abstrata, auto-suficiente e lógica de objetos, operadores e outros elementos que juntos constituem uma máquina abstrata de acesso a dados com a qual o usuário interage. "Esses objetos permitem modelar a estrutura de dados e os operadores - o comportamento dos dados."

Todos os tipos de pneus populares que não dão a idéia de um objeto como uma classe na qual o comportamento, a estrutura etc. são definidos criam confusão, são menos bem controlados e, às vezes, cobertos por algo que não é típico para eles. Por exemplo, uma classe de cães deve descrever um cão sob quaisquer condições. Se o cão for percebido como um conjunto de campos: cauda, ​​cor, rosto, etc., o cão poderá crescer uma pata extra e outro cão aparecerá em vez da cabeça.

imagem

Controle sobre mudanças de modelo à medida que novos eventos chegam


Neste parágrafo, descreverei o problema que encontrei ao trabalhar na interface da Web do aplicativo móvel de apostas esportivas. A API do aplicativo funcionou através de soquetes da Web pelos quais eles receberam: atualização de probabilidades, adição e remoção de novos tipos de apostas, notificações sobre o início ou o fim de uma partida, etc. - totalizam cerca de trezentos eventos do soquete da web. Durante a partida, as apostas e as informações são atualizadas constantemente, às vezes 2 a 3 vezes por segundo, então o problema era que depois delas a interface era atualizada sem controle intermediário.

Quando o usuário monitorou o lance em um dispositivo móvel e, ao mesmo tempo, as listas foram atualizadas em sua exibição, o lance desapareceu do campo de visão; portanto, o usuário precisou procurar o lance rastreado novamente. Esse comportamento foi repetido para cada atualização.

imagem

A solução exigia imutabilidade para os objetos que eram exibidos na tela, mas, ao mesmo tempo, os coeficientes de apostas precisavam mudar, os lances irrelevantes para se tornarem inativos e os novos não eram adicionados até o usuário rolar a tela. As opções desatualizadas não foram armazenadas no back-end; portanto, essas linhas tiveram que ser lembradas e marcadas com o sinalizador "excluído", para o qual um armazenamento intermediário de dados foi criado entre o site e a assinatura, o que garantiu o controle sobre as alterações.

No novo serviço, também criaremos uma camada substituta e, desta vez, usaremos o Dexie.js - um wrapper sobre a API IndexedDB, mas qualquer outro banco de dados virtual ou de navegador o fará. Redux é aceitável.

Ignorar eventos arbitrários e cancelar ignorar


Na mesma empresa, geralmente existem vários projetos do mesmo tipo ao mesmo tempo: versões para celular e Web, versões com configurações diferentes para diferentes grupos de usuários, versões avançadas e truncadas do mesmo aplicativo.

Geralmente, todos eles usam uma única base de código; portanto, às vezes, é necessário desativar eventos desnecessários em tempo de execução ou durante o DI sem excluir a assinatura e ativá-la novamente, ou seja, ignore alguns deles, para não processar eventos desnecessários. Esse é um recurso simples, mas útil, que adiciona flexibilidade ao barramento Pub / Sub.

Vamos começar com uma descrição das interfaces:

export interface IWebsocketService { //    addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { //   DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { //   Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { //    [topic: string]: ITopic<any>; } export interface IBuffer { //    ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { //   id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; //  callMessage   

Herdaremos o Subject para criar um coletor de lixo:

 export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, //    private topic: string, //   private id: string // id  ) { super(); } /* *   next, *       , *   garbageCollect */ public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect(); //   } } } /* * garbage collector * */ private garbageCollect(): void { delete this.listeners[this.topic][this.id]; //  Subject if (!Object.keys(this.listeners[this.topic]).length) { //    delete this.listeners[this.topic]; } } } 

Diferente da implementação anterior, websocket.events.ts fará parte do módulo de soquete da web

 export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } }; 

Para configurar ao conectar o módulo, crie websocket.config:

 import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket'); 

Crie um modelo para Proxy:

 import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { //    Dexie  typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); //   this.version(1).stores({ //   messages: '++id,text' }); } } 

Um analisador de modelo simples, em condições reais, é melhor dividi-lo em vários arquivos:

 export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /*  */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /*   */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* ,    */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); //   IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); //    } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); //     } } else { console.log(`[${Date()}] Buffer is "undefined"`); } }; 

WebsocketModule:

 @NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } } 

Vamos começar a criar um serviço:

 private listeners: IListeners; //   private uniqueId: number; //   id  private websocket: ReconnectingWebSocket; //   constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; //  this.connect(); } ngOnDestroy() { this.websocket.close(); //     } 

Método de conexão:

 private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, //  ,    maxRetries: 10, //  ,    ...this.wsConfig.options }; //  this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { //   console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { //   console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { //   console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { //     this.onMessage(event); }); setInterval(() => { //    this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); } 

Duplique o coletor de lixo e verificará as assinaturas por tempo limite:

 private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; //  Subject    if (!subject.observers.length) { delete topic[key]; } } }  ,   if (!Object.keys(topic).length) { delete this.listeners[event]; } } } } 

Examinamos qual assinatura enviar o evento:

 private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/'); //      const isMessage = keys.includes(message.event); const model = modelParser(message); //     if (isMessage && typeof model !== 'undefined') { model.then((data: ITopicDataType) => { //   Subject this.callMessage<ITopicDataType>(topic, data); }); } } } } 

Evento Helmet no Assunto:

 private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { //   subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } } 

Criar Pub / Subtópico:

 private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; //  id   const hash = sha256.hex(key); // SHA256-   id  if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); } 

Inscrevendo-se em um ou mais eventos:

 public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { //       const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } } 

Tudo aqui é intencionalmente simplificado, mas pode ser convertido em entidades binárias, como é o caso do servidor. Enviando comandos para o servidor:

 public sendMessage(event: string, data: any = {}): void { //   ,      if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } } 

Adicione eventos ao ignorlist em tempo de execução:

 public runtimeIgnore(topics: string[]): void { if (topics && topics.length) { //    this.wsConfig.ignore.push(...topics); } } 

Exclua eventos da lista de ignorados:

 public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => { //      const topicIndex = this.wsConfig.ignore.findIndex(t => t === topic); if (topicIndex > -1) { //    this.wsConfig.ignore.splice(topicIndex, 1); } }); } } 

Conectamos o módulo de soquetes da web:

 @NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, //  "ws://mywebsocketurl" //    ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, //    options: { connectionTimeout: 1000, //   maxRetries: 10 //   } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { } 

Nós usamos nos componentes:

 @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] }); // get messages this.messages$ = this.wsService .addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES); // get messages multi this.messagesMulti$ = this.wsService .addEventListener<IMessage[]>([ WS_API.EVENTS.MESSAGES, WS_API.EVENTS.MESSAGES_1 ]); // get counter this.counter$ = this.wsService .addEventListener<number>(WS_API.EVENTS.COUNTER); // get texts this.texts$ = this.wsService .addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS); } ngOnDestroy() { } public sendText(): void { if (this.form.valid) { this.wsService .sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text); this.form.reset(); } } public removeText(index: number): void { this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index); } } 

O serviço está pronto para uso.



Embora o exemplo do artigo não seja uma solução universal para cada projeto, ele demonstra uma das abordagens para trabalhar com soquetes da Web em aplicativos grandes e complexos. Você pode colocá-lo em serviço e modificá-lo, dependendo das tarefas atuais.

A versão completa do serviço pode ser encontrada no GitHub .

Para todas as perguntas, você pode entrar em contato nos comentários, pelo Telegram ou pelo canal Angular de lá.

Source: https://habr.com/ru/post/pt419099/


All Articles