
目前,我从事流式传输(和转换)数据的任务。 在某些圈子里
这样的过程被称为ETL ,即 提取,转换和加载信息。
整个过程包括以下Google Cloud Platform服务的参与:
0.当前状态
目前,上述服务有一个有效的流式传输版本,但是在
作为模板, 使用标准模板之一。
问题在于该模板提供了1对1的数据传输,即 在
在Pub / Sub的入口处,我们有一个JSON格式的字符串,在输出处,我们有一个包含字段的BigQuery表,
对应于输入JSON顶层对象的键。
1.问题陈述
创建一个数据流模板,该模板可让您在输出中获取一个或多个表
根据给定的条件。 例如,我们要为每个表创建一个单独的表
特定输入JSON密钥的值。 有必要考虑以下事实:
输入JSON对象可以包含嵌套JSON作为值,即 是必要的
能够使用RECORD
类型的字段创建BigQuery表以存储嵌套
数据。
2.决定的准备
要创建数据流模板,请使用Apache Beam SDK ,依次使用
支持Java和Python作为编程语言。 我必须说
仅支持Python 2.7.x版本,这让我有些惊讶。 而且,支持
Java稍宽一些,因为 例如,对于Python,某些功能不可用,更多
少量的内置连接器 。 顺便说一句,您可以编写自己的连接器。
但是,由于我不熟悉Java,因此我使用了Python。
在开始创建模板之前,必须具有以下条件:
- 输入JSON格式,它不应及时更改
- BigQuery表的架构或将要流式传输数据的架构
- 输出数据流将流入的表数
请注意,在创建模板并基于该模板启动Dataflow Job之后,可以将这些参数设置为
仅通过创建新模板进行更改。
让我们谈谈这些限制。 它们全都来自这样一个事实,即不可能
创建一个可以将任何字符串作为输入的动态模板,对其进行解析
根据内部逻辑,然后用动态填充动态创建的表
由电路创建。 这种可能性很可能存在,但是在数据中
我没有成功实施这样的计划。 据我了解的整体
该管道是在运行时执行之前构建的,因此无法将其更改为
飞。 也许有人会分享他们的决定。
3.决定
为了更全面地了解该过程,有必要提供所谓的管道图
从Apache Beam文档中获得。

在我们的例子中(我们将使用划分成几个表):
- 输入-数据来自Dataflow Job中的PubSub
- 转换 #1-将数据从字符串转换为Python字典,我们得到输出
PCollection #1 - 转换#2-标记数据,以根据单独的表进一步分离为
输出是PCollection#2(实际上是PCollection元组) - 转换#3-使用方案将PCollection#2中的数据写入表
桌子
在编写自己的模板的过程中, 这些示例为我带来了积极的启发。
现在,我们将遍历代码并给出解释,但是首先值得一提的是
编写此模板的困难在于根据“数据流”进行思考,并且
不是特定的消息。 还需要了解,Pub / Sub通过消息和
正是从他们那里,我们将收到标记流的信息。
pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True
因为 Apache Beam Pub / Sub IO连接器仅在必要的流模式下使用
添加PipelineOptions()(尽管实际上不使用选项);否则,创建一个模板
属于例外。 必须提到有关启动模板的选项。 他们可以是
静态的,所谓的“运行时”。 这是有关此主题的文档的链接。 这些选项允许您创建模板而不预先指定参数,但是在从模板启动Dataflow Job时传递它们,但是我仍然无法实现它,可能是由于此连接器不支持RuntimeValueProvider
。
注释清楚显示一切,我们阅读了该主题的主题。 值得补充的是,您可以参加直播
来自主题和订阅(订阅)。 如果将主题指定为输入,则
将自动创建对该主题的临时订阅。 语法也很漂亮
清晰的输入数据流beam.io.ReadFromPubSub(input_topic)
发送到我们
管道p
。
这是Transform#1发生的地方,我们的输入从python字符串转换为
python dict,在输出中得到PCollection#1。 >>
出现在语法中。 开
实际上,引号中的文字是信息流的名称(必须唯一)以及注释,
它将添加到GCP Dataflow Web界面中图形上的块中。 让我们更详细地考虑
重写的类TransformToBigQuery
。
class TransformToBigQuery(beam.DoFn):
element
变量将包含来自PubSub订阅的一条消息。 如从
代码,在我们的情况下,它应该是有效的JSON。 在教室里一定要
重新定义了process
方法,其中应进行必要的转换
输入线以获得与电路匹配的输出
将数据加载到的表。 因为 在这种情况下,我们的流程是
连续的,对于Apache Beam来说是unbounded
,您必须使用
对于最终数据流, yield
而不是return
。 如果是最终流程,您可以
(和必要)另外配置windowing
和triggers
此代码将PCollection#1引导到Transform#2,在此进行标记
(分离)数据流。 在变量schema_dct
在这种情况下,是一个字典,其中的键是不带扩展名的方案文件的名称,它将是标记,值是方案的有效JSON。
此标记的BigQuery表。 应该注意的是,该方案应准确地发送给
查看{'fields': }
,其中
是JSON形式的BigQuery表的架构(您可以
从网络界面导出)。
main='default'
是它们将使用的线程标签的名称
所有不受标记条件约束的消息。 考虑上课
TagDataWithReqType
。
class TagDataWithReqType(beam.DoFn):
如您所见, process
类在这里也被覆盖。 types
变量包含名称
标签,并且它们必须将数字和名称与字典键的数字和名称匹配
schema_dct
。 尽管process
方法具有接受参数的能力,但我从未
我能够通过他们。 我还没有找到原因。
在输出中,我们在标签数量中得到了一个线程元组,即我们的数量
预定义标签+无法标记的默认线程。
转换#...(实际上,它不在图中,这是一个“分支”)-我们编写了默认流
到默认表。
tagged_stream.default
使用具有default
标签的流,替代语法为tagged_stream['default']
schema=parse_table_schema_from_json(schema_dct.get('default'))
-在此定义方案
表。 请注意,带有有效BigQuery表架构的default.json
文件
必须在当前schema_dir = './'
目录中。
流将转到名为default
的表。
如果不存在具有该名称的表(在该项目的给定数据集中),则该表
默认设置将根据方案自动创建
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
转换#3,对于那些从一开始就读并拥有这篇文章的人来说,一切都应该清楚
python语法。 我们用循环将流元组分开,并使用以下命令将每个流写入其自己的表中:
他的计划。 应该'%s:%s.%s' % (gcp_project, bq_dataset, name)
流名称必须是唯一的- '%s:%s.%s' % (gcp_project, bq_dataset, name)
。
现在应该清楚它是如何工作的,您可以创建一个模板。 为此,您需要
在控制台中运行(如果可用,请不要忘记激活venv)或从IDE中运行:
python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
同时,应组织对Google帐户的访问,例如通过导出
GOOGLE_APPLICATION_CREDENTIALS
环境GOOGLE_APPLICATION_CREDENTIALS
或其他方式 。
关于--runner
的几句话。 在这种情况下, DataflowRunner
表示该代码
将作为数据流作业的模板运行。 仍然可以指定
DirectRunner
,如果没有--runner
和code选项,将默认使用它
将用作数据流作业,但在本地,这对于调试非常方便。
如果没有发生错误,则gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
将是
创建的模板。 值得一提的是,在gs://STORAGE_NAME/STAGING_DIR
也将被写入
成功运行基于以下条件创建的Datafow作业所需的服务文件:
模板,则无需删除它们。
接下来,您需要使用此模板手动或通过任何方式创建一个数据流作业
以另一种方式(例如CI)。
4.结论
因此,我们设法使用以下方法将流从PubSub流到BigQuery
必要的数据转换,以进一步存储,转换和
数据使用情况。
主要连结
在本文中,可能会出现错误甚至错误,我将非常感谢您的建设性
批评。 最后,我想补充一点,实际上,这里并没有全部使用
Apache Beam SDK的功能,但这不是目的。