Impor laporan panggilan terjadwal dari CoMagic ke BigQuery pada jadwal menggunakan Google Cloud Functions

Untuk apa


Dengan struktur kompleks kampanye iklan dan sejumlah besar panggilan, alat tambahan untuk menyimpan, memproses, dan menganalisis informasi tentang panggilan masuk menjadi penting. Seringkali Anda membutuhkan akses cepat ke data dalam jangka waktu yang lama. Terkadang Anda membutuhkan pemrosesan data yang kompleks, menghubungkan panggilan ke saluran atau kampanye tertentu.

Salah satu opsi untuk mempercepat pekerjaan, yang juga memberikan manfaat tambahan, adalah mengimpor panggilan dari CoMagic ke Google BigQuery. Banyak yang telah ditulis tentang manfaat BigQuery, jadi mari kita beralih ke penciptaan.

Untuk membuat impor otomatis, Anda perlu:

  1. Akun Google (jika belum) dengan proyek yang dibuat
  2. Pengetahuan python
  3. Memperkenalkan Dokumentasi Google Cloud

Cara membuat proyek dijelaskan di sini . Setelah proyek dibuat, Anda perlu membuat dataset di BigQuery. Dokumentasi dan instruksi BQ untuk membuat dataset .

Mengambil data dari CoMagic


Beralih ke dokumentasi CoMagic. Untuk mendapatkan daftar panggilan atau panggilan, kami membutuhkan bagian laporan.

Kami membuat kelas sederhana untuk bekerja dengan CoMagic API. Semua persyaratan yang diperlukan akan ditunjukkan di akhir tautan ke GitHub.

import json import requests import random import pandas as pd class ComagicClient: """     CoMagic""" def __init__(self, login, password): """     CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """     CoMagic.      API   .    JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7) #  payload = self.payload_.copy() payload["method"] = method payload["params"] = params payload["id"] = id_ self.r = requests.post(self.base_url, data=json.dumps(payload)) self.last_response = json.loads(self.r.text) return self.last_response def get_token(self, login, password): """   .       .  .""" method = "login.user" params = {"login":self.login, "password":self.password} response = self.base_request(method, params) token = response['result']['data']['access_token'] return token def get_report_per_page(self, method, params): """  .      10000 .    .     110000 .     JSON-like .""" response = self.base_request(method, params) print(f"""  c {params["date_from"]}  {params["date_till"]}.  = {params["offset"]}""") result = response['result']['data'] if len(result) < 10000: return result else: params['offset'] += 10000 add_result = self.get_report_per_page(method, params) return result + add_result def get_basic_report(self, method, fields, date_from, date_till, filter=None, offset=0): """   .       method  fields.       .       ,   ,       . method -- <string>   date_from -- <string>  .  "YYYY-MM-DD hh:mm:ss" date_till -- <string>  .  "YYYY-MM-DD hh:mm:ss" fields -- <list>,    filter [] - <dict>  offset [] -- <int>  return -- <list>  """ params = {"access_token":self.token, "limit":10000, "date_from":date_from, "date_till":date_till, "fields": fields, "offset": offset} if filter: params['filter'] = filter report = self.get_report_per_page(method, params) return report 

Sekarang Anda perlu menentukan jenis data apa yang dibutuhkan. Data perlu diproses dan dibuat terlihat sehingga dapat dimuat ke BigQuery.

Buat kelas pembantu dan tentukan data yang diterima dari CoMagic.

 class ComagicHandler(ComagicClient): """    ,   CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """        .           .    Pandas DataFrame.      .      Connector    .    .    .  Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes'] #   CoMagic calls_data = self.get_basic_report(method, fields, date_from, date_till) # DataFrame df = pd.DataFrame(calls_data) #    .    . df[self.time_partition_field] = pd.to_datetime(df.start_time).apply(lambda x: x.date()) #  tags,   BigQuery       ,  # CoMagic.    . df['tags'] = df.tags.apply(lambda x: x if x == None else [i['tag_name'] for i in x]) return df 

Mengirim data ke BigQuery


Setelah data dari CoMagic diterima dan dikonversi, Anda harus mengirimkannya ke BigQuery.

 from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """     BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """       `myproject.mydataset.mytable`.  ,   Application Default Credentials,           .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path: #      from google.oauth2 import service_account self.credentials = service_account.Credentials.from_service_account_file( service_account_file_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],) self.bq_client = bigquery.Client(credentials = self.credentials, project = self.project_id) else: self.bq_client = bigquery.Client() self.dataset = self.bq_client.get_dataset(self.dataset_id) self.location = self.dataset.location self.table_ref = self.dataset.table(self.table_id) def get_last_update(self): """        Pandas datetime.      False.""" try: self.bq_client.get_table(self.full_table_id) except NotFound as error: return False query = f"""SELECT MAX({self.time_partition_field}) as last_call FROM `{self.full_table_id}`""" result = self.bq_client.query(query,location=self.location).to_dataframe() date = pd.to_datetime(result.iloc[0,0]).date() return date def insert_dataframe(self, dataframe): """      BigQuery.     Pandas DataFrame.    ,       .""" job_config = bigquery.LoadJobConfig() #     job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': self.time_partition_field} result = self.bq_client.load_table_from_dataframe(dataframe, self.table_ref, job_config=job_config).result() return result 

Tentukan logika untuk memperbarui data


Karena ada batasan jumlah baris data yang diterima dari CoMagic, perlu untuk membatasi jumlah data yang diminta. Kami akan membatasi periode permintaan. Untuk melakukan ini, Anda memerlukan fungsi bantu yang akan membagi periode waktu besar menjadi segmen-segmen dengan panjang yang ditentukan.

 def interval_split(array, interval): """      .   ,      2,    -   ,     -    . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals 

Ini diperlukan saat memuat data untuk pertama kalinya, saat Anda perlu mengunduh data untuk jangka waktu yang lama. Periode ini dibagi menjadi beberapa periode kecil. Ngomong-ngomong, lebih baik melakukan ini tanpa menggunakan Fungsi Cloud, karena mereka memiliki batas waktu. Baik, atau, sebagai opsi, Anda dapat menjalankan fungsinya berkali-kali.

Kami membuat kelas konektor untuk menautkan tabel BigQuery tempat kami ingin menyimpan data dan data dari CoMagic.

 from helpfunctions import interval_split import pandas as pd class Connector: """      """ time_partition_field = 'PARTITION_DATE' #  -.       def __init__ (self, source, dest): """          """ self.source = source self.dest = dest self.source.time_partition_field = self.time_partition_field self.dest.time_partition_field = self.time_partition_field def insert_data_in_dest(self, start_date, end_date): """      .          ,     .""" dates = pd.date_range(start_date, end_date) week_intervals = interval_split(dates, 7) #     7  for week_interval in week_intervals: date_from = week_interval[0].strftime("%Y-%m-%d") + " 00:00:00" date_till = week_interval[1].strftime("%Y-%m-%d") + " 23:59:59" calls_df = self.source.get_calls_report(date_from, date_till) self.dest.insert_dataframe(calls_df) print (f"  {date_from}  {date_till}   ") return True def update_dest_data(self): #     BigQuery last_date = self.dest.get_last_update() if not last_date: #    last_date = self.source.day_before_first_call yesterday = pd.Timestamp.today(tz='Europe/Moscow').date() - pd.Timedelta(days=1) if last_date == yesterday: print("  ") else: last_date = last_date + pd.Timedelta(days=1) self.insert_data_in_dest(last_date, yesterday) return True 

Selanjutnya, kami meresepkan fungsi utama untuk memperbarui data, yang akan diluncurkan sesuai jadwal.

 from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """    event, context  : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python""" #import base64 #pubsub_message = base64.b64decode(event['data']).decode('utf-8') # c      comagic_handler = ComagicHandler(COMAGIC_LOGIN, COMAGIC_PASSWORD, FIRST_CALL_DATE) bq_handelr = BQTableHanler(full_table_id, google_credintials_key_path) #  connector = Connector(comagic_handler, bq_handelr) #     connector.update_dest_data() 

Konfigurasikan Google Cloud Platform


Kami mengumpulkan semua file dalam arsip ZIP. Di file credfile.py, kami memasukkan login dan kata sandi CoMagic untuk menerima token, serta nama lengkap tabel di BigQuery dan jalur ke file akun layanan jika skrip diluncurkan dari mesin lokal.

Buat Fungsi Cloud


  • Pergi ke konsol
  • Jika belum ada fungsi yang dibuat, klik "Buat fungsi"
  • Di bidang pemicu, pilih PUB / SUB
  • Buat tema baru untuk PUB / SUB. Misalnya, 'update_calls'
  • Sumber: Unggahan ZIP (file ZIP lokal)
  • Lingkungan: Python 3.7
  • Unduh file zip
  • Memilih segmen sementara Penyimpanan Cloud
  • Di bidang `disebut fungsi` kita menulis 'utama'
  • Memori yang dialokasikan: opsional



Mengkonfigurasi Penjadwal dan PUB / SUB


Pada langkah terakhir, kami membuat pemicu `update_calls`. Topik otomatis ini telah muncul dalam daftar topik .

Sekarang, dengan Cloud Scheduler Anda perlu mengonfigurasi pelatuk. kapan akan menyala dan GCF akan mulai.

  • Pergi ke konsol
  • Di bidang frekuensi dalam format CRON, atur kapan pelatuk akan diaktifkan dan fungsi dimulai.
  • Tujuan: Pub / Sub
  • Subjek: daftarkan tema yang ditentukan saat membuat fungsi: "update_calls"
  • Payloads * (Payloads) - ini adalah informasi yang akan ditransfer ke Pub / Sub dan ke fungsi utama



Sekarang skrip akan diluncurkan setiap hari pada pukul 01:00 dan data panggilan akan diperbarui pada akhir hari sebelumnya.

Tautan ke GitHub untuk dijalankan dari komputer lokal
GitHub Tautan ke File ZIP

Source: https://habr.com/ru/post/id475804/


All Articles