使用Google Cloud Functions按计划将计划的呼叫报告从CoMagic导入到BigQuery

为了什么


随着广告活动的复杂结构和大量的呼叫,用于存储,处理和分析有关来电的信息的附加工具变得必要。 通常,您需要长时间长时间快速访问数据。 有时您需要进行复杂的数据处理,从而将呼叫与特定渠道或广告系列相关联。

加快工作的一种方法(还提供其他好处)是将呼叫从CoMagic导入到Google BigQuery。 有关BigQuery的好处的文章很多,所以让我们继续进行创作。

要创建自动导入,您需要:

  1. 已创建项目的Google帐户(如果尚未注册)
  2. Python知识
  3. 介绍Google Cloud文档

这里介绍如何创建项目。 创建项目后,您需要在BigQuery中创建一个数据集。 BQ文档有关创建数据集的说明

从CoMagic检索数据


转到CoMagic 文档 。 要获取通话清单,我们需要报告部分。

我们创建了一个用于使用CoMagic API的简单类。 所有必要的要求将在GitHub链接的末尾指出。

import json import requests import random import pandas as pd class ComagicClient: """     CoMagic""" def __init__(self, login, password): """     CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """     CoMagic.      API   .    JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7) #  payload = self.payload_.copy() payload["method"] = method payload["params"] = params payload["id"] = id_ self.r = requests.post(self.base_url, data=json.dumps(payload)) self.last_response = json.loads(self.r.text) return self.last_response def get_token(self, login, password): """   .       .  .""" method = "login.user" params = {"login":self.login, "password":self.password} response = self.base_request(method, params) token = response['result']['data']['access_token'] return token def get_report_per_page(self, method, params): """  .      10000 .    .     110000 .     JSON-like .""" response = self.base_request(method, params) print(f"""  c {params["date_from"]}  {params["date_till"]}.  = {params["offset"]}""") result = response['result']['data'] if len(result) < 10000: return result else: params['offset'] += 10000 add_result = self.get_report_per_page(method, params) return result + add_result def get_basic_report(self, method, fields, date_from, date_till, filter=None, offset=0): """   .       method  fields.       .       ,   ,       . method -- <string>   date_from -- <string>  .  "YYYY-MM-DD hh:mm:ss" date_till -- <string>  .  "YYYY-MM-DD hh:mm:ss" fields -- <list>,    filter [] - <dict>  offset [] -- <int>  return -- <list>  """ params = {"access_token":self.token, "limit":10000, "date_from":date_from, "date_till":date_till, "fields": fields, "offset": offset} if filter: params['filter'] = filter report = self.get_report_per_page(method, params) return report 

现在,您需要确定需要哪种数据。 需要处理数据并使数据可见,以便可以将其加载到BigQuery中。

创建一个帮助器类并定义从CoMagic接收的数据。

 class ComagicHandler(ComagicClient): """    ,   CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """        .           .    Pandas DataFrame.      .      Connector    .    .    .  Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes'] #   CoMagic calls_data = self.get_basic_report(method, fields, date_from, date_till) # DataFrame df = pd.DataFrame(calls_data) #    .    . df[self.time_partition_field] = pd.to_datetime(df.start_time).apply(lambda x: x.date()) #  tags,   BigQuery       ,  # CoMagic.    . df['tags'] = df.tags.apply(lambda x: x if x == None else [i['tag_name'] for i in x]) return df 

将数据发送到BigQuery


接收并转换了来自CoMagic的数据后,您需要将其发送到BigQuery。

 from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """     BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """       `myproject.mydataset.mytable`.  ,   Application Default Credentials,           .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path: #      from google.oauth2 import service_account self.credentials = service_account.Credentials.from_service_account_file( service_account_file_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],) self.bq_client = bigquery.Client(credentials = self.credentials, project = self.project_id) else: self.bq_client = bigquery.Client() self.dataset = self.bq_client.get_dataset(self.dataset_id) self.location = self.dataset.location self.table_ref = self.dataset.table(self.table_id) def get_last_update(self): """        Pandas datetime.      False.""" try: self.bq_client.get_table(self.full_table_id) except NotFound as error: return False query = f"""SELECT MAX({self.time_partition_field}) as last_call FROM `{self.full_table_id}`""" result = self.bq_client.query(query,location=self.location).to_dataframe() date = pd.to_datetime(result.iloc[0,0]).date() return date def insert_dataframe(self, dataframe): """      BigQuery.     Pandas DataFrame.    ,       .""" job_config = bigquery.LoadJobConfig() #     job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': self.time_partition_field} result = self.bq_client.load_table_from_dataframe(dataframe, self.table_ref, job_config=job_config).result() return result 

确定更新数据的逻辑


由于从CoMagic接收的数据行数有限制,因此有必要限制请求的数据数。 我们将限制请求期限。 为此,您需要一个辅助功能,该功能可以将较长的时间段分成指定长度的段。

 def interval_split(array, interval): """      .   ,      2,    -   ,     -    . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals 

首次加载数据时,需要长时间下载数据时,这是必需的。 该时期分为几个小时期。 顺便说一句,最好不要使用Cloud Function,因为它们有时间限制。 好吧,或者作为一个选择,您可以多次运行该函数。

我们创建一个连接器类来链接BigQuery表,该表用于存储数据和CoMagic中的数据。

 from helpfunctions import interval_split import pandas as pd class Connector: """      """ time_partition_field = 'PARTITION_DATE' #  -.       def __init__ (self, source, dest): """          """ self.source = source self.dest = dest self.source.time_partition_field = self.time_partition_field self.dest.time_partition_field = self.time_partition_field def insert_data_in_dest(self, start_date, end_date): """      .          ,     .""" dates = pd.date_range(start_date, end_date) week_intervals = interval_split(dates, 7) #     7  for week_interval in week_intervals: date_from = week_interval[0].strftime("%Y-%m-%d") + " 00:00:00" date_till = week_interval[1].strftime("%Y-%m-%d") + " 23:59:59" calls_df = self.source.get_calls_report(date_from, date_till) self.dest.insert_dataframe(calls_df) print (f"  {date_from}  {date_till}   ") return True def update_dest_data(self): #     BigQuery last_date = self.dest.get_last_update() if not last_date: #    last_date = self.source.day_before_first_call yesterday = pd.Timestamp.today(tz='Europe/Moscow').date() - pd.Timedelta(days=1) if last_date == yesterday: print("  ") else: last_date = last_date + pd.Timedelta(days=1) self.insert_data_in_dest(last_date, yesterday) return True 

接下来,我们规定了用于更新数据的主要功能,该功能将按计划启动。

 from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """    event, context  : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python""" #import base64 #pubsub_message = base64.b64decode(event['data']).decode('utf-8') # c      comagic_handler = ComagicHandler(COMAGIC_LOGIN, COMAGIC_PASSWORD, FIRST_CALL_DATE) bq_handelr = BQTableHanler(full_table_id, google_credintials_key_path) #  connector = Connector(comagic_handler, bq_handelr) #     connector.update_dest_data() 

配置Google Cloud Platform


我们将所有文件收集在ZIP存档中。 在credfile.py文件中,我们输入CoMagic登录名和密码以接收令牌,以及BigQuery中表的全名以及(如果脚本是从本地计算机启动的)服务帐户文件的路径。

创建云功能


  • 转到控制台
  • 如果尚未创建功能,请单击“创建功能”
  • 在触发字段中,选择PUB / SUB
  • 为PUB / SUB创建一个新主题。 例如,“ update_calls”
  • 来源:ZIP上传(本地ZIP文件)
  • 环境:Python 3.7
  • 下载压缩文件
  • 选择云存储的临时段
  • 在“函数”字段中,我们写“ main”
  • 分配的内存:可选



配置调度程序和PUB / SUB


在最后一步,我们创建了“ update_calls”触发器。 该自动主题已出现在主题列表中

现在,使用Cloud Scheduler,您需要配置触发器。 何时触发,GCF将启动。

  • 转到控制台
  • 在CRON格式的频率字段中,设置触发器应触发和功能启动的时间。
  • 目的地:Pub / Sub
  • 主题:注册创建函数时指定的主题:“ update_calls”
  • 有效载荷*(有效载荷)-这是将传输到Pub / Sub和主要功能的信息



现在,脚本将在每天的01:00启动,并且通话数据将在前一天结束时进行更新。

链接到GitHub以从本地计算机运行
GitHub 链接到ZIP文件

Source: https://habr.com/ru/post/zh-CN475804/


All Articles