
Em um
artigo anterior, falamos sobre uma solução geral para soquetes da Web no Angular, onde construímos um barramento com reconexão e serviço para uso em componentes baseados no WebSocketSubject. Essa implementação é adequada para os casos mais simples, por exemplo, recebimento e envio de mensagens de bate-papo etc., mas seus recursos podem não ser suficientes quando você precisa criar algo mais flexível e controlado. Neste artigo, revelarei alguns recursos ao trabalhar com soquetes da Web e falarei sobre os requisitos que você mesmo encontrou e, possivelmente, encontrará.
Freqüentemente, em grandes projetos com alta participação, o front-end enfrenta tarefas que, em outras circunstâncias, são mais comuns no back-end. Em condições de austeridade dos recursos do servidor, parte dos problemas migra para o território de front-end; por esse motivo, um máximo de extensibilidade e controle é estabelecido no projeto.
Aqui está uma lista dos requisitos básicos para um cliente de soquete da web que serão abordados neste artigo:
- Reconexão automática “inteligente”;
- Modo de depuração;
- Sistema de assinatura de eventos baseado em RxJs;
- Recepção e análise de dados binários;
- Projeção (mapeamento) das informações recebidas no modelo;
- Controle sobre as alterações do modelo quando novos eventos chegam;
- Ignore eventos arbitrários e cancele ignorar.
Considere cada item com mais detalhes.
Reconectar / Debag
Eu escrevi sobre reconectar em um artigo anterior, então apenas cito parte do texto:
A reconexão ou a organização da reconexão com o servidor é um fator primordial ao trabalhar com soquetes da Web, pois quebras de rede, falhas no servidor ou outros erros que causem uma falha na conexão podem causar falhas no aplicativo.
É importante observar que as tentativas de reconexão não devem ser muito frequentes e não devem continuar indefinidamente, pois esse comportamento pode suspender o cliente.
O soquete da web em si não sabe reconectar quando desconectado. Portanto, se o servidor reiniciou ou o servidor travou ou o usuário reconectou a Internet, para continuar trabalhando, você também precisa reconectar o soquete da web.
Neste artigo, para reconectar e depurar, usaremos
Reconnecting WebSocket , que contém a funcionalidade necessária e outras opções, como alterar o URL do soquete da Web entre reconexões, escolher um construtor arbitrário do WebSocket etc. Outras alternativas também são adequadas. Reconectar do artigo anterior não é adequado, porque Está escrito em WebSocketSubject, que desta vez não se aplica.
Sistema de Assinatura de Eventos RxJs
Para usar soquetes da Web em componentes, é necessário assinar eventos e cancelar a assinatura deles quando necessário. Para fazer isso, use o popular padrão de design
Pub / Sub .
"Publisher-subscriber (eng. Publisher-subscriber ou eng. Pub / sub) - um modelo de design comportamental para a transmissão de mensagens em que remetentes de mensagens, chamados editores (eng. Publishers), não estão diretamente vinculados ao código do programa para enviar mensagens aos assinantes (eng. Assinantes ) Em vez disso, as mensagens são divididas em classes e não contêm informações sobre seus assinantes, se houver. Da mesma forma, os assinantes lidam com uma ou mais classes de mensagens, abstraindo de editores específicos. ”
O assinante não entra em contato diretamente com o editor, mas através do barramento intermediário - o serviço do site. Também deve ser possível se inscrever em vários eventos com o mesmo tipo de retorno. Cada assinatura cria seu próprio Assunto, que é adicionado ao objeto ouvintes, o que permite endereçar eventos de soquete da Web às assinaturas necessárias. Ao trabalhar com o Assunto RxJs, há algumas dificuldades em cancelar a inscrição, portanto, criaremos um coletor de lixo simples que removerá os objetos do objeto ouvintes quando eles não tiverem observadores.
Recepção e análise de dados binários
O WebSocket suporta a transferência de dados binários, arquivos ou fluxos, que geralmente são usados em grandes projetos. Parece algo como isto:
0x80, <comprimento - um ou mais bytes>, <corpo da mensagem>
Para não criar restrições no comprimento da mensagem transmitida e ao mesmo tempo não gastar bytes irracionalmente, os desenvolvedores do protocolo usaram o seguinte algoritmo. Cada byte na indicação de comprimento é considerado separadamente: o mais alto indica se é o último byte (0) ou os outros (1) que o seguem e os 7 bits inferiores contêm os dados transmitidos. Portanto, quando o sinal do quadro de dados binários 0x80 aparecer, o próximo byte será capturado e armazenado em um "cofrinho" separado. Em seguida, o próximo byte, se tiver o conjunto de bits mais significativo, também é transferido para o “mealheiro” e assim sucessivamente até que o byte com o bit mais significativo seja encontrado. Este byte é o último no indicador de comprimento e também é adicionado ao "mealheiro". Agora, os bits altos são removidos dos bytes no mealheiro e o restante é combinado. Esse será o comprimento do corpo da mensagem - números de 7 bits sem o bit mais significativo.
O mecanismo de análise de front-end e fluxo binário é complexo e está associado ao mapeamento de dados no modelo. Um artigo separado pode ser dedicado a isso. Desta vez, analisaremos uma opção simples e deixaremos casos difíceis para as seguintes publicações, se houver interesse no tópico.
Projeção (mapeamento) das informações recebidas no modelo
Independentemente do tipo de transmissão recebida, você deve lê-la e modificá-la com segurança. Não há consenso sobre como fazer isso melhor, eu aderir à teoria de
um modelo de dados , pois considero lógico e confiável para a programação no estilo OOP.
“Um modelo de dados é uma definição abstrata, auto-suficiente e lógica de objetos, operadores e outros elementos que juntos constituem uma máquina abstrata de acesso a dados com a qual o usuário interage. "Esses objetos permitem modelar a estrutura de dados e os operadores - o comportamento dos dados."
Todos os tipos de pneus populares que não dão a idéia de um objeto como uma classe na qual o comportamento, a estrutura etc. são definidos criam confusão, são menos bem controlados e, às vezes, cobertos por algo que não é típico para eles. Por exemplo, uma classe de cães deve descrever um cão sob quaisquer condições. Se o cão for percebido como um conjunto de campos: cauda, cor, rosto, etc., o cão poderá crescer uma pata extra e outro cão aparecerá em vez da cabeça.

Controle sobre mudanças de modelo à medida que novos eventos chegam
Neste parágrafo, descreverei o problema que encontrei ao trabalhar na interface da Web do aplicativo móvel de apostas esportivas. A API do aplicativo funcionou através de soquetes da Web pelos quais eles receberam: atualização de probabilidades, adição e remoção de novos tipos de apostas, notificações sobre o início ou o fim de uma partida, etc. - totalizam cerca de trezentos eventos do soquete da web. Durante a partida, as apostas e as informações são atualizadas constantemente, às vezes 2 a 3 vezes por segundo, então o problema era que depois delas a interface era atualizada sem controle intermediário.
Quando o usuário monitorou o lance em um dispositivo móvel e, ao mesmo tempo, as listas foram atualizadas em sua exibição, o lance desapareceu do campo de visão; portanto, o usuário precisou procurar o lance rastreado novamente. Esse comportamento foi repetido para cada atualização.

A solução exigia imutabilidade para os objetos que eram exibidos na tela, mas, ao mesmo tempo, os coeficientes de apostas precisavam mudar, os lances irrelevantes para se tornarem inativos e os novos não eram adicionados até o usuário rolar a tela. As opções desatualizadas não foram armazenadas no back-end; portanto, essas linhas tiveram que ser lembradas e marcadas com o sinalizador "excluído", para o qual um armazenamento intermediário de dados foi criado entre o site e a assinatura, o que garantiu o controle sobre as alterações.
No novo serviço, também criaremos uma camada substituta e, desta vez, usaremos o
Dexie.js - um wrapper sobre a API IndexedDB, mas qualquer outro banco de dados virtual ou de navegador o fará. Redux é aceitável.
Ignorar eventos arbitrários e cancelar ignorar
Na mesma empresa, geralmente existem vários projetos do mesmo tipo ao mesmo tempo: versões para celular e Web, versões com configurações diferentes para diferentes grupos de usuários, versões avançadas e truncadas do mesmo aplicativo.
Geralmente, todos eles usam uma única base de código; portanto, às vezes, é necessário desativar eventos desnecessários em tempo de execução ou durante o DI sem excluir a assinatura e ativá-la novamente, ou seja, ignore alguns deles, para não processar eventos desnecessários. Esse é um recurso simples, mas útil, que adiciona flexibilidade ao barramento Pub / Sub.
Vamos começar com uma descrição das interfaces:
export interface IWebsocketService { // addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { // DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { // Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { // [topic: string]: ITopic<any>; } export interface IBuffer { // ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { // id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; // callMessage
Herdaremos o Subject para criar um coletor de lixo:
export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, // private topic: string, // private id: string // id ) { super(); } public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect();
Diferente da implementação anterior, websocket.events.ts fará parte do módulo de soquete da web
export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } };
Para configurar ao conectar o módulo, crie websocket.config:
import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket');
Crie um modelo para Proxy:
import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { // Dexie typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); // this.version(1).stores({ // messages: '++id,text' }); } }
Um analisador de modelo simples, em condições reais, é melhor dividi-lo em vários arquivos:
export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /* */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /* */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* , */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); // IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); // } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); // } } else { console.log(`[${Date()}] Buffer is "undefined"`); } };
WebsocketModule:
@NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } }
Vamos começar a criar um serviço:
private listeners: IListeners; // private uniqueId: number; // id private websocket: ReconnectingWebSocket; // constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; // this.connect(); } ngOnDestroy() { this.websocket.close(); // }
Método de conexão:
private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, // , maxRetries: 10, // , ...this.wsConfig.options }; // this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { // console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { // console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { // console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { // this.onMessage(event); }); setInterval(() => { // this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); }
Duplique o coletor de lixo e verificará as assinaturas por tempo limite:
private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key];
Examinamos qual assinatura enviar o evento:
private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/');
Evento Helmet no Assunto:
private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { // subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } }
Criar Pub / Subtópico:
private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; // id const hash = sha256.hex(key); // SHA256- id if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); }
Inscrevendo-se em um ou mais eventos:
public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { // const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } }
Tudo aqui é intencionalmente simplificado, mas pode ser convertido em entidades binárias, como é o caso do servidor. Enviando comandos para o servidor:
public sendMessage(event: string, data: any = {}): void { // , if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } }
Adicione eventos ao ignorlist em tempo de execução:
public runtimeIgnore(topics: string[]): void { if (topics && topics.length) {
Exclua eventos da lista de ignorados:
public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => {
Conectamos o módulo de soquetes da web:
@NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, // "ws://mywebsocketurl" // ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, // options: { connectionTimeout: 1000, // maxRetries: 10 // } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { }
Nós usamos nos componentes:
@Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] });
O serviço está pronto para uso.
Embora o exemplo do artigo não seja uma solução universal para cada projeto, ele demonstra uma das abordagens para trabalhar com soquetes da Web em aplicativos grandes e complexos. Você pode colocá-lo em serviço e modificá-lo, dependendo das tarefas atuais.
A versão completa do serviço pode ser encontrada no
GitHub .
Para todas as perguntas, você pode entrar em contato nos comentários, pelo Telegram ou
pelo canal Angular de lá.