该书“ Apache Kafka。 流处理和数据分析”

图片 在任何企业应用程序的工作期间,都会生成数据:这些是日志文件,指标,有关用户活动的信息,传出消息等。正确处理所有这些数据与数据本身同样重要。 如果您是想解决此类问题的架构师,开发人员或应届工程师,但还不熟悉Apache Kafka,那么从这本精彩的书中,您将学到如何使用这个免费的流媒体平台工作,该平台允许您实时处理数据队列。

这本书是给谁的?


“ Apache Kafka。 “流处理和数据分析”是为在工作中使用Kafka API的开发人员以及在工业运行期间参与安装,配置,配置和监视其运行的过程工程师(也称为SRE,DevOps或系统管理员)编写的。 我们还没有忘记数据架构师和分析工程师-负责公司整个数据基础架构的设计和创建的人。 有些章节,尤其是第3、4和11章,是针对Java开发人员的。 要理解它们,读者必须熟悉Java编程语言的基础,包括诸如异常处理和竞争之类的问题,这一点很重要。

其他章节,尤其是第2、8、9和10章,假定读者具有Linux的经验,并且熟悉设置Linux网络和存储。 Kafka的书和软件体系结构的其余部分以更笼统的术语进行了讨论,因此读者不需要特殊的知识。

可能对本书感兴趣的另一类人是不是直接与Kafka合作而是与之合作的经理和建筑师。 同样重要的是,他们了解平台的保证是什么,以及在创建基于Kafka的系统时其下属和同事必须做出的妥协。 对于那些想培训员工与Kafka一起工作或确保开发团队拥有必要信息的经理来说,这本书将非常有用。

第2章安装Kafka


Apache Kafka是一个Java应用程序,可以在许多操作系统上运行,包括Windows,MacOS,Linux和其他操作系统,在本章中,我们将重点介绍在Linux上安装Kafka,因为它是该操作系统上最常安装的平台。 Linux还是推荐用于通用Kafka部署的操作系统。 有关在Windows和MacOS上安装Kafka的信息,请参阅附录A。

安装java

在安装ZooKeeper或Kafka之前,必须安装和配置Java环境。 建议您使用Java 8,它可能是操作系统中附带的版本,也可以是直接从java.com下载的版本。 尽管ZooKeeper和Kafka将与Java Runtime Edition一起使用,但是在开发实用程序和应用程序时使用完整的Java Development Kit(JDK)更为方便。 这些安装步骤假定您在/usr/java/jdk1.8.0_51目录中安装了JDK版本8.0.51。

安装ZooKeeper

Apache Kafka使用ZooKeeper来存储有关Kafka集群的元数据以及有关消费者客户端的详细信息(图2.1)。 尽管也可以使用Kafka发行版中包含的脚本启动ZooKeeper,但是从发行版中安装ZooKeeper存储库的完整版本非常简单。

图片

Kafka已通过ZooKeeper储存库的稳定版本3.4.6进行了全面测试,可从apache.org下载。

独立服务器

以下示例演示了在/ usr / local / zookeeper目录中以基本设置安装ZooKeeper并将数据保存在/ var / lib / zookeeper目录中的方法:

# tar -zxf zookeeper-3.4.6.tar.gz # mv zookeeper-3.4.6 /usr/local/zookeeper # mkdir -p /var/lib/zookeeper # cat > /usr/local/zookeeper/conf/zoo.cfg << EOF > tickTime=2000 > dataDir=/var/lib/zookeeper > clientPort=2181 > EOF # /usr/local/zookeeper/bin/zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED # export JAVA_HOME=/usr/java/jdk1.8.0_51 # /usr/local/zookeeper/bin/zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED # 

现在,您可以通过连接到客户端端口并发送四字母srvr命令来验证ZooKeeper是否可以脱机工作:

 # telnet localhost 2181 Trying ::1... Connected to localhost. Escape character is '^]'. srvr Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT Latency min/avg/max: 0/0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: standalone Node count: 4 Connection closed by foreign host. # 

ZooKeeper合奏

ZooKeeper集群称为合奏。 由于算法本身的性质,建议集合中包含奇数个服务器,例如3、5等,因为为了使ZooKeeper能够响应请求,大多数集合成员必须起作用(仲裁)。 这意味着三个节点的集合可以与一个空闲节点一起工作。 如果合奏具有三个节点,则可能有两个。

要在集合中配置ZooKeeper服务器的操作,它们必须具有一个包含所有服务器列表的单一配置,并且数据目录中的每个服务器都必须具有带有该服务器标识符的myid文件。 如果集合中的主机名为zoo1.example.com,zoo2.example.com和zoo3.example.com,则配置文件可能如下所示:

 tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 initLimit=20 syncLimit=5 server.1=zoo1.example.com:2888:3888 server.2=zoo2.example.com:2888:3888 server.3=zoo3.example.com:2888:3888 

在此配置中,initLimit是从属节点可以连接到主节点的时间。 syncLimit值限制从属节点与主节点之间的延迟。 这两个值都以tickTime单位指定,即initLimit = 20·2000 ms = 40 s。 该配置还列出了所有集成服务器。 它们的格式为server.X =主机名:peerPort:LeaderPort,具有以下参数:

  • X是服务器标识符。 它必须是整数,但计数不能从零开始,也不能连续。
  • 主机名-主机名或服务器IP地址;
  • peerPort-集成服务器之间通过其进行通信的TCP端口;
  • leaderPort-通过其选择主机的TCP端口。

客户端可以通过clientPort端口连接到集成系统就足够了,但是集成体成员必须能够在所有三个端口上彼此交换消息。

除了单个配置文件之外,dataDir目录中的每个服务器还必须具有myid文件。 它应包含与配置文件中给定的服务器标识符相对应的服务器标识符。 完成这些步骤后,您可以启动服务器,并且它们将在集合中彼此交互。

安装Kafka Broker


完成Java和ZooKeeper的配置后,您可以继续安装Apache Kafka。 可以从kafka.apache.org/downloads.html下载最新版本的Apache Kafka。

在以下示例中,将Kafka平台安装在/ usr / local / kafka目录中,对其进行配置以使用先前启动的ZooKeeper服务器,并将消息日志段保存在/ tmp / kafka-logs目录中:

 # tar -zxf kafka_2.11-0.9.0.1.tgz # mv kafka_2.11-0.9.0.1 /usr/local/kafka # mkdir /tmp/kafka-logs # export JAVA_HOME=/usr/java/jdk1.8.0_51 # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties # 

启动Kafka代理后,您可以通过对集群执行任何简单操作来测试其功能,包括创建测试主题,生成消息并使用它们。

创建和检查线程:

 # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test". # /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 # 

为测试主题生成消息:

 # /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Test Message 1 Test Message 2 ^D # 

消耗来自测试主题的消息:

 # /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Test Message 1 Test Message 2 ^C Consumed 2 messages # 

经纪人配置


Kafka发行版随附的代理配置示例非常适合尝试运行独立服务器,但是对于大多数安装而言,这还不够。 有很多Kafka配置选项可控制安装和配置的各个方面。 您可以为其中许多保留默认值,因为它们与设置Kafka经纪人的细微差别有关,直到您处理需要使用它们的特定方案时才适用。

基本经纪人设定


在任何环境中部署平台时,除了独立服务器上的独立代理外,您还应考虑Kafka代理的几种设置。 这些参数与代理的主要设置有关,必须更改其中的大多数设置,以便代理可以与其他代理一起在群集中工作。

broker.id

每个Kafka代理必须具有由broker.id参数指定的整数标识符。 默认情况下,此值为0,但可以是任何数字。 最主要的是,它不会在同一Kafka集群中重复。 号码的选择可以是任意的,如果需要,为了维护方便,可以将其从一个经纪人转移到另一个经纪人。 希望该数字以某种方式与主机连接,然后代理标识符与具有跟踪的主机的对应关系将更加透明。 例如,如果您的主机名包含唯一数字(例如host1.example.com,host2.example.com等),则这些数字将是broker.id值的理想选择。

港口

一个典型的配置文件使用TCP端口9092上的侦听器启动Kafka。可以通过更改配置参数端口将该端口更改为任何其他可用端口。 请记住,当选择端口号小于1024的端口时,Kafka应该以root用户身份运行。 不建议以root身份运行Kafka。

zookeeper.connect

ZooKeeper用于存储代理元数据的路径是使用zookeeper.connect配置参数设置的。 在示例配置中,ZooKeeper在本地主机的端口2181上运行,该端口指示为localhost:2181。 该参数的格式是用分号分隔的行列表,其格式为主机名:端口/路径,包括:

  • 主机名-ZooKeeper服务器的主机名或IP地址;
  • port-服务器的客户端端口号;
  • / path-可选的ZooKeeper路径,用作Kafka集群的新根(chroot)路径。 如果未指定,则使用根路径。

如果指定的chroot路径不存在,则它将在代理启动时创建。

log.dirs

Kafka将所有消息保存到硬盘驱动器,日志的这些段存储在log.dirs设置中指定的目录中。 它是本地系统中路径的逗号分隔列表。 如果指定了多个路径,则代理将根据使用最少的原则将节保存在其中,并沿一条路径保留一个节的日志段。 请注意,代理会将新节放置在当前存储最少分区而不使用最少空间的目录中,这样就不能保证各节之间数据的均匀分布。

num.recovery.threads.per.data.dir

Kafka使用自定义线程池来处理日志段。 当前它被应用:

  • 在正常启动期间-打开每个部分的日志段;
  • 失败后开始-检查并截断每个部分的日志段;
  • 停止-轻轻关闭日志段。

默认情况下,每个日志目录仅使用一个线程。 由于只有在启动和停止时才会发生这种情况,因此有必要使用更多的并行化操作。 从不正确的关闭中恢复时,如果重新启动具有大量分区的代理,则使用这种方法的好处可能会达到几个小时! 请记住,此参数的值是根据使用log.dirs指定的编号中的一个日志目录确定的。 也就是说,如果num.recovery.threads.per.data.dir参数的值为8,并且在log.dirs中指定了三个路径,则线程总数为24。

auto.create.topics.enable

根据Kafka的默认配置,在以下情况下,代理应自动创建主题:

  • 制造商开始在主题行中写;
  • 消费者开始阅读主题行;
  • 任何客户端请求主题元数据。

在许多情况下,这种行为可能是不希望的,尤其是由于以下事实:没有一种方法可以使用Kafka协议检查主题的存在而不会导致其创建。 如果您通过手动或通过初始化系统显式地控制创建,则可以将auto.create.topics.enable参数设置为false。

默认主题设置


Kafka服务器配置为创建的主题设置了许多默认设置。 可以使用管理员工具为每个主题分别设置一些参数,包括节数和消息保存参数(在第9章中讨论)。 服务器配置中的默认值应设置为等于适用于大多数群集主题的参考值。

数量分区

num.partitions参数确定新主题的创建部分数,主要是在启用按主题自动创建时(这是默认行为)。 此参数的默认值为1。请记住,主题的节数只能增加,而不能减少。 这意味着,如果它需要的分区少于num.partitions中指示的分区,则必须手动仔细创建它(在第9章中进行了讨论)。

如第1章中所述,这是一种扩展Kafka集群中主题的方法,因此,在添加代理后,拥有尽可能多的平衡整个集群中邮件负载的需求非常重要。 许多用户更喜欢分区的数量等于或等于集群中的代理数量。 这使得可以在代理之间平均分配部分,这将导致跨消息均匀地分配负载。 但是,这不是强制性要求,因为多个主题的存在使您能够平衡负载。

log.retention.ms

卡夫卡中的消息存储经常受到时间限制。 缺省值是使用log.retention.hours参数在配置文件中指定的,等于168小时或1周。 但是,您可以使用其他两个参数-log.retention.minutes和log.retention.ms。 所有这三个参数确定同一件事-删除邮件的时间段。 但是建议使用log.retention.ms参数,因为如果指定了多个参数,则优先级属于最小的度量单位,因此将始终使用log.retention.ms的值。

log.retention.bytes

限制消息有效性的另一种方法是基于所存储消息的总大小(以字节为单位)。 该值使用log.retention.bytes参数设置,并单独应用。 这意味着,在主题为八个部分且等于log.retention.bytes值1 GB的情况下,为此主题存储的最大数据量为8 GB。 请注意,存储量取决于各个部分,而不取决于主题。 这意味着如果主题的节数增加,则使用log.retention.bytes时保存的最大数据量也将增加。

log.segment.bytes

提到的日志记录设置涉及日志段,而不是单个消息。 当消息由Kafka代理生成时,它们被添加到相应部分的当前日志段的末尾。 当日志段达到log.segment.bytes参数指定的大小并且默认情况下等于1 GB时,该段关闭并打开一个新段。 关闭后,日记帐段可以退出。 日志段的大小越小,关闭文件并创建新文件的频率就越高,这会降低磁盘写入的总体效率。

当主题的消息生成频率较低时,调整日志段的大小很重要。 例如,如果一个主题每天仅接收100 MB的消息,并且log.segment.bytes参数设置为默认值,则填写一个段需要10天。 而且由于无法在关闭日志段之前将消息声明为无效,因此使用log.retention.ms参数的值6.048亿(1周),消息可以在关闭的日志段退出流通之前的17天内累积。 这是因为当您关闭一个段并保存了超过10天的邮件时,您必须将其存储另外7天,然后才能按照采用的临时规则将其撤消,因为无法在该段中的最后一条消息到期之前删除该段。

log.segment.ms

控制日志段关闭的另一种方法是使用log.segment.ms参数,该参数指定关闭日志段的时间长度。 像log.retention.bytes和log.retention.ms参数一样,log.segment.bytes和log.segment.ms参数也不互斥。 当时间用完或达到指定的大小限制时,Kafka将关闭日志段,具体取决于哪个事件首先发生。 默认情况下,未设置log.segment.ms参数的值,因此,日志段的关闭由其大小决定。

message.max.bytes

Kafka代理允许使用message.max.bytes参数来限制所生成消息的最大大小。 此参数的默认值为1,000,000(1 MB)。 尝试发送较大消息的制造商将收到来自代理的错误通知,但该消息将不被接受。 与代理设置中指定的所有其他大小(以字节为单位)的情况一样,我们正在谈论压缩消息的大小,因此,如果制造商可以将其压缩到message.max.bytes参数指定的限制,则它们可以以未压缩形式发送的消息,其大小要大得多。 。

增加消息的大小会严重影响性能。 较大的消息大小意味着处理网络连接和请求的代理线程将为每个请求花费更长的时间。 较大的消息还会增加写入磁盘的数据量,从而影响I / O吞吐量。

硬件选择


为卡夫卡经纪人选择合适的硬件,更多的是艺术而不是科学。 Kafka平台本身没有严格的硬件要求;它将在任何系统上正常工作。 但是,如果我们谈论性能,那么它会受到几个因素的影响:磁盘,RAM,网络和CPU的容量和吞吐量。

首先,您需要确定哪种性能类型对您的系统最重要,然后您可以选择适合预算的最佳硬件配置。

磁盘吞吐量


用于存储日志段的代理磁盘的吞吐量直接影响制造客户的性能。 Kafka消息必须提交到本地存储以确认其记录。 只有这样才能将发送操作视为成功。 这意味着对磁盘执行写操作的速度越快,消息生成的延迟就越小。

在磁盘带宽出现问题时,明显的措施是使用带有旋转板(HDD)或固态驱动器(SSD)的硬盘驱动器。 SSD的搜索/访问时间缩短了几个数量级,并且性能更高。 HDD更经济,并且具有更高的相对容量。 HDD的性能可以提高,因为它们在代理中数量更大,或者使用多个数据目录,或者通过在具有冗余的独立磁盘阵列中安装磁盘(独立磁盘冗余阵列,RAID)。 其他因素会影响吞吐量,例如,制造硬盘(例如SAS或SATA)的技术以及硬盘控制器的特性。

磁盘容量


容量是存储的另一方面。 所需的磁盘空间量取决于需要同时存储多少条消息。 如果预计代理每天将接收1 TB的流量,则具有7天的存储空间,他将需要至少7 TB日志段的可用存储空间。 您还应该考虑其他文件的溢出量至少为10%,而不是将缓冲区计入可能的流量波动或其随时间的增长。

存储容量是确定最佳Kafka群集大小并决定其扩展时必须考虑的因素之一。 可以通过每个主题的几个部分来平衡群集的总流量,这使您可以在每个代理的数据密度不足的情况下使用其他代理来增加可用容量。 是否需要多少磁盘空间的决定还取决于为集群选择的复制策略(在第6章中有更详细的讨论)。

记忆


在正常操作模式下,消费者Kafka从该节的末尾开始阅读,并且消费者会不断弥补所浪费的时间,并且仅稍稍落后于制造商(如果有的话)。 , , . , , -.

Kafka JVM . , X X , 5 . Kafka . Kafka , , , Kafka.


, Kafka, . ( ) . Kafka ( ) . 1 , , . , (. 6) ( 8). , .

中央处理器


, , . . Kafka, , . . Kafka ' . .

Kafka


Kafka , , Amazon Web Services (AWS). AWS , CPU, . Kafka. , . / SSD. (, AWS Elastic Block Store). CPU .
, AWS m4 r3. m4 , , . r3 SSD-, . i2 d2.

Kafka


Kafka , (. 2.2). — . — . Kafka . Kafka. 6.

图片


?


Kafka . — . 10 , 2 , — . , 100 % ( ) (. 6). , .

, , — . , ( ). 80 % , , . , , . , , .


Kafka. — zookeeper.connect. ZooKeeper . — broker.id. broker.id , . , , , .


Linux , , Kafka. , , . /etc/sysctl.conf, Linux, .


Linux . , «» , Kafka.
, , , () . , , Kafka. , Kafka , , .

— . — , - . . vm.swappiness , 1. ( ) , . , .

, «» , , . Kafka /. : (, SSD), NVRAM (, RAID). «» , . vm.dirty_background_ratio , ( 10). ( ), 5. 0, .

«» , , vm.dirty_ratio , — 20 ( ). , 60 80. , / . vm.dirty_ratio Kafka, .

«» Kafka . /proc/vmstat:

 # cat /proc/vmstat | egrep "dirty|writeback" nr_dirty 3875 nr_writeback 29 nr_writeback_temp 0 # 

驱动器


, RAID- , . , EXT4 (fourth extended file system — ) XFS (Extents File System — ). EXT4 , . , (5), . EXT4 , . XFS , , EXT4. XFS Kafka , , . , /.

, , noatime. /: (ctime), (mtime) (atime). atime . . atime , , , ( realtime). Kafka atime, . noatime /, ctime mtime.


Linux — , , . Kafka , - . ( ) , . . net.core.wmem_default net.core.rmem_default , 2 097 152 (2 ). , , .

TCP net.ipv4.tcp_wmem net.ipv4.tcp_rmem. , , . — 4096 65536 2048000 — , 4 , — 64 , — 2 . , net.core.wmem_max net.core.rmem_max. Kafka .

. TCP 1 net.ipv4.tcp_window_scaling, . net.ipv4.tcp_max_syn_backlog , 1024, . net.core.netdev_max_backlog, 1000, , , , .


当需要将Kafka从测试转移到生产时,只需要做几件事即可建立可靠的消息传递服务。

垃圾收集选项


Java , , . , Java 7 Garbage First (G1). G1 . , , .

G1 . .

  • MaxGCPauseMillis. . — G1 . 200 . , G1 , , , , 200 .
  • InitiatingHeapOccupancyPercent. , . 45. , G1 , 45 % , (Eden), .

Kafka , . 64 , Kafka 5 . 20 MaxGCPauseMillis. InitiatingHeapOccupancyPercent 35, , .

Kafka G1, . . :

 # export JAVA_HOME=/usr/java/jdk1.8.0_51 # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true" # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties # 


Kafka , . - , . Kafka (. 6), . Kafka, .

Kafka , , ( , , AWS), , . . , «» (. 6).

: Kafka , , , . ( ) . , , .

ZooKeeper


Kafka ZooKeeper , . ZooKeeper Kafka. , ZooKeeper Kafka . ZooKeeper Kafka ( ZooKeeper , ).

ZooKeeper . ZooKeeper, Kafka, . ZooKeeper , ZooKeeper . — 1 , . ZooKeeper, , . ZooKeeper , . , Kafka Kafka ZooKeeper.

Kafka, , . Kafka ZooKeeper, . ZooKeeper, . , , , . , , .

总结


, Apache Kafka. , , . , Kafka, Kafka. Kafka ( 3), ( 4).

»这本书的更多信息可以在出版商的网站上找到
» 目录
» 摘录

20% — Apache Kafka

Source: https://habr.com/ru/post/zh-CN421519/


All Articles