这些天的异步业务逻辑

简而言之:


  • 该证明已经在适用于Java的 C ++JSPHP中实现。
  • 比协程和Promise 更快 ,功能更多。
  • 它不需要单独的软件堆栈。
  • 与所有安全和调试工具成为朋友。
  • 它适用于任何体系结构,不需要特殊的编译器标志。





回头看


在计算机诞生之初,只有一个控制流,但阻塞了输入输出。 然后铁中断添加到它。 现在,您可以有效地使用缓慢且不可预测的设备。


随着熨斗功能的增长和可用性的降低,必须同时执行多项任务,从而提供硬件支持。 因此,存在隔离的过程,其中中断以信号的形式从铁中抽象出来。


下一个演进阶段是多线程,它是在相同进程的基础上实现的,但是共享访问内存和其他资源。 这种方法有其局限性,而且切换到安全OS的开销也很大。


为了在流程甚至不同的机器之间进行通信,40年前就提出了Promise / Future抽象。


用户界面和当前可笑的10K客户端问题导致了事件循环,反应器和Proactor方法的鼎盛时期,它们比清晰,一致的业务逻辑更面向事件。


最后,我们介绍了现代协程(协程),本质上是在上述抽象之上对流程进行的仿真,具有相应的技术局限性和控制权的确定性转移。


为了传达事件,结果和异常,它们都回到了“承诺/未来”的相同概念。 一些办公室决定使用不同的名称-“任务”。


最后,他们将所有内容都隐藏在漂亮的async/await包中,这取决于技术要求编译器或翻译器支持。


当前异步业务逻辑情况的问题


仅考虑用async/await装饰的协程和Promise,作为 旧方法中问题的存在证实了进化过程本身。


这两个术语不相同。 例如,在ECMAScript中,没有协程,但是使用Promise减轻语法的负担,而Promise只能使用回调地狱来组织工作。 实际上,像V8这样的脚本引擎走得更远,并对纯async/await函数和调用进行了特殊的优化。


专家co_async/co_await并没有包含在C ++ 17中,但是来自软件巨头协程的压力可以完全以它们的形式出现在标准中。 同时,传统上公认的解决方案是Boost.ContextBoost.FiberBoost.Coroutine2


在Java中,在语言级别上仍然没有async/await ,但是有诸如EA Async之类的解决方案,像Boost.Context一样,需要针对每个版本的JVM和代码字节进行自定义。


Go有其自己的协程,但是如果您仔细查看开放项目的文章和错误报告,事实证明这里的一切都不那么顺利。 也许失去协程接口作为托管实体不是一个好主意。


作者的意见:裸露的协程很危险


就个人而言,作者很少反对使用动态语言编写的协程,但是他非常警惕任何在机器代码级别上对堆栈的调情。


几点:


  1. 需要堆叠:
    • 堆上的堆栈有许多缺点:无法及时确定溢出,邻居损坏和其他可靠性/安全性问题的问题,
    • 一个安全堆栈至少需要一页物理内存,一个条件页以及每次调用async函数的额外开销:4 + KB(最小)+增加的系统限制,
    • 最终,可能是在协程停机期间未使用分配给堆栈的大部分内存。
  2. 有必要实现一个复杂的逻辑来保存,恢复和删除协程状态:
    • 对于处理器架构(甚至模型)和二进制接口(ABI)的每种情况: example
    • 新的或可选的架构功能会带来潜在的潜在问题(例如,英特尔TSX,ARM协处理器或MIPS),
    • 由于专有系统而导致的其他潜在问题已关闭文档(Boost文档是与此相关的文档)。
  3. 动态分析工具和总体安全性方面的潜在问题:
    • 例如,由于相同的跳跃堆栈,因此必须与Valgrind集成,
    • 杀毒软件很难说,但在过去的JVM问题示例中,它们可能并不十分喜欢它,
    • 我敢肯定,将会出现新的攻击类型,并且将揭示与实施协程相关的漏洞。

作者的观点:发电机和yield根本的邪恶


这个看似第三方的主题与协程的概念和“ continue”属性直接相关。


简而言之,任何集合都必须存在完整的迭代器。 为什么创建裁剪的迭代器-生成器问题尚不清楚。 例如,在Python中带有range()的情况比技术复杂化的借口更像是排他性炫耀。


如果情况是无限生成器,则其实现的逻辑是基本的。 为什么要增加额外的技术难度以推动不断发展的周期。


协程的支持者后来提出的唯一明智的理由是,各种具有反向控制的流解析器。 实际上,这是在库级别解决单个问题的狭义专业案例,而不是应用程序的业务逻辑。 同时,通过有限状态机提供了一种优雅,简单且更具描述性的解决方案。 这些技术问题的范围比普通业务逻辑的范围小得多。


实际上,要解决的问题是从手指上获得的,并且需要相对认真的努力来进行初始实施和长期支持。 如此之多,以至于某些项目可能会在禁止使用goto或在各个行业中使用动态内存分配的示例后,禁止使用机器代码级协同程序。


作者意见:ECMAScript的async/await Promise模型更可靠,但需要修改


与连续协程不同,在此模型中,代码段被秘密地划分为设计为匿名函数的不可中断块。 在C ++中,由于内存管理的特殊性,这并不完全适合,例如:


 struct SomeObject { using Value = std::vector<int>; Promise funcPromise() { return Promise.resolved(value_); } void funcCallback(std::function<void()> &&cb, const Value& val) { somehow_call_later(cb); } Value value_; }; Promise example() { SomeObject some_obj; return some_obj.funcPromise() .catch([](const std::exception &e){ // ... }) .then([&](SomeObject::value &&val){ return Promise([&](Resolve&& resolve, Reject&&){ some_obj.funcCallback(resolve, val); }); }); } 

首先,退出example()以及调用lambda函数之前, some_obj将被销毁。


其次,具有捕获变量或引用的lambda函数是对象,并秘密地添加复制/移动,这可能会对具有大量捕获的性能产生负面影响,并且需要在通常的std::function在类型擦除期间在堆上分配内存。


第三, Promise接口本身是基于结果的“承诺”概念,而不是业务逻辑的一致执行。


原理图非最佳解决方案可能看起来像这样:


 Promise example() { struct LocalContext { SomeObject some_obj; }; auto ctx = std::make_shared<LocalContext>(); return some_obj.funcPromise() .catch([](const std::exception &e){ // ... }) .then([ctx](SomeObject::Value &&val){ struct LocalContext2 { LocalContext2(std::shared_ptr<LocalContext> &&ctx, SomeObject::Value &&val) : ctx(ctx), val(val) {} std::shared_ptr<LocalContext> ctx; SomeObject::Value val; }; auto ctx2 = std::make_shared<LocalContext2>( std::move(ctx), std::forward<SomeObject::Value>(val) ); return Promise([ctx2](Resolve&& resolve, Reject&&){ ctx2->ctx->some_obj.funcCallback([ctx2, resolve](){ resolve(); }, val); }); }); } 

注意: std::move而不是std::shared_ptr不合适的,因为它无法一次转移到多个lambda,并且它们的大小会增加。


随着async/await异步恐怖进入了一种可消化的状态:


 async void example() { SomeObject some_obj; try { SomeObject::Value val = await some_obj.func(); } catch (const std::exception& e) ( // ... } // Capture "async context" return Promise([async](Resolve&& resolve, Reject&&){ some_obj.funcCallback([async](){ resolve(); }, val); }); } 

作者观点:协程策划师是半身像


一些批评家称缺少调度程序和对处理器资源的“不诚实”使用是一个问题。 也许更严重的问题是数据局部性和处理器缓存的有效利用。


关于第一个问题:在各个协程级别上进行优先级排序似乎开销很大。 相反,它们可以为特定的统一任务共同操作。 这就是流量的作用。


这可以通过使用其自己的“铁”线程创建单独的Event Loop实例并在操作系统级别进行计划来实现。 第二种选择是在竞争和/或性能方面将协程与相对原始的(Mutex,Throttle)原始同步。


异步编程不会使处理器资源具有弹性,并且需要对同时处理的任务数量和总执行时间进行绝对正常的限制。


防止在一个协程上长时间阻塞需要采取与回调相同的措施-避免阻塞系统调用和冗长的数据处理周期。


第二个问题需要研究,但至少协程本身会堆叠在一起,并且Future / Promise实现的详细信息已经违反了数据的局部性。 如果将来已经很重要,则有机会尝试继续执行相同的协程。 为了防止一个协程捕获整个处理器时间,需要某种机制来计算执行时间或这种连续的次数。 根据处理器缓存的大小和线程数的不同,这可能不会给出结果,或者给出非常双重的结果。


还有第三点-协程调度程序的许多实现允许您在不同的处理器内核上运行它们,相反,由于在访问共享资源时进行强制同步,因此增加了问题。 在单个事件循环流的情况下,仅在逻辑级别需要这种同步,因为 保证每个同步回调块都可以正常工作,而不会与其他组件竞争。


作者的观点:一切都要适度


现代操作系统中线程的存在不会否定单个进程的使用。 同样,在事件循环中处理大量客户端并不会否定将隔离的“铁”线程用于其他需求。


无论如何,协程和事件循环的各种变体在没有必要工具支持的情况下使调试过程复杂化,并且在协程堆栈上具有局部变量,一切都变得更加困难-几乎没有办法获得它们。





FutoIn AsyncSteps-协程的替代品


我们以已经建立的事件循环模式和根据ECMAScript(JavaScript)Promise类型的回调方案的组织为基础。


在执行计划方面,我们对Event Loop的以下活动感兴趣:


  1. Handle immediate(callack)需要干净调用堆栈的Handle immediate(callack)
  2. Deferred callback延迟Handle deferred(delay, callback)
  3. 取消回调handle.cancel()

因此,我们获得了一个名为AsyncTool的接口,该接口可以通过多种方式实现,包括在现有的经过验证的开发之上。 他与编写业务逻辑没有直接关系,因此我们不再赘述。


步骤树:


在AsyncSteps概念中,通过深入创建序列来排列并执行同步步骤的抽象树。 完成这样的通道后,将动态设置每个更深层次的步骤。


所有交互都通过单个AsyncSteps接口进行,按照约定,该接口作为第一个参数传递到每个步骤。 按照惯例,参数名称为asi或不建议使用。 这种方法使您几乎可以完全打破特定实现之间的联系,并在插件和库中编写业务逻辑。


在规范的实现中,每个步骤都会收到自己的对象实例,该对象实现AsyncSteps ,该实例允许在使用接口时及时跟踪逻辑错误。


抽象示例:


  asi.add( // Level 0 step 1 func( asi ){ print( "Level 0 func" ) asi.add( // Level 1 step 1 func( asi ){ print( "Level 1 func" ) asi.error( "MyError" ) }, onerror( asi, error ){ // Level 1 step 1 catch print( "Level 1 onerror: " + error ) asi.error( "NewError" ) } ) }, onerror( asi, error ){ // Level 0 step 1 catch print( "Level 0 onerror: " + error ) if ( error strequal "NewError" ) { asi.success( "Prm", 123, [1, 2, 3], true) } } ) asi.add( // Level 0 step 2 func( asi, str_param, int_param, array_param ){ print( "Level 0 func2: " + param ) } ) 

执行结果:


  Level 0 func 1 Level 1 func 1 Level 1 onerror 1: MyError Level 0 onerror 1: NewError Level 0 func 2: Prm 

同步显示如下:


  str_res, int_res, array_res, bool_res // undefined try { // Level 0 step 1 print( "Level 0 func 1" ) try { // Level 1 step 1 print( "Level 1 func 1" ) throw "MyError" } catch( error ){ // Level 1 step 1 catch print( "Level 1 onerror 1: " + error ) throw "NewError" } } catch( error ){ // Level 0 step 1 catch print( "Level 0 onerror 1: " + error ) if ( error strequal "NewError" ) { str_res = "Prm" int_res = 123 array_res = [1, 2, 3] bool_res = true } else { re-throw } } { // Level 0 step 2 print( "Level 0 func 2: " + str_res ) } 

传统同步代码的最大模仿是立即可见的,这应该有助于提高可读性。


从业务逻辑的角度来看, 大量需求随时间增长,但是我们可以将其分为易于理解的部分。 如下所述,是实际运行四年的结果。


核心运行时API:


  1. add(func[, onerror]) -模仿try-catch
  2. success([args...]) -成功完成的明确指示:
    • 默认暗示
    • 可以将结果传递给下一步。
  3. error(code[, reason) -因错误而中断执行:
    • code -具有字符串类型,可以更好地与微服务架构中的网络协议集成,
    • reason -对一个人的任意解释。
  4. state() -线程本地存储的模拟。 预定义的关联键:
    • error_info对一个人的最后一个错误的解释,
    • last_exception指向最后一个异常的对象的指针,
    • async_stack技术允许的异步调用堆栈,
    • 其余的由用户设置。

前面的示例已经具有真正的C ++代码和一些其他功能:


 #include <futoin/iasyncsteps.hpp> using namespace futoin; void some_api(IAsyncSteps& asi) { asi.add( [](IAsyncSteps& asi) { std::cout << "Level 0 func 1" << std::endl; asi.add( [](IAsyncSteps& asi) { std::cout << "Level 1 func 1" << std::endl; asi.error("MyError"); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 1 onerror 1: " << code << std::endl; asi.error("NewError", "Human-readable description"); } ); }, [](IAsyncSteps& asi, ErrorCode code) { std::cout << "Level 0 onerror 1: " << code << std::endl; if (code == "NewError") { // Human-readable error info assert(asi.state().error_info == "Human-readable description"); // Last exception thrown is also available in state std::exception_ptr e = asi.state().last_exception; // NOTE: smart conversion of "const char*" asi.success("Prm", 123, std::vector<int>({1, 2, 3}, true)); } } ); asi.add( [](IAsyncSteps& asi, const futoin::string& str_res, int int_res, std::vector<int>&& arr_res) { std::cout << "Level 0 func 2: " << str_res << std::endl; } ); } 

用于创建循环的API:


  1. loop( func, [, label] ) -无限重复的步骤。
  2. forEach( map|list, func [, label] ) -集合对象的逐步迭代。
  3. repeat( count, func [, label] ) -逐步重复指定的次数。
  4. break( [label] )是传统循环中断的类似物。
  5. continue( [label] )是传统循环延续的类似形式,但有新的迭代。

规范提供备用名称breakLoopcontinueLoop等,以防与保留字冲突。


C ++示例:


  asi.loop([](IAsyncSteps& asi) { // infinite loop asi.breakLoop(); }); asi.repeat(10, [](IAsyncSteps& asi, size_t i) { // range loop from i=0 till i=9 (inclusive) asi.continueLoop(); }); asi.forEach( std::vector<int>{1, 2, 3}, [](IAsyncSteps& asi, size_t i, int v) { // Iteration of vector-like and list-like objects }); asi.forEach( std::list<futoin::string>{"1", "2", "3"}, [](IAsyncSteps& asi, size_t i, const futoin::string& v) { // Iteration of vector-like and list-like objects }); asi.forEach( std::map<futoin::string, futoin::string>(), [](IAsyncSteps& asi, const futoin::string& key, const futoin::string& v) { // Iteration of map-like objects }); std::map<std::string, futoin::string> non_const_map; asi.forEach( non_const_map, [](IAsyncSteps& asi, const std::string& key, futoin::string& v) { // Iteration of map-like objects, note the value reference type }); 

与外部事件集成的API:


  1. setTimeout( timeout_ms ) -如果步骤及其子树尚未完成执行,则在Timeout后引发Timeout错误。
  2. setCancel( handler ) -设置取消处理程序,当完全取消线程以及在错误处理过程中扩展异步步骤堆栈时调用此处理程序。
  3. waitExternal() -一个简单的等待外部事件。
    • 注意:仅在带有垃圾收集器的技术中使用是安全的。

调用这些函数中的任何一个都需要显式调用success()


C ++示例:


  asi.add([](IAsyncSteps& asi) { auto handle = schedule_external_callback([&](bool err) { if (err) { try { asi.error("ExternalError"); } catch (...) { // pass } } else { asi.success(); } }); asi.setCancel([=](IAsyncSteps& asi) { external_cancel(handle); }); }); asi.add( [](IAsyncSteps& asi) { // Raises Timeout error after specified period asi.setTimeout(std::chrono::seconds{10}); asi.loop([](IAsyncSteps& asi) { // infinite loop }); }, [](IAsyncSteps& asi, ErrorCode code) { if (code == futoin::errors::Timeout) { asi(); } }); 

ECMAScript示例:


 asi.add( (asi) => { asi.waitExternal(); // disable implicit success() some_obj.read( (err, data) => { if (!asi.state) { // ignore as AsyncSteps execution got canceled } else if (err) { try { asi.error( 'IOError', err ); } catch (_) { // ignore error thrown as there are no // AsyncSteps frames on stack. } } else { asi.success( data ); } } ); } ); 

未来/承诺集成API:


  1. await(promise_future[, on_error]) -等待Future / Promise作为一个步骤。
  2. promise() -将整个执行流程转换为Future / Promise,而不是execute()

C ++示例:


  [](IAsyncSteps& asi) { // Proper way to create new AsyncSteps instances // without hard dependency on implementation. auto new_steps = asi.newInstance(); new_steps->add([](IAsyncSteps& asi) {}); // Can be called outside of AsyncSteps event loop // new_steps.promise().wait(); // or // new_steps.promise<int>().get(); // Proper way to wait for standard std::future asi.await(new_steps->promise()); // Ensure instance lifetime asi.state()["some_obj"] = std::move(new_steps); }; 

业务逻辑流控制API:


  1. AsyncSteps(AsyncTool&)是将执行线程绑定到特定事件循环的构造函数。
  2. execute() -启动执行线程。
  3. cancel() -取消执行线程。

这里已经需要特定的接口实现。


C ++示例:


 #include <futoin/ri/asyncsteps.hpp> #include <futoin/ri/asynctool.hpp> void example() { futoin::ri::AsyncTool at; futoin::ri::AsyncSteps asi{at}; asi.loop([&](futoin::IAsyncSteps &asi){ // Some infinite loop logic }); asi.execute(); std::this_thread::sleep_for(std::chrono::seconds{10}); asi.cancel(); // called in d-tor by fact } 

其他API:


  1. newInstance() -允许您创建新的执行线程,而无需直接依赖于实现。
  2. sync(object, func, onerror) -相同,但是相对于实现相应接口的对象具有同步。
  3. parallel([on_error]) -特殊的add() ,其子步骤是单独的AsyncSteps流:
    • 所有线程都有共同的state()
    • 父线程在所有子线程完成后继续执行
    • 任何子级中未捕获的错误会立即取消所有其他子级线程。

C ++示例:


  #include <futoin/ri/mutex.hpp> using namespace futoin; ri::Mutex mtx_a; void sync_example(IAsyncSteps& asi) { asi.sync(mtx_a, [](IAsyncSteps& asi) { // synchronized section asi.add([](IAsyncSteps& asi) { // inner step in the section // This synchronization is NOOP for already // acquired Mutex. asi.sync(mtx_a, [](IAsyncSteps& asi) { }); }); }); } void parallel_example(IAsyncSteps& asi) { using OrderVector = std::vector<int>; asi.state("order", OrderVector{}); auto& p = asi.parallel([](IAsyncSteps& asi, ErrorCode) { // Overall error handler asi.success(); }); p.add([](IAsyncSteps& asi) { // regular flow asi.state<OrderVector>("order").push_back(1); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(4); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(2); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(5); asi.error("SomeError"); }); }); p.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(3); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order").push_back(6); }); }); asi.add([](IAsyncSteps& asi) { asi.state<OrderVector>("order"); // 1, 2, 3, 4, 5 }); }; 

同步的标准原语


  1. Mutex限制同时在Q的队列中同时执行N线程,默认情况下N=1, Q=unlimited
  2. Throttle -在周期P中限制Q的输入P的数量N ,默认情况下N=1, P=1s, Q=0
  3. LimiterMutexThrottle的组合,通常用于处理外部请求的输入以及在调用外部系统以在负载下稳定运行的目的。

如果DefenseRejected队列限制,则会DefenseRejected错误,其含义从Limiter描述中可以清楚看出。


主要好处


AsyncSteps的概念本身并不是目的,而是出于对时间的限制,取消和各个回调的整体连接性而需要对程序进行更受控的异步执行的需要而诞生的。 当时还没有通用解决方案提供相同的功能。 因此:


FTN12 — .


setCancel() — . , . RAII atexit() .


cancel() — , . SIGTERM pthread_cancel() , .


setTimeout() — . , "Timeout".


— FutoIn AsyncSteps .


— ABI , . Embedded MMU.







Intel Xeon E3-1245v2/DDR1333 Debian Stretch .


:


  1. Boost.Fiber protected_fixedsize_stack .
  2. Boost.Fiber pooled_fixedsize_stack .
  3. FutoIn AsyncSteps .
  4. FutoIn AsyncSteps ( FUTOIN_USE_MEMPOOL=false ).
    • futoin::IMemPool .
  5. FutoIn NitroSteps<> — .
    • .

Boost.Fiber :


  1. 1 . .
  2. 30 . 1 . .
    • 30 . mmap()/mprotect() boost::fiber::protected_fixedsize_stack .
    • .
  3. 30 . 10 . .
    • "" .

"" , .. , . . .


GCC 6.3.0. lang tcmalloc , .


GitHub GitLab .


1.


Boost.Fiber protected4.8s208333.333Hz
Boost.Fiber pooled0.23s4347826.086Hz
FutoIn AsyncSteps0.21s4761904.761Hz
FutoIn AsyncSteps no mempool0.31s3225806.451Hz
FutoIn NitroSteps0.255s3921568.627Hz


— .


Boost.Fiber - , pooled_fixedsize_stack , AsyncSteps.


2.


Boost.Fiber protected6.31s158478.605Hz
Boost.Fiber pooled1.558s641848.523Hz
FutoIn AsyncSteps1.13s884955.752Hz
FutoIn AsyncSteps no mempool1.353s739098.300Hz
FutoIn NitroSteps1.43s699300.699Hz


— .


, . , — .


3.


Boost.Fiber protected5.096s1962323.390Hz
Boost.Fiber pooled5.077s1969667.126Hz
FutoIn AsyncSteps5.361s1865323.633Hz
FutoIn AsyncSteps no mempool8.288s1206563.706Hz
FutoIn NitroSteps3.68s2717391.304Hz


— .


, Boost.Fiber AsyncSteps, NitroSteps.


( RSS)


Boost.Fiber protected124M
Boost.Fiber pooled505M
FutoIn AsyncSteps124M
FutoIn AsyncSteps no mempool84M
FutoIn NitroSteps115M


— .


, Boost.Fiber .


: Node.js


- Promise : + 10 . . 10 . JIT NODE_ENV=production , @futoin/optihelp .


GitHub GitLab . Node.js v8.12.0 v10.11.0, FutoIn CID .


TechSimpleLoop
Node.js v10
FutoIn AsyncSteps1342899.520Hz587.777Hz
async/await524983.234Hz630.863Hz
Node.js v8
FutoIn AsyncSteps682420.735Hz588.336Hz
async/await365050.395Hz400.575Hz



— .


async/await ? , V8 Node.js v10 .


, Promise async/await Node.js Event Loop. ( ), FutoIn AsyncSteps .


AsyncSteps Node.js Event Loop async/await - Node.js v10.


, ++ — . , Node.js 10 .


结论


C++, FutoIn AsyncSteps Boost.Fiber , Boost.Fiber mmap()/mprotect .


, - , . .


FutoIn AsyncSteps JavaScript async/await Node.js v10.


, -, . .


- "" . — API.




结论


, FutoIn AsyncSteps , "" async/await . , . Promise ECMAScript, AsyncSteps "" .


. AsyncSteps NitroSteps .


, - .


Java/JVM — . .


, GitHub / GitLab .

Source: https://habr.com/ru/post/zh-CN424311/


All Articles