Importez des rapports d'appels planifiés de CoMagic vers BigQuery selon un calendrier à l'aide de Google Cloud Functions

Pour quoi


Avec la structure complexe des campagnes publicitaires et un grand nombre d'appels, des outils supplémentaires pour le stockage, le traitement et l'analyse des informations sur les appels entrants deviennent nécessaires. Souvent, vous avez besoin d'un accès rapide aux données sur une longue période. Parfois, vous avez besoin d'un traitement de données complexe, d'une corrélation des appels vers un canal ou une campagne spécifique.

L'une des options pour accélérer le travail, qui offre également des avantages supplémentaires, consiste à importer des appels de CoMagic vers Google BigQuery. Beaucoup a été écrit sur les avantages de BigQuery, passons donc à la création.

Pour créer une importation automatique, vous aurez besoin de:

  1. Compte Google (si ce n'est pas déjà fait) avec le projet créé
  2. Connaissance de Python
  3. Présentation de la documentation Google Cloud

Comment créer un projet est décrit ici . Une fois le projet créé, vous devez créer un jeu de données dans BigQuery. Documentation BQ et instructions pour créer un ensemble de données .

Récupération des données de CoMagic


Passons à la documentation CoMagic. Pour obtenir une liste d'appels ou d'appels, nous avons besoin de la section des rapports.

Nous créons une classe simple pour travailler avec l'API CoMagic. Toutes les exigences nécessaires seront indiquées à la fin dans le lien vers GitHub.

import json import requests import random import pandas as pd class ComagicClient: """     CoMagic""" def __init__(self, login, password): """     CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """     CoMagic.      API   .    JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7) #  payload = self.payload_.copy() payload["method"] = method payload["params"] = params payload["id"] = id_ self.r = requests.post(self.base_url, data=json.dumps(payload)) self.last_response = json.loads(self.r.text) return self.last_response def get_token(self, login, password): """   .       .  .""" method = "login.user" params = {"login":self.login, "password":self.password} response = self.base_request(method, params) token = response['result']['data']['access_token'] return token def get_report_per_page(self, method, params): """  .      10000 .    .     110000 .     JSON-like .""" response = self.base_request(method, params) print(f"""  c {params["date_from"]}  {params["date_till"]}.  = {params["offset"]}""") result = response['result']['data'] if len(result) < 10000: return result else: params['offset'] += 10000 add_result = self.get_report_per_page(method, params) return result + add_result def get_basic_report(self, method, fields, date_from, date_till, filter=None, offset=0): """   .       method  fields.       .       ,   ,       . method -- <string>   date_from -- <string>  .  "YYYY-MM-DD hh:mm:ss" date_till -- <string>  .  "YYYY-MM-DD hh:mm:ss" fields -- <list>,    filter [] - <dict>  offset [] -- <int>  return -- <list>  """ params = {"access_token":self.token, "limit":10000, "date_from":date_from, "date_till":date_till, "fields": fields, "offset": offset} if filter: params['filter'] = filter report = self.get_report_per_page(method, params) return report 

Vous devez maintenant déterminer quel type de données est nécessaire. Les données doivent être traitées et rendues visibles pour pouvoir être chargées dans BigQuery.

Créez une classe d'assistance et définissez les données reçues de CoMagic.

 class ComagicHandler(ComagicClient): """    ,   CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """        .           .    Pandas DataFrame.      .      Connector    .    .    .  Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes'] #   CoMagic calls_data = self.get_basic_report(method, fields, date_from, date_till) # DataFrame df = pd.DataFrame(calls_data) #    .    . df[self.time_partition_field] = pd.to_datetime(df.start_time).apply(lambda x: x.date()) #  tags,   BigQuery       ,  # CoMagic.    . df['tags'] = df.tags.apply(lambda x: x if x == None else [i['tag_name'] for i in x]) return df 

Envoi de données à BigQuery


Une fois les données de CoMagic reçues et converties, vous devez les envoyer à BigQuery.

 from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """     BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """       `myproject.mydataset.mytable`.  ,   Application Default Credentials,           .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path: #      from google.oauth2 import service_account self.credentials = service_account.Credentials.from_service_account_file( service_account_file_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],) self.bq_client = bigquery.Client(credentials = self.credentials, project = self.project_id) else: self.bq_client = bigquery.Client() self.dataset = self.bq_client.get_dataset(self.dataset_id) self.location = self.dataset.location self.table_ref = self.dataset.table(self.table_id) def get_last_update(self): """        Pandas datetime.      False.""" try: self.bq_client.get_table(self.full_table_id) except NotFound as error: return False query = f"""SELECT MAX({self.time_partition_field}) as last_call FROM `{self.full_table_id}`""" result = self.bq_client.query(query,location=self.location).to_dataframe() date = pd.to_datetime(result.iloc[0,0]).date() return date def insert_dataframe(self, dataframe): """      BigQuery.     Pandas DataFrame.    ,       .""" job_config = bigquery.LoadJobConfig() #     job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': self.time_partition_field} result = self.bq_client.load_table_from_dataframe(dataframe, self.table_ref, job_config=job_config).result() return result 

Déterminer la logique de mise à jour des données


Puisqu'il y a une limite sur le nombre de lignes de données reçues de CoMagic, il est nécessaire de limiter le nombre de données demandées. Nous limiterons la période de demande. Pour ce faire, vous avez besoin d'une fonction auxiliaire qui divisera une grande période de temps en segments d'une longueur spécifiée.

 def interval_split(array, interval): """      .   ,      2,    -   ,     -    . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals 

Cela est nécessaire lors du premier chargement de données, lorsque vous devez télécharger des données pendant une longue période. La période est divisée en plusieurs petites périodes. Soit dit en passant, il est préférable de le faire sans utiliser la fonction Cloud, car ils ont une limite de temps. Eh bien, ou, en option, vous pouvez exécuter la fonction plusieurs fois.

Nous créons une classe de connecteur pour lier la table BigQuery où nous voulons stocker les données et les données de CoMagic.

 from helpfunctions import interval_split import pandas as pd class Connector: """      """ time_partition_field = 'PARTITION_DATE' #  -.       def __init__ (self, source, dest): """          """ self.source = source self.dest = dest self.source.time_partition_field = self.time_partition_field self.dest.time_partition_field = self.time_partition_field def insert_data_in_dest(self, start_date, end_date): """      .          ,     .""" dates = pd.date_range(start_date, end_date) week_intervals = interval_split(dates, 7) #     7  for week_interval in week_intervals: date_from = week_interval[0].strftime("%Y-%m-%d") + " 00:00:00" date_till = week_interval[1].strftime("%Y-%m-%d") + " 23:59:59" calls_df = self.source.get_calls_report(date_from, date_till) self.dest.insert_dataframe(calls_df) print (f"  {date_from}  {date_till}   ") return True def update_dest_data(self): #     BigQuery last_date = self.dest.get_last_update() if not last_date: #    last_date = self.source.day_before_first_call yesterday = pd.Timestamp.today(tz='Europe/Moscow').date() - pd.Timedelta(days=1) if last_date == yesterday: print("  ") else: last_date = last_date + pd.Timedelta(days=1) self.insert_data_in_dest(last_date, yesterday) return True 

Ensuite, nous prescrivons la fonction principale de mise à jour des données, qui sera lancée selon un calendrier.

 from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """    event, context  : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python""" #import base64 #pubsub_message = base64.b64decode(event['data']).decode('utf-8') # c      comagic_handler = ComagicHandler(COMAGIC_LOGIN, COMAGIC_PASSWORD, FIRST_CALL_DATE) bq_handelr = BQTableHanler(full_table_id, google_credintials_key_path) #  connector = Connector(comagic_handler, bq_handelr) #     connector.update_dest_data() 

Configurer Google Cloud Platform


Nous collectons tous les fichiers dans une archive ZIP. Dans le fichier credfile.py, nous entrons le login et le mot de passe CoMagic pour recevoir le jeton, ainsi que le nom complet de la table dans BigQuery et le chemin d'accès au fichier de compte de service si le script est lancé à partir de la machine locale.

Créer une fonction cloud


  • Accédez à la console
  • Si aucune fonction n'a encore été créée, cliquez sur «Créer une fonction»
  • Dans le champ déclencheur, sélectionnez PUB / SUB
  • Créez un nouveau thème pour PUB / SUB. Par exemple, «update_calls»
  • Source: téléchargement ZIP (fichier ZIP local)
  • Environnement: Python 3.7
  • Téléchargez le fichier zip
  • Choisir un segment temporaire de Cloud Storage
  • Dans le champ «fonction appelée», nous écrivons «principal»
  • Mémoire allouée: facultative



Configuration de Scheduler et PUB / SUB


Dans la dernière étape, nous avons créé le déclencheur `update_calls`. Ce sujet automatique est apparu dans la liste des sujets .

Maintenant, avec Cloud Scheduler, vous devez configurer le déclencheur. quand il se déclenchera et GCF démarrera.

  • Accédez à la console
  • Dans le champ de fréquence au format CRON, définissez le moment où le déclencheur doit se déclencher et la fonction démarre.
  • Destination: Pub / Sub
  • Objet: enregistrez le thème qui a été spécifié lors de la création de la fonction: «update_calls»
  • Payloads * (Payloads) - ce sont les informations qui seront transférées à Pub / Sub et à la fonction principale



Maintenant, le script sera lancé tous les jours à 01h00 et les données d'appel seront mises à jour à la fin de la journée précédente.

Lien vers GitHub pour s'exécuter à partir de l'ordinateur local
Lien GitHub vers un fichier ZIP

Source: https://habr.com/ru/post/fr475804/


All Articles