Importieren Sie geplante Anrufberichte von CoMagic nach einem Zeitplan in BigQuery mithilfe von Google Cloud-Funktionen

Wofür


Mit der komplexen Struktur von Werbekampagnen und einer großen Anzahl von Anrufen werden zusätzliche Tools zum Speichern, Verarbeiten und Analysieren von Informationen über eingehende Anrufe erforderlich. Oft muss über einen langen Zeitraum schnell auf Daten zugegriffen werden. Manchmal ist eine komplexe Datenverarbeitung erforderlich, bei der Anrufe einem bestimmten Kanal oder einer bestimmten Kampagne zugeordnet werden.

Eine der Möglichkeiten zur Beschleunigung der Arbeit, die auch zusätzliche Vorteile bietet, besteht darin, Anrufe von CoMagic nach Google BigQuery zu importieren. Es wurde viel über die Vorteile von BigQuery geschrieben. Fahren wir also mit der Erstellung fort.

Um einen automatischen Import zu erstellen, benötigen Sie:

  1. Google-Konto (falls nicht bereits vorhanden) mit dem erstellten Projekt
  2. Python-Kenntnisse
  3. Einführung in die Google Cloud-Dokumentation

Wie Sie ein Projekt anlegen, erfahren Sie hier . Nachdem das Projekt erstellt wurde, müssen Sie in BigQuery ein Dataset erstellen. BQ-Dokumentation und Anweisungen zum Erstellen eines Datensatzes .

Abrufen von Daten aus CoMagic


Weiter zur CoMagic- Dokumentation . Um eine Liste von Anrufen oder Anrufen zu erhalten, benötigen wir den Abschnitt Berichte.

Wir erstellen eine einfache Klasse für die Arbeit mit der CoMagic-API. Alle notwendigen Voraussetzungen werden am Ende im Link zu GitHub angegeben.

import json import requests import random import pandas as pd class ComagicClient: """     CoMagic""" def __init__(self, login, password): """     CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """     CoMagic.      API   .    JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7) #  payload = self.payload_.copy() payload["method"] = method payload["params"] = params payload["id"] = id_ self.r = requests.post(self.base_url, data=json.dumps(payload)) self.last_response = json.loads(self.r.text) return self.last_response def get_token(self, login, password): """   .       .  .""" method = "login.user" params = {"login":self.login, "password":self.password} response = self.base_request(method, params) token = response['result']['data']['access_token'] return token def get_report_per_page(self, method, params): """  .      10000 .    .     110000 .     JSON-like .""" response = self.base_request(method, params) print(f"""  c {params["date_from"]}  {params["date_till"]}.  = {params["offset"]}""") result = response['result']['data'] if len(result) < 10000: return result else: params['offset'] += 10000 add_result = self.get_report_per_page(method, params) return result + add_result def get_basic_report(self, method, fields, date_from, date_till, filter=None, offset=0): """   .       method  fields.       .       ,   ,       . method -- <string>   date_from -- <string>  .  "YYYY-MM-DD hh:mm:ss" date_till -- <string>  .  "YYYY-MM-DD hh:mm:ss" fields -- <list>,    filter [] - <dict>  offset [] -- <int>  return -- <list>  """ params = {"access_token":self.token, "limit":10000, "date_from":date_from, "date_till":date_till, "fields": fields, "offset": offset} if filter: params['filter'] = filter report = self.get_report_per_page(method, params) return report 

Nun müssen Sie bestimmen, welche Art von Daten benötigt werden. Die Daten müssen verarbeitet und sichtbar gemacht werden, damit sie in BigQuery geladen werden können.

Erstellen Sie eine Hilfsklasse und definieren Sie die von CoMagic empfangenen Daten.

 class ComagicHandler(ComagicClient): """    ,   CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """        .           .    Pandas DataFrame.      .      Connector    .    .    .  Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes'] #   CoMagic calls_data = self.get_basic_report(method, fields, date_from, date_till) # DataFrame df = pd.DataFrame(calls_data) #    .    . df[self.time_partition_field] = pd.to_datetime(df.start_time).apply(lambda x: x.date()) #  tags,   BigQuery       ,  # CoMagic.    . df['tags'] = df.tags.apply(lambda x: x if x == None else [i['tag_name'] for i in x]) return df 

Senden von Daten an BigQuery


Nachdem die Daten von CoMagic empfangen und konvertiert wurden, müssen Sie sie an BigQuery senden.

 from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """     BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """       `myproject.mydataset.mytable`.  ,   Application Default Credentials,           .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path: #      from google.oauth2 import service_account self.credentials = service_account.Credentials.from_service_account_file( service_account_file_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],) self.bq_client = bigquery.Client(credentials = self.credentials, project = self.project_id) else: self.bq_client = bigquery.Client() self.dataset = self.bq_client.get_dataset(self.dataset_id) self.location = self.dataset.location self.table_ref = self.dataset.table(self.table_id) def get_last_update(self): """        Pandas datetime.      False.""" try: self.bq_client.get_table(self.full_table_id) except NotFound as error: return False query = f"""SELECT MAX({self.time_partition_field}) as last_call FROM `{self.full_table_id}`""" result = self.bq_client.query(query,location=self.location).to_dataframe() date = pd.to_datetime(result.iloc[0,0]).date() return date def insert_dataframe(self, dataframe): """      BigQuery.     Pandas DataFrame.    ,       .""" job_config = bigquery.LoadJobConfig() #     job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': self.time_partition_field} result = self.bq_client.load_table_from_dataframe(dataframe, self.table_ref, job_config=job_config).result() return result 

Bestimmen Sie die Logik zum Aktualisieren von Daten


Da die Anzahl der von CoMagic empfangenen Datenzeilen begrenzt ist, muss die Anzahl der angeforderten Daten begrenzt werden. Wir werden den Anforderungszeitraum begrenzen. Dazu benötigen Sie eine Hilfsfunktion, die einen großen Zeitraum in Segmente einer bestimmten Länge aufteilt.

 def interval_split(array, interval): """      .   ,      2,    -   ,     -    . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals 

Dies ist beim erstmaligen Laden von Daten erforderlich, wenn Sie Daten über einen längeren Zeitraum herunterladen müssen. Die Periode ist in mehrere kleine Perioden unterteilt. Übrigens ist es besser, auf die Cloud-Funktion zu verzichten, da sie zeitlich begrenzt ist. Nun, oder optional können Sie die Funktion viele, viele Male ausführen.

Wir erstellen eine Connector-Klasse, um die BigQuery-Tabelle zu verknüpfen, in der die Daten und die Daten von CoMagic gespeichert werden sollen.

 from helpfunctions import interval_split import pandas as pd class Connector: """      """ time_partition_field = 'PARTITION_DATE' #  -.       def __init__ (self, source, dest): """          """ self.source = source self.dest = dest self.source.time_partition_field = self.time_partition_field self.dest.time_partition_field = self.time_partition_field def insert_data_in_dest(self, start_date, end_date): """      .          ,     .""" dates = pd.date_range(start_date, end_date) week_intervals = interval_split(dates, 7) #     7  for week_interval in week_intervals: date_from = week_interval[0].strftime("%Y-%m-%d") + " 00:00:00" date_till = week_interval[1].strftime("%Y-%m-%d") + " 23:59:59" calls_df = self.source.get_calls_report(date_from, date_till) self.dest.insert_dataframe(calls_df) print (f"  {date_from}  {date_till}   ") return True def update_dest_data(self): #     BigQuery last_date = self.dest.get_last_update() if not last_date: #    last_date = self.source.day_before_first_call yesterday = pd.Timestamp.today(tz='Europe/Moscow').date() - pd.Timedelta(days=1) if last_date == yesterday: print("  ") else: last_date = last_date + pd.Timedelta(days=1) self.insert_data_in_dest(last_date, yesterday) return True 

Als nächstes schreiben wir die Hauptfunktion für die Aktualisierung der Daten vor, die nach einem Zeitplan gestartet wird.

 from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """    event, context  : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python""" #import base64 #pubsub_message = base64.b64decode(event['data']).decode('utf-8') # c      comagic_handler = ComagicHandler(COMAGIC_LOGIN, COMAGIC_PASSWORD, FIRST_CALL_DATE) bq_handelr = BQTableHanler(full_table_id, google_credintials_key_path) #  connector = Connector(comagic_handler, bq_handelr) #     connector.update_dest_data() 

Konfigurieren Sie Google Cloud Platform


Wir sammeln alle Dateien in einem ZIP-Archiv. In der Datei credfile.py geben wir den CoMagic-Benutzernamen und das Kennwort ein, um das Token zu erhalten, sowie den vollständigen Namen der Tabelle in BigQuery und den Pfad zur Dienstkontodatei, wenn das Skript vom lokalen Computer ausgeführt wird.

Erstellen Sie eine Cloud-Funktion


  • Gehen Sie zur Konsole
  • Wenn noch keine Funktion erstellt wurde, klicken Sie auf "Funktion erstellen"
  • Wählen Sie im Triggerfeld PUB / SUB
  • Erstellen Sie ein neues Thema für PUB / SUB. Zum Beispiel 'update_calls'
  • Quelle: ZIP-Upload (lokale ZIP-Datei)
  • Umgebung: Python 3.7
  • Laden Sie die ZIP-Datei herunter
  • Auswählen eines temporären Cloud-Speichersegments
  • In das Feld `called function` schreiben wir 'main'
  • Zugewiesener Speicher: Optional



Scheduler und PUB / SUB konfigurieren


Im letzten Schritt haben wir den "update_calls" -Trigger erstellt. Dieses automatische Thema wurde in der Themenliste angezeigt .

Jetzt müssen Sie mit Cloud Scheduler den Trigger konfigurieren. wenn es ausgelöst wird und GCF gestartet wird.

  • Gehen Sie zur Konsole
  • Stellen Sie im Frequenzfeld im CRON-Format ein, wann der Trigger ausgelöst werden soll und die Funktion startet.
  • Ziel: Pub / Sub
  • Betreff: Registrieren Sie das Thema, das beim Erstellen der Funktion angegeben wurde: "update_calls"
  • Payloads * (Payloads) - Dies sind die Informationen, die an Pub / Sub und an die Hauptfunktion übertragen werden



Jetzt wird das Skript täglich um 01:00 Uhr gestartet und die Anrufdaten werden am Ende des vorherigen Tages aktualisiert.

Verlinken Sie zu GitHub, um es auf dem lokalen Computer auszuführen
GitHub Link zur ZIP-Datei

Source: https://habr.com/ru/post/de475804/


All Articles