为了什么
随着广告活动的复杂结构和大量的呼叫,用于存储,处理和分析有关来电的信息的附加工具变得必要。 通常,您需要长时间长时间快速访问数据。 有时您需要进行复杂的数据处理,从而将呼叫与特定渠道或广告系列相关联。
加快工作的一种方法(还提供其他好处)是将呼叫从CoMagic导入到Google BigQuery。 有关BigQuery的好处的文章很多,所以让我们继续进行创作。
要创建自动导入,您需要:
- 已创建项目的Google帐户(如果尚未注册)
- Python知识
- 介绍Google Cloud文档
这里介绍
了如何创建项目。 创建项目后,您需要在BigQuery中创建一个数据集。
BQ文档和
有关创建数据集的说明 。
从CoMagic检索数据
转到CoMagic
文档 。 要获取通话清单,我们需要报告部分。
我们创建了一个用于使用CoMagic API的简单类。 所有必要的要求将在GitHub链接的末尾指出。
import json import requests import random import pandas as pd class ComagicClient: """ CoMagic""" def __init__(self, login, password): """ CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """ CoMagic. API . JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7)
现在,您需要确定需要哪种数据。 需要处理数据并使数据可见,以便可以将其加载到BigQuery中。
创建一个帮助器类并定义从CoMagic接收的数据。
class ComagicHandler(ComagicClient): """ , CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """ . . Pandas DataFrame. . Connector . . . Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes']
将数据发送到BigQuery
接收并转换了来自CoMagic的数据后,您需要将其发送到BigQuery。
from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """ BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """ `myproject.mydataset.mytable`. , Application Default Credentials, .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path:
确定更新数据的逻辑
由于从CoMagic接收的数据行数有限制,因此有必要限制请求的数据数。 我们将限制请求期限。 为此,您需要一个辅助功能,该功能可以将较长的时间段分成指定长度的段。
def interval_split(array, interval): """ . , 2, - , - . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals
首次加载数据时,需要长时间下载数据时,这是必需的。 该时期分为几个小时期。 顺便说一句,最好不要使用Cloud Function,因为它们有时间限制。 好吧,或者作为一个选择,您可以多次运行该函数。
我们创建一个连接器类来链接BigQuery表,该表用于存储数据和CoMagic中的数据。
from helpfunctions import interval_split import pandas as pd class Connector: """ """ time_partition_field = 'PARTITION_DATE'
接下来,我们规定了用于更新数据的主要功能,该功能将按计划启动。
from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """ event, context : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python"""
配置Google Cloud Platform
我们将所有文件收集在ZIP存档中。 在credfile.py文件中,我们输入CoMagic登录名和密码以接收令牌,以及BigQuery中表的全名以及(如果脚本是从本地计算机启动的)服务帐户文件的路径。
创建云功能
- 转到控制台
- 如果尚未创建功能,请单击“创建功能”
- 在触发字段中,选择PUB / SUB
- 为PUB / SUB创建一个新主题。 例如,“ update_calls”
- 来源:ZIP上传(本地ZIP文件)
- 环境:Python 3.7
- 下载压缩文件
- 选择云存储的临时段
- 在“函数”字段中,我们写“ main”
- 分配的内存:可选

配置调度程序和PUB / SUB
在最后一步,我们创建了“ update_calls”触发器。 该自动主题已出现在
主题列表中 。

现在,使用Cloud Scheduler,您需要配置触发器。 何时触发,GCF将启动。
- 转到控制台
- 在CRON格式的频率字段中,设置触发器应触发和功能启动的时间。
- 目的地:Pub / Sub
- 主题:注册创建函数时指定的主题:“ update_calls”
- 有效载荷*(有效载荷)-这是将传输到Pub / Sub和主要功能的信息

现在,脚本将在每天的01:00启动,并且通话数据将在前一天结束时进行更新。
链接到GitHub以从本地计算机运行
GitHub
链接到ZIP文件