使用Apache Beam SDK和Python创建基于GCP的数据流模板,用于将数据从Pub / Sub传输到BigQuery

图片


目前,我从事流式传输(和转换)数据的任务。 在某些圈子里
这样的过程被称为ETL ,即 提取,转换和加载信息。


整个过程包括以下Google Cloud Platform服务的参与:


  • 发布/子服务实时数据流
  • 数据流 -用于转换数据的服务(可以
    实时和批处理模式下工作)
  • BigQuery-一种以表格形式存储数据的服务
    (支持SQL)

0.当前状态

目前,上述服务有一个有效的流式传输版本,但是在
作为模板, 使用标准模板之一。


问题在于该模板提供了1对1的数据传输,即 在
在Pub / Sub的入口处,我们有一个JSON格式的字符串,在输出处,我们有一个包含字段的BigQuery表,
对应于输入JSON顶层对象的键。


1.问题陈述

创建一个数据流模板,该模板可让您在输出中获取一个或多个表
根据给定的条件。 例如,我们要为每个表创建一个单独的表
特定输入JSON密钥的值。 有必要考虑以下事实:
输入JSON对象可以包含嵌套JSON作为值,即 是必要的
能够使用RECORD类型的字段创建BigQuery表以存储嵌套
数据。


2.决定的准备

要创建数据流模板,请使用Apache Beam SDK ,依次使用
支持Java和Python作为编程语言。 我必须说
仅支持Python 2.7.x版本,这让我有些惊讶。 而且,支持
Java稍宽一些,因为 例如,对于Python,某些功能不可用,更多
少量的内置连接器 。 顺便说一句,您可以编写自己的连接器。


但是,由于我不熟悉Java,因此我使用了Python。


在开始创建模板之前,必须具有以下条件:


  1. 输入JSON格式,它不应及时更改
  2. BigQuery表的架构或将要流式传输数据的架构
  3. 输出数据流将流入的表数

请注意,在创建模板并基于该模板启动Dataflow Job之后,可以将这些参数设置为
仅通过创建新模板进行更改。


让我们谈谈这些限制。 它们全都来自这样一个事实,即不可能
创建一个可以将任何字符串作为输入的动态模板,对其进行解析
根据内部逻辑,然后用动态填充动态创建的表
由电路创建。 这种可能性很可能存在,但是在数据中
我没有成功实施这样的计划。 据我了解的整体
该管道是在运行时执行之前构建的,因此无法将其更改为
飞。 也许有人会分享他们的决定。


3.决定

为了更全面地了解该过程,有必要提供所谓的管道图
从Apache Beam文档中获得。


图片


在我们的例子中(我们将使用划分成几个表):


  • 输入-数据来自Dataflow Job中的PubSub
  • 转换 #1-将数据从字符串转换为Python字典,我们得到输出
    PCollection #1
  • 转换#2-标记数据,以根据单独的表进一步分离为
    输出是PCollection#2(实际上是PCollection元组)
  • 转换#3-使用方案将PCollection#2中的数据写入表
    桌子

在编写自己的模板的过程中, 这些示例为我带来了积极的启发。


带有注释的模板代码(左注释与以前的作者相同):
 # coding=utf-8 from __future__ import absolute_import import logging import json import os import apache_beam as beam from apache_beam.pvalue import TaggedOutput from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.gcp.bigquery import parse_table_schema_from_json #  GCP  gcp_project = '' #  Pub/Sub  topic_name = '' # Pub/Sub    'projects/_GCP_/topics/_' input_topic = 'projects/%s/topics/%s' % (gcp_project, topic_name) #  BigQuery  bq_dataset = 'segment_eu_test' #       schema_dir = './' class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #   yield body class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element def run(): #       _.json   schema_dir,  #         ()  schema_dct = {} for schema_file in os.listdir(schema_dir): filename_list = schema_file.split('.') if filename_list[-1] == 'json': with open('%s/%s' % (schema_dir, schema_file)) as f: schema_json = f.read() schema_dct[filename_list[0]] = json.dumps({'fields': json.loads(schema_json)}) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (eg, a module imported at module level). pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()). with_outputs(*schema_dct.keys(), main='default') # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__) run() 

现在,我们将遍历代码并给出解释,但是首先值得一提的是
编写此模板的困难在于根据“数据流”进行思考,并且
不是特定的消息。 还需要了解,Pub / Sub通过消息和
正是从他们那里,我们将收到标记流的信息。


 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True 

因为 Apache Beam Pub / Sub IO连接器仅在必要的流模式下使用
添加PipelineOptions()(尽管实际上不使用选项);否则,创建一个模板
属于例外。 必须提到有关启动模板的选项。 他们可以是
静态的,所谓的“运行时”。 这是有关此主题的文档链接。 这些选项允许您创建模板而不预先指定参数,但是在从模板启动Dataflow Job时传递它们,但是我仍然无法实现它,可能是由于此连接器不支持RuntimeValueProvider


 # Read from PubSub into a PCollection. input_stream = p | beam.io.ReadFromPubSub(input_topic) 

注释清楚显示一切,我们阅读了该主题的主题。 值得补充的是,您可以参加直播
来自主题和订阅(订阅)。 如果将主题指定为输入,则
将自动创建对该主题的临时订阅。 语法也很漂亮
清晰的输入数据流beam.io.ReadFromPubSub(input_topic)发送到我们
管道p


 # Transform stream to BigQuery IO format stream_bq = input_stream | 'transform to BigQuery' >> beam.ParDo(TransformToBigQuery()) 

这是Transform#1发生的地方,我们的输入从python字符串转换为
python dict,在输出中得到PCollection#1。 >>出现在语法中。 开
实际上,引号中的文字是信息流的名称(必须唯一)以及注释,
它将添加到GCP Dataflow Web界面中图形上的块中。 让我们更详细地考虑
重写的类TransformToBigQuery


 class TransformToBigQuery(beam.DoFn): #          ,   # BigQuery IO     python dict def process(self, element, *args, **kwargs): body = json.loads(element) #       ,      # python dict       ,     #  ,      python dict yield body 

element变量将包含来自PubSub订阅的一条消息。 如从
代码,在我们的情况下,它应该是有效的JSON。 在教室里一定要
重新定义了process方法,其中应进行必要的转换
输入线以获得与电路匹配的输出
将数据加载到的表。 因为 在这种情况下,我们的流程是
连续的,对于Apache Beam来说是unbounded ,您必须使用
对于最终数据流, yield而不是return 。 如果是最终流程,您可以
(和必要)另外配置windowingtriggers


 # Tag stream by schema name tagged_stream = \ stream_bq \ | 'tag data by type' >> beam.ParDo(TagDataWithReqType()).with_outputs(*schema_dct.keys(), main='default') 

此代码将PCollection#1引导到Transform#2,在此进行标记
(分离)数据流。 在变量schema_dct在这种情况下,是一个字典,其中的键是不带扩展名的方案文件的名称,它将是标记,值是方案的有效JSON。
此标记的BigQuery表。 应该注意的是,该方案应准确地发送给
查看{'fields': } ,其中是JSON形式的BigQuery表的架构(您可以
从网络界面导出)。


main='default'是它们将使用的线程标签的名称
所有不受标记条件约束的消息。 考虑上课
TagDataWithReqType


 class TagDataWithReqType(beam.DoFn): #      , ..      #     ,       #  with_outputs + default def process(self, element, *args, **kwargs): req_type = element.get('_') types = ( 'type1', 'type2', 'type3', ) if req_type in types: yield TaggedOutput(req_type, element) else: yield element 

如您所见, process类在这里也被覆盖。 types变量包含名称
标签,并且它们必须将数字和名称与字典键的数字和名称匹配
schema_dct 。 尽管process方法具有接受参数的能力,但我从未
我能够通过他们。 我还没有找到原因。


在输出中,我们在标签数量中得到了一个线程元组,即我们的数量
预定义标签+无法标记的默认线程。


 # Stream unidentified data to default table tagged_stream.default | 'push to default table' >> beam.io.WriteToBigQuery( '%s:%s.default' % ( gcp_project, bq_dataset, ), schema=parse_table_schema_from_json(schema_dct.get('default')), ) 

转换#...(实际上,它不在图中,这是一个“分支”)-我们编写了默认流
到默认表。


tagged_stream.default使用具有default标签的流,替代语法为tagged_stream['default']


schema=parse_table_schema_from_json(schema_dct.get('default')) -在此定义方案
表。 请注意,带有有效BigQuery表架构的default.json文件
必须在当前schema_dir = './'目录中。


流将转到名为default的表。


如果不存在具有该名称的表(在该项目的给定数据集中),则该表
默认设置将根据方案自动创建
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED


 # Stream data to BigQuery tables by number of schema names for name, schema in schema_dct.iteritems(): tagged_stream[name] | 'push to table %s' % name >> beam.io.WriteToBigQuery( '%s:%s.%s' % ( gcp_project, bq_dataset, name), schema=parse_table_schema_from_json(schema), ) 

转换#3,对于那些从一开始就读并拥有这篇文章的人来说,一切都应该清楚
python语法。 我们用循环将流元组分开,并使用以下命令将每个流写入其自己的表中:
他的计划。 应该'%s:%s.%s' % (gcp_project, bq_dataset, name)流名称必须是唯一的- '%s:%s.%s' % (gcp_project, bq_dataset, name)


现在应该清楚它是如何工作的,您可以创建一个模板。 为此,您需要
在控制台中运行(如果可用,请不要忘记激活venv)或从IDE中运行:


 python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME 

同时,应组织对Google帐户的访问,例如通过导出
GOOGLE_APPLICATION_CREDENTIALS环境GOOGLE_APPLICATION_CREDENTIALS或其他方式


关于--runner的几句话。 在这种情况下, DataflowRunner表示该代码
将作为数据流作业的模板运行。 仍然可以指定
DirectRunner ,如果没有--runner和code选项,将默认使用它
将用作数据流作业,但在本地,这对于调试非常方便。


如果没有发生错误,则gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME将是
创建的模板。 值得一提的是,在gs://STORAGE_NAME/STAGING_DIR也将被写入
成功运行基于以下条件创建的Datafow作业所需的服务文件:
模板,则无需删除它们。


接下来,您需要使用此模板手动或通过任何方式创建一个数据流作业
以另一种方式(例如CI)。


4.结论

因此,我们设法使用以下方法将流从PubSub流到BigQuery
必要的数据转换,以进一步存储,转换和
数据使用情况。


主要连结



在本文中,可能会出现错误甚至错误,我将非常感谢您的建设性
批评。 最后,我想补充一点,实际上,这里并没有全部使用
Apache Beam SDK的功能,但这不是目的。

Source: https://habr.com/ru/post/zh-CN441892/


All Articles