《 Kafka Streams》一书在行动。 实时应用程序和微服务»

图片 嗨,habrozhiteli! 本书适合任何想了解流处理的开发人员。 了解分布式编程将帮助您更好地了解Kafka和Kafka Streams。 知道Kafka框架本身会很高兴,但这不是必需的:我将告诉您所需的一切。 多亏了这本书,经验丰富的Kafka开发人员(如新手)将学习如何使用Kafka Streams库创建有趣的流应用程序。 熟悉序列化等概念的中级和高级Java开发人员将学习如何应用其技能来创建Kafka Streams应用程序。 本书的源代码是用Java 8编写的,并且实质上使用Java 8的lambda表达式的语法,因此使用lambda函数(即使使用另一种编程语言)的功能对您也很有用。

摘录。 5.3。 聚合和窗口操作


在本节中,我们将继续介绍Kafka Streams最有希望的部分。 到目前为止,我们已经涵盖了Kafka Streams的以下方面:

  • 创建处理拓扑;
  • 在流应用程序中使用状态;
  • 建立数据流连接;
  • 事件流(KStream)和更新流(KTable)之间的差异。

在以下示例中,我们将所有这些元素放在一起。 此外,还将向您介绍窗口操作-流应用程序的另一个强大功能。 我们的第一个示例将是简单聚合。

5.3.1。 按行业分类的股票销售汇总


聚合和分组是处理流数据的重要工具。 仅凭入场检查个人记录通常是不够的。 要从数据中提取其他信息,必须对它们进行分组和组合。

在此示例中,您必须尝试一个盘中交易者的诉讼,该交易者需要跟踪多个行业中公司股票的销售量。 特别是,您对每个行业中销售份额最大的五家公司感兴趣。

对于这种聚合,您将需要执行以下几个步骤,以将数据转换为所需的格式(一般而言)。

  1. 创建一个基于主题的来源,发布原始股票交易信息。 我们将必须将StockTransaction类型的对象映射到ShareVolume类型的对象。 事实是,StockTransaction对象包含销售元数据,我们只需要有关已售出股票数量的数据。
  2. 通过股票代码将ShareVolume数据分组。 按符号分组后,可以将此数据折叠为股票销售小计。 值得注意的是,KStream.groupBy方法返回的类型为KGroupedStream的实例。 然后可以通过稍后调用KGroupedStream.reduce方法来获取KTable实例。

什么是KGroupedStream接口

KStream.groupBy和KStream.groupByKey方法返回KGroupedStream的实例。 KGroupedStream是按键分组后事件流的中间表示。 它根本不打算直接与其一起使用。 相反,KGroupedStream用于聚合操作,其结果始终为KTable。 而且,由于聚合操作的结果是KTable,并且它们使用状态存储,因此有可能并非所有更新结果都在管道中进一步发送。

KTable.groupBy方法返回一个类似的KGroupedTable-由密钥重新分组的更新流的中间表示。

让我们休息一下,看看无花果。 5.9,这表明我们已经取得了成就。 您应该已经熟悉此拓扑。

图片

现在,让我们看一下该拓扑的代码(可以在src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java文件中找到)(清单5.2)。

图片

给定的代码简洁性不同,并且在几行中执行了大量的操作。 在builder.stream方法的第一个参数中,您会自己发现一些新东西:使用Consumed.withOffsetResetPolicy方法设置的枚举类型AutoOffsetReset.EARLIEST的值(还有LATEST)。 使用此枚举类型,可以为每个KStream或KTable指定一种重置偏移量的策略;它的优先级高于从配置重置偏移量的参数。

GroupByKey和GroupBy

KStream接口有两种将记录分组的方法:GroupByKey和GroupBy。 两者都返回KGroupedTable,因此您可能会遇到一个合理的问题:它们之间的区别是什么?何时使用哪个?

当KStream中的键已经为非空时,使用GroupByKey方法。 最重要的是,从未设置过“需要重新分区”标志。

GroupBy方法假定您更改了分组键,因此重新分区标志设置为true。 在GroupBy方法之后执行连接,聚合等操作将导致自动重新分区。
摘要:尽可能使用GroupByKey而不是GroupBy。

mapValues和groupBy方法的作用是可以理解的,因此请看一下sum()方法(可以在src / main / java / bbejeck / model / ShareVolume.java文件中找到)(清单5.3)。

图片

ShareVolume.sum方法返回股票销售量的小计,整个计算链的结果是KTable <String,ShareVolume>对象。 现在您了解了KTable扮演的角色。 当ShareVolume对象到达时,最新的当前更新将保存在相应的KTable中。 重要的是不要忘记所有更新都反映在先前的shareVolumeKTable中,但并非所有更新都被发送出去。

此外,借助此KTable,我们进行汇总(按已售出的股票数量计算),以使五个公司的股票销售额在每个行业中最高。 在这种情况下,我们的操作将类似于第一次聚合期间的操作。

  1. 执行另一个groupBy操作,以按行业对单个ShareVolume对象进行分组。
  2. 继续汇总ShareVolume对象。 这次,聚合对象是固定大小的优先级队列。 这样的固定规模队列中只保留了出售股票数量最多的五家公司。
  3. 以字符串值显示上一段中的行,并按行业份额返回前五名最畅销的行。
  4. 将结果以字符串形式写入主题。

在图。 5.10显示了数据移动拓扑的图形。 如您所见,第二轮处理非常简单。

图片

现在,在清楚地理解了第二轮处理的结构之后,您可以参考其源代码(可以在文件src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java中找到它)(清单5.4)。

此初始化程序中有一个fixedQueue变量。 这是一个自定义对象-java.util.TreeSet的适配器,用于按已售出股票数量的降序跟踪N个最高结果。

图片

您已经遇到了对groupBy和mapValues的调用,因此我们不会在上面停下来(由于不推荐使用KTable.print方法,因此我们称之为KTable.toStream方法)。 但是您还没有看到Aggregate()方法的KTable版本,因此我们将花一些时间来讨论它。

您还记得,KTable的特征在于具有相同键的记录被视为更新。 KTable用新记录替换了旧记录。 聚合以相同的方式发生:具有一个键的最后记录被聚合。 记录到达时,使用加法器(聚合方法调用中的第二个参数)将其添加到FixedSizePriorityQueue类的实例中,但是如果已经存在具有相同键的另一个记录,则使用减法器删除旧记录(聚合方法中的第三个参数)。

这一切都意味着我们的聚合器FixedSizePriorityQueue不会使用一个键来聚合所有值,而是存储最畅销类型的股票数量N的移动总和。 每个条目包含到目前为止已售出的股票总数。 KTable将为您提供有关当前销售量最大的公司股票的信息;不需要每次更新的滚动汇总。

我们学会了做两件重要的事情:

  • 将KTable中的值按它们共有的键进行分组;
  • 对这些分组值执行有用的操作,例如卷积和聚合。

执行这些操作的能力对于理解通过Kafka Streams应用程序传递的数据的含义并弄清楚它们携带哪些信息非常重要。

我们还汇集了本书前面讨论的一些关键概念。 在第4章中,我们讨论了故障安全的本地状态对于流应用程序有多重要。 本章中的第一个示例说明了为什么本地状态如此重要-它使得可以跟踪您已经看到的信息。 本地访问避免了网络延迟,使应用程序更具生产力和抗错误性。

执行任何卷积或聚合操作时,必须指定状态存储的名称。 卷积和聚合操作返回一个KTable实例,并且KTable使用状态存储将旧结果替换为新结果。 如您所见,并非所有更新都在管道中进一步发送,这很重要,因为聚集操作旨在获取最终信息。 如果未应用本地状态,则KTable将进一步发送所有聚合和卷积结果。

接下来,我们研究在特定时间段内诸如聚合之类的操作的执行-所谓的窗口操作。

5.3.2。 窗口操作


在上一节中,我们介绍了“滚动”卷积和聚合。 该应用程序执行了股票销售的连续卷积,随后汇总了五只最畅销的股票。

有时,这种连续的汇总和结果卷积是必要的。 有时您只需要在给定的时间段内执行操作。 例如,计算在最近10分钟内用特定公司的股票进行了多少次证券交易。 或最近15分钟内有多少用户点击了新的横幅广告。 应用程序可以执行多次此类操作,但结果仅与指定的时间间隔(时间窗口)有关。

计算买方的交易所交易


在下面的示例中,我们将为多个交易者(大型组织或精明的单一金融家)跟踪交易交易。

进行此跟踪有两个可能的原因。 其中之一是需要知道哪些市场领导者正在买卖。 如果这些大型参与者和老练的投资者看到了自己的机会,那么遵循他们的策略就很有意义。 第二个原因是希望利用内部信息注意到任何可能的非法交易迹象。 为此,您将需要分析大量销售高峰与重要新闻稿之间的相关性。

此类跟踪包括以下步骤:

  • 创建一个流以读取股票交易主题;
  • 按客户ID和库存的库存代号对传入记录进行分组。 调用groupBy方法将返回KGroupedStream类的实例。
  • KGroupedStream.windowedBy返回由临时窗口界定的数据流,该窗口允许窗口聚合。 根据窗口的类型,返回TimeWindowedKStream或SessionWindowedKStream;否则,返回0。
  • 计算聚合操作的事务。 窗口数据流确定在此计算中是否考虑特定记录;
  • 在开发过程中将结果写入主题或将其输出到控制台。

该应用程序的拓扑结构很简单,但是其视觉效果没有受到损害。 看看图片。 5.11。

此外,我们将考虑窗口操作和相应代码的功能。

图片

窗户类型


Kafka Streams中有三种类型的窗口:

  • 会议
  • 翻滚(翻滚);
  • 滑动/“跳跃”(滑动/跳跃)。

选择哪种取决于业务需求。 “滚动”和“跳转”窗口的时间受到限制,而会话限制与用户操作相关联-会话的持续时间仅由用户的行为方式决定。 最主要的是不要忘记所有类型的窗口都是基于记录的日期/时间戳,而不是系统时间。

接下来,我们使用每种窗口类型实现拓扑。 完整的代码仅在第一个示例中给出,其他类型的窗口将保持不变,除了窗口操作的类型。

会话窗口


会话窗口与所有其他窗口类型非常不同。 它们不受时间的限制,而受用户活动(或您要跟踪的实体的活动)的限制。 会话窗口由不活动时间段界定。

图5.12说明了会话窗口的概念。 较小的会话将与左侧的会话合并。 右侧的会话将是单独的,因为它会长时间处于不活动状态。 会话窗口基于用户操作,但是应用记录中的日期/时间戳来确定记录属于哪个会话。

图片


使用会话Windows跟踪Exchange交易


我们将使用会话窗口捕获有关交换交易的信息。 会话窗口的实现如清单5.5所示(可以在src / main / java / bbejeck / chapter_5 / CountingWindowingAndKTableJoinExample.java中找到)。

图片

您已经满足了此拓扑的大多数操作,因此无需在此处再次考虑它们。 但是,我们现在将讨论几个新元素。

对于任何groupBy操作,通常会执行某种聚合操作(聚合,卷积或计数)。 您可以执行具有累积总数的累积聚合,也可以执行窗口聚合,在给定的时间窗口内考虑记录。

清单5.5中的代码计算了会话窗口内的事务数量。 在图。 5.13这些动作将逐步分析。

通过调用windowedBy(SessionWindows.with(twentySeconds).until(fifteenMinutes)),我们创建了一个会话窗口,其空闲间隔为20秒,保留间隔为15分钟。 20秒的不活动时间间隔意味着应用程序将包括在当前(活动)会话中从当前会话结束或开始20秒内到达的所有记录。

图片

接下来,我们指示要在会话窗口中执行的聚合操作-在这种情况下,计数。 如果传入记录落在非活动间隔之外(在日期/时间戳的任一侧),则应用程序将创建一个新会话。 保存间隔意味着将会话保持一定的时间,并允许后期数据超出会话的非活动时间段,但仍可以附加。 另外,合并产生的新会话的开始和结束与最早和最新的日期/时间戳相对应。

让我们看一下count方法中的一些条目,以了解会话如何工作(表5.1)。

图片

收到记录后,我们寻找具有相同密钥的现有会话,结束时间小于当前日期/时间戳-非活动间隔,开始时间大于当前日期/时间戳+非活动间隔。 考虑到这一点,表中有四个记录。 5.1合并为单个会话,如下所示。

1.记录1首先出现,因此开始时间等于结束时间,并且是00:00:00。

2.接下来是记录2,我们寻找不早于23:59:55且不迟于00:00:35开始的会话。 查找记录1并组合会话1和2。以会话1的开始时间(较早)和会话2的结束时间(较后)为准,新会话将在00:00:00开始并在00:00:15结束。

3.记录3到达,我们在00:00:30到00:01:10之间寻找会话,但未找到任何会话。 为密钥123-345-654(FFBE)添加第二个会话,该会话从00:00:50开始和结束。

4.记录4到达,我们在23:59:45和00:00:25之间寻找会话。 这次有两个会话-1和2。所有三个会话合并为一个,开始时间为00:00:00,结束时间为00:00:15。

从本节中所说的,值得记住以下重要的细微差别:

  • 会话不是固定大小的窗口。 会话的持续时间取决于给定时间段内的活动;
  • 数据中的日期/时间戳确定事件是属于现有会话还是处于非活动状态。

此外,我们将讨论以下类型的窗口-“同盟”窗口。

翻动窗户


“翻滚”窗口捕获特定时间段内的事件。 想象一下,您需要每20秒捕获一次公司的所有交换交易,以便您收集这段时间内的所有事件。 在20秒间隔结束时,窗口“滚动”并切换到新的20秒观察间隔。 图5.14说明了这种情况。

图片

如您所见,窗口中包括了过去20秒内收到的所有事件。 在这段时间结束时,将创建一个新窗口。

清单5.6显示了使用滚动窗口每20秒捕获一次交换交易的代码(您可以在src / main / java / bbejeck / Chapter_5 / CountingWindowingAndKtableJoinExample.java中找到它)。

图片

多亏了对TimeWindows.of方法的调用,您可以使用滚动窗口。 在此示例中,没有调用直到()方法,因此将使用默认的24小时保存间隔。

最后,是时候进入最后一个窗口选项了-跳窗。

滑动(“跳跃”)窗口


滑动/“跳跃”窗口类似于“滚动”窗口,但略有不同。 滑动窗口在创建新窗口以处理最近事件之前不会等待时间间隔的结束。 他们在比窗口持续时间短的等待间隔之后开始新的计算。

为了说明“爆炸袭击”和“跳跃”窗口之间的区别,让我们回到示例中计算交换交易。 和以前一样,我们的目标是计算交易数量,但是我们不想一直等待更新计数器。 相反,我们将以较短的间隔更新计数器。 例如,我们将继续每20秒计数一次事务数,但是每5秒更新一次计数器,如图2所示。 5.15。 同时,我们有三个带有重叠数据的结果窗口。

图片

清单5.7显示了指定滑动窗口的代码(可以在src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java中找到)。

图片

«» «» advanceBy(). 15 .

, . , , :

  • , ;
  • «» ;
  • «» , .

, KTable KStream .

5.3.3. KStream KTable


4 KStream. KTable KStream. . KStream — , KTable — , KTable.

. , .

  1. KTable KStream , , .
  2. KTable, . KTable .
  3. .

, .

KTable KStream


KTable KStream .

  1. KTable.toStream().
  2. KStream.map , Windowed TransactionSummary.

( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.8).

图片

KStream.map, KStream .

, KTable .

KTable


, KTable ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.9).

图片

, Serde , Serde. EARLIEST .

— .


. , ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.10).

图片

leftJoin . 4, JoinWindow , KStream-KTable KTable . : KTable, . : KTable KStream .

KStream.

5.3.4. GlobalKTable


, . 4 KStream, — KStream KTable. . , Kafka Streams . , , ( 4, « » 4.2.4).


— , ; . , , .


, , , . Kafka Streams GlobalKTable.

GlobalKTable , . , , . GlobalKTable . .

KStream GlobalKTable


5.3.2 . :

{customerId='074-09-3705', stockTicker='GUTM'}, 17 {customerId='037-34-5184', stockTicker='CORK'}, 16 

尽管这些结果与目标相符,但如果同时显示客户名称和公司全名,将会更加方便。 要添加客户名称和公司名称,您可以执行常规连接,但是需要执行两个键映射和重复分区。 使用GlobalKTable可以避免此类操作的成本。

为此,我们将使用清单5.11中的countStream对象(可以在src / main / java / bbejeck / Chapter_5 / GlobalKTableExample.java文件中找到相应的代码),并将其与两个GlobalKTable对象连接。

图片

之前我们已经讨论过了,所以不再赘述。 但是我注意到toStream()。Map函数中的代码出于可读性而不是嵌入的lambda表达式被抽象到函数对象中。

下一步是声明GlobalKTable的两个实例(显示的代码可以在src / main / java / bbejeck / Chapter_5 / GlobalKTableExample.java中找到)(清单5.12)。

图片


请注意,主题名称是使用枚举类型描述的。

现在我们已经准备好所有组件,剩下的就是编写连接代码(可以在文件src / main / java / bbejeck / Chapter_5 / GlobalKTableExample.java中找到)(清单5.13)。

图片

尽管此代码中有两种化合物,但是它们没有一个单独使用,因此它们是按链组织的。 结果显示在整个操作的结尾。

当您开始上述连接操作时,您将得到以下结果:

 {customer='Barney, Smith' company="Exxon", transactions= 17} 

本质没有改变,但是这些结果看起来更加清晰。

在第4章中,您已经看到了几种有效的连接类型。 它们在表中列出。 5.2。 该表反映了与1.0.0版本的Kafka Streams相关的连接; 将来的发行版中会有所变化。

图片

最后,我将提醒您一个主要问题:您可以使用本地状态连接事件流(KStream)和更新流(KTable)。 另外,如果参考数据的大小不太大,则可以使用GlobalKTable对象。 GlobalKTable将所有节复制到Kafka Streams应用程序的每个节点,从而确保所有数据的可用性,而与密钥对应的节无关。

接下来,我们将看到Kafka Streams的可能性,借助它,您可以观察状态变化而无需使用Kafka主题中的数据。

5.3.5。 请求状态


我们已经执行了一些涉及该状态的操作,并且始终将结果输出到控制台(出于开发目的)或将其写入主题(出于工业操作)。 在将结果写入主题时,必须使用Kafka使用者查看它们。

从这些主题读取数据可以被视为一种物化视图。 对于我们的任务,我们可以使用Wikipedia中的物化视图的定义:“ ...一个包含查询结果的物理数据库对象。 例如,它可以是已删除数据的本地副本,也可以是表或联接结果的行和/或列的子集,也可以是使用聚合((https://en.wikipedia.org/wiki/Materialized_view)获得的数据透视表。

Kafka Streams还允许您在状态存储上执行交互式查询,从而可以直接读取这些实例化视图。 重要的是要注意,对状态存储的请求具有只读操作的性质。 因此,您不必担心在数据处理过程中意外使应用程序状态不一致。

直接查询状态存储的能力很重要。 这意味着您可以创建应用程序-仪表板,而不必先从Kafka使用者那里接收数据。 由于不需要再次记录数据,因此提高了应用程序的效率:

  • 由于数据的局部性,可以快速访问它们;
  • 数据重复被排除在外,因为它们没有被写入外部存储。

我要记住的主要事情是:您可以直接从应用程序执行状态请求。 您不能高估这给您带来的机会。 您可以使用相同的结果查询状态存储,而不必使用来自Kafka的数据并在应用程序的数据库中存储记录。 对状态存储的直接请求意味着更少的代码(不需要使用者)和更少的软件(不需要数据库表来存储结果)。

在本章中,我们介绍了许多信息,因此我们将暂时停止对状态存储的交互式查询的讨论。 但请放心:在第9章中,我们将创建一个简单的应用程序-带有交互式查询的信息面板。 为了演示交互式查询以及将其添加到Kafka Streams应用程序的可能性,它将使用本章和前几章中的一些示例。

总结


  • KStream对象表示与数据库插入相当的事件流。 KTable对象代表更新流,它们更类似于数据库中的更新。 KTable对象的大小不会增加;旧记录将替换为新记录。
  • 聚合操作需要KTable对象。
  • 使用窗口操作,您可以将汇总数据分为多个时间段。
  • 多亏了GlobalKTable对象,您可以在应用程序中的任何位置访问引用数据,而无需考虑分区。
  • 对象KStream,KTable和GlobalKTable之间的连接是可能的。

到目前为止,我们专注于使用高级KStream DSL创建Kafka Streams应用程序。 尽管高级方法允许您创建简洁明了的程序,但是使用它是绝对的妥协。 使用DSL KStream意味着通过降低控制程度来提高代码的简洁性。 在下一章中,我们将研究处理程序节点的低级API并尝试其他折衷方案。 程序将比现在更长,但是我们将有机会创建我们可能需要的几乎所有处理节点。

→有关这本书的更多详细信息,可以在出版商的网站上找到

→对于Khabrozhiteley优惠券25%的折扣-Kafka Streams

→支付纸质版本的书后,就会通过电子邮件发送电子书。

Source: https://habr.com/ru/post/zh-CN457756/


All Articles