你好 在本文中,我将在欧洲四处奔跑,也就是说,我将告诉您反应式编程的含义,介绍演员,反应式流程,最后,使用反应式流程,我们将认识到鼠标手势,就像在旧的Opera及其精神继承者Vivaldi中一样。
目的是介绍反应式编程的基本概念,并表明并非所有事物看上去乍一看都那么复杂和令人恐惧。
来源什么是反应式编程?
为了回答这个问题,我们转到该
站点 。 它有一幅美丽的图画,显示了反应性应用程序必须满足的四个主要标准。

该应用程序应该快速,容错并且扩展性好。
看起来“我们对所有人都有好有坏”,对吗?
这些词是什么意思:
- 反应性
该应用程序应在半秒钟内为用户提供结果。 这还包括快速失败的原理-也就是说,当出现问题时,最好向用户返回一条错误消息,例如“对不起,有问题。 请稍后重试,以免天气在海边等待。 如果操作很长,我们会向用户显示进度条。 如果时间太长,“您的请求将在2042年3月18日临时得到满足。 我们将通过邮件向您发送通知。” - 可伸缩性是在负载下提供响应能力的一种方法。 想象一下相对成功的服务的生命周期:
- 启动-请求流很小,服务在具有一个核心的虚拟机上运行。
- 请求流增加-将内核添加到虚拟机,并在多个线程中处理请求。
- 更大的负载-我们连接了批处理-对数据库和硬盘驱动器的请求进行了分组。
- 更大的负载-您需要增加服务器并在集群中提供工作。
理想情况下,系统本身应根据负载扩大或缩小。
- 容错能力
我们接受我们生活在一个不完美的世界中,一切都会发生。 万一我们的系统出现问题,我们必须提供错误处理和恢复方法 - 最后,我们受邀使用其系统基于消息驱动的消息传递的系统来实现所有这些目标
在继续之前,我想详细介绍事件驱动系统与消息驱动系统的不同之处。
事件驱动:- 事件-系统报告已达到特定状态。
- 活动的订阅者可能很多。
- 事件链通常很短,事件处理程序(在物理上和在代码上)都接近源。
- 事件源及其处理程序通常具有相同的状态(从物理上讲,它们使用同一块RAM进行信息交换)。
与事件驱动相反,在消息驱动系统中:- 每封邮件只有一个收件人。
- 消息是不可变的:您无法更改接收到的消息中的任何内容,以便发件人知道并可以读取信息。
- 系统元素响应(或不响应)接收消息,并且可以将消息发送到系统的其他元素。
所有这些为我们提供了
演员模型
发展的里程碑:
- 在1973年的科学论文中首次提到了参与者-卡尔·休伊特(Carl Hewitt),彼得·毕晓普(Peter Bishop)和理查德·史蒂格(Richard Steiger),“人工智能的通用模块化ACTOR形式主义”,
- 1986年-Erlang出现。 Ericson需要一种用于电信设备的语言,该语言将提供容错能力和无错传播。 在本文的上下文中,其主要功能是:
- 一切都是一个过程
- 消息是唯一的通信方式(Erlang是一种功能语言,并且其中的消息不可更改)。
- ..
- 2004年-Scala语言的第一个版本。 其特点:
- 由JVM驱动,
- 功能性
- 对于多线程,已选择参与者模型。
- 2009年-演员的实施分配在一个单独的图书馆中-Akka
- 2014年-Akka.net-已移植到.Net。
演员可以做什么?
角色是相同的对象,但是:
- 与普通对象不同,演员无法调用彼此的方法。
- 参与者只能通过不变的消息来传递信息。
- 收到消息后,演员可以
- 创建新的参与者(他们将在层次结构中处于较低的位置),
- 发送消息给其他演员,
- 在层次结构和您自己中停止下面的参与者。
让我们来看一个例子。

演员A要向演员B发送消息。他所拥有的只是ActorRef(某个地址)。 演员B可以在任何地方。
Actor A通过系统(ActorSystem)发送字母B。 系统将字母放入演员B的信箱中,并“唤醒”演员B。演员B从信箱中取出这封信并做某事。
与在另一个对象上调用方法相比,它看起来不必要地复杂,但是如果您假设参与者是受过训练以对某些刺激做出反应的人,那么参与者的模型就非常适合现实世界。
想象一下父亲和儿子:

父亲给儿子SMSku发送“在房间里打扫房间”,并继续做自己的事情。 儿子读了SMSku并开始清洁。 同时,父亲在玩扑克。 儿子完成清洁并发送短信“完成”。 看起来很简单,对吧?
现在假设父亲和儿子不是演员,而是可以互相借鉴的普通物品。 父亲拉着儿子采取“打扫房间”的方法,紧随其后,直到儿子完成打扫并将控制权移交给父亲。 父亲此时不能玩扑克。 在这种情况下,演员模型变得越来越有吸引力。
现在让我们继续
Akka.NET
下面编写的所有内容对于JVM的原始Akka都是正确的,但是对我而言,C#比Java更紧密,因此我将以Akka.NET为例。
那么Akka有什么好处呢?
- 通过消息传递的多线程。 您再也不必为共享内存的经典多线程所遭受的各种锁,信号灯,互斥锁和其他魅力所困扰。
- 系统及其组件之间的透明通信。 无需担心复杂的网络代码-系统本身将找到消息的目的地并保证消息的传递(您可以在此插入有关UDP vs TCP的笑话)。
- 可以自动按比例放大或缩小的灵活体系结构。 例如,在负载下,系统可以增加其他群集节点并平均分配负载。
但是扩展的主题非常广泛,值得单独发表。 因此,我将仅详细介绍该功能,该功能将在所有项目中有用:
错误处理
角色具有层次结构-可以将其表示为树。 每个演员都有父母,可以有“孩子”。
Akka.NET文档版权所有2013-2018 Akka.NET项目您可以为每个演员设置监督策略-如果“孩子”出了问题该怎么办。 例如,“击败”遇到问题的演员,然后创建一个相同类型的新演员并将其委托给他。
例如,我在Akka.net CRUD上创建了一个应用程序,其中在角色上实现了“业务逻辑”层。 该项目的目的是确定参与者是否应在不可扩展的系统中使用-他们会改善生活还是增加痛苦。
Akka的内置错误处理如何帮助您:
- 一切都很好,应用程序正常工作,
- 仓库发生了什么事,现在结果只占5分之一
- 我将监督策略设置为“每秒尝试10次”,
- 该应用程序又可以工作了(尽管速度较慢),而且我有时间弄清楚是怎么回事。
有一种诱惑说:“来吧,我自己写这样的错误处理程序,为什么有些演员必须犯错?” 坦白地说,但前提是失败要点很少。
和一些代码。 这就是IoC容器中的actor系统初始化的样子:
public Container() { system = ActorSystem.Create("MySystem"); var echo = system.ActorOf<EchoActor>("Echo");
EchoActor是向发送方返回值的最简单的actor:
public class EchoActor : ReceiveActor { public EchoActor() { Receive<bool>(flag => { Sender.Tell(flag); }); } }
要将参与者与“常规”代码连接,请使用Ask命令:
public async Task<ActionResult> Index() { ViewBag.Type = typeof(Model); var res = await CrudActorRef.Ask<IEnumerable<Model>>(DataMessage.GetAll<Model>(), maxDelay); return View(res); }
合计
我可以和演员们窃笑,我可以说:
- 如果需要可伸缩性,请查看它们。
- 对于复杂的业务逻辑,最好不要使用它们,因为
- 奇怪的依赖注入。 要使用必要的依赖关系初始化角色,必须首先创建一个Props对象,然后将其提供给ActorSystem以创建所需类型的角色。 要使用IoC容器(例如Castle Windsor或Autofac)创建Props,有现成的包装器-DependencyResolvers。 但是我面对这样一个事实,即IoC容器试图控制依赖项的生存期,不久后系统悄然崩溃。
*也许,不是将依赖项注入到对象中,而是应将此依赖项作为子actor放置。 - 打字问题。 ActorRef对其所引用的actor类型一无所知。 也就是说,在编译时,不知道参与者是否可以处理这种类型的消息。
第2部分:喷射流
现在,让我们继续讨论一个更流行和有用的主题-射流。 如果您在工作过程中无法与演员见面,那么Rx流肯定会在前端和后端派上用场。 它们几乎以所有现代编程语言实现。 我将提供有关RxJ的示例,因为如今,即使后端程序员有时也必须使用JavaScript进行某些操作。
Rx流可用于所有流行的编程语言。由CC BY-NC 4.0许可的Andre Staltz撰写的 “ 您缺少的反应式编程简介 ”为了解释什么是喷射流,我将从pull和push集合开始。
| 单返回值 | 多个返回值 |
---|
拉力 同步的 互动式 | Ť | IEnumerable <T> |
推入 异步的 反应性 | 任务<T> | IObservable <T> |
拉取集合是我们在编程中经常使用的集合。 最引人注目的示例是数组。
const arr = [1,2,3,4,5];
它已经有数据,他本人不会更改此数据,但是他可以根据要求提供。
arr.forEach(console.log);
另外,在对数据进行处理之前,您可以进行某种方式的处理。
arr.map(i => i+1).map(I => “my number is ”+i).forEach(console.log);
现在,让我们假设最初在集合中没有数据,但是它肯定会通知您它们已经出现(推送)。 同时,我们仍然可以对该集合进行必要的转换。
例如:
source.map(i => i+1).map(I => “my number is ”+i).forEach(console.log);
当源中出现诸如1之类的值时,console.log将输出“我的号码为1”。
运作方式:
出现一个新实体-主题(或可观察):
const observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); setTimeout(() => { observer.next(4); observer.complete(); }, 1000); });
这是一个推送集合,它将发出有关其状态更改的通知。
在这种情况下,数字1、2和3将立即出现在其中,在第二个4中出现,然后集合将“结束”。 这是一种特殊的事件。
第二个实体是观察者。 他可以订阅主题事件,并对接收到的数据进行处理。 例如:
observable.subscribe(x => console.log(x)); observable.subscribe({ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), }); observable .map(x => 'This is ' + x) .subscribe(x => console.log(x));
可以看出,一个主题可以有许多订阅者。
看起来很简单,但尚不清楚为什么有必要这样做。 在使用无功流时,我将给出另外两个需要了解的定义,然后在实践中说明它们如何工作以及在什么情况下发挥其全部潜能。
冷观测
- 订阅事件时通知事件。
- 不管订阅时间如何,整个数据流都会再次发送给每个订阅者。
- 为每个订户复制数据。
这是什么意思:假设公司(主题)决定安排礼物的分发。 每个员工(观察员)上班并收到礼物的副本。 没有人仍然被剥夺。
热观测
- 他们尝试通知事件,而不考虑订户的存在。 如果在事件发生时没有订阅者,则数据将丢失。
示例:早上,将员工的热饼带到公司。 当它们被带进来时,所有的百灵都会闻到气味,然后把馅饼做成早餐。 但是后来出现的猫头鹰不再有馅饼了。
在什么情况下使用喷射流?
当有数据流随时间分布时。 例如,用户输入。 或任何服务的日志。 在一个项目中,我看到了一个自制的记录器,该记录器在N秒内收集了事件,然后同时记录了整个文件包。 电池代码占据了该页面。 如果使用Rx流,那么它将更加简单:
“ RxJs参考/可观察的 ,根据CC BY 4.0许可的文档 。
(有很多示例和图片说明了无功流的各种操作的作用) source.bufferTime(2000).subsribe(doThings);
最后,是一个使用示例。
使用Rx流识别鼠标手势
在旧的Opera或其精神继任者Vivaldi中,有一个使用鼠标手势的浏览器控件。
也就是说,您需要识别鼠标的上/下,右/左移动及其组合。 它可以在没有Rx流的情况下编写,但是代码将很复杂且难以维护。
这是Rx流的外观:
我将从头开始-我将按原始顺序设置要搜索的数据和格式:
这些是单位向量及其组合。
接下来,您需要将鼠标事件转换为Rx流。 所有Rx库都具有用于将标准事件转换为Observable的内置工具。
const mouseMoves = Rx.Observable.fromEvent(canvas, 'mousemove'), mouseDowns = Rx.Observable.fromEvent(canvas, 'mousedown'), mouseUps = Rx.Observable.fromEvent(canvas, 'mouseup');
接下来,我将鼠标的坐标按2分组并找到它们的差,得到鼠标的偏移量。
const mouseDiffs = mouseMoves .map(getOffset) .pairwise() .map(pair => { return { x: pair[1].x-pair[0].x, y: pair[1].y-pair[0].y } });
并使用“ mousedown”和“ mouseup”事件对这些运动进行分组。
const mouseGestures = mouseDiffs .bufferToggle(mouseDowns, x => mouseUps) .map(concat);
连拍功能会切掉太短的运动,并将方向大致对齐的运动分组。
function concat(values) {
如果X轴或Y轴上的运动太短,则将其重置为零。 然后从获得的位移坐标中仅保留符号。 因此,获得了我们正在寻找的单位向量。
const normalizedMouseGestures = mouseGestures.map(arr => arr.map(v => { const dist = Math.hypot(vx, vy);
结果:
gestures.map(gesture => normalizedMouseGestures.mergeMap( moves => Rx.Observable.from(moves) .sequenceEqual(gesture.sequence, comparer) ).filter(x => x).mapTo(gesture.name) ).mergeAll().subscribe(gestureName => actions[gestureName]());
使用sequenceEqual,您可以将接收到的动作与原始动作进行比较,如果匹配,则执行特定操作。
→
您可以在这里玩手势请注意,除了手势识别,在HTML画布上还绘制了鼠标的初始移动和标准化移动。 代码的可读性不会因此受到影响。
由此带来的另一个优势是-可以轻松地补充和扩展借助Rx流编写的功能。
总结
- 带有Rx流的库可用于几乎所有编程语言。
- 当事件流随时间扩展时(例如,用户输入),应使用Rx流。
- 使用Rx流编写的功能可以轻松补充和扩展。
- 我没有发现任何重大缺陷。