استيراد تقارير المكالمات المجدولة من CoMagic إلى BigQuery وفق جدول زمني باستخدام وظائف Google Cloud

من اجل ماذا


بفضل البنية المعقدة للحملات الإعلانية وعدد كبير من المكالمات ، أصبحت الأدوات الإضافية لتخزين ومعالجة وتحليل المعلومات حول المكالمات الواردة ضرورية. غالبًا ما تحتاج إلى الوصول السريع إلى البيانات على مدار فترة زمنية طويلة. في بعض الأحيان تحتاج إلى معالجة معقدة للبيانات ، وربط المكالمات بقناة أو حملة محددة.

أحد الخيارات لتسريع العمل ، والذي يوفر أيضًا مزايا إضافية ، هو استيراد المكالمات من CoMagic إلى Google BigQuery. لقد كتب الكثير عن فوائد BigQuery ، لذلك دعنا ننتقل إلى الخلق.

لإنشاء استيراد تلقائي ، ستحتاج إلى:

  1. حساب Google (إذا لم يكن بالفعل) مع المشروع الذي تم إنشاؤه
  2. بيثون المعرفة
  3. تقديم Google Cloud Documentation

كيفية إنشاء مشروع موصوفة هنا . بعد إنشاء المشروع ، تحتاج إلى إنشاء مجموعة بيانات في BigQuery. وثائق BQ والتعليمات لإنشاء مجموعة البيانات .

استرداد البيانات من CoMagic


أنتقل إلى وثائق CoMagic. للحصول على قائمة بالمكالمات أو المكالمات ، نحتاج إلى قسم التقارير.

نخلق فئة بسيطة للعمل مع API CoMagic. سيتم الإشارة إلى جميع المتطلبات اللازمة في النهاية في الرابط إلى جيثب.

import json import requests import random import pandas as pd class ComagicClient: """     CoMagic""" def __init__(self, login, password): """     CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """     CoMagic.      API   .    JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7) #  payload = self.payload_.copy() payload["method"] = method payload["params"] = params payload["id"] = id_ self.r = requests.post(self.base_url, data=json.dumps(payload)) self.last_response = json.loads(self.r.text) return self.last_response def get_token(self, login, password): """   .       .  .""" method = "login.user" params = {"login":self.login, "password":self.password} response = self.base_request(method, params) token = response['result']['data']['access_token'] return token def get_report_per_page(self, method, params): """  .      10000 .    .     110000 .     JSON-like .""" response = self.base_request(method, params) print(f"""  c {params["date_from"]}  {params["date_till"]}.  = {params["offset"]}""") result = response['result']['data'] if len(result) < 10000: return result else: params['offset'] += 10000 add_result = self.get_report_per_page(method, params) return result + add_result def get_basic_report(self, method, fields, date_from, date_till, filter=None, offset=0): """   .       method  fields.       .       ,   ,       . method -- <string>   date_from -- <string>  .  "YYYY-MM-DD hh:mm:ss" date_till -- <string>  .  "YYYY-MM-DD hh:mm:ss" fields -- <list>,    filter [] - <dict>  offset [] -- <int>  return -- <list>  """ params = {"access_token":self.token, "limit":10000, "date_from":date_from, "date_till":date_till, "fields": fields, "offset": offset} if filter: params['filter'] = filter report = self.get_report_per_page(method, params) return report 

تحتاج الآن إلى تحديد نوع البيانات المطلوبة. يجب معالجة البيانات وجعلها مرئية بحيث يمكن تحميلها في BigQuery.

إنشاء فئة المساعد وتحديد البيانات الواردة من CoMagic.

 class ComagicHandler(ComagicClient): """    ,   CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """        .           .    Pandas DataFrame.      .      Connector    .    .    .  Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes'] #   CoMagic calls_data = self.get_basic_report(method, fields, date_from, date_till) # DataFrame df = pd.DataFrame(calls_data) #    .    . df[self.time_partition_field] = pd.to_datetime(df.start_time).apply(lambda x: x.date()) #  tags,   BigQuery       ,  # CoMagic.    . df['tags'] = df.tags.apply(lambda x: x if x == None else [i['tag_name'] for i in x]) return df 

إرسال البيانات إلى BigQuery


بعد تلقي البيانات من CoMagic وتحويلها ، تحتاج إلى إرسالها إلى BigQuery.

 from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """     BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """       `myproject.mydataset.mytable`.  ,   Application Default Credentials,           .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path: #      from google.oauth2 import service_account self.credentials = service_account.Credentials.from_service_account_file( service_account_file_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],) self.bq_client = bigquery.Client(credentials = self.credentials, project = self.project_id) else: self.bq_client = bigquery.Client() self.dataset = self.bq_client.get_dataset(self.dataset_id) self.location = self.dataset.location self.table_ref = self.dataset.table(self.table_id) def get_last_update(self): """        Pandas datetime.      False.""" try: self.bq_client.get_table(self.full_table_id) except NotFound as error: return False query = f"""SELECT MAX({self.time_partition_field}) as last_call FROM `{self.full_table_id}`""" result = self.bq_client.query(query,location=self.location).to_dataframe() date = pd.to_datetime(result.iloc[0,0]).date() return date def insert_dataframe(self, dataframe): """      BigQuery.     Pandas DataFrame.    ,       .""" job_config = bigquery.LoadJobConfig() #     job_config._properties['load']['timePartitioning'] = {'type': 'DAY', 'field': self.time_partition_field} result = self.bq_client.load_table_from_dataframe(dataframe, self.table_ref, job_config=job_config).result() return result 

تحديد المنطق لتحديث البيانات


نظرًا لأن هناك حدًا لعدد صفوف البيانات التي يتم تلقيها من CoMagic ، فمن الضروري الحد من عدد البيانات المطلوبة. سنحد من فترة الطلب. للقيام بذلك ، تحتاج إلى وظيفة مساعدة تقسم فترة كبيرة من الوقت إلى مقاطع بطول محدد.

 def interval_split(array, interval): """      .   ,      2,    -   ,     -    . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals 

يعد ذلك ضروريًا عند تحميل البيانات لأول مرة ، عندما تحتاج إلى تنزيل البيانات لفترة طويلة من الزمن. وتنقسم هذه الفترة إلى عدة فترات صغيرة. بالمناسبة ، من الأفضل القيام بذلك دون استخدام وظيفة السحاب ، نظرًا لأن لها مهلة زمنية. حسنًا ، أو كخيار ، يمكنك تشغيل الوظيفة عدة مرات.

نقوم بإنشاء فئة رابط لربط جدول BigQuery حيث نريد تخزين البيانات والبيانات من CoMagic.

 from helpfunctions import interval_split import pandas as pd class Connector: """      """ time_partition_field = 'PARTITION_DATE' #  -.       def __init__ (self, source, dest): """          """ self.source = source self.dest = dest self.source.time_partition_field = self.time_partition_field self.dest.time_partition_field = self.time_partition_field def insert_data_in_dest(self, start_date, end_date): """      .          ,     .""" dates = pd.date_range(start_date, end_date) week_intervals = interval_split(dates, 7) #     7  for week_interval in week_intervals: date_from = week_interval[0].strftime("%Y-%m-%d") + " 00:00:00" date_till = week_interval[1].strftime("%Y-%m-%d") + " 23:59:59" calls_df = self.source.get_calls_report(date_from, date_till) self.dest.insert_dataframe(calls_df) print (f"  {date_from}  {date_till}   ") return True def update_dest_data(self): #     BigQuery last_date = self.dest.get_last_update() if not last_date: #    last_date = self.source.day_before_first_call yesterday = pd.Timestamp.today(tz='Europe/Moscow').date() - pd.Timedelta(days=1) if last_date == yesterday: print("  ") else: last_date = last_date + pd.Timedelta(days=1) self.insert_data_in_dest(last_date, yesterday) return True 

بعد ذلك ، نصف الوظيفة الرئيسية لتحديث البيانات ، والتي سيتم إطلاقها وفقًا لجدول زمني.

 from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """    event, context  : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python""" #import base64 #pubsub_message = base64.b64decode(event['data']).decode('utf-8') # c      comagic_handler = ComagicHandler(COMAGIC_LOGIN, COMAGIC_PASSWORD, FIRST_CALL_DATE) bq_handelr = BQTableHanler(full_table_id, google_credintials_key_path) #  connector = Connector(comagic_handler, bq_handelr) #     connector.update_dest_data() 

تكوين Google Cloud Platform


نحن نجمع كل الملفات في أرشيف ZIP. في ملف credfile.py ، ندخل تسجيل الدخول وكلمة المرور CoMagic لاستلام الرمز المميز ، وكذلك الاسم الكامل للجدول في BigQuery والمسار إلى ملف حساب الخدمة إذا تم تشغيل البرنامج النصي من الجهاز المحلي.

إنشاء وظيفة سحابة


  • انتقل إلى وحدة التحكم
  • إذا لم يتم إنشاء أي وظيفة حتى الآن ، فانقر فوق "إنشاء وظيفة"
  • في حقل المشغل ، حدد PUB / SUB
  • إنشاء سمة جديدة ل PUB / SUB. على سبيل المثال ، "update_calls"
  • المصدر: تحميل ZIP (ملف ZIP محلي)
  • البيئة: بيثون 3.7
  • قم بتنزيل الملف المضغوط
  • اختيار شريحة مؤقتة من Cloud Storage
  • في الحقل "وظيفة تسمى" نكتب "الرئيسي"
  • الذاكرة المخصصة: اختياري



تكوين جدولة و PUB / SUB


في الخطوة الأخيرة ، أنشأنا المشغل `update_calls`. لقد ظهر هذا الموضوع التلقائي في قائمة المواضيع .

الآن ، مع Cloud Scheduler ، تحتاج إلى تكوين المشغل. عندما سوف تطلق وسوف تبدأ GCF.

  • انتقل إلى وحدة التحكم
  • في مجال التردد بتنسيق CRON ، حدد متى يجب إطلاق المشغل وبدء تشغيل الوظيفة.
  • الوجهة: حانة / Sub
  • الموضوع: سجل السمة التي تم تحديدها عند إنشاء الوظيفة: "update_calls"
  • Payloads * (Payloads) - هذه هي المعلومات التي سيتم نقلها إلى Pub / Sub وإلى الوظيفة الرئيسية



سيتم الآن تشغيل البرنامج النصي يوميًا في الساعة 01:00 وسيتم تحديث بيانات الاتصال في نهاية اليوم السابق.

اربط بـ GitHub للتشغيل من الكمبيوتر المحلي
جيثب رابط لملف مضغوط

Source: https://habr.com/ru/post/ar475804/


All Articles