一次严格的实时异构竞争数据处理

竞争香肠


注解


仅一次( 精确一次 )实时处理数据是一项极其艰巨的任务,并且需要在整个计算链中采取认真周到的方法。 甚至有人认为这样的任务是不可能的 。 实际上,我希望有一种方法可以提供容错处理而没有任何延迟,并且可以使用各种数据存储,这对系统提出了新的甚至更严格的要求: 并发一次和持久层的异构性。 迄今为止,这样的要求不支持任何现有系统。


所提出的方法将一致地揭示秘密成分和必要的概念,这些概念使从字面上从两个组件实现异构并发一次精确处理相对容易。


引言


分布式系统的开发人员经历了以下几个阶段:


阶段1:算法 。 这是对基本算法,数据结构和编程方法(如OOP等)的研究。 该代码仅是单线程的。 进入专业的初始阶段。 但是,它非常复杂并且可以持续数年。


阶段2:多线程 。 接下来,出现了从铁中获取最大效率的问题,多线程,异步,赛车,调试,跟踪,不眠之夜……许多人陷于这一阶段,甚至在某个时候开始感到莫名的兴奋。 但是只有少数几个了解虚拟内存和内存模型,无锁/无等待算法以及各种异步模型的体系结构。 几乎没有人使用过-多线程代码验证。


第三阶段:发行 。 这里发生了这样的垃圾,童话和笔都无法描述。


似乎有些复杂。 我们进行转换:许多线程->许多进程->许多服务器。 但是转换的每个步骤都会带来质的变化,它们都落在系统上,将其压碎并变成灰尘。


这里的重点是更改错误处理域和共享内存的可用性。 如果以前在每个线程中始终有一块内存可用,并且如果需要,则在每个进程中现在都没有,现在也没有。 每个人都为自己,独立而自豪。


如果较早的时候流中的故障同时淹没了流和进程,那么这很好,因为 并不会导致部分失败,现在部分失败已成为常态,并且每次您认为每项行动之前都会:“如果?”。 实际上,这很烦人,而且从动作本身上分散了注意力,以至于因此而使代码不时增长,而是按数量级增长。 一切都变成了错误处理,状态切换和上下文保存,由于一个组件,另一个组件的故障以及某些服务的不可访问性等导致的恢复的问题。 等 加强了对所有这些东西的监视,您可以在自己喜欢的笔记本电脑上睡个好觉。


无论是多线程问题:我都使用了互斥锁,并切碎了共享内存,以取乐。 美女!


结果,我们不得不删除了关键的和经过战斗考验的模式,由于某种原因,新模式取代了它们,但由于某种原因而未能交付,结果就像是在开玩笑说仙子如何挥动魔杖,而塔从坦克上掉下来了。


但是,分布式系统具有一组经过验证的实践和经过验证的算法。 但是,尽管有积累的经验,许多科学文章和学术研究,每个自重的程序员都认为有责任拒绝著名的成就并发挥自己的才能。 毕竟,如果您可以使用算法和多线程技术,那么如何使它陷入混乱呢? 这里不可能有两种意见!


结果,系统出现故障,数据分散和恶化,服务定期变得无法写入,甚至完全不可用,因为节点突然崩溃,网络瘫痪,Java消耗了大量内存和GC钝化,还有许多其他原因可能会延迟其终止给当局。


但是,即使采用众所周知且经过验证的方法,生活也不会变得更加轻松,因为 分布式可靠原语是重量级的,对可执行代码的逻辑有严格的要求。 因此,尽可能地切掉角落。 而且,通常情况下,匆忙偷工减料,会出现简单性和相对可伸缩性,但是分布式系统的可靠性,可用性和一致性却消失了。


理想情况下,我完全不希望我们的系统是分布式和多线程的,即 在第一阶段(算法)工作,而无需考虑第二阶段(多线程+异步)和第三阶段(分布式)。 这种隔离抽象的方式将显着提高编写代码的简便性,可靠性和速度。 不幸的是,目前只有在梦中才有可能。


但是,单个抽象允许相对隔离。 典型示例之一是协程使用 ,其中我们获得了同步而不是异步代码,即 我们从第二阶段过渡到第一阶段,这使我们可以大大简化代码的编写和维护。


文章逐步揭示了使用无锁算法来构建可靠的一致的分布式可扩展实时系统,即 第二阶段的无锁成就如何帮助实现第三阶段,从而将任务减少到第一阶段的单线程算法。


问题陈述


此任务仅说明一些重要方法,并作为将问题引入上下文的示例进行介绍。 可以很容易地将其推广到更复杂的情况,这将在将来完成。


任务:实时流数据处理


有两个数字流。 处理程序读取这些输入流的数据,并选择特定时间段内的最后一个数字。 这些数字是在此时间间隔内得出的平均值,即 给定时间在滑动数据窗口中显示。 必须将获得的平均值写入输出队列以进行后续处理。 另外,如果窗口中的数字数量超过某个阈值,则将外部事务数据库中的计数器增加一。


最初的


我们注意到此问题的某些功能。


  1. 非确定性 。 有两种不确定性行为的来源:这是从两个流中读取的内容以及一个时间窗口。 显然,可以以不同的方式进行读取,并且最终结果将取决于将以何种顺序提取数据。 时间窗口也会从开始到开始更改结果,因为 窗口中的数据量将取决于工作速度。
  2. 处理程序的状态 。 窗口中存在一组数字形式的处理程序状态,工作的当前结果和后续结果取决于该状态。 即 我们有一个有状态的处理程序。
  3. 与外部存储的交互 。 有必要更新外部数据库中的计数器值。 关键点是外部存储的类型不同于处理器和线程状态的存储。

如下所示,所有这些都会严重影响所使用的工具和可能的实现方法。


仍然需要在任务上添加一点点内容,立即将任务从复杂性超出的领域转移到不可能的事情:需要并发一次准确的保证。


一次


精确一次通常被解释得过于宽泛,这模糊了该术语本身,并且不再满足该任务的原始要求。 如果我们要讨论的是在一台计算机上本地运行的系统,那么一切都很简单:多花一点钱,走得更远。 但是在这种情况下,我们谈论的是一种分布式系统,其中:


  1. 处理程序的数量可能很大:每个处理程序都使用自己的数据。 而且,可以将结果添加到各个位置,例如,甚至可能改组的外部数据库。
  2. 每个处理程序都可能突然停止处理。 容错系统意味着即使在系统各个部分发生故障的情况下也可以继续运行。

因此,我们必须为处理程序可能跌倒这一事实做好准备,而另一个处理程序应该接管已经完成的工作并继续进行处理。


问题立即浮出水面:如果不确定的处理程序正常工作, 一次将意味着什么? 毕竟,一般而言,每次重新启动时,我们都会收到不同的结果状态。 答案很简单: 一次执行一次,这样的系统执行中,每个输入值都会被精确处理一次,从而给出相应的输出结果。 此外,此执行不必在物理上在同一节点上。 但是结果应该好像一切都在单个逻辑节点上进行处理而没有崩溃一样


一次并发


为了加重要求,我们引入了一个新概念: 并发精确一次 。 与简单一次的根本区别是,在处理过程中没有暂停,就好像所有内容都在同一节点上处理一样, 没有丢弃没有暂停 。 在我们的任务中,为了演示的简单起见,我们将要求完全并发一次 ,以便不考虑与当前不可用的现有系统进行比较。


具有这种要求的后果将在下面讨论。


交易性


为了使读者对已出现的复杂性更加了解,让我们看看开发这样的系统时必须考虑的各种不良情况。 我们还将尝试使用一种通用方法,该方法将允许我们考虑到我们的要求来解决上述问题。


首先想到的是需要记录处理程序以及输入和输出流的状态。 输出流的状态由一个简单的数字队列描述,输入流的状态由它们中的位置描述。 本质上,流是无限队列,并且队列中的位置唯一地设置位置。


主意


以下处理程序的简单实现是使用某种数据仓库而产生的。 在此阶段,存储库的特定属性对我们而言并不重要。 我们将使用Pseco语言进行说明(Pseco:=伪代码):


handle(input_queues, output_queues, state): #    input_indexes = storage.get_input_indexes() #      while true: #         items, new_input_indexes = input_queues.get_from(input_indexes) #    state.queue.push(items) #     duration state.queue.trim_time_window(duration) avg = state.queue.avg() need_update_counter = state.queue.size() > size_boundary # (A)      output_queues[0].push(avg) if need_update_counter: # (B)      db.increment_counter() # (C)     storage.save_state(state) # (D)    storage.save_queue_indexes(new_input_indexes) # (E)    input_indexes = new_input_indexes 

这是一种简单的单线程算法,可以根据上述任务从输入流中读取数据并写入所需的值。


让我们看看在任意时间点节点掉落以及恢复工作之后会发生什么。 显然,如果在(A)(E)点跌倒了,一切都会好起来的:要么数据尚未记录在任何地方,我们只是恢复状态并在另一个节点上继续进行,或者所有必要的数据已被记录下来,然后继续下一步。


但是,如果其他所有地方都跌倒了,那么意外的麻烦正在等待着我们。 如果在(B)点发生下降,则在重新启动处理程序时,我们将恢复状态并在近似相同的数字范围内重新记录平均值。 在(C)点下降的情况下(C)除了平均重复次数外,还会以值的增量重复一次。 并且如果(D)下降(D)我们将获得处理程序的不一致状态:该状态对应于新的时间点,并且我们将从旧的输入流中读取值。


惊喜


同时,重新安排录制操作时,根本不会发生根本变化:不一致和重复将保持不变。 因此,我们得出的结论是,所有更改存储库,输出队列和数据库中处理程序状态的操作都应以事务方式执行,即 一切都是原子的。


因此,有必要开发一种机制,以使不同的存储可以事务性地更改其状态,而不是独立于每个内部,而是同时在所有存储之间进行事务性更改。 当然,您可以将我们的存储放入外部数据库中,但是,该任务假定数据库引擎和流数据处理框架的引擎是分开的,并且彼此独立地工作。 在这里我想考虑最困难的情况,因为 简单的案例没有什么好考虑的。


竞争响应能力


仔细考虑一次竞争性执行。 在容错系统的情况下,我们需要从某个角度继续工作。 显然,这是过去的观点,因为 为了保持性能,不可能存储当前和将来的所有状态变化时刻:要么保存最后的操作结果,要么保存一组用于增加吞吐量的值。 这种行为立即导致我们这样一个事实,即在恢复处理器状态之后,结果会有一些延迟,它将随着值组的大小和状态大小的增加而增加。


除了此延迟之外,系统中还存在与将状态加载到另一个节点相关的延迟。 除此之外,问题节点的检测还需要一些时间,并且通常需要很多时间。 首先,这是由于以下事实:如果我们设置了较短的检测时间,则可能会出现频繁的误报,这将导致各种不愉快的特殊效果。


此外,随着并行处理器数量的增加,突然发现,即使没有故障,并不是所有处理器都能很好地工作。 有时会发生钝器,这也会导致处理延迟。 此类钝器的原因可能多种多样:


  1. 软件 :GC暂停,内存碎片,分配器暂停,内核中断和任务调度,设备驱动程序问题导致速度降低。
  2. 硬件 :磁盘或网络负载高,由于散热问题,过载等导致CPU节流,由于技术问题而导致磁盘变慢。

这绝不是详尽的问题列表,这些问题可能会使处理程序变慢。


因此,放慢速度是人们赖以生存的前提。 有时这不是一个严重的问题,有时,即使出现故障或减速,保持高处理速度也非常重要。


立即出现系统重复的想法:让我们为一个相同的数据流运行,而不是一个,而是两个,甚至三个。 这里的问题在于,在这种情况下,很容易发生重复和不一致的系统行为。 通常,框架不是针对这种行为而设计的,并且建议在任何给定时间处理程序的数量不超过一个。 允许描述的执行重复的系统称为并发精确一次


这种体系结构使您可以立即解决几个问题:


  1. 故障安全行为:如果其中一个节点发生故障,则另一个节点继续工作,就好像什么都没发生一样。 不需要额外的协调,因为 无论第一个处理程序的状态如何,都将执行第二个处理程序。
  2. 去除钝器:首先提供结果的人对他有好处。 对方将仅需选择一个新状态并从此刻开始继续。

尤其是,这种方法使您可以在更可预测的时间内完成困难且困难的长时间计算,因为 两者都变得愚蠢和跌倒的可能性大大降低。


概率评估


让我们尝试评估重复性能的好处。 假设处理程序平均每天都会发生一些事情:GC变钝了,或者节点在说谎,或者容器变得癌了。 还假设我们在10秒内准备好数据包。


那么,在创建包期间发生某些事情的概率为10 / (24 · 3600) ≃ 1e-4


如果并行运行两个处理程序,则两次飞行的概率为≃ 1e-8 。 因此,这项活动将在23年后到来! 是的,系统寿命不长,这将永远不会发生!


而且,如果包装的准备时间更短和/或发钝的次数更少,那么这个数字只会增加。


因此,我们得出结论,正在考虑的方法大大提高了整个系统的可靠性。 剩下的只是解决这样一个小问题:在哪里阅读如何制作并发一次精确系统。 答案很简单:您必须在这里阅读。


半交易


为了进一步讨论,我们需要半事务的概念。 解释它的最简单方法是举一个例子。


考虑将资金从一个银行帐户转移到另一个银行帐户。 使用Pseco语言进行交易的传统方法可以描述如下:


 transfer(from, to, amount): tx = db.begin_transaction() amount_from = tx.get(from) if amount_from < amount: return error.insufficient_funds tx.set(from, amount_from - amount) tx.set(to, tx.get(to) + amount) tx.commit() return ok 

但是,如果无法进行这些交易怎么办? 使用锁,可以按照以下步骤进行操作:


 transfer(from, to, amount): #         lock_from = db.lock(from) lock_to = db.lock(to) amount_from = db.get(from) if amount_from < amount: return error.insufficient_funds db.set(from, amount_from - amount) db.set(to, db.get(to) + amount) return ok 

这种方法可能导致死锁,因为 可以并行执行不同顺序的锁定。 要纠正此行为,只需引入一个以确定性顺序(例如,按键排序)同时获取多个锁的函数,即可完全消除可能的死锁。


但是,可以稍微简化实现:


 transfer(from, to, amount): lock_from = db.lock(from) amount_from = db.get(from) if amount_from < amount: return error.insufficient_funds db.set(from, amount_from - amount) lock_from.release() #   , # .. db.set(db.get...)     lock_to = db.lock(to) db.set(to, db.get(to) + amount) return ok 

这种方法还可以使最终状态保持一致,并通过防止过度支出资金的类型来保存不变性。 与以前的方法的主要区别在于,在这种实施方式中,我们有一定的时间段内帐户处于不一致状态。 即,这种操作意味着账户中资金的总状态不会改变。 在这种情况下, lock_from.release()db.lock(to)之间存在一个时间间隔,在此时间间隔内,数据库可能给出不一致的值:总数可能与正确的向下数值不同。


实际上,我们将一笔用于汇款的交易分为两个半交易:


  1. 前半笔交易会进行检查,并从帐户中扣除必要的金额。
  2. 第二半事务将提取的金额写入另一个帐户。

显然,将交易拆分为较小的交易通常会违反交易行为。 上面的例子也不例外。 但是,如果完全完成了链中的所有半事务,那么结果将与保留的所有不变量保持一致。 这正是半交易链的重要属性。


暂时失去一些一致性,我们仍然获得了另一个有用的功能:操作的独立性,因此具有更好的可伸缩性。 独立性体现在以下事实上:每次半事务仅处理一行,读取,检查和更改其数据,而不与其他数据通信。 因此,您可以改组其事务仅使用一个分片的数据库。 而且,这种方法可用于异构存储库的情况。 半事务可以从一种类型的存储开始,而在另一种类型上结束。 这种有用的属性将在将来使用。


一个合理的问题出现了:如何在分布式系统中实现半-而不是耙? 要解决此问题,您需要考虑无锁方法。


无锁


如您所知,无锁方法有时会提高多线程系统的性能,尤其是在竞争性访问资源的情况下。 但是,完全不清楚这种方法是否可以用于分布式系统中。 让我们深入研究一下什么是无锁,以及为什么此属性对解决我们的问题很有用。


有些开发人员有时不太了解什么是无锁。 狭narrow的眼光表明这与原子处理器指令有关。 在这里重要的是要理解,无锁意味着使用“原子”,反之则不成立,也就是说, 并非所有“原子”都提供无锁行为。


无锁算法的一个重要属性是至少一个线程在系统中取得了进展。 但是由于某些原因,许多人将此属性归为定义(例如,可以在Wikipedia上找到的这样的钝性定义)。 这里有必要添加一个重要的细微差别:即使在一个或多个线程变钝的情况下也取得了进展。 这是一个非常关键的点,经常被忽略,并且对分布式系统有严重的影响。


为什么没有至少一个线程的进度条件会否定无锁算法的概念? 事实是,在这种情况下,通常的自旋锁也将是无锁的。 确实,锁定者将取得进步。 有没有进度=>无锁的线程?


显然,无锁意味着没有锁,而自旋锁的名称表明这是一个真正的锁。 这就是为什么即使在直率的情况下,也要为进度添加条件很重要。 毕竟,这些延迟可以无限期地持续下去,因为 该定义并未说明上限时间线。 如果是这样,那么这种延迟在某种意义上将等同于流量的关闭。 在这种情况下,无锁算法将在这种情况下取​​得进展。


但是谁说无锁方法专门适用于多线程系统? 将同一节点上同一进程中的线程替换为不同节点上的进程,并使用共享的分布式存储来替换线程的共享内存,我们得到了一种无锁的分布式算法。


在这样的系统中,节点掉落等效于线程执行一段时间的延迟,因为 现在该恢复工作了。 同时,无锁方法允许分布式系统中的其他参与者继续工作。 此外,特殊的无锁算法可以彼此并行运行,从而检测出竞争性变化并减少重复项。


一次精确的方法意味着存在一致的分布式存储。 通常,此类存储表示巨大的持久键值表。 可能的操作: setgetdel 。 但是,无锁方法需要更复杂的操作:CAS或比较和交换。 让我们更详细地考虑该操作,其使用的可能性以及其产生的结果。


卡斯


CAS或比较交换是无锁和无等待算法的主要且重要的同步原语。 以下Pseco可以说明其实质:


 CAS(var, expected, new): # ,   atomic,   atomic: if var.get() != expected: return false var.set(new) return true 

有时,为了进行优化,它们返回的不是truefalse ,而是前一个值,因为 通常,此类操作是在循环中执行的,为了获得expected ,您必须首先阅读它:


 CAS_optimized(var, expected, new): # ,   atomic,   atomic: current = var.get() if current == expected: var.set(new) return current #  CAS   CAS_optimized CAS(var, expected, new): return var.CAS_optimized(expected, new) == expected 

这种方法可以节省一个读数。 作为审查的一部分,我们将使用CAS的简单形式,因为 如果需要,这样的优化可以独立完成。


如果是分布式系统,则对每个更改进行版本控制。 即 首先,我们从存储中读取值,以获取数据的当前版本。 然后,我们尝试写入,期望数据的版本没有更改。 在这种情况下,每次更新数据时都会增加版本:


 CAS_versioned(var, expected_version, new): atomic: if var.get_version() != expected_version: return false var.set(new, expected_version + 1) return true 

这种方法使您可以更准确地控制值的更新,从而避免ABA问题 。 特别是,Etcd和Zookeeper支持版本控制。


注意CAS_versioned操作的使用赋予的重要属性。 事实是,可以在不影响高级逻辑的情况下重复执行此类操作。 在多线程编程中,此属性没有特殊值,因为 在那里,如果操作失败,那么我们肯定知道它不适用。 在分布式系统的情况下,违反了该不变式,因为 该请求可能到达收件人,但是成功的响应不再存在。 因此,能够重新发送请求而不必担心破坏高级逻辑的不变性很重要。


CAS_versioned操作CAS_versioned正是此属性。 实际上,可以无休止地重复此操作,直到返回接收者的真实响应为止。 而这又引发了与网络交互有关的一整类错误。


例子


让我们看看如何基于CAS_versioned和半交易将帐户从一个帐户转移到另一个帐户,例如,该帐户属于Etcd的不同副本。 在此,我假设已经基于提供的API相应地实现了CAS_versioned函数。


 withdraw(from, amount): # CAS- while true: #     version_from, amount_from = from.get_versioned() if amount_from < amount: return error.insufficient_funds if from.CAS_versioned(version_from, amount_from - amount): break return ok deposit(to, amount): # CAS- while true: version_to, amount_to = to.get_versioned() if to.CAS_versioned(version_to, amount_to + amount): break return ok transfer(from, to, amount): #   if withdraw(from, amount) is ok: #     , #    deposit(to, amount) 

在这里,我们将操作分为半事务,然后通过CAS_versioned操作执行每个半事务。 这种方法使您可以独立地使用每个帐户,从而允许使用彼此不连接的异构存储。 在这里等待我们的唯一问题是在半交易之间的时间间隔内当前流程下降的情况下资金损失。



为了继续,您需要实现一个事件队列。 其思想是,为了使处理程序彼此通信,您需要具有一个有序的消息队列,在该队列中数据不会丢失或重复。 因此,处理程序链中的所有交互都将基于此原语构建。 它也是分析和审核传入和传出数据流的有用工具。 除此之外,还可以通过队列完成处理程序状态的更改。


该队列将包含一对操作:


  1. 将消息添加到队列的末尾。
  2. 从队列中指定索引处接收消息。

在这种情况下,出于以下几个原因,我不考虑从队列中删除消息:


  1. 多个处理器可以从同一队列读取。 删除同步将是一项艰巨的任务,尽管并非不可能。
  2. 将队列保持相对较长的时间间隔(一天或一周)以进行调试和审核非常有用。 此属性的有用性很难高估。
  3. 您可以按计划删除旧项目,也可以通过在队列项目上设置TTL来删除。 重要的是要确保处理器在扫帚到达并清理所有东西之前设法处理数据。 如果处理时间大约是几秒钟,而TTL大约是几天,那么这种情况就不会发生。

为了存储元素并有效地实现添加,我们需要:


  1. 具有当前索引的值。 该索引指向添加项目的队列的末尾。
  2. , .

lock-free


: . :


  1. CAS .
  2. .

, , .


  1. lock-free . , , . Lock-free? ! , 2 : . lock-free, — ! , , , . . , .. , .
  2. . , . .

, lock-free .


Lock-free


, , : , .. , :


 push(queue, value): #      index = queue.get_current_index() while true: #  ,    #    var = queue.at(index) #  = 0   , ..   # ,         if var.CAS_versioned(0, value): #   ,    queue.update_index(index + 1) break #   , .   index = max(queue.get_current_index(), index + 1) update_index(queue, index): while true: #     cur_index, version = queue.get_current_index_versioned() #      , #  , .   if cur_index >= index: # -     , #        break if queue.current_index_var().CAS_versioned(version, index): #      ,   break # -  . # ,      ,   

. , ( — , , ). lock-free . ?


, push , ! , , .


. : . , - , - . , , .. . . ? , .. , , .


, , . .. . , , . , .


, . , . , , . , .



, , , .


. .


, :


  1. , .. stateless.
  2. , — .

, , concurrent exactly-once .


:


 handle(input, output): index = 0 while true: value = input.get(index) output.push(value) index += 1 

. :


 handle(input, output, state): #   index = state.get() while true: value = input.get(index) output.push(value) index += 1 #   state.set(index) 

exactly-once . , , , .


exactly-once , , . .., , , , , — :


 #       get_next_index(queue): index = queue.get_index() #     while queue.has(index): #    queue.push index = max(index + 1, queue.get_index()) return index #      . #  true    push_at(queue, value, index): var = queue.at(index) if var.CAS_versioned(0, value): #   queue.update_index(index + 1) return true return false handle(input, output, state): #   #    {PREPARING, 0} fsm_state = state.get() while true: switch fsm_state: case {PREPARING, input_index}: #   :   , #        output_index = output.get_next_index() fsm_state = {WRITING, input_index, output_index} case {WRITING, input_index, output_index}: value = input.get(input_index) if output.push_at(value, output_index): #  ,     input_index += 1 #    ,  push_at  false, #        fsm_state = {PREPARING, input_index} state.set(fsm_state) 

push_at ? , . , , , . , . . - , lock-free .


, :


  1. : .
  2. , : .

: concurrent exactly-once .


? :


  1. , , push_at false. .
  2. , . , , .

concurrent exactly-once ? , , . , . .


:


 #     ,  ,     # ..       true, #      true. #       false push_at_idempotent(queue, value, index): return queue.push_at(value, index) or queue.get(index) == value handle(input, output, state): version, fsm_state = state.get_versioned() while true: switch fsm_state: case {PREPARING, input_index}: #   ,   , #        output_index = output.get_next_index() fsm_state = {WRITING, input_index, output_index} case {WRITING, input_index, output_index}: value = input.get(input_index) #   , #       if output.push_at_idempotent(value, output_index): input_index += 1 fsm_state = {PREPARING, input_index} #     if state.CAS_versioned(version, fsm_state): version += 1 else: #   ,    version, fsm_state = state.get_versioned() 

:


简单的


, . , .


kernel panic, , .. . . : , . , .


, , .



: .


: , , , , :


 #  : # - input_queues -   # - output_queues -   # - state -    # - handler -    : state, inputs -> state, outputs handle(input_queues, output_queues, state, handler): #        version, fsm_state = state.get_versioned() while true: switch fsm_state: # input_indexes       case {HANDLING, user_state, input_indexes}: #       inputs = [queue.get(index) for queue, index in zip(input_queues, input_indexes)] #   ,    next_indexes = next(inputs, input_indexes) #    #     user_state, outputs = handler(user_state, inputs) #      , #     fsm_state = {PREPARING, user_state, next_indexes, outputs, 0} case {PREPARING, user_state, input_indexes, outputs, output_pos}: #  ,      #    output_index = output_queues[output_pos].get_next_index() #     fsm_state = { WRITING, user_state, input_indexes, outputs, output_pos, output_index } case { WRITING, user_state, input_indexes, outputs, output_pos, output_index }: value = outputs[output_pos] #       if output_queues[output_pos].push_at_idempotent( value, output_index ): #  ,      output_pos += 1 #    ,      PREPARING. #     #     fsm_state = if output_pos == len(outputs): #   , #       {HANDLING, user_state, input_indexes} else: #       #   , #         {PREPARING, user_state, input_indexes, outputs, output_pos} if state.CAS_versioned(version, fsm_state): version += 1 else: #   ,    version, fsm_state = state.get_versioned() 

:


最后的


: HANDLING . , .., , . , . , PREPARING WRITING , . , HANDLING .


, , , . , . , .


. . .


最后的


:


 my_handler(state, inputs): #      state.queue.push(inputs) #    duration state.queue.trim_time_window(duration) #   avg = state.queue.avg() need_update_counter = state.queue.size() > size_boundary return state, [ avg, if need_update_counter: true else: # none      none ] 

, , concurrent exactly-once handle .


:


 handle_db(input_queue, db): while true: #      tx = db.begin_transaction() #     . #      , #      index = tx.get_current_index() #    tx.write_current_index(index + 1) #      value = intput_queue.get(index) if value: #    tx.increment_counter() tx.commit() #   ,      , #           

. 因为 , , , , concurrent exactly-once . .



— . , , .



, , . , , .



. , . 因为 , . . .



— . , , . , - , , . , .. , , .



. , , . , , .



. , . : , . , .


, , :


  1. , . .
  2. . , .
  3. . , . , , . .. . : .

, , -, , -, .


, . :


 transfer(from, to, amount): #   if withdraw(from, amount) is ok: #     , #    deposit(to, amount) 

withdraw , , deposit : ? deposit - (, , ), . , , , , ? , , - , .


, , , . , , , . , . , , . 因为 , , . , : , — .



, .


: , , , , . , - :


  • . , , , , .
  • . .

, , .


, , .. , , . , .


: lock-free , . , .. , .


CAS . , :


 #     ,    handle(input, output, state): # ... while true: switch fsm_state: case {HANDLING, ...}: #      fsm_state = {PREPARING, ...} case {PREPARING, input_index}: #   ... output_index = ...get_next_index() fsm_state = {WRITING, output_index, ...} case {WRITING, output_index, ...}: #  ,  output_index 

, . . :


  1. PREPARING . , .
  2. WRITING . . , PREPARING .

, . , , — . :


  1. . , , .. , .
  2. , .. . , .

, lock-free , , .



, . , Stale Read , . — CAS: . :


  • Distributed single register — (, etcd Zookeeper):
    1. Linearizability
    2. Sequential consistency
  • Transactional — (, MySQL, PostgreSQL ..):
    1. Serializability
    2. Snapshot Isolation
    3. Repeatable Read
    4. Read Committed
  • Distributed Transactional — NewSQL :
    1. Strict Consistency

: ? , , . , , CAS . , , Read My Writes .


结论


exactly-once . , .. , , , . , , , , .. , .


lock-free .


:


  1. : .
  2. : .
  3. : : exactly-once .
  4. Concurrent : .
  5. Real-time : .
  6. Lock-free : , .
  7. Deadlock free : , .
  8. Race condition free : .
  9. Hot-hot : .
  10. Hard stop : .
  11. No failover : .
  12. No downtime : .
  13. : , .
  14. : .
  15. : .
  16. : .

, . 但这是另一个故事。


轻便



:


  1. Concurrent exactly-once.
  2. Semi-transactions .
  3. Lock-free two-phase commit, .


  1. .
  2. lock-free .
  3. .


[1] 维基百科:ABA问题。
[2] 博客:您无法立即交付
[3] 哈伯:可实现分布式故障安全事务的提交执行时间下限的可实现性。
[4] 哈伯:异步3:主观模型。
[5] 维基百科:非阻塞同步。

Source: https://habr.com/ru/post/zh-CN413817/


All Articles