Apache Kafka和Spark Streaming的流式传输

哈Ha! 今天,我们将构建一个系统,该系统将使用Apark Kafka使用Spark Streaming处理消息流,并将处理结果写入AWS RDS云数据库。

想象一下,某个信贷机构将我们的任务设置为在其所有分支机构中即时处理传入交易。 可以执行此操作,以便快速计算国库的未平仓货币头寸,限额或交易的财务结果等。

如何在不使用魔术和魔术的情况下实施此案例-我们在删节下阅读! 走吧


(图片来源)

引言


当然,实时处理大型数据阵列为在现代系统中使用提供了充足的机会。 最受欢迎的组合之一是Apache Kafka和Spark Streaming串联,其中Kafka创建传入消息包的流,Spark Streaming在指定的时间间隔处理这些包。

为了提高应用程序的容错能力,我们将使用检查点-检查点。 使用此机制,当Spark Streaming模块需要恢复丢失的数据时,它仅需要返回到最后一个控制点并从该点恢复计算。

正在开发的系统架构




使用的组件:

  • Apache Kafka是具有发布和订阅功能的分布式消息系统。 适用于离线和在线消息消费。 为了防止数据丢失,Kafka消息存储在磁盘上,并在群集内复制。 Kafka系统建立在ZooKeeper同步服务之上;
  • Apache Spark Streaming-用于处理流数据的Spark组件。 当数据流被解释为小数据包的连续序列时,使用微批处理体系结构构建Spark Streaming模块。 Spark Streaming从各种来源接收数据,并将它们组合成小数据包。 定期创建新软件包。 在每个时间间隔的开始,都会创建一个新的数据包,并且在此时间间隔内接收到的所有数据都将包含在该数据包中。 在间隔结束时,数据包停止增长。 间隔的大小由称为批处理间隔的参数确定。
  • Apache Spark SQL-将关系处理与Spark功能编程结合在一起。 结构化数据是指具有模式的数据,即所有记录的一组字段。 Spark SQL支持来自各种结构化数据源的输入,并且由于提供了架构信息,它可以有效地仅检索所需的记录字段,并提供DataFrame API;
  • AWS RDS是一种相对便宜的基于云的关系数据库,一种简化了配置,操作和扩展的Web服务,由Amazon直接管理。

安装并启动Kafka服务器


在直接使用Kafka之前,您需要确保Java可用,因为 JVM用于工作:

sudo apt-get update sudo apt-get install default-jre java -version 

创建一个新用户以使用Kafka:

 sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo 

接下来,从Apache Kafka官方网站下载发行版:

 wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz" 

解压缩下载的档案:
 tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka 

下一步是可选的。 事实是默认设置不允许完全使用Apache Kafka的所有功能。 例如,删除可以向其发布消息的主题,类别,组。 要更改此设置,请编辑配置文件:

 vim ~/kafka/config/server.properties 

将以下内容添加到文件末尾:

 delete.topic.enable = true 

在启动Kafka服务器之前,您需要启动ZooKeeper服务器,我们将使用Kafka发行版随附的辅助脚本:

 Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties 

ZooKeeper成功启动后,在另一个终端中,我们启动Kafka服务器:

 bin/kafka-server-start.sh config/server.properties 

创建一个名为Transaction的新主题:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction 

确保已创建具有正确数量的分区和复制的主题:

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 



我们将错过为新创建的主题测试生产者和消费者的时间。 有关如何测试发送和接收消息的更多详细信息,请参阅官方文档- 发送一些消息 。 好吧,我们继续使用KafkaProducer API用Python编写生产器。

制片人写作


生产者将生成随机数据-每秒100条消息。 随机数据是指由三个字段组成的字典:

  • 分支机构 -信贷机构的销售点名称;
  • 货币 -交易货币;
  • 金额 -交易金额。 如果该金额是银行购买的货币,则金额将为正数;如果是销售,则金额将为负数。

生产者的代码如下:

 from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict 

接下来,使用send方法,以所需的主题,以JSON格式将消息发送到服务器:

 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.format(e)) finally: producer.flush() 

运行脚本时,我们在终端中收到以下消息:


这意味着一切都可以按照我们想要的方式工作-生产者生成并发送消息到我们需要的主题。

下一步是安装Spark并处理此消息流。

安装Apache Spark


Apache Spark是一个多功能的高性能集群计算平台。

在性能方面,Spark超越了MapReduce模型的流行实现,同时为更广泛的计算类型提供了支持,包括交互式查询和流处理。 速度在处理大量数据中起着重要作用,因为速度使您无需花费几分钟或几个小时就可以进行交互式工作。 Spark如此高速的最大优势之一就是其执行内存中计算的能力。

该框架是用Scala编写的,因此必须首先安装它:

 sudo apt-get install scala 

从官方网站下载Spark发行版:

 wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz" 

解压缩档案:

 sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark 

在bash文件中添加Spark的路径:

 vim ~/.bashrc 

通过编辑器添加以下行:

 SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH 

对bashrc进行更改后,运行以下命令:

 source ~/.bashrc 

AWS PostgreSQL部署


剩下的工作就是部署数据库,在这里我们将从流中上传已处理的信息。 为此,我们将使用AWS RDS服务。

转到控制台AWS-> AWS RDS->数据库->创建数据库:


选择PostgreSQL,然后单击下一步按钮:


因为 本示例仅出于教育目的而理解,我们将“至少”使用免费服务器(Free Tier):


接下来,在Free Tier块中打勾,然后将自动为我们提供t2.micro类的实例-尽管很弱,但它是免费的,非常适合我们的任务:

接下来是非常重要的事情:数据库实例的名称,主用户的名称及其密码。 让我们命名实例:myHabrTest,主用户: habr ,密码: habr12345 ,然后单击下一步按钮:



下一页包含从外部负责我们的数据库服务器可用性(公共可访问性)和端口可用性的参数:


让我们为VPC安全组创建一个新配置,这将使我们能够从外部通过端口5432(PostgreSQL)访问我们的数据库服务器。

在单独的浏览器窗口中,转到VPC仪表板->安全组->创建安全组部分中的AWS控制台:

设置安全组的名称-PostgreSQL,一个描述,指示该组应与哪个VPC关联,然后单击创建按钮:


填写端口5432的新创建的入站规则组,如下图所示。 您不必指定手动端口,而是从“类型”下拉列表中选择PostgreSQL。

严格来说,值:: / 0表示来自世界各地的服务器的传入流量的可用性,这并不完全正确,但是为了分析示例,让我们使用以下方法:


返回浏览器页面,打开“配置高级设置”,然后在“ VPC安全组->选择现有VPC安全组-> PostgreSQL”部分中进行选择:


接下来,在数据库选项->数据库名称->部分中,设置名称-habrDB

默认情况下,除了禁用备份(备份保留期-0天),监视和Performance Insights外,我们可以保留其余参数。 单击创建数据库按钮:


流处理程序


最后一步将是Spark-jobs的开发,它将每两秒钟处理一次从Kafka收到的新数据,并将结果输入数据库。

如上所述,检查点是SparkStreaming中的主要机制,必须对其进行配置以提供容错能力。 我们将使用控制点,并且在过程丢失的情况下,Spark Streaming模块将仅需要返回到最后一个控制点并从该控制点恢复计算以恢复丢失的数据。

您可以通过在容错,可靠的文件系统(例如HDFS,S3等)中设置目录来启用检查点,在该文件系统中将保存检查点信息。 例如,使用以下方法完成此操作:

 streamingContext.checkpoint(checkpointDirectory) 

在我们的示例中,我们将使用以下方法,即,如果存在checkpointDirectory,则将从控制点数据重新创建上下文。 如果目录不存在(即首次执行),则调用functionToCreateContext函数以创建新上下文并配置DStreams:

 from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) 

使用KafkaUtils库的createDirectStream方法创建一个DirectStream对象以连接到“事务”主题:

 from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list}) 

解析JSON格式的传入数据:

 rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream") 

使用Spark SQL,我们进行了简单的分组并将结果打印到控制台:

 select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency 

获取查询文本并通过Spark SQL运行它:

 sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5) 

然后,我们将接收到的聚合数据保存到AWS RDS中的表中。 要将聚合结果保存到数据库表中,我们将使用DataFrame对象的write方法:

 testResultDataFrame.write \ .format("jdbc") \ .mode("append") \ .option("driver", 'org.postgresql.Driver') \ .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") \ .option("dbtable", "transaction_flow") \ .option("user", "habr") \ .option("password", "habr12345") \ .save() 

关于建立与AWS RDS的连接的几句话。 我们在“部署AWS PostgreSQL”步骤中为其创建了用户和密码。 对于数据库服务器URL,请使用“端点”,该端点显示在“连接性和安全性”部分中:


为了正确连接Spark和Kafka,您应该使用spark-streaming-kafka-0-8_2.11工件通过smark-submit运行作业。 此外,我们还将工件与PostgreSQL数据库进行交互,我们将通过--packages进行传输。

为了提高脚本的灵活性,我们还提取了消息服务器的名称以及要从中接收数据作为输入参数的主题。

因此,该启动并测试系统了:

 spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction 

一切顺利! 如下图所示,在应用程序工作期间,每2秒显示一次新的聚合结果,因为在创建StreamingContext对象时,我们将批处理间隔设置为2秒:


接下来,我们对数据库进行简单查询,以检查transaction_flow表中的记录:


结论


本文研究了使用Spark Streaming以及Apache Kafka和PostgreSQL进行流信息处理的示例。 随着来自各种来源的数据的增长,很难高估Spark Streaming对于创建流应用程序和实时运行的应用程序的实用价值。

您可以在GitHub的我的存储库中找到完整的源代码。

我准备愉快地讨论本文,期待您的评论,也希望对所有相关读者提出建设性的批评。

祝你成功!

PS最初计划使用本地PostgreSQL数据库,但是由于我对AWS的热爱,我决定将数据库放在云中。 在关于该主题的下一篇文章中,我将展示如何使用AWS Kinesis和AWS EMR在AWS中实现上述整个系统。 关注新闻!

Source: https://habr.com/ru/post/zh-CN451160/


All Articles