Importe relatórios de chamadas agendadas do CoMagic para o BigQuery em uma programação usando o Google Cloud Functions

Para que


Com a estrutura complexa de campanhas publicitárias e um grande número de chamadas, são necessárias ferramentas adicionais para armazenar, processar e analisar informações sobre as chamadas recebidas. Geralmente, você precisa de acesso rápido aos dados por um longo período de tempo. Às vezes, você precisa de processamento de dados complexo, correlacionando chamadas para um canal ou campanha específica.

Uma das opções para acelerar o trabalho, que também oferece benefícios adicionais, é importar chamadas do CoMagic para o Google BigQuery. Muito foi escrito sobre os benefícios do BigQuery, então vamos à criação.

Para criar uma importação automática, você precisará de:

  1. Conta do Google (se ainda não estiver) com o projeto criado
  2. Conhecimento em Python
  3. Apresentando a documentação do Google Cloud

Como criar um projeto é descrito aqui . Após a criação do projeto, você precisa criar um conjunto de dados no BigQuery. Documentação e instruções da BQ para criar um conjunto de dados .

Recuperando dados do CoMagic


Voltando à documentação do CoMagic. Para obter uma lista de chamadas, precisamos da seção de relatórios.

Criamos uma classe simples para trabalhar com a API CoMagic. Todos os requisitos necessários serão indicados no final do link para o GitHub.

import json import requests import random import pandas as pd class ComagicClient: """     CoMagic""" def __init__(self, login, password): """     CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """     CoMagic.      API   .    JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7) #  payload = self.payload_.copy() payload["method"] = method payload["params"] = params payload["id"] = id_ self.r = requests.post(self.base_url, data=json.dumps(payload)) self.last_response = json.loads(self.r.text) return self.last_response def get_token(self, login, password): """   .       .  .""" method = "login.user" params = {"login":self.login, "password":self.password} response = self.base_request(method, params) token = response['result']['data']['access_token'] return token def get_report_per_page(self, method, params): """  .      10000 .    .     110000 .     JSON-like .""" response = self.base_request(method, params) print(f"""  c {params["date_from"]}  {params["date_till"]}.  = {params["offset"]}""") result = response['result']['data'] if len(result) < 10000: return result else: params['offset'] += 10000 add_result = self.get_report_per_page(method, params) return result + add_result def get_basic_report(self, method, fields, date_from, date_till, filter=None, offset=0): """   .       method  fields.       .       ,   ,       . method -- <string>   date_from -- <string>  .  "YYYY-MM-DD hh:mm:ss" date_till -- <string>  .  "YYYY-MM-DD hh:mm:ss" fields -- <list>,    filter [] - <dict>  offset [] -- <int>  return -- <list>  """ params = {"access_token":self.token, "limit":10000, "date_from":date_from, "date_till":date_till, "fields": fields, "offset": offset} if filter: params['filter'] = filter report = self.get_report_per_page(method, params) return report 

Agora você precisa determinar que tipo de dados é necessário. Os dados precisam ser processados ​​e tornados visíveis para que possam ser carregados no BigQuery.

Crie uma classe auxiliar e defina os dados recebidos do CoMagic.

 class ComagicHandler(ComagicClient): """    ,   CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """        .           .    Pandas DataFrame.      .      Connector    .    .    .  Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes'] #   CoMagic calls_data = self.get_basic_report(method, fields, date_from, date_till) # DataFrame df = pd.DataFrame(calls_data) #    .    . df[self.time_partition_field] = pd.to_datetime(df.start_time).apply(lambda x: x.date()) #  tags,   BigQuery       ,  # CoMagic.    . df['tags'] = df.tags.apply(lambda x: x if x == None else [i['tag_name'] for i in x]) return df 

Enviando dados para o BigQuery


Depois que os dados do CoMagic são recebidos e convertidos, você precisa enviá-los para o BigQuery.

 from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """     BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """       `myproject.mydataset.mytable`.  ,   Application Default Credentials,           .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path: #      from google.oauth2 import service_account self.credentials = service_account.Credentials.from_service_account_file( service_account_file_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],) self.bq_client = bigquery.Client(credentials = self.credentials, project = self.project_id) else: self.bq_client = bigquery.Client() self.dataset = self.bq_client.get_dataset(self.dataset_id) self.location = self.dataset.location self.table_ref = self.dataset.table(self.table_id) def get_last_update(self): """        Pandas datetime.      False.""" try: self.bq_client.get_table(self.full_table_id) except NotFound as error: return False query = f"""SELECT MAX({self.time_partition_field}) as last_call FROM `{self.full_table_id}`""" result = self.bq_client.query(query,location=self.location).to_dataframe() date = pd.to_datetime(result.iloc[0,0]).date() return date def insert_dataframe(self, dataframe): """      BigQuery.     Pandas DataFrame.    ,       .""" job_config = bigquery.LoadJobConfig() #     job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': self.time_partition_field} result = self.bq_client.load_table_from_dataframe(dataframe, self.table_ref, job_config=job_config).result() return result 

Determinar a lógica para atualizar dados


Como existe um limite no número de linhas de dados recebidas do CoMagic, é necessário limitar o número de dados solicitados. Nós limitaremos o período de solicitação. Para fazer isso, você precisa de uma função auxiliar que divida um grande período de tempo em segmentos de um comprimento especificado.

 def interval_split(array, interval): """      .   ,      2,    -   ,     -    . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals 

Isso é necessário ao carregar dados pela primeira vez, quando você precisa fazer o download de dados por um longo período de tempo. O período é dividido em vários pequenos períodos. A propósito, é melhor fazer isso sem usar a função Cloud, pois eles têm um limite de tempo. Bem, ou, como opção, você pode executar a função muitas e muitas vezes.

Criamos uma classe de conectores para vincular a tabela do BigQuery na qual queremos armazenar os dados e os dados do CoMagic.

 from helpfunctions import interval_split import pandas as pd class Connector: """      """ time_partition_field = 'PARTITION_DATE' #  -.       def __init__ (self, source, dest): """          """ self.source = source self.dest = dest self.source.time_partition_field = self.time_partition_field self.dest.time_partition_field = self.time_partition_field def insert_data_in_dest(self, start_date, end_date): """      .          ,     .""" dates = pd.date_range(start_date, end_date) week_intervals = interval_split(dates, 7) #     7  for week_interval in week_intervals: date_from = week_interval[0].strftime("%Y-%m-%d") + " 00:00:00" date_till = week_interval[1].strftime("%Y-%m-%d") + " 23:59:59" calls_df = self.source.get_calls_report(date_from, date_till) self.dest.insert_dataframe(calls_df) print (f"  {date_from}  {date_till}   ") return True def update_dest_data(self): #     BigQuery last_date = self.dest.get_last_update() if not last_date: #    last_date = self.source.day_before_first_call yesterday = pd.Timestamp.today(tz='Europe/Moscow').date() - pd.Timedelta(days=1) if last_date == yesterday: print("  ") else: last_date = last_date + pd.Timedelta(days=1) self.insert_data_in_dest(last_date, yesterday) return True 

Em seguida, prescrevemos a principal função para atualizar os dados, que serão lançados em uma programação.

 from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """    event, context  : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python""" #import base64 #pubsub_message = base64.b64decode(event['data']).decode('utf-8') # c      comagic_handler = ComagicHandler(COMAGIC_LOGIN, COMAGIC_PASSWORD, FIRST_CALL_DATE) bq_handelr = BQTableHanler(full_table_id, google_credintials_key_path) #  connector = Connector(comagic_handler, bq_handelr) #     connector.update_dest_data() 

Configurar o Google Cloud Platform


Coletamos todos os arquivos em um arquivo ZIP. No arquivo credfile.py, inserimos o login e a senha do CoMagic para receber o token, o nome completo da tabela no BigQuery e o caminho para o arquivo da conta de serviço se o script for iniciado na máquina local.

Criar uma função de nuvem


  • Vá para o console
  • Se nenhuma função foi criada ainda, clique em "Criar função"
  • No campo acionador, selecione PUB / SUB
  • Crie um novo tema para o PUB / SUB. Por exemplo, 'update_calls'
  • Origem: upload ZIP (arquivo ZIP local)
  • Ambiente: Python 3.7
  • Faça o download do arquivo zip
  • Escolhendo um segmento temporário do Cloud Storage
  • No campo `chamado function`, escrevemos 'main'
  • Memória alocada: opcional



Configurando o Scheduler e o PUB / SUB


Na última etapa, criamos o gatilho `update_calls`. Este tópico automático apareceu na lista de tópicos .

Agora, com o Cloud Scheduler, você precisa configurar o gatilho. quando será acionado e o GCF será iniciado.

  • Vá para o console
  • No campo de frequência no formato CRON, defina quando o gatilho deve disparar e a função é iniciada.
  • Destino: Pub / Sub
  • Assunto: registre o tema que foi especificado ao criar a função: “update_calls”
  • Cargas úteis * (Cargas úteis) - são as informações que serão transferidas para Pub / Sub e para a função principal



Agora, o script será iniciado diariamente às 01:00 e os dados da chamada serão atualizados no final do dia anterior.

Link para o GitHub para executar no computador local
Link para o arquivo ZIP do GitHub

Source: https://habr.com/ru/post/pt475804/


All Articles