
In einem
früheren Artikel haben wir über eine allgemeine Lösung für Web-Sockets in Angular gesprochen, bei der wir einen auf WebSocketSubject basierenden Bus mit Wiederverbindung und Service für die Verwendung in Komponenten erstellt haben. Eine solche Implementierung eignet sich für die meisten einfachen Fälle, z. B. zum Empfangen und Senden von Chat-Nachrichten usw., aber ihre Funktionen reichen möglicherweise nicht aus, wenn Sie etwas Flexibleres und Kontrollierteres erstellen müssen. In diesem Artikel werde ich einige Funktionen beim Arbeiten mit Web-Sockets aufzeigen und über die Anforderungen sprechen, auf die Sie selbst gestoßen sind und auf die Sie möglicherweise stoßen werden.
Bei großen Projekten mit hoher Besucherzahl steht das Front-End häufig vor Aufgaben, die unter anderen Umständen häufiger im Back-End zu sehen sind. Unter den Bedingungen der Sparsamkeit der Serverressourcen wird ein Teil der Probleme in das Front-End-Gebiet migriert. Aus diesem Grund wird im Projekt ein Maximum an Erweiterbarkeit und Kontrolle festgelegt.
Hier ist eine Liste der grundlegenden Anforderungen für einen Web-Socket-Client, die in diesem Artikel behandelt werden:
- Automatische „intelligente“ Wiederverbindung;
- Debug-Modus;
- Event-Abonnement-System basierend auf RxJs;
- Empfang und Analyse von Binärdaten;
- Projektion (Mapping) der empfangenen Informationen auf das Modell;
- Kontrolle über Modelländerungen bei Eintreffen neuer Ereignisse;
- Ignorieren Sie beliebige Ereignisse und brechen Sie das Ignorieren ab.
Betrachten Sie jeden Punkt genauer.
Erneut verbinden / debaggen
Ich habe in einem früheren Artikel über das Wiederverbinden geschrieben, daher zitiere ich nur einen Teil des Textes:
Die erneute Verbindung oder die Organisation der erneuten Verbindung zum Server ist ein wichtiger Faktor bei der Arbeit mit Web-Sockets Netzwerkbrüche, Serverabstürze oder andere Fehler, die einen Verbindungsbruch verursachen, können zum Absturz der Anwendung führen.
Es ist wichtig zu beachten, dass Wiederverbindungsversuche nicht zu häufig sein und nicht auf unbestimmte Zeit fortgesetzt werden sollten Dieses Verhalten kann den Client anhalten.
Der Web-Socket selbst weiß nicht, wie er die Verbindung wiederherstellen soll, wenn die Verbindung getrennt wird. Wenn der Server neu gestartet oder der Server abgestürzt ist oder der Benutzer das Internet erneut verbunden hat, müssen Sie den Web-Socket erneut verbinden, um weiterarbeiten zu können.
In diesem Artikel verwenden wir zum erneuten Verbinden und Debuggen das
Reconnecting WebSocket , das die erforderlichen Funktionen und andere Optionen enthält, z. B. das Ändern der URL des Web-Sockets zwischen erneuten Verbindungen, die Auswahl eines beliebigen WebSocket-Konstruktors usw. Andere Alternativen sind ebenfalls geeignet. Eine erneute Verbindung aus dem vorherigen Artikel ist nicht geeignet, da Es ist unter WebSocketSubject geschrieben, was diesmal nicht gilt.
RxJs Event-Abonnement-System
Um Web-Sockets in Komponenten zu verwenden, müssen Sie Ereignisse abonnieren und bei Bedarf abbestellen. Verwenden Sie dazu das beliebte
Pub / Sub- Designmuster.
"Publisher-Subscriber (engl. Publisher-Subscriber oder eng. Pub / sub) - eine Verhaltensentwurfsvorlage für die Übertragung von Nachrichten, bei der Absender von Nachrichten, sogenannte Publisher (eng. Publishers), nicht direkt an den Programmcode gebunden sind, um Nachrichten an Abonnenten (eng. Subscribers) zu senden ) Stattdessen sind die Nachrichten in Klassen unterteilt und enthalten keine Informationen über ihre Abonnenten, falls vorhanden. In ähnlicher Weise befassen sich Abonnenten mit einer oder mehreren Klassen von Nachrichten, die von bestimmten Herausgebern abstrahiert werden. “
Der Abonnent kontaktiert den Verlag nicht direkt, sondern über den Zwischenbus - den Website-Service. Es sollte auch möglich sein, mehrere Ereignisse mit demselben Rückgabetyp zu abonnieren. Jedes Abonnement erstellt einen eigenen Betreff, der dem Listener-Objekt hinzugefügt wird, sodass Sie Web-Socket-Ereignisse an die erforderlichen Abonnements adressieren können. Bei der Arbeit mit RxJs Subject treten einige Probleme beim Abbestellen auf. Daher erstellen wir einen einfachen Garbage Collector, der Subjekte aus dem Listener-Objekt entfernt, wenn sie keine Beobachter haben.
Empfang und Analyse von Binärdaten
WebSocket unterstützt die Übertragung von Binärdaten, Dateien oder Streams, die häufig in großen Projekten verwendet wird. Es sieht ungefähr so aus:
0x80, <Länge - ein oder mehrere Bytes>, <Nachrichtentext>
Um die Länge der übertragenen Nachricht nicht einzuschränken und gleichzeitig keine irrationalen Bytes auszugeben, verwendeten die Protokollentwickler den folgenden Algorithmus. Jedes Byte in der Längenangabe wird separat betrachtet: Das höchste gibt an, ob es das letzte Byte (0) ist oder die anderen (1) folgen, und die unteren 7 Bits enthalten die übertragenen Daten. Wenn daher das Vorzeichen des binären Datenrahmens 0x80 erscheint, wird das nächste Byte genommen und in einem separaten "Sparschwein" gespeichert. Dann wird das nächste Byte, wenn das höchstwertige Bit gesetzt ist, ebenfalls an das "Sparschwein" usw. übertragen, bis das Byte mit dem höchstwertigen Bit Null angetroffen wird. Dieses Byte ist das letzte im Längenindikator und wird auch dem „Sparschwein“ hinzugefügt. Jetzt werden die hohen Bits aus den Bytes im Sparschwein entfernt und der Rest kombiniert. Dies ist die Länge des Nachrichtentexts - 7-Bit-Zahlen ohne das höchstwertige Bit.
Der Front-End-Parsing- und Binär-Stream-Mechanismus ist komplex und mit der Datenzuordnung im Modell verbunden. Diesem kann ein separater Artikel gewidmet werden. Dieses Mal werden wir eine einfache Option analysieren und schwierige Fälle für die folgenden Veröffentlichungen belassen, wenn Interesse an dem Thema besteht.
Projektion (Mapping) der empfangenen Informationen auf das Modell
Unabhängig von der Art der empfangenen Übertragung ist es erforderlich, sicher zu lesen und zu ändern. Es gibt keinen Konsens darüber, wie dies besser gemacht werden kann. Ich halte mich an die Theorie
eines Datenmodells , da ich es für logisch und zuverlässig halte, um im OOP-Stil zu programmieren.
„Ein Datenmodell ist eine abstrakte, autarke, logische Definition von Objekten, Operatoren und anderen Elementen, die zusammen eine abstrakte Datenzugriffsmaschine bilden, mit der der Benutzer interagiert. "Mit diesen Objekten können Sie die Datenstruktur und die Operatoren modellieren - das Verhalten der Daten."
Alle Arten von populären Reifen, die keine Vorstellung von einem Objekt als einer Klasse vermitteln, in der Verhalten, Struktur usw. definiert sind, sorgen für Verwirrung, sind weniger gut kontrolliert und manchmal mit etwas bewachsen, das für sie nicht typisch ist. Zum Beispiel sollte eine Hundeklasse einen Hund unter allen Bedingungen beschreiben. Wenn der Hund als eine Reihe von Feldern wahrgenommen wird: Schwanz, Farbe, Gesicht usw., kann der Hund eine zusätzliche Pfote wachsen lassen und ein anderer Hund erscheint anstelle des Kopfes.

Kontrolle über Modelländerungen bei Eintreffen neuer Ereignisse
In diesem Abschnitt werde ich das Problem beschreiben, auf das ich bei der Arbeit an der Weboberfläche der mobilen Sportwettenanwendung gestoßen bin. Die Anwendungs-API arbeitete über Web-Sockets, über die sie empfangen wurden: Aktualisieren von Quoten, Hinzufügen und Entfernen neuer Arten von Wetten, Benachrichtigungen über den Beginn oder das Ende eines Spiels usw. - Insgesamt etwa dreihundert Ereignisse des Web-Sockets. Während des Spiels werden Wetten und Informationen ständig aktualisiert, manchmal 2-3 Mal pro Sekunde. Das Problem war also, dass die Benutzeroberfläche danach ohne Zwischenkontrolle aktualisiert wurde.
Wenn der Benutzer das Gebot von einem mobilen Gerät aus überwachte und gleichzeitig die Listen auf seinem Display aktualisiert wurden, verschwand das Gebot aus dem Sichtfeld, sodass der Benutzer erneut nach dem nachverfolgten Gebot suchen musste. Dieses Verhalten wurde für jedes Update wiederholt.

Die Lösung erforderte Unveränderlichkeit für die Objekte, die auf dem Bildschirm angezeigt wurden, aber gleichzeitig mussten sich die Wettkoeffizienten ändern, irrelevante Gebote wurden inaktiv und neue wurden erst hinzugefügt, nachdem der Benutzer den Bildschirm gescrollt hatte. Veraltete Optionen wurden nicht im Backend gespeichert, daher mussten solche Zeilen gespeichert und mit dem Flag "Gelöscht" gekennzeichnet werden, für das ein Zwischendatenspeicher zwischen dem Web-Socket und dem Abonnement erstellt wurde, der die Kontrolle über die Änderungen sicherstellte.
In dem neuen Dienst werden wir auch eine Ersatzschicht erstellen und dieses Mal
Dexie.js verwenden - einen Wrapper über die IndexedDB-API, aber jede andere virtuelle oder Browser-Datenbank wird dies tun. Redux ist akzeptabel.
Ignorieren Sie beliebige Ereignisse und brechen Sie das Ignorieren ab
In derselben Firma gibt es häufig mehrere Projekte desselben Typs gleichzeitig: Mobil- und Webversionen, Versionen mit unterschiedlichen Einstellungen für unterschiedliche Benutzergruppen, erweiterte und abgeschnittene Versionen derselben Anwendung.
Oft verwenden alle eine einzige Codebasis, so dass Sie manchmal unnötige Ereignisse zur Laufzeit oder während der DI deaktivieren müssen, ohne das Abonnement zu löschen und erneut zu aktivieren, d. H. Ignorieren Sie einige von ihnen, um unnötige Ereignisse nicht zu verarbeiten. Dies ist eine einfache, aber nützliche Funktion, die dem Pub / Sub-Bus mehr Flexibilität verleiht.
Beginnen wir mit einer Beschreibung der Schnittstellen:
export interface IWebsocketService { // addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { // DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { // Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { // [topic: string]: ITopic<any>; } export interface IBuffer { // ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { // id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; // callMessage
Wir werden Subject erben, um einen Garbage Collector zu erstellen:
export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, // private topic: string, // private id: string // id ) { super(); } public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect();
Im Gegensatz zur vorherigen Implementierung ist websocket.events.ts Teil des Web-Socket-Moduls
export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } };
Erstellen Sie zur Konfiguration beim Anschließen des Moduls die Datei websocket.config:
import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket');
Erstellen Sie ein Modell für Proxy:
import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { // Dexie typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); // this.version(1).stores({ // messages: '++id,text' }); } }
Als einfacher Modellparser ist es unter realen Bedingungen besser, ihn in mehrere Dateien aufzuteilen:
export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /* */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /* */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* , */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); // IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); // } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); // } } else { console.log(`[${Date()}] Buffer is "undefined"`); } };
WebsocketModule:
@NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } }
Beginnen wir mit der Erstellung eines Dienstes:
private listeners: IListeners; // private uniqueId: number; // id private websocket: ReconnectingWebSocket; // constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; // this.connect(); } ngOnDestroy() { this.websocket.close(); // }
Verbindungsmethode:
private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, // , maxRetries: 10, // , ...this.wsConfig.options }; // this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { // console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { // console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { // console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { // this.onMessage(event); }); setInterval(() => { // this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); }
Wenn Sie den Garbage Collector duplizieren, wird nach Zeitüberschreitung nach Abonnements gesucht:
private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key];
Wir prüfen, welches Abonnement die Veranstaltung senden soll:
private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/');
Helmveranstaltung im Betreff:
private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { // subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } }
Pub / Unterthema erstellen:
private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; // id const hash = sha256.hex(key); // SHA256- id if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); }
Abonnieren einer oder mehrerer Veranstaltungen:
public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { // const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } }
Alles hier ist absichtlich vereinfacht, kann aber wie beim Server in binäre Entitäten konvertiert werden. Senden von Befehlen an den Server:
public sendMessage(event: string, data: any = {}): void { // , if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } }
Hinzufügen von Ereignissen zur Ignorierliste zur Laufzeit:
public runtimeIgnore(topics: string[]): void { if (topics && topics.length) {
Ereignisse aus der Ignorierliste löschen:
public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => {
Wir verbinden das Modul der Web-Sockets:
@NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, // "ws://mywebsocketurl" // ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, // options: { connectionTimeout: 1000, // maxRetries: 10 // } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { }
Wir verwenden in den Komponenten:
@Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] });
Der Dienst ist einsatzbereit.
Obwohl das Beispiel aus dem Artikel nicht für jedes Projekt eine universelle Lösung darstellt, zeigt es einen der Ansätze für die Arbeit mit Web-Sockets in großen und komplexen Anwendungen. Sie können es in Betrieb nehmen und je nach aktuellen Aufgaben ändern.
Die Vollversion des Dienstes finden Sie auf
GitHub .
Bei allen Fragen können Sie sich in den Kommentaren
an mich per Telegramm oder auf
den dortigen Angular-Kanal wenden.