Importe informes de llamadas programadas de CoMagic a BigQuery en un horario usando Google Cloud Functions

Para que


Con la compleja estructura de las campa帽as publicitarias y una gran cantidad de llamadas, se hacen necesarias herramientas adicionales para almacenar, procesar y analizar informaci贸n sobre las llamadas entrantes. A menudo necesita un acceso r谩pido a los datos durante un largo per铆odo de tiempo. A veces necesita un procesamiento de datos complejo, correlacionando llamadas a un canal o campa帽a espec铆ficos.

Una de las opciones para acelerar el trabajo, que tambi茅n proporciona beneficios adicionales, es importar llamadas de CoMagic a Google BigQuery. Mucho se ha escrito sobre los beneficios de BigQuery, as铆 que pasemos a la creaci贸n.

Para crear una importaci贸n autom谩tica, necesitar谩:

  1. Cuenta de Google (si a煤n no lo est谩) con el proyecto creado
  2. Conocimiento de Python
  3. Presentaci贸n de la documentaci贸n de Google Cloud

Aqu铆 se describe c贸mo crear un proyecto. Despu茅s de crear el proyecto, debe crear un conjunto de datos en BigQuery. Documentaci贸n de BQ e instrucciones para crear un conjunto de datos .

Recuperando datos de CoMagic


Volviendo a la documentaci贸n de CoMagic. Para obtener una lista de llamadas o llamadas, necesitamos la secci贸n de informes.

Creamos una clase simple para trabajar con la API de CoMagic. Todos los requisitos necesarios se indicar谩n al final en el enlace a GitHub.

import json import requests import random import pandas as pd class ComagicClient: """     CoMagic""" def __init__(self, login, password): """     CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """     CoMagic.      API   .    JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7) #  payload = self.payload_.copy() payload["method"] = method payload["params"] = params payload["id"] = id_ self.r = requests.post(self.base_url, data=json.dumps(payload)) self.last_response = json.loads(self.r.text) return self.last_response def get_token(self, login, password): """   .       .  .""" method = "login.user" params = {"login":self.login, "password":self.password} response = self.base_request(method, params) token = response['result']['data']['access_token'] return token def get_report_per_page(self, method, params): """  .      10000 .    .     110000 .     JSON-like .""" response = self.base_request(method, params) print(f"""  c {params["date_from"]}  {params["date_till"]}.  = {params["offset"]}""") result = response['result']['data'] if len(result) < 10000: return result else: params['offset'] += 10000 add_result = self.get_report_per_page(method, params) return result + add_result def get_basic_report(self, method, fields, date_from, date_till, filter=None, offset=0): """   .       method  fields.       .       ,   ,       . method -- <string>   date_from -- <string>  .  "YYYY-MM-DD hh:mm:ss" date_till -- <string>  .  "YYYY-MM-DD hh:mm:ss" fields -- <list>,    filter [] - <dict>  offset [] -- <int>  return -- <list>  """ params = {"access_token":self.token, "limit":10000, "date_from":date_from, "date_till":date_till, "fields": fields, "offset": offset} if filter: params['filter'] = filter report = self.get_report_per_page(method, params) return report 

Ahora debe determinar qu茅 tipo de datos se necesitan. Los datos deben procesarse y hacerse visibles para poder cargarlos en BigQuery.

Cree una clase auxiliar y defina los datos recibidos de CoMagic.

 class ComagicHandler(ComagicClient): """    ,   CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """        .           .    Pandas DataFrame.      .      Connector    .    .    .  Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes'] #   CoMagic calls_data = self.get_basic_report(method, fields, date_from, date_till) # DataFrame df = pd.DataFrame(calls_data) #    .    . df[self.time_partition_field] = pd.to_datetime(df.start_time).apply(lambda x: x.date()) #  tags,   BigQuery       ,  # CoMagic.    . df['tags'] = df.tags.apply(lambda x: x if x == None else [i['tag_name'] for i in x]) return df 

Env铆o de datos a BigQuery


Despu茅s de recibir y convertir los datos de CoMagic, debe enviarlos a BigQuery.

 from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """     BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """       `myproject.mydataset.mytable`.  ,   Application Default Credentials,           .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path: #      from google.oauth2 import service_account self.credentials = service_account.Credentials.from_service_account_file( service_account_file_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],) self.bq_client = bigquery.Client(credentials = self.credentials, project = self.project_id) else: self.bq_client = bigquery.Client() self.dataset = self.bq_client.get_dataset(self.dataset_id) self.location = self.dataset.location self.table_ref = self.dataset.table(self.table_id) def get_last_update(self): """        Pandas datetime.      False.""" try: self.bq_client.get_table(self.full_table_id) except NotFound as error: return False query = f"""SELECT MAX({self.time_partition_field}) as last_call FROM `{self.full_table_id}`""" result = self.bq_client.query(query,location=self.location).to_dataframe() date = pd.to_datetime(result.iloc[0,0]).date() return date def insert_dataframe(self, dataframe): """      BigQuery.     Pandas DataFrame.    ,       .""" job_config = bigquery.LoadJobConfig() #     job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': self.time_partition_field} result = self.bq_client.load_table_from_dataframe(dataframe, self.table_ref, job_config=job_config).result() return result 

Determinar la l贸gica para actualizar los datos.


Como existe un l铆mite en el n煤mero de filas de datos recibidas de CoMagic, es necesario limitar el n煤mero de datos solicitados. Limitaremos el per铆odo de solicitud. Para hacer esto, necesita una funci贸n auxiliar que dividir谩 un gran per铆odo de tiempo en segmentos de una longitud espec铆fica.

 def interval_split(array, interval): """      .   ,      2,    -   ,     -    . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals 

Esto es necesario cuando se cargan datos por primera vez, cuando necesita descargar datos durante un largo per铆odo de tiempo. El per铆odo se divide en varios per铆odos peque帽os. Por cierto, es mejor hacerlo sin usar la funci贸n de nube, ya que tienen un l铆mite de tiempo. Bueno, o, como opci贸n, puede ejecutar la funci贸n muchas, muchas veces.

Creamos una clase de conector para vincular la tabla BigQuery donde queremos almacenar los datos y los datos de CoMagic.

 from helpfunctions import interval_split import pandas as pd class Connector: """      """ time_partition_field = 'PARTITION_DATE' #  -.       def __init__ (self, source, dest): """          """ self.source = source self.dest = dest self.source.time_partition_field = self.time_partition_field self.dest.time_partition_field = self.time_partition_field def insert_data_in_dest(self, start_date, end_date): """      .          ,     .""" dates = pd.date_range(start_date, end_date) week_intervals = interval_split(dates, 7) #     7  for week_interval in week_intervals: date_from = week_interval[0].strftime("%Y-%m-%d") + " 00:00:00" date_till = week_interval[1].strftime("%Y-%m-%d") + " 23:59:59" calls_df = self.source.get_calls_report(date_from, date_till) self.dest.insert_dataframe(calls_df) print (f"  {date_from}  {date_till}   ") return True def update_dest_data(self): #     BigQuery last_date = self.dest.get_last_update() if not last_date: #    last_date = self.source.day_before_first_call yesterday = pd.Timestamp.today(tz='Europe/Moscow').date() - pd.Timedelta(days=1) if last_date == yesterday: print("  ") else: last_date = last_date + pd.Timedelta(days=1) self.insert_data_in_dest(last_date, yesterday) return True 

A continuaci贸n, prescribimos la funci贸n principal para actualizar los datos, que se lanzar谩n en un horario.

 from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """    event, context  : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python""" #import base64 #pubsub_message = base64.b64decode(event['data']).decode('utf-8') # c      comagic_handler = ComagicHandler(COMAGIC_LOGIN, COMAGIC_PASSWORD, FIRST_CALL_DATE) bq_handelr = BQTableHanler(full_table_id, google_credintials_key_path) #  connector = Connector(comagic_handler, bq_handelr) #     connector.update_dest_data() 

Configurar Google Cloud Platform


Recopilamos todos los archivos en un archivo ZIP. En el archivo credfile.py, ingresamos el nombre de usuario y la contrase帽a de CoMagic para recibir el token, as铆 como el nombre completo de la tabla en BigQuery y la ruta al archivo de la cuenta de servicio si el script se inicia desde la m谩quina local.

Crear una funci贸n en la nube


  • Ir a la consola
  • Si a煤n no se ha creado ninguna funci贸n, haga clic en "Crear funci贸n"
  • En el campo de activaci贸n, seleccione PUB / SUB
  • Crea un nuevo tema para PUB / SUB. Por ejemplo, 'update_calls'
  • Fuente: carga ZIP (archivo ZIP local)
  • Entorno: Python 3.7
  • Descargar el archivo zip
  • Elegir un segmento temporal de almacenamiento en la nube
  • En el campo `llamado funci贸n` escribimos 'main'
  • Memoria asignada: opcional



Configuraci贸n del planificador y PUB / SUB


En el 煤ltimo paso, creamos el activador `update_calls`. Este tema autom谩tico ha aparecido en la lista de temas .

Ahora, con Cloud Scheduler necesita configurar el disparador. cu谩ndo se disparar谩 y se iniciar谩 el GCF.

  • Ir a la consola
  • En el campo de frecuencia en el formato CRON, configure cu谩ndo debe dispararse el disparador y se inicia la funci贸n.
  • Destino: Pub / Sub
  • Asunto: registre el tema que se especific贸 al crear la funci贸n: "update_calls"
  • Carga 煤til * (Carga 煤til): esta es la informaci贸n que se transferir谩 a Pub / Sub y a la funci贸n principal



Ahora el script se lanzar谩 diariamente a las 01:00 y los datos de la llamada se actualizar谩n al final del d铆a anterior.

Enlace a GitHub para ejecutar desde la computadora local
Enlace de GitHub al archivo ZIP

Source: https://habr.com/ru/post/475804/


All Articles