Angular中的WebSockets。 第2部分。产品解决方案

图片

上一篇文章中,我们讨论了Angular中针对Web套接字的通用解决方案,其中我们构建了具有重新连接和服务的总线,可用于基于WebSocketSubject的组件。 这种实现适用于大多数简单情况,例如,接收和发送聊天消息等,但是在您需要构建更灵活和可控制的内容的情况下,其功能可能不足。 在本文中,我将介绍使用Web套接字时的一些功能,并讨论您自己可能遇到的需求。

通常,在人流较大的大型项目中,前端要面对在其他情况下在后端更常见的任务。 在服务器资源紧缩的情况下,部分问题会迁移到前端区域,因此,该项目具有最大的可扩展性和可控制性。

这是本文将介绍的Web套接字客户端的基本要求的列表:

  • 自动“智能”重新连接;
  • 调试模式;
  • 基于RxJ的事件订阅系统;
  • 接收和解析二进制数据;
  • 在模型上投影(映射)收到的信息;
  • 随着新事件的到来,控制模型的变化;
  • 忽略任意事件并取消忽略。

请更详细地考虑每个项目。

重新连接/卸袋


我在上一篇文章中写过关于重新连接的知识,所以我只引用部分内容:

当使用Web套接字时,重新连接或重新连接到服务器的组织是至关重要的因素,因为 网络中断,服务器崩溃或其他导致连接中断的错误可能导致应用程序崩溃。
重要的是要注意,重新连接尝试不应太频繁且不应无限期地继续,例如 此行为可以挂起客户端。

Web套接字本身不知道断开连接后如何重新连接。 因此,如果服务器重新启动或服务器崩溃,或者用户重新连接了Internet,则要继续工作,还需要重新连接Web套接字。

在本文中,为了进行重新连接和调试,我们将使用Reconnecting WebSocket ,其中包含必要的功能和其他选项,例如在重新连接之间更改Web套接字的URL,选择任意WebSocket构造函数等。 其他替代方式也是合适的。 重新连接上一篇文章不适合,因为 它是在WebSocketSubject下编写的,这一次不适用。

RxJs事件订阅系统


要在组件中使用Web套接字,您需要订阅事件,并在必要时取消订阅它们。 为此,请使用流行的Pub / Sub设计模式。
“发布者-订阅者(eng。Publisher-subscriber或eng。Pub / sub)-用于传输消息的行为设计模板,其中消息的发件人(称为发布者(eng。Publishers))没有直接绑定到程序代码上,而是将消息发送给订阅者(eng。Subscriber) ) 而是将消息分为几类,并且不包含有关其订户的信息(如果有)。 同样,订户处理一类或多类消息,这些消息是从特定发布者那里提取的。”

订户不是直接与发布者联系,而是通过中间总线-网站服务。 还应该可以订阅具有相同返回类型的多个事件。 每个订阅都创建自己的主题,该主题添加到侦听器对象中,使您可以将Web套接字事件寻址到必要的订阅。 在使用RxJs Subject时,取消订阅会有一些困难,因此,我们将创建一个简单的垃圾回收器,当没有观察者的侦听器对象时,它将从侦听器对象中删除这些主题。

接收和解析二进制数据


WebSocket支持二进制数据,文件或流的传输,这通常在大型项目中使用。 看起来像这样:
0x80,<长度-一个或多个字节>,<消息正文>

为了不限制传输消息的长度,同时又不浪费字节,协议开发人员使用了以下算法。 长度指示中的每个字节都被单独考虑:最高字节指示它是最后一个字节(0)还是后面的其他字节(1),而低7位包含发送的数据。 因此,当二进制数据帧0x80的符号出现时,则获取下一个字节并将其存储在单独的“存钱罐”中。 然后,下一个字节(如果设置了最高有效位)也将传送到“存钱罐”,依此类推,直到遇到最高有效位为零的字节。 该字节是长度指示器中的最后一个字节,并且也添加到“存钱罐”中。 现在,从存钱罐中的字节中删除高位,然后合并其余部分。 这将是消息正文的长度-7位数字,没有最高有效位。

前端解析和二进制流机制很复杂,并且与模型上的数据映射相关联。 可以单独撰写一篇文章。 这次,我们将分析一个简单的选项,如果对该主题感兴趣的话,将为下面的出版物提供困难的案例。

在模型上投影(映射)收到的信息


不管接收到的传输类型如何,都需要安全地阅读和修改。 关于如何更好地做到这一点尚无共识,我坚持使用数据模型的理论,因为我认为它对于以OOP风格进行编程是合乎逻辑且可靠的。
“数据模型是对象,运算符和其他元素的抽象,自给自足的逻辑定义,它们共同构成了用户与之交互的抽象数据访问机器。 “这些对象使您可以对数据结构和运算符进行建模-数据的行为。”

各种不把对象归类为行为,结构等的对象的流行轮胎都会造成混乱,控制得不好,有时会长满一些不常见的东西。 例如,狗类应该在任何条件下描述狗。 如果将狗视为一组区域:尾巴,颜色,脸部等,则该狗可能会长出额外的爪子,并且将出现另一只狗而不是头部。

图片

随着新事件的到来,控制模型的变化


在本段中,我将描述在体育博彩移动应用程序的Web界面上工作时遇到的问题。 应用程序API通过接收的Web套接字工作:更新赔率,添加和删除新类型的投注,有关比赛开始或结束的通知等。 -总共约有300个Web套接字事件。 在比赛期间,赌注和信息会不断更新,有时每秒更新2-3次,因此问题在于,在更新之后,无需中间控制即可更新界面。

当用户从移动设备监视出价时,同时在其显示器上更新列表时,出价从视野中消失了,因此用户必须再次搜索跟踪的出价。 每次更新都会重复此行为。

图片

该解决方案要求屏幕上显示的对象具有不变性,但与此同时,投注系数必须更改,不相关的出价变为无效,并且直到用户滚动屏幕后才添加新的出价。 过时的选项未存储在后端,因此必须记住这些行,并用“ deleted”标志标记,为此在Web套接字和订阅之间创建了中间数据存储,以确保对更改进行控制。

在新服务中,我们还将创建一个替代层,这一次我们将使用Dexie.js -IndexedDB API的包装器,但任何其他虚拟数据库或浏览器数据库都可以。 Redux是可以接受的。

忽略任意事件并取消忽略


在同一家公司中,经常同时存在多个相同类型的项目:移动版本和Web版本,为不同用户组设置不同的版本,同一应用程序的高级版本和截短版本。

通常它们都使用单个代码库,因此有时您需要在运行时或DI期间关闭不必要的事件,而又不删除订阅并重新打开它,即 忽略其中的一些,以免处理不必要的事件。 这是一个简单但有用的功能,可为Pub / Sub总线增加灵活性。

让我们从接口的描述开始:

export interface IWebsocketService { //    addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { //   DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { //   Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { //    [topic: string]: ITopic<any>; } export interface IBuffer { //    ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { //   id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; //  callMessage   

我们将继承Subject创建一个垃圾回收器:

 export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, //    private topic: string, //   private id: string // id  ) { super(); } /* *   next, *       , *   garbageCollect */ public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect(); //   } } } /* * garbage collector * */ private garbageCollect(): void { delete this.listeners[this.topic][this.id]; //  Subject if (!Object.keys(this.listeners[this.topic]).length) { //    delete this.listeners[this.topic]; } } } 

与之前的实现不同,websocket.events.ts将成为Web套接字模块的一部分

 export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } }; 

要在连接模块时进行配置,请创建websocket.config:

 import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket'); 

为代理创建模型:

 import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { //    Dexie  typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); //   this.version(1).stores({ //   messages: '++id,text' }); } } 

一个简单的模型解析器,在实际情况下最好将其分为几个文件:

 export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /*  */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /*   */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* ,    */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); //   IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); //    } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); //     } } else { console.log(`[${Date()}] Buffer is "undefined"`); } }; 

WebsocketModule:

 @NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } } 

让我们开始创建服务:

 private listeners: IListeners; //   private uniqueId: number; //   id  private websocket: ReconnectingWebSocket; //   constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; //  this.connect(); } ngOnDestroy() { this.websocket.close(); //     } 

连接方式:

 private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, //  ,    maxRetries: 10, //  ,    ...this.wsConfig.options }; //  this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { //   console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { //   console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { //   console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { //     this.onMessage(event); }); setInterval(() => { //    this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); } 

复制垃圾收集器,将按超时检查订阅:

 private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; //  Subject    if (!subject.observers.length) { delete topic[key]; } } }  ,   if (!Object.keys(topic).length) { delete this.listeners[event]; } } } } 

我们查看发送事件的订阅:

 private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/'); //      const isMessage = keys.includes(message.event); const model = modelParser(message); //     if (isMessage && typeof model !== 'undefined') { model.then((data: ITopicDataType) => { //   Subject this.callMessage<ITopicDataType>(topic, data); }); } } } } 

主题中的头盔事件:

 private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { //   subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } } 

创建发布/子主题:

 private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; //  id   const hash = sha256.hex(key); // SHA256-   id  if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); } 

订阅一个或多个事件:

 public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { //       const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } } 

此处的所有内容都经过有意简化,但可以转换为二进制实体,就像服务器一样。 将命令发送到服务器:

 public sendMessage(event: string, data: any = {}): void { //   ,      if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } } 

在运行时将事件添加到ignorlist:

 public runtimeIgnore(topics: string[]): void { if (topics && topics.length) { //    this.wsConfig.ignore.push(...topics); } } 

从忽略列表中删除事件:

 public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => { //      const topicIndex = this.wsConfig.ignore.findIndex(t => t === topic); if (topicIndex > -1) { //    this.wsConfig.ignore.splice(topicIndex, 1); } }); } } 

我们连接Web套接字模块:

 @NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, //  "ws://mywebsocketurl" //    ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, //    options: { connectionTimeout: 1000, //   maxRetries: 10 //   } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { } 

我们在组件中使用:

 @Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] }); // get messages this.messages$ = this.wsService .addEventListener<IMessage[]>(WS_API.EVENTS.MESSAGES); // get messages multi this.messagesMulti$ = this.wsService .addEventListener<IMessage[]>([ WS_API.EVENTS.MESSAGES, WS_API.EVENTS.MESSAGES_1 ]); // get counter this.counter$ = this.wsService .addEventListener<number>(WS_API.EVENTS.COUNTER); // get texts this.texts$ = this.wsService .addEventListener<string[]>(WS_API.EVENTS.UPDATE_TEXTS); } ngOnDestroy() { } public sendText(): void { if (this.form.valid) { this.wsService .sendMessage(WS_API.COMMANDS.SEND_TEXT, this.form.value.text); this.form.reset(); } } public removeText(index: number): void { this.wsService.sendMessage(WS_API.COMMANDS.REMOVE_TEXT, index); } } 

该服务可以使用了。



尽管本文中的示例不是每个项目的通用解决方案,但它演示了在大型和复杂应用程序中使用Web套接字的一种方法。 您可以启用它并根据当前任务对其进行修改。

该服务的完整版本可以在GitHub找到

对于所有问题,您都可以在评论中联系,无论是通过Telegram还是在同一个地方的Angular频道上,可以与联系。

Source: https://habr.com/ru/post/zh-CN419099/


All Articles