创建用于流数据处理的管道。 第一部分

大家好 朋友,我们正在与您分享一篇专为数据工程师课程的学生准备的文章的翻译。 走吧



用于实时管道的Apache Beam和DataFlow


今天的帖子基于我最近在工作中完成的任务。 我真的很高兴实现它并以博客文章的形式描述所做的工作,因为它使我有机会对数据工程进行工作,并做了一些对我的团队非常有用的事情。 不久前,我发现我们的系统存储了与我们其中一种用于处理数据的产品相关的大量用户日志。 事实证明,没有人使用过这些数据,所以我立即对我们开始定期进行分析所能发现的东西产生了兴趣。 但是,在此过程中存在一些问题。 第一个问题是数据存储在许多不同的文本文件中,这些文件无法进行即时分析。 第二个问题是它们存储在封闭的系统中,因此我无法使用任何我喜欢的数据分析工具。

我必须决定如何使我们更容易访问,以及如何通过将数据源嵌入某些用户交互解决方案中来至少增加一些价值。 经过一会儿的思考,我决定构建一个管道来将该数据传输到云数据库,以便我和团队可以访问它并开始生成任何结论。 不久前我在Coursera完成了数据工程专业之后,我渴望在项目中使用某些课程工具。

因此,将数据放入云数据库似乎是解决我的第一个问题的聪明方法,但是问题2怎么办? 幸运的是,有一种方法可以将数据传输到可以访问Python和Google Cloud Platform(GCP)等工具的环境。 但是,这是一个漫长的过程,因此我需要做一些事情,以便在等待数据传输结束时继续进行开发。 我想出的解决方案是使用Python中的Faker库创建伪数据。 我以前从未使用过该库,但是很快意识到它的用处。 使用这种方法使我可以开始编写代码并在没有实际数据的情况下测试管道。

基于已经说过的内容,在这篇文章中,我将告诉您如何使用GCP中提供的一些技术来构建上述管道。 特别是,我将使用Apache Beam(适用于Python的版本),Dataflow,Pub / Sub和Big Query来收集用户日志,转换数据并将其传输到数据库以进行进一步分析。 就我而言,我只需要Beam的批处理功能,因为我的数据不是实时到达的,所以不需要Pub / Sub。 但是,我将重点介绍流媒体版本,因为这是您在实践中可能会遇到的问题。

GCP和Apache Beam简介


Google Cloud Platform提供了一组非常有用的工具来处理大数据。 以下是一些我将使用的工具:

  • Pub / Sub是使用Publisher-Subscriber模板的消息服务,它使我们可以实时接收数据。
  • DataFlow是一项服务,可简化数据管道的创建并自动解决诸如扩展基础结构之类的任务,这意味着我们只能专注于为管道编写代码。
  • BigQuery是一个基于云的数据仓库。 如果您熟悉其他SQL数据库,则无需长时间使用BigQuery。
  • 最后,我们将使用Apache Beam,即专注于Python版本来创建我们的管道。 该工具将使我们能够创建与GCP集成的流式或批处理管道。 它对并行处理特别有用,并且适合诸如提取,转换和加载(ETL)之类的任务,因此,如果我们需要通过转换或计算将数据从一个位置移动到另一个位置,Beam是一个不错的选择。


GCP中提供了许多工具,因此可能很难涵盖所有工具,包括其用途,但是, 这里还是一个简短的摘要,以供参考。

可视化我们的输送机


让我们在图1中可视化管道的组件。 在较高的层次上,我们希望实时收集用户数据,进行处理并将其传输到BigQuery。 当用户通过将请求发送到服务器来与产品进行交互时,将创建日志,然后将其记录下来。 这些数据对于了解用户如何与我们的产品交互以及他们是否正常工作特别有用。 通常,传送带将包含以下步骤:

  1. 我们用户的日志数据发布在Pub / Sub部分中。
  2. 我们将连接到Pub / Sub并使用Python和Beam将数据转换为适当的格式(图1中的第3步和第4步)。
  3. 转换数据后,Beam将连接到BigQuery并将其添加到我们的表中(图1中的第4步和第5步)。
  4. 为了进行分析,我们可以使用各种工具(例如Tableau和Python)连接到BigQuery。

无论我们是使用流数据源还是CSV文件,并且要进行批处理,Beam都使此过程非常简单。 稍后您将看到该代码仅包含在它们之间切换所需的最小更改。 这是使用Beam的好处之一。


图1:主数据管道

使用Faker创建伪数据


如前所述,由于对数据的访问有限,我决定以与实际格式相同的格式创建伪数据。 这是一个非常有用的练习,因为我可以在期待数据的同时编写代码并测试管道。 如果您想知道该库还提供什么功能,我建议您查看Faker 文档 。 我们的用户数据通常与以下示例类似。 基于这种格式,我们可以生成逐行数据以模拟实时数据。 这些日志为我们提供了诸如日期,请求类型,服务器响应,IP地址等信息。

192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"

基于上面的行,我们想使用下面花括号中的7个变量来创建LINE变量。 稍后,我们还将在表模式中将它们用作变量名。

LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""


如果我们要执行批处理,则代码将非常相似,尽管我们需要在特定时间范围内创建一组样本。 要使用伪造者,我们只需创建一个对象并调用所需的方法即可。 特别是,Faker对于创建IP地址和网站很有用。 我使用以下方法:

fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()


 from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line 


第一部分结束。

在接下来的几天里,我们将与您分享本文的继续,但现在传统上我们一直在等待评论;-)。

第二部分

Source: https://habr.com/ru/post/zh-CN454186/


All Articles