哈Ha!
我们发现了《
Apache Kafka。流处理和数据分析 》这本书的最新储备,并将其发送给印前。 此外,我们还收到了《
行动中的卡夫卡流 》一书的合同,并于下周开始将其翻译。

为了展示使用Kafka Streams库的有趣情况,我们决定将Kafka中事件源范例的文章从非常早的Adam Worski那里翻译过来,后者在两周前发表了有关Scala语言的
文章 。 更有意思的是,亚当·沃斯基(Adam Worski)的观点不可否认:例如,
在这里 ,有人认为这种范式绝对不适合卡夫卡。 我们希望更令人难忘的是,该文章给人留下了深刻的印象。
罗伯特·马丁
( Robert Martin)出版的《
清洁架构》和本文中的“事件采购”一词均被翻译为“事件记录”。 如果有人对“泵送事件”的翻译印象深刻-请让我知道。
创建一个提供事件注册(事件源)的系统,我们迟早会遇到持久性(persistence)的问题-这里我们有两个选择。 首先,有
EventStore ,这是一个在战斗中变硬的成熟实现。 另外,您可以使用
akka-persistence来充分利用
Cassandra的可伸缩性,并依赖于actor模型的性能。 另一个选择是良好的旧
关系数据库 ,其中
CRUD
方法与事件的使用相结合,并且最大的收益被挤出交易。
除了由于最近实施的一些事情而产生的这些(可能还有许多其他)机会外,今天在
Kafka上组织事件注册也变得非常简单。 让我们看看如何。
什么是事件记录?关于此主题,有很多
出色的 入门 文章 ,因此,我将只限于最简洁的介绍。 注册事件时,我们不会保存系统中使用的实体的“当前”状态,而是保存与这些实体相关的事件流。 每个
事件都是一个
事实 ,描述对象
发生的状态变化(已经!)。 如您所知,事实并未讨论且
没有改变 。
当我们拥有此类事件流时,可以通过最小化与该实体有关的所有事件来阐明实体的当前状态; 但是,请记住,相反的情况是不可能的-仅保留“当前”状态,我们将丢弃大量有价值的时间信息。
事件记录可以与更传统的状态存储方式
和平共处 。 通常,系统处理许多实体类型(例如:用户,订单,商品等),事件注册很可能仅对其中某些类别有用。 重要的是要注意,这里我们没有面临“全有还是全无”的选择; 这只是我们应用程序中的其他状态管理功能。
卡夫卡的活动存储要解决的第一个问题:如何在Kafka中存储事件? 有三种可能的策略:
- 将所有类型的实体的所有事件存储在一个主题中 (包含多个细分)
- 通过每个实体类型的主题,即,我们在单独的主题中,在单独的主题中提取与用户相关的所有事件-所有与产品相关的事件,等等。
- 按主题本质,即针对每个特定用户和每个产品名称的单独主题
第三种策略(按主题)实际上是不可行的。 如果每个新用户出现在系统中时,他必须开始一个单独的主题,那么主题的数量很快就会变得不受限制。 在这种情况下,任何聚合都将非常困难,例如,将很难在搜索引擎中为所有用户建立索引; 您不仅需要消耗大量的主题,而且还不是所有主题都事先已知。
因此,仍然需要在1和2之间进行选择。这两种选择都有其优点和缺点。 拥有一个主题可以更轻松地获得所有事件的
全局视图 。 另一方面,通过突出显示每种实体的主题,可以分别缩放和细分每个实体的流程。 两种策略之一的选择取决于特定的用例。
此外,如果您有额外的存储空间,则可以一次实施两种策略:从一个综合性主题中按实体类型生成主题。

在本文的其余部分,我们将仅处理一种类型的实体和单个主题,尽管可以轻松推断出所提供的资料并将其应用于许多主题或实体类型。
(编辑:正如
克里斯·亨特 (
Chris Hunt)指出的那样,
马丁·克莱普曼 ( Martin Kleppman)有
一篇出色的文章 ,详细探讨了如何按主题和细分来分配事件。
事件记录范例中最简单的存储操作从支持事件记录的存储中可以预期的逻辑上,最简单的操作是读取特定实体的“当前”(最小化)状态。 通常,每个实体都有一个或另一个
id
。 因此,知道此
id
,我们的存储系统应返回对象的当前状态。
最后的事实是事件日志:始终可以从与特定实体关联的事件流中推断出当前状态。 为此,数据库引擎将需要一个纯函数(无副作用),该函数接受事件和初始状态,并返回更改后的状态:
Event = > State => State
。 在存在这样的函数和
初始状态的
值的情况下,当前状态是事件流的
卷积 (状态更改函数必须是
干净的,以便可以将其自由地重复应用于相同的事件。)
Kafka中“读取当前状态”操作的简化实现从该主题收集
所有事件的流,对其进行过滤,仅保留具有给定
id
事件,并使用指定的函数折叠。 如果有很多事件(随着时间的推移,事件的数量只会增加),那么此操作可能会变慢并消耗大量资源。 即使其结果将被缓存在内存中并存储在服务节点上,该信息仍将必须定期重新创建,例如由于节点故障或由于缓存数据拥挤所致。

因此,需要一种更合理的方法。 这就是kafka流和状态存储库派上用场的地方。 Kafka-streams应用程序在整个节点群集上运行,这些节点一起使用某些主题。 就像常规的Kafka使用者一样,每个节点都分配了一系列消耗的主题段。 但是,kafka-streams提供了更高级别的数据操作,使创建派生流变得更加容易。
卡夫卡流中的一种此类操作是流在本地存储中的卷积。 每个本地存储仅包含来自给定节点使用的那些段的数据。 开箱即用,有两种本地存储实现:
在RAM中和基于
RocksDB 。
回到事件注册的主题,我们注意到可以
通过在本地节点上保留分配给该节点的段中每个实体的“当前状态”来折叠
状态存储中的事件流。 如果我们使用基于RocksDB的状态存储的实现,那么我们可以在单个节点上跟踪多少个实体仅取决于磁盘空间量。
这是使用Java API时事件在本地存储中的卷积(serde表示“ serializer / deserializer”):
KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde)
Confluent网站
上提供了
基于微服务的订单处理的完整示例。
(编辑:正如
Sergei Egorov和
Nikita Salnikov在Twitter上所指出的那样,对于具有事件日志记录的系统,您可能需要更改Kafka中的默认数据存储设置,以便没有时间或大小限制,并且,可选地, ,启用数据压缩。)
查看当前状态我们已经创建了一个状态存储库,位于该状态存储库中的所有实体的当前状态都来自分配给节点的段,但是现在如何请求该存储库? 如果请求是本地的(也就是说,它来自存储库所在的同一节点),那么一切都非常简单:
streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId);
但是,如果我们要请求位于另一个节点上的数据怎么办? 以及如何找出这个节点是什么? 在这里,Kafka最近引入的另一个功能很方便:
交互式查询 。 在他们的帮助下,您可以访问Kafka元数据,并找出哪个节点处理具有给定
id
的主题细分(在这种情况下,隐式使用主题细分工具):
metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)
接下来,您需要以某种方式将请求重定向到正确的节点。 请注意:实施和处理跨站点通信的特定方式-无论是REST,akka-remote还是任何其他方式-不属于kafka-streams责任范围。 Kafka只是提供对状态存储的访问,并提供有关给定
id
的状态存储位于哪个节点的信息。
灾难复原状态存储看起来不错,但是当节点发生故障时会发生什么呢? 重建给定段的本地状态存储也可能是一项昂贵的操作。 由于kafka流需要重新平衡(添加或删除节点之后),因此很长一段时间内可能会导致延迟增加或请求丢失。
这就是为什么默认情况下会记录长期状态存储的原因:也就是说,对存储进行的所有更改都会另外写入changelog主题。 压缩了该主题(因为对于每个
id
我们只对最后一条记录感兴趣,而没有更改历史记录,因为历史记录存储在事件本身中)-因此,它应尽可能小。 这就是为什么可以更快地恢复另一个节点上的存储的原因。
但是,在这种情况下,通过重新平衡,延迟仍然可能。 为了进一步减少它们,kafka-streams提供了为每个存储库保留多个
备份副本 (
num.standby.replicas
)的功能。 这些副本将使用从主题中检索到的所有更新以及更改日志(一旦可用),并准备在当前主存储出现故障后立即切换到给定段的主状态存储模式。
连贯性使用默认设置,Kafka至少提供一次交付。 也就是说,在节点发生故障的情况下,某些消息可能会多次传递。 例如,如果在状态存储更改为日志之后但在执行该特定事件的偏移之前系统崩溃,则特定事件可能会两次应用于状态存储。 也许这不会造成任何困难:我们的状态更新功能(
Event = > State => State
)通常可以应付这种情况。 但是,它可能无法应付:在这种情况下,可以使用Kafka提供的
严格一次性交付的保证。 这样的保证仅在读写Kafka主题时适用,但这就是我们在这里所做的:
在后台,Kafka主题中的所有条目都被简化为更新状态存储的更改日志并执行偏移量。 所有这一切都可以
以交易的形式完成。
因此,如果我们的状态更新功能需要这样做,则可以使用单个配置选项“ processing.guarantee”启用“严格一次性交付”流处理的语义。 因此,性能下降,但是没有任何结果。
事件监听既然我们已经介绍了基础知识-查询“当前状态”并为每个实体更新它-触发
副作用怎么办? 在某些时候,这对于以下情况是必要的:
- 发送通知电子邮件
- 搜索引擎实体索引
- 通过REST(或SOAP,CORBA等)调用外部服务
所有这些任务在某种程度上都是阻塞的,并且与I / O操作相关(这对于副作用是很自然的),因此在状态更新逻辑的框架内执行它们可能不是一个好主意:结果,主循环中的故障频率可能会增加事件以及性能方面将存在瓶颈。
此外,具有状态更新逻辑(E
Event = > State => State
)的函数可以多次运行(在失败或重新启动的情况下),并且大多数情况下我们希望最大程度地减少多次运行特定事件的副作用的情况。
幸运的是,由于我们处理Kafka主题,因此我们具有很大的灵活性。 在状态存储被更新的流程阶段,事件可以不变地(或在必要时也以修改的形式)发出,并且生成的流/主题(在Kafka中,这些概念是等效的)可以随意使用。 此外,可以在状态更新阶段之前或之后使用它。 最后,我们可以控制产生副作用的方式:至少一次或最多一次。 如果仅在成功完成所有副作用之后才执行消耗的主题事件的偏移,则提供第一个选项。 相反,最多运行一次,我们执行换档直到触发副作用。
有几种触发副作用的选项,它们取决于特定的实际情况。 首先,您可以定义Kafka-streams阶段,其中触发每个事件的副作用作为流处理功能的一部分。
设置这种机制非常简单,但是当您必须同时处理重试,控制偏移量和竞争许多事件的偏移量时,此解决方案并不灵活。 在这种更复杂的情况下,使用
反应型卡夫卡或其他“直接”消耗卡夫卡主题的机制来确定处理可能更为合适。
一个事件也有可能
触发其他事件 -例如,“ order”事件可以触发“ disparation for dispatch”和“ customer notification”事件。 这也可以在kafka-streams阶段实现。
最后,如果我们想将事件或从事件中提取的某些数据存储在数据库或搜索引擎中(例如在ElasticSearch或PostgreSQL中),则可以使用
Kafka Connect连接器,该
连接器将为我们处理与主题使用相关的所有详细信息。
创建视图和投影通常,系统要求不限于仅查询和处理单个实体流。 还应该支持聚合,多个事件流的组合。 此类组合流通常称为
投影 ,并且在折叠时,可用于创建
数据表示 。 是否可以使用Kafka实施它们?

再次,是的! 请记住,原则上我们只处理存储事件的Kafka主题; 因此,我们拥有原始的Kafka消费者/生产者,kafka-stream组合器甚至
KSQL的所有
功能 -所有这些对于我们定义投影都
将派上用场。 例如,使用kafka-streams可以过滤流,显示,按键分组,在临时或会话窗口中聚合等。 在代码级别,或使用类似SQL的KSQL。
就像我们对单个实体流所做的那样,可以使用状态存储和交互式查询将此类流存储很长时间,并提供给查询。
接下来是什么为了防止事件随系统的发展而无限流动,压缩选项(例如保存“当前状态”的
快照)可能会很有用。 因此,我们可以将自己限制为仅存储一些最近的快照以及在创建快照之后发生的那些事件。
尽管在Kafka中(以及在其他一些基于记录事件的原理上运行的系统中)不直接支持快照,但您绝对可以使用上述某些机制自行添加此类功能,例如流,使用者,状态存储等。 d。
总结尽管起初,Kafka在设计时并不着眼于事件注册范例,但实际上它是一种流数据引擎,支持
主题复制 ,分段,
状态存储库和
流API ,并且同时非常灵活。 因此,在Kafka上,您可以轻松实现事件注册系统。 此外,由于在发生所有事情的背景下,我们将始终有一个Kafka主题,我们将获得更多的灵活性,因为我们可以使用高级流API或低级使用者。