使用Google Cloud Functions自动将数据导入Google BigQuery

我们一直在与Google BigQuery进行合作-我们从不同的来源导入有关用户,他们的订单和广告费用的数据,以便能够将它们彼此结合在一起。 这给了我们什么? 例如,如果您有一家在线商店,并且客户通过电话订购,然后登录到该站点,则可以使用Google BigQuery追溯其所有操作。 您可以通过营销渠道跟踪客户的整个路径-从首次访问网站到在实体店购买商品,并考虑这种离线销售情况评估广告系列。

在这个集合中,我们有6个python脚本,可使用Google Cloud函数自动将数据导入Google BigQuery :FTP; FTPS HTTP(s); 对讲机 MySQL和SFTP。 操作原理是相同的:使用HTTP POST请求,调用Cloud函数,该Cloud函数从源接收数据并将其加载到Google BigQuery表中。 如果该表已存在于所选数据集中,则它将被覆盖。

基本要求


  1. 在Google Cloud Platform中使用已激活的结算进行项目。
  2. 有权编辑要上传表的BigQuery项目中Cloud功能的服务帐户的编辑(BigQuery数据的“编辑”角色)和完成任务(BigQuery任务的用户角色);
  3. 一个HTTP客户端,用于执行调用Cloud功能的POST请求。

设定步骤



名称:例如,ftp-bq-integration或任何其他合适的名称;

分配的内存: 2 GB或更少,取决于已处理文件的大小;

触发: HTTP;

源代码:内置编辑器;

运行时环境: Python3.X。

  • 将main.py文件的内容复制到内置编辑器的main.py选项卡中。
  • 将Requirements.txt文件的内容复制到内置编辑器的requirements.txt选项卡中。
  • 根据要使用的模块,将ftp / ftps / https等指定为要调用的函数。
  • 在其他参数中,等待时间从60秒增加。 长达540秒。 或更小,取决于要处理的文件的大小。
  • 单击“创建”按钮完成创建云功能。

FTP / FTPS / SFTP


此模块旨在使用Google Cloud功能从Google BigQuery中的FTP(FTPS,SFTP)服务器传输文件。 该解决方案可让您从FTP服务器上定期更新的文件中自动将数据上传到Google BigQuery。

从适当的服务器获取的文件可以具有任何适当的扩展名(.json,.txt,.csv),但必须采用以下格式之一:JSON(换行符分隔)或逗号分隔值(CSV)。

使用范例


from urllib import urlencode from httplib2 import Http trigger_url = "https://REGION-PROJECT_ID.cloudfunctions.net/ftp/" headers = { "Content-Type": "application/json" } payload = { "ftp": { "user": "ftp.user_name", "psswd": "ftp.password", "path_to_file": "ftp://server_host/path/to/file/" }, "bq": { "project_id": "my_bq_project", "dataset_id": "my_bq_dataset", "table_id": "my_bq_table", "delimiter": ",", "source_format": "NEWLINE_DELIMITED_JSON", "location": "US" } } Http().request(trigger_url, "POST", urlencode(payload), headers = headers) 

HTTP(s)


用于将文件从HTTPS服务器传输到Google BigQuery的模块。

使用范例


 from urllib import urlencode from httplib2 import Http trigger_url = "https://REGION-PROJECT_ID.cloudfunctions.net/https/" headers = { "Content-Type": "application/json" } payload = { "https": { "path_to_file": "https://server_host/path/to/file/", "user": "https.user_name", "psswd": "https.password" }, "bq": { "project_id": "my_bq_project", "dataset_id": "my_bq_dataset", "table_id": "my_bq_table", "delimiter": ",", "source_format": "CSV", "location": "US" } } Http().request(trigger_url, "POST", urlencode(payload), headers = headers) 

对讲机


使用Google Cloud功能自动将数据从对讲机传输到Google BigQuery的模块。 当前,该模块允许您从对讲机导入以下实体:用户,公司,联系人,管理员,对话,团队,标签,细分。 但是,该模块不支持自定义属性。

使用范例


 from urllib import urlencode from httplib2 import Http trigger_url = "https://REGION-PROJECT_ID.cloudfunctions.net/intercom/" headers = { "Content-Type": "application/json" } payload = { "intercom": { "accessToken": "INTERCOM ACCESS TOKEN", "entities": [ "users", "companies", "contacts", "admins", "conversations", "teams", "tags", "segments" ] }, "bq": { "project_id": "YOUR GCP PROJECT", "dataset_id": "YOUR DATASET NAME", "location": "US" } } Http().request(trigger_url, "POST", urlencode(payload), headers = headers) 

的MySQL


用于使用Google Cloud函数将文件从MySQL服务器传输到Google BigQuery。 通过此解决方案,您可以从MySQL服务器上定期更新的表中自动将数据上传到Google BigQuery。

使用范例


 from urllib import urlencode from httplib2 import Http trigger_url = "https://REGION-PROJECT_ID.cloudfunctions.net/mysql/" headers = { "Content-Type": "application/json" } payload = { "mysql": { "user": "mysql.user", "psswd": "mysql.password", "host": "host_name", "port”: 3306, "database": "database_name", "table_id": "table_name", "query": "SELECT * FROM table_name" }, "bq": { "project_id": "my_bq_projec", "dataset_id": "my_bq_dataset", "table_id": "my_bq_table" } } Http().request(trigger_url, "POST", urlencode(payload), headers = headers) 

在每个部分的自述文件中可以找到每个模块的详细文档。

这仅仅是开始,现在我们正在为Bitrix和amoCRM编写脚本,因为我们看到它们在客户中最受欢迎。 分享您用于组合数据的方法以及为此而缺乏的集成。

Source: https://habr.com/ru/post/zh-CN426647/


All Articles