
在
上一篇文章中,我们讨论了Angular中针对Web套接字的通用解决方案,其中我们构建了具有重新连接和服务的总线,可用于基于WebSocketSubject的组件。 这种实现适用于大多数简单情况,例如,接收和发送聊天消息等,但是在您需要构建更灵活和可控制的内容的情况下,其功能可能不足。 在本文中,我将介绍使用Web套接字时的一些功能,并讨论您自己可能遇到的需求。
通常,在人流较大的大型项目中,前端要面对在其他情况下在后端更常见的任务。 在服务器资源紧缩的情况下,部分问题会迁移到前端区域,因此,该项目具有最大的可扩展性和可控制性。
这是本文将介绍的Web套接字客户端的基本要求的列表:
- 自动“智能”重新连接;
- 调试模式;
- 基于RxJ的事件订阅系统;
- 接收和解析二进制数据;
- 在模型上投影(映射)收到的信息;
- 随着新事件的到来,控制模型的变化;
- 忽略任意事件并取消忽略。
请更详细地考虑每个项目。
重新连接/卸袋
我在上一篇文章中写过关于重新连接的知识,所以我只引用部分内容:
当使用Web套接字时,重新连接或重新连接到服务器的组织是至关重要的因素,因为 网络中断,服务器崩溃或其他导致连接中断的错误可能导致应用程序崩溃。
重要的是要注意,重新连接尝试不应太频繁且不应无限期地继续,例如 此行为可以挂起客户端。
Web套接字本身不知道断开连接后如何重新连接。 因此,如果服务器重新启动或服务器崩溃,或者用户重新连接了Internet,则要继续工作,还需要重新连接Web套接字。
在本文中,为了进行重新连接和调试,我们将使用
Reconnecting WebSocket ,其中包含必要的功能和其他选项,例如在重新连接之间更改Web套接字的URL,选择任意WebSocket构造函数等。 其他替代方式也是合适的。 重新连接上一篇文章不适合,因为 它是在WebSocketSubject下编写的,这一次不适用。
RxJs事件订阅系统
要在组件中使用Web套接字,您需要订阅事件,并在必要时取消订阅它们。 为此,请使用流行的
Pub / Sub设计模式。
“发布者-订阅者(eng。Publisher-subscriber或eng。Pub / sub)-用于传输消息的行为设计模板,其中消息的发件人(称为发布者(eng。Publishers))没有直接绑定到程序代码上,而是将消息发送给订阅者(eng。Subscriber) ) 而是将消息分为几类,并且不包含有关其订户的信息(如果有)。 同样,订户处理一类或多类消息,这些消息是从特定发布者那里提取的。”
订户不是直接与发布者联系,而是通过中间总线-网站服务。 还应该可以订阅具有相同返回类型的多个事件。 每个订阅都创建自己的主题,该主题添加到侦听器对象中,使您可以将Web套接字事件寻址到必要的订阅。 在使用RxJs Subject时,取消订阅会有一些困难,因此,我们将创建一个简单的垃圾回收器,当没有观察者的侦听器对象时,它将从侦听器对象中删除这些主题。
接收和解析二进制数据
WebSocket支持二进制数据,文件或流的传输,这通常在大型项目中使用。 看起来像这样:
0x80,<长度-一个或多个字节>,<消息正文>
为了不限制传输消息的长度,同时又不浪费字节,协议开发人员使用了以下算法。 长度指示中的每个字节都被单独考虑:最高字节指示它是最后一个字节(0)还是后面的其他字节(1),而低7位包含发送的数据。 因此,当二进制数据帧0x80的符号出现时,则获取下一个字节并将其存储在单独的“存钱罐”中。 然后,下一个字节(如果设置了最高有效位)也将传送到“存钱罐”,依此类推,直到遇到最高有效位为零的字节。 该字节是长度指示器中的最后一个字节,并且也添加到“存钱罐”中。 现在,从存钱罐中的字节中删除高位,然后合并其余部分。 这将是消息正文的长度-7位数字,没有最高有效位。
前端解析和二进制流机制很复杂,并且与模型上的数据映射相关联。 可以单独撰写一篇文章。 这次,我们将分析一个简单的选项,如果对该主题感兴趣的话,将为下面的出版物提供困难的案例。
在模型上投影(映射)收到的信息
不管接收到的传输类型如何,都需要安全地阅读和修改。 关于如何更好地做到这一点尚无共识,我坚持
使用数据模型的理论,因为我认为它对于以OOP风格进行编程是合乎逻辑且可靠的。
“数据模型是对象,运算符和其他元素的抽象,自给自足的逻辑定义,它们共同构成了用户与之交互的抽象数据访问机器。 “这些对象使您可以对数据结构和运算符进行建模-数据的行为。”
各种不把对象归类为行为,结构等的对象的流行轮胎都会造成混乱,控制得不好,有时会长满一些不常见的东西。 例如,狗类应该在任何条件下描述狗。 如果将狗视为一组区域:尾巴,颜色,脸部等,则该狗可能会长出额外的爪子,并且将出现另一只狗而不是头部。

随着新事件的到来,控制模型的变化
在本段中,我将描述在体育博彩移动应用程序的Web界面上工作时遇到的问题。 应用程序API通过接收的Web套接字工作:更新赔率,添加和删除新类型的投注,有关比赛开始或结束的通知等。 -总共约有300个Web套接字事件。 在比赛期间,赌注和信息会不断更新,有时每秒更新2-3次,因此问题在于,在更新之后,无需中间控制即可更新界面。
当用户从移动设备监视出价时,同时在其显示器上更新列表时,出价从视野中消失了,因此用户必须再次搜索跟踪的出价。 每次更新都会重复此行为。

该解决方案要求屏幕上显示的对象具有不变性,但与此同时,投注系数必须更改,不相关的出价变为无效,并且直到用户滚动屏幕后才添加新的出价。 过时的选项未存储在后端,因此必须记住这些行,并用“ deleted”标志标记,为此在Web套接字和订阅之间创建了中间数据存储,以确保对更改进行控制。
在新服务中,我们还将创建一个替代层,这一次我们将使用
Dexie.js -IndexedDB API的包装器,但任何其他虚拟数据库或浏览器数据库都可以。 Redux是可以接受的。
忽略任意事件并取消忽略
在同一家公司中,经常同时存在多个相同类型的项目:移动版本和Web版本,为不同用户组设置不同的版本,同一应用程序的高级版本和截短版本。
通常它们都使用单个代码库,因此有时您需要在运行时或DI期间关闭不必要的事件,而又不删除订阅并重新打开它,即 忽略其中的一些,以免处理不必要的事件。 这是一个简单但有用的功能,可为Pub / Sub总线增加灵活性。
让我们从接口的描述开始:
export interface IWebsocketService { // addEventListener<T>(topics: string[], id?: number): Observable<T>; runtimeIgnore(topics: string[]): void; runtimeRemoveIgnore(topics: string[]): void; sendMessage(event: string, data: any): void; } export interface WebSocketConfig { // DI url: string; ignore?: string[]; garbageCollectInterval?: number; options?: Options; } export interface ITopic<T> { // Pub/Sub [hash: string]: MessageSubject<T>; } export interface IListeners { // [topic: string]: ITopic<any>; } export interface IBuffer { // ws.message type: string; data: number[]; } export interface IWsMessage { // ws.message event: string; buffer: IBuffer; } export interface IMessage { // id: number; text: string; } export type ITopicDataType = IMessage[] | number | string[]; // callMessage
我们将继承Subject创建一个垃圾回收器:
export class MessageSubject<T> extends Subject<T> { constructor( private listeners: IListeners, // private topic: string, // private id: string // id ) { super(); } public next(value?: T): void { if (this.closed) { throw new ObjectUnsubscribedError(); } if (!this.isStopped) { const {observers} = this; const len = observers.length; const copy = observers.slice(); for (let i = 0; i < len; i++) { copy[i].next(value); } if (!len) { this.garbageCollect();
与之前的实现不同,websocket.events.ts将成为Web套接字模块的一部分
export const WS_API = { EVENTS: { MESSAGES: 'messages', COUNTER: 'counter', UPDATE_TEXTS: 'update-texts' }, COMMANDS: { SEND_TEXT: 'set-text', REMOVE_TEXT: 'remove-text' } };
要在连接模块时进行配置,请创建websocket.config:
import { InjectionToken } from '@angular/core'; export const config: InjectionToken<string> = new InjectionToken('websocket');
为代理创建模型:
import Dexie from 'dexie'; import { IMessage, IWsMessage } from './websocket.interfaces'; import { WS_API } from './websocket.events'; class MessagesDatabase extends Dexie { // Dexie typescript public messages!: Dexie.Table<IMessage, number>; // id is number in this case constructor() { super('MessagesDatabase'); // this.version(1).stores({ // messages: '++id,text' }); } }
一个简单的模型解析器,在实际情况下最好将其分为几个文件:
export const modelParser = (message: IWsMessage) => { if (message && message.buffer) { /* */ const encodeUint8Array = String.fromCharCode .apply(String, new Uint8Array(message.buffer.data)); const parseData = JSON.parse(encodeUint8Array); let MessagesDB: MessagesDatabase; // IndexedDB if (message.event === WS_API.EVENTS.MESSAGES) { // IMessage[] if (!MessagesDB) { MessagesDB = new MessagesDatabase(); } parseData.forEach((messageData: IMessage) => { /* */ MessagesDB.transaction('rw', MessagesDB.messages, async () => { /* , */ if ((await MessagesDB.messages .where({id: messageData.id}).count()) === 0) { const id = await MessagesDB.messages .add({id: messageData.id, text: messageData.text}); console.log(`Addded message with id ${id}`); } }).catch(e => { console.error(e.stack || e); }); }); return MessagesDB.messages.toArray(); // IMessage[] } if (message.event === WS_API.EVENTS.COUNTER) { // counter return new Promise(r => r(parseData)); // } if (message.event === WS_API.EVENTS.UPDATE_TEXTS) { // text const texts = []; parseData.forEach((textData: string) => { texts.push(textData); }); return new Promise(r => r(texts)); // } } else { console.log(`[${Date()}] Buffer is "undefined"`); } };
WebsocketModule:
@NgModule({ imports: [ CommonModule ] }) export class WebsocketModule { public static config(wsConfig: WebSocketConfig): ModuleWithProviders { return { ngModule: WebsocketModule, providers: [{provide: config, useValue: wsConfig}] }; } }
让我们开始创建服务:
private listeners: IListeners; // private uniqueId: number; // id private websocket: ReconnectingWebSocket; // constructor(@Inject(config) private wsConfig: WebSocketConfig) { this.uniqueId = -1; this.listeners = {}; this.wsConfig.ignore = wsConfig.ignore ? wsConfig.ignore : []; // this.connect(); } ngOnDestroy() { this.websocket.close(); // }
连接方式:
private connect(): void { // ReconnectingWebSocket config const options = { connectionTimeout: 1000, // , maxRetries: 10, // , ...this.wsConfig.options }; // this.websocket = new ReconnectingWebSocket(this.wsConfig.url, [], options); this.websocket.addEventListener('open', (event: Event) => { // console.log(`[${Date()}] WebSocket connected!`); }); this.websocket.addEventListener('close', (event: CloseEvent) => { // console.log(`[${Date()}] WebSocket close!`); }); this.websocket.addEventListener('error', (event: ErrorEvent) => { // console.error(`[${Date()}] WebSocket error!`); }); this.websocket.addEventListener('message', (event: MessageEvent) => { // this.onMessage(event); }); setInterval(() => { // this.garbageCollect(); }, (this.wsConfig.garbageCollectInterval || 10000)); }
复制垃圾收集器,将按超时检查订阅:
private garbageCollect(): void { for (const event in this.listeners) { if (this.listeners.hasOwnProperty(event)) { const topic = this.listeners[event]; for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key];
我们查看发送事件的订阅:
private onMessage(event: MessageEvent): void { const message = JSON.parse(event.data); for (const name in this.listeners) { if (this.listeners.hasOwnProperty(name) && !this.wsConfig.ignore.includes(name)) { const topic = this.listeners[name]; const keys = name.split('/');
主题中的头盔事件:
private callMessage<T>(topic: ITopic<T>, data: T): void { for (const key in topic) { if (topic.hasOwnProperty(key)) { const subject = topic[key]; if (subject) { // subject.next(data); } else { console.log(`[${Date()}] Topic Subject is "undefined"`); } } } }
创建发布/子主题:
private addTopic<T>(topic: string, id?: number): MessageSubject<T> { const token = (++this.uniqueId).toString(); const key = id ? token + id : token; // id const hash = sha256.hex(key); // SHA256- id if (!this.listeners[topic]) { this.listeners[topic] = <any>{}; } return this.listeners[topic][hash] = new MessageSubject<T>(this.listeners, topic, hash); }
订阅一个或多个事件:
public addEventListener<T>(topics: string | string[], id?: number): Observable<T> { if (topics) { // const topicsKey = typeof topics === 'string' ? topics : topics.join('/'); return this.addTopic<T>(topicsKey, id).asObservable(); } else { console.log(`[${Date()}] Can't add EventListener. Type of event is "undefined".`); } }
此处的所有内容都经过有意简化,但可以转换为二进制实体,就像服务器一样。 将命令发送到服务器:
public sendMessage(event: string, data: any = {}): void { // , if (event && this.websocket.readyState === 1) { this.websocket.send(JSON.stringify({event, data})); } else { console.log('Send error!'); } }
在运行时将事件添加到ignorlist:
public runtimeIgnore(topics: string[]): void { if (topics && topics.length) {
从忽略列表中删除事件:
public runtimeRemoveIgnore(topics: string[]): void { if (topics && topics.length) { topics.forEach((topic: string) => {
我们连接Web套接字模块:
@NgModule({ declarations: [ AppComponent ], imports: [ BrowserModule, ReactiveFormsModule, WebsocketModule.config({ url: environment.ws, // "ws://mywebsocketurl" // ignore: [WS_API.EVENTS.ANY_1, WS_API.EVENTS.ANY_2], garbageCollectInterval: 60 * 1000, // options: { connectionTimeout: 1000, // maxRetries: 10 // } }) ], providers: [], bootstrap: [AppComponent] }) export class AppModule { }
我们在组件中使用:
@Component({ selector: 'app-root', templateUrl: './app.component.html', styleUrls: ['./app.component.css'] }) export class AppComponent implements OnInit, OnDestroy { private messages$: Observable<IMessage[]>; private messagesMulti$: Observable<IMessage[]>; private counter$: Observable<number>; private texts$: Observable<string[]>; public form: FormGroup; constructor( private fb: FormBuilder, private wsService: WebsocketService) { } ngOnInit() { this.form = this.fb.group({ text: [null, [ Validators.required ]] });
该服务可以使用了。
尽管本文中的示例不是每个项目的通用解决方案,但它演示了在大型和复杂应用程序中使用Web套接字的一种方法。 您可以启用它并根据当前任务对其进行修改。
该服务的完整版本可以在
GitHub上
找到 。
对于所有问题,您都可以在评论中联系,无论是通过Telegram还是在同一个地方
的Angular频道上,可以与
我联系。