简而言之:
- 该证明已经在适用于Java的 C ++ , JS和PHP中实现。
- 比协程和Promise 更快 ,功能更多。
- 它不需要单独的软件堆栈。
- 与所有安全和调试工具成为朋友。
- 它适用于任何体系结构,不需要特殊的编译器标志。
回头看
在计算机诞生之初,只有一个控制流,但阻塞了输入输出。 然后铁中断添加到它。 现在,您可以有效地使用缓慢且不可预测的设备。
随着熨斗功能的增长和可用性的降低,必须同时执行多项任务,从而提供硬件支持。 因此,存在隔离的过程,其中中断以信号的形式从铁中抽象出来。
下一个演进阶段是多线程,它是在相同进程的基础上实现的,但是共享访问内存和其他资源。 这种方法有其局限性,而且切换到安全OS的开销也很大。
为了在流程甚至不同的机器之间进行通信,40年前就提出了Promise / Future抽象。
用户界面和当前可笑的10K客户端问题导致了事件循环,反应器和Proactor方法的鼎盛时期,它们比清晰,一致的业务逻辑更面向事件。
最后,我们介绍了现代协程(协程),本质上是在上述抽象之上对流程进行的仿真,具有相应的技术局限性和控制权的确定性转移。
为了传达事件,结果和异常,它们都回到了“承诺/未来”的相同概念。 一些办公室决定使用不同的名称-“任务”。
最后,他们将所有内容都隐藏在漂亮的async/await
包中,这取决于技术要求编译器或翻译器支持。
当前异步业务逻辑情况的问题
仅考虑用async/await
装饰的协程和Promise,作为 旧方法中问题的存在证实了进化过程本身。
这两个术语不相同。 例如,在ECMAScript中,没有协程,但是使用Promise
减轻语法的负担,而Promise
只能使用回调地狱来组织工作。 实际上,像V8这样的脚本引擎走得更远,并对纯async/await
函数和调用进行了特殊的优化。
专家co_async/co_await
并没有包含在C ++ 17中,但是来自软件巨头协程的压力可以完全以它们的形式出现在标准中。 同时,传统上公认的解决方案是Boost.Context , Boost.Fiber和Boost.Coroutine2 。
在Java中,在语言级别上仍然没有async/await
,但是有诸如EA Async之类的解决方案,像Boost.Context一样,需要针对每个版本的JVM和代码字节进行自定义。
Go有其自己的协程,但是如果您仔细查看开放项目的文章和错误报告,事实证明这里的一切都不那么顺利。 也许失去协程接口作为托管实体不是一个好主意。
作者的意见:裸露的协程很危险
就个人而言,作者很少反对使用动态语言编写的协程,但是他非常警惕任何在机器代码级别上对堆栈的调情。
几点:
- 需要堆叠:
- 堆上的堆栈有许多缺点:无法及时确定溢出,邻居损坏和其他可靠性/安全性问题的问题,
- 一个安全堆栈至少需要一页物理内存,一个条件页以及每次调用
async
函数的额外开销:4 + KB(最小)+增加的系统限制, - 最终,可能是在协程停机期间未使用分配给堆栈的大部分内存。
- 有必要实现一个复杂的逻辑来保存,恢复和删除协程状态:
- 对于处理器架构(甚至模型)和二进制接口(ABI)的每种情况: example ,
- 新的或可选的架构功能会带来潜在的潜在问题(例如,英特尔TSX,ARM协处理器或MIPS),
- 由于专有系统而导致的其他潜在问题已关闭文档(Boost文档是与此相关的文档)。
- 动态分析工具和总体安全性方面的潜在问题:
- 例如,由于相同的跳跃堆栈,因此必须与Valgrind集成,
- 杀毒软件很难说,但在过去的JVM问题示例中,它们可能并不十分喜欢它,
- 我敢肯定,将会出现新的攻击类型,并且将揭示与实施协程相关的漏洞。
作者的观点:发电机和yield
根本的邪恶
这个看似第三方的主题与协程的概念和“ continue”属性直接相关。
简而言之,任何集合都必须存在完整的迭代器。 为什么创建裁剪的迭代器-生成器问题尚不清楚。 例如,在Python中带有range()
的情况比技术复杂化的借口更像是排他性炫耀。
如果情况是无限生成器,则其实现的逻辑是基本的。 为什么要增加额外的技术难度以推动不断发展的周期。
协程的支持者后来提出的唯一明智的理由是,各种具有反向控制的流解析器。 实际上,这是在库级别解决单个问题的狭义专业案例,而不是应用程序的业务逻辑。 同时,通过有限状态机提供了一种优雅,简单且更具描述性的解决方案。 这些技术问题的范围比普通业务逻辑的范围小得多。
实际上,要解决的问题是从手指上获得的,并且需要相对认真的努力来进行初始实施和长期支持。 如此之多,以至于某些项目可能会在禁止使用goto
或在各个行业中使用动态内存分配的示例后,禁止使用机器代码级协同程序。
作者意见:ECMAScript的async/await
Promise模型更可靠,但需要修改
与连续协程不同,在此模型中,代码段被秘密地划分为设计为匿名函数的不可中断块。 在C ++中,由于内存管理的特殊性,这并不完全适合,例如:
struct SomeObject { using Value = std::vector<int>; Promise funcPromise() { return Promise.resolved(value_); } void funcCallback(std::function<void()> &&cb, const Value& val) { somehow_call_later(cb); } Value value_; }; Promise example() { SomeObject some_obj; return some_obj.funcPromise() .catch([](const std::exception &e){
首先,退出example()
以及调用lambda函数之前, some_obj
将被销毁。
其次,具有捕获变量或引用的lambda函数是对象,并秘密地添加复制/移动,这可能会对具有大量捕获的性能产生负面影响,并且需要在通常的std::function
在类型擦除期间在堆上分配内存。
第三, Promise
接口本身是基于结果的“承诺”概念,而不是业务逻辑的一致执行。
原理图非最佳解决方案可能看起来像这样:
Promise example() { struct LocalContext { SomeObject some_obj; }; auto ctx = std::make_shared<LocalContext>(); return some_obj.funcPromise() .catch([](const std::exception &e){
注意: std::move
而不是std::shared_ptr
不合适的,因为它无法一次转移到多个lambda,并且它们的大小会增加。
随着async/await
异步恐怖进入了一种可消化的状态:
async void example() { SomeObject some_obj; try { SomeObject::Value val = await some_obj.func(); } catch (const std::exception& e) (
作者观点:协程策划师是半身像
一些批评家称缺少调度程序和对处理器资源的“不诚实”使用是一个问题。 也许更严重的问题是数据局部性和处理器缓存的有效利用。
关于第一个问题:在各个协程级别上进行优先级排序似乎开销很大。 相反,它们可以为特定的统一任务共同操作。 这就是流量的作用。
这可以通过使用其自己的“铁”线程创建单独的Event Loop实例并在操作系统级别进行计划来实现。 第二种选择是在竞争和/或性能方面将协程与相对原始的(Mutex,Throttle)原始同步。
异步编程不会使处理器资源具有弹性,并且需要对同时处理的任务数量和总执行时间进行绝对正常的限制。
防止在一个协程上长时间阻塞需要采取与回调相同的措施-避免阻塞系统调用和冗长的数据处理周期。
第二个问题需要研究,但至少协程本身会堆叠在一起,并且Future / Promise实现的详细信息已经违反了数据的局部性。 如果将来已经很重要,则有机会尝试继续执行相同的协程。 为了防止一个协程捕获整个处理器时间,需要某种机制来计算执行时间或这种连续的次数。 根据处理器缓存的大小和线程数的不同,这可能不会给出结果,或者给出非常双重的结果。
还有第三点-协程调度程序的许多实现允许您在不同的处理器内核上运行它们,相反,由于在访问共享资源时进行强制同步,因此增加了问题。 在单个事件循环流的情况下,仅在逻辑级别需要这种同步,因为 保证每个同步回调块都可以正常工作,而不会与其他组件竞争。
作者的观点:一切都要适度
现代操作系统中线程的存在不会否定单个进程的使用。 同样,在事件循环中处理大量客户端并不会否定将隔离的“铁”线程用于其他需求。
无论如何,协程和事件循环的各种变体在没有必要工具支持的情况下使调试过程复杂化,并且在协程堆栈上具有局部变量,一切都变得更加困难-几乎没有办法获得它们。
FutoIn AsyncSteps-协程的替代品
我们以已经建立的事件循环模式和根据ECMAScript(JavaScript)Promise类型的回调方案的组织为基础。
在执行计划方面,我们对Event Loop的以下活动感兴趣:
Handle immediate(callack)
需要干净调用堆栈的Handle immediate(callack)
。- Deferred callback延迟
Handle deferred(delay, callback)
。 - 取消回调
handle.cancel()
。
因此,我们获得了一个名为AsyncTool
的接口,该接口可以通过多种方式实现,包括在现有的经过验证的开发之上。 他与编写业务逻辑没有直接关系,因此我们不再赘述。
步骤树:
在AsyncSteps概念中,通过深入创建序列来排列并执行同步步骤的抽象树。 完成这样的通道后,将动态设置每个更深层次的步骤。
所有交互都通过单个AsyncSteps
接口进行,按照约定,该接口作为第一个参数传递到每个步骤。 按照惯例,参数名称为asi
或不建议使用。 这种方法使您几乎可以完全打破特定实现之间的联系,并在插件和库中编写业务逻辑。
在规范的实现中,每个步骤都会收到自己的对象实例,该对象实现AsyncSteps
,该实例允许在使用接口时及时跟踪逻辑错误。
抽象示例:
asi.add( // Level 0 step 1 func( asi ){ print( "Level 0 func" ) asi.add( // Level 1 step 1 func( asi ){ print( "Level 1 func" ) asi.error( "MyError" ) }, onerror( asi, error ){ // Level 1 step 1 catch print( "Level 1 onerror: " + error ) asi.error( "NewError" ) } ) }, onerror( asi, error ){ // Level 0 step 1 catch print( "Level 0 onerror: " + error ) if ( error strequal "NewError" ) { asi.success( "Prm", 123, [1, 2, 3], true) } } ) asi.add( // Level 0 step 2 func( asi, str_param, int_param, array_param ){ print( "Level 0 func2: " + param ) } )
执行结果:
Level 0 func 1 Level 1 func 1 Level 1 onerror 1: MyError Level 0 onerror 1: NewError Level 0 func 2: Prm
同步显示如下:
str_res, int_res, array_res, bool_res // undefined try { // Level 0 step 1 print( "Level 0 func 1" ) try { // Level 1 step 1 print( "Level 1 func 1" ) throw "MyError" } catch( error ){ // Level 1 step 1 catch print( "Level 1 onerror 1: " + error ) throw "NewError" } } catch( error ){ // Level 0 step 1 catch print( "Level 0 onerror 1: " + error ) if ( error strequal "NewError" ) { str_res = "Prm" int_res = 123 array_res = [1, 2, 3] bool_res = true } else { re-throw } } { // Level 0 step 2 print( "Level 0 func 2: " + str_res ) }
传统同步代码的最大模仿是立即可见的,这应该有助于提高可读性。
从业务逻辑的角度来看, 大量需求随时间增长,但是我们可以将其分为易于理解的部分。 如下所述,是实际运行四年的结果。
核心运行时API:
add(func[, onerror])
-模仿try-catch
。success([args...])
-成功完成的明确指示:
error(code[, reason)
-因错误而中断执行:
code
-具有字符串类型,可以更好地与微服务架构中的网络协议集成,reason
-对一个人的任意解释。
state()
-线程本地存储的模拟。 预定义的关联键:
error_info
对一个人的最后一个错误的解释,last_exception
指向最后一个异常的对象的指针,async_stack
技术允许的异步调用堆栈,- 其余的由用户设置。
前面的示例已经具有真正的C ++代码和一些其他功能:
#include <futoin/iasyncsteps.hpp> using namespace futoin; void some_api(IAsyncSteps& asi) { asi.add( [](IAsyncSteps& asi) { std::cout << "Level 0 func 1" << std::endl; asi.add( [](IAsyncSteps& asi) { std::cout << "Level 1 func 1" << std::endl; asi.error("MyError"); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 1 onerror 1: " << code << std::endl; asi.error("NewError", "Human-readable description"); } ); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 0 onerror 1: " << code << std::endl; if (code == "NewError") { // Human-readable error info assert(asi.state().error_info == "Human-readable description"); // Last exception thrown is also available in state std::exception_ptr e = asi.state().last_exception; // NOTE: smart conversion of "const char*" asi.success("Prm", 123, std::vector<int>({1, 2, 3}, true)); } } ); asi.add( [](IAsyncSteps& asi, const futoin::string& str_res, int int_res, std::vector<int>&& arr_res) { std::cout << "Level 0 func 2: " << str_res << std::endl; } ); }
用于创建循环的API:
loop( func, [, label] )
-无限重复的步骤。forEach( map|list, func [, label] )
-集合对象的逐步迭代。repeat( count, func [, label] )
-逐步重复指定的次数。break( [label] )
是传统循环中断的类似物。continue( [label] )
是传统循环延续的类似形式,但有新的迭代。
规范提供备用名称breakLoop
, continueLoop
等,以防与保留字冲突。
C ++示例:
asi.loop([](IAsyncSteps& asi) {
与外部事件集成的API:
setTimeout( timeout_ms )
-如果步骤及其子树尚未完成执行,则在Timeout
后引发Timeout
错误。setCancel( handler )
-设置取消处理程序,当完全取消线程以及在错误处理过程中扩展异步步骤堆栈时调用此处理程序。waitExternal()
-一个简单的等待外部事件。
调用这些函数中的任何一个都需要显式调用success()
。
C ++示例:
asi.add([](IAsyncSteps& asi) { auto handle = schedule_external_callback([&](bool err) { if (err) { try { asi.error("ExternalError"); } catch (...) {
ECMAScript示例:
asi.add( (asi) => { asi.waitExternal();
未来/承诺集成API:
await(promise_future[, on_error])
-等待Future / Promise作为一个步骤。promise()
-将整个执行流程转换为Future / Promise,而不是execute()
。
C ++示例:
[](IAsyncSteps& asi) { // Proper way to create new AsyncSteps instances // without hard dependency on implementation. auto new_steps = asi.newInstance(); new_steps->add([](IAsyncSteps& asi) {}); // Can be called outside of AsyncSteps event loop // new_steps.promise().wait(); // or // new_steps.promise<int>().get(); // Proper way to wait for standard std::future asi.await(new_steps->promise()); // Ensure instance lifetime asi.state()["some_obj"] = std::move(new_steps); };
业务逻辑流控制API:
AsyncSteps(AsyncTool&)
是将执行线程绑定到特定事件循环的构造函数。execute()
-启动执行线程。cancel()
-取消执行线程。
这里已经需要特定的接口实现。
C ++示例:
#include <futoin/ri/asyncsteps.hpp> #include <futoin/ri/asynctool.hpp> void example() { futoin::ri::AsyncTool at; futoin::ri::AsyncSteps asi{at}; asi.loop([&](futoin::IAsyncSteps &asi){ // Some infinite loop logic }); asi.execute(); std::this_thread::sleep_for(std::chrono::seconds{10}); asi.cancel(); // called in d-tor by fact }
其他API:
newInstance()
-允许您创建新的执行线程,而无需直接依赖于实现。sync(object, func, onerror)
-相同,但是相对于实现相应接口的对象具有同步。parallel([on_error])
-特殊的add()
,其子步骤是单独的AsyncSteps流:
- 所有线程都有共同的
state()
, - 父线程在所有子线程完成后继续执行
- 任何子级中未捕获的错误会立即取消所有其他子级线程。
C ++示例:
#include <futoin/ri/mutex.hpp> using namespace futoin; ri::Mutex mtx_a; void sync_example(IAsyncSteps& asi) { asi.sync(mtx_a, [](IAsyncSteps& asi) { // synchronized section asi.add([](IAsyncSteps& asi) { // inner step in the section // This synchronization is NOOP for already // acquired Mutex. asi.sync(mtx_a, [](IAsyncSteps& asi) { }); }); }); } void parallel_example(IAsyncSteps& asi) { using OrderVector = std::vector<int>; asi.state("order", OrderVector{}); auto& p = asi.parallel([](IAsyncSteps& asi, ErrorCode) { // Overall error handler asi.success(); }); p.add([](IAsyncSteps& asi) { // regular flow asi.state<OrderVector>("order").push_back(1); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(4); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(2); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(5); asi.error("SomeError"); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(3); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(6); }); }); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order"); // 1, 2, 3, 4, 5 }); };
同步的标准原语
Mutex
限制同时在Q
的队列中同时执行N
线程,默认情况下N=1, Q=unlimited
。Throttle
-在周期P
中限制Q
的输入P
的数量N
,默认情况下N=1, P=1s, Q=0
。Limiter
是Mutex
和Throttle
的组合,通常用于处理外部请求的输入以及在调用外部系统以在负载下稳定运行的目的。
如果DefenseRejected
队列限制,则会DefenseRejected
错误,其含义从Limiter
描述中可以清楚看出。
主要好处
AsyncSteps的概念本身并不是目的,而是出于对时间的限制,取消和各个回调的整体连接性而需要对程序进行更受控的异步执行的需要而诞生的。 当时还没有通用解决方案提供相同的功能。 因此:
FTN12 — .
setCancel()
— . , . RAII atexit()
.
cancel()
— , . SIGTERM
pthread_cancel()
, .
setTimeout()
— . , "Timeout".
— FutoIn AsyncSteps .
— ABI , . Embedded MMU.
Intel Xeon E3-1245v2/DDR1333 Debian Stretch .
:
- Boost.Fiber
protected_fixedsize_stack
. - Boost.Fiber
pooled_fixedsize_stack
. - FutoIn AsyncSteps .
- FutoIn AsyncSteps (
FUTOIN_USE_MEMPOOL=false
).
- FutoIn NitroSteps<> — .
Boost.Fiber :
- 1 . .
- 30 . 1 . .
- 30 .
mmap()/mprotect()
boost::fiber::protected_fixedsize_stack
. - .
- 30 . 10 . .
"" , .. , . . .
GCC 6.3.0. lang tcmalloc , .
GitHub GitLab .
1.
| | |
---|
Boost.Fiber protected | 4.8s | 208333.333Hz |
Boost.Fiber pooled | 0.23s | 4347826.086Hz |
FutoIn AsyncSteps | 0.21s | 4761904.761Hz |
FutoIn AsyncSteps no mempool | 0.31s | 3225806.451Hz |
FutoIn NitroSteps | 0.255s | 3921568.627Hz |
— .
Boost.Fiber - , pooled_fixedsize_stack
, AsyncSteps.
2.
| | |
---|
Boost.Fiber protected | 6.31s | 158478.605Hz |
Boost.Fiber pooled | 1.558s | 641848.523Hz |
FutoIn AsyncSteps | 1.13s | 884955.752Hz |
FutoIn AsyncSteps no mempool | 1.353s | 739098.300Hz |
FutoIn NitroSteps | 1.43s | 699300.699Hz |
— .
, . , — .
3.
| | |
---|
Boost.Fiber protected | 5.096s | 1962323.390Hz |
Boost.Fiber pooled | 5.077s | 1969667.126Hz |
FutoIn AsyncSteps | 5.361s | 1865323.633Hz |
FutoIn AsyncSteps no mempool | 8.288s | 1206563.706Hz |
FutoIn NitroSteps | 3.68s | 2717391.304Hz |
— .
, Boost.Fiber AsyncSteps, NitroSteps.
| |
---|
Boost.Fiber protected | 124M |
Boost.Fiber pooled | 505M |
FutoIn AsyncSteps | 124M |
FutoIn AsyncSteps no mempool | 84M |
FutoIn NitroSteps | 115M |
— .
, Boost.Fiber .
: Node.js
- Promise
: + 10 . . 10 . JIT NODE_ENV=production
, @futoin/optihelp
.
GitHub GitLab . Node.js v8.12.0 v10.11.0, FutoIn CID .
Tech | Simple | Loop |
---|
Node.js v10 | | |
FutoIn AsyncSteps | 1342899.520Hz | 587.777Hz |
async/await | 524983.234Hz | 630.863Hz |
Node.js v8 | | |
FutoIn AsyncSteps | 682420.735Hz | 588.336Hz |
async/await | 365050.395Hz | 400.575Hz |
— .
async/await
? , V8 Node.js v10 .
, Promise async/await
Node.js Event Loop. ( ), FutoIn AsyncSteps .
AsyncSteps Node.js Event Loop async/await
- Node.js v10.
, ++ — . , Node.js 10 .
结论
C++, FutoIn AsyncSteps Boost.Fiber , Boost.Fiber mmap()/mprotect
.
, - , . .
FutoIn AsyncSteps JavaScript async/await
Node.js v10.
, -, . .
- "" . — API.
结论
, FutoIn AsyncSteps , "" async/await
. , . Promise
ECMAScript, AsyncSteps "" .
. AsyncSteps NitroSteps .
, - .
Java/JVM — . .
, GitHub / GitLab .