من اجل ماذا
بفضل البنية المعقدة للحملات الإعلانية وعدد كبير من المكالمات ، أصبحت الأدوات الإضافية لتخزين ومعالجة وتحليل المعلومات حول المكالمات الواردة ضرورية. غالبًا ما تحتاج إلى الوصول السريع إلى البيانات على مدار فترة زمنية طويلة. في بعض الأحيان تحتاج إلى معالجة معقدة للبيانات ، وربط المكالمات بقناة أو حملة محددة.
أحد الخيارات لتسريع العمل ، والذي يوفر أيضًا مزايا إضافية ، هو استيراد المكالمات من CoMagic إلى Google BigQuery. لقد كتب الكثير عن فوائد BigQuery ، لذلك دعنا ننتقل إلى الخلق.
لإنشاء استيراد تلقائي ، ستحتاج إلى:
- حساب Google (إذا لم يكن بالفعل) مع المشروع الذي تم إنشاؤه
- بيثون المعرفة
- تقديم Google Cloud Documentation
كيفية إنشاء مشروع موصوفة
هنا . بعد إنشاء المشروع ، تحتاج إلى إنشاء مجموعة بيانات في BigQuery.
وثائق BQ والتعليمات لإنشاء مجموعة البيانات .
استرداد البيانات من CoMagic
أنتقل إلى
وثائق CoMagic. للحصول على قائمة بالمكالمات أو المكالمات ، نحتاج إلى قسم التقارير.
نخلق فئة بسيطة للعمل مع API CoMagic. سيتم الإشارة إلى جميع المتطلبات اللازمة في النهاية في الرابط إلى جيثب.
import json import requests import random import pandas as pd class ComagicClient: """ CoMagic""" def __init__(self, login, password): """ CoMagic""" self.login = login self.password = password self.base_url = 'https://dataapi.comagic.ru/v2.0' self.payload_ = {"jsonrpc":"2.0", "id":1, "method":None, "params": None} self.token = self.get_token(self.login, self.password) def base_request(self, method, params): """ CoMagic. API . JSON-like . : https://www.comagic.ru/support/api/data-api/""" id_ = random.randrange(10**7)
تحتاج الآن إلى تحديد نوع البيانات المطلوبة. يجب معالجة البيانات وجعلها مرئية بحيث يمكن تحميلها في BigQuery.
إنشاء فئة المساعد وتحديد البيانات الواردة من CoMagic.
class ComagicHandler(ComagicClient): """ , CoMagic""" time_partition_field = 'PARTITION_DATE' def __init__(self, login, password, first_call_date): self.day_before_first_call = pd.to_datetime(first_call_date) - pd.Timedelta(days=1) super().__init__(login, password) def get_calls_report(self, date_from, date_till): """ . . Pandas DataFrame. . Connector . . . Pnadas.DataFrame""" method = "get.calls_report" fields = ['id', 'visitor_id', 'person_id', 'start_time', 'finish_reason', 'is_lost', 'tags', 'campaign_name','communication_number', 'contact_phone_number', 'talk_duration', 'clean_talk_duration', 'virtual_phone_number', 'ua_client_id', 'ym_client_id', 'entrance_page', 'gclid', 'yclid', 'visitor_type', 'visits_count', 'visitor_first_campaign_name', 'visitor_device', 'site_domain_name','utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'eq_utm_source', 'eq_utm_medium', 'eq_utm_campaign', 'attributes']
إرسال البيانات إلى BigQuery
بعد تلقي البيانات من CoMagic وتحويلها ، تحتاج إلى إرسالها إلى BigQuery.
from google.cloud import bigquery from google.cloud.exceptions import NotFound import pandas as pd class BQTableHanler: """ BigQuery""" time_partition_field = 'PARTITION_DATE' def __init__(self, full_table_id, service_account_file_key_path = None): """ `myproject.mydataset.mytable`. , Application Default Credentials, .""" self.full_table_id = full_table_id project_id, dataset_id, table_id = full_table_id.split(".") self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id if service_account_file_key_path:
تحديد المنطق لتحديث البيانات
نظرًا لأن هناك حدًا لعدد صفوف البيانات التي يتم تلقيها من CoMagic ، فمن الضروري الحد من عدد البيانات المطلوبة. سنحد من فترة الطلب. للقيام بذلك ، تحتاج إلى وظيفة مساعدة تقسم فترة كبيرة من الوقت إلى مقاطع بطول محدد.
def interval_split(array, interval): """ . , 2, - , - . : get_intervals([1,2,3,4,5,6,7], 3) => [[1,3], [4,6], [7]] get_intervals([1,2,3], 4) => [[1,3]]""" intervals = [] iw, i = 0, 0 l = len(array) for v in array: if i==0 or (i)%interval==0: intervals.append([v]) if (i+1)%interval == 0 or (i+1) == l: intervals[iw].append(v) iw+=1 i+=1 return intervals
يعد ذلك ضروريًا عند تحميل البيانات لأول مرة ، عندما تحتاج إلى تنزيل البيانات لفترة طويلة من الزمن. وتنقسم هذه الفترة إلى عدة فترات صغيرة. بالمناسبة ، من الأفضل القيام بذلك دون استخدام وظيفة السحاب ، نظرًا لأن لها مهلة زمنية. حسنًا ، أو كخيار ، يمكنك تشغيل الوظيفة عدة مرات.
نقوم بإنشاء فئة رابط لربط جدول BigQuery حيث نريد تخزين البيانات والبيانات من CoMagic.
from helpfunctions import interval_split import pandas as pd class Connector: """ """ time_partition_field = 'PARTITION_DATE'
بعد ذلك ، نصف الوظيفة الرئيسية لتحديث البيانات ، والتي سيتم إطلاقها وفقًا لجدول زمني.
from connector import Connector from bqhandler import BQTableHanler from comagichandler import ComagicHandler from credfile import * def main(event, context): """ event, context : https://cloud.google.com/functions/docs/writing/background#functions-writing-background-hello-pubsub-python"""
تكوين Google Cloud Platform
نحن نجمع كل الملفات في أرشيف ZIP. في ملف credfile.py ، ندخل تسجيل الدخول وكلمة المرور CoMagic لاستلام الرمز المميز ، وكذلك الاسم الكامل للجدول في BigQuery والمسار إلى ملف حساب الخدمة إذا تم تشغيل البرنامج النصي من الجهاز المحلي.
إنشاء وظيفة سحابة
- انتقل إلى وحدة التحكم
- إذا لم يتم إنشاء أي وظيفة حتى الآن ، فانقر فوق "إنشاء وظيفة"
- في حقل المشغل ، حدد PUB / SUB
- إنشاء سمة جديدة ل PUB / SUB. على سبيل المثال ، "update_calls"
- المصدر: تحميل ZIP (ملف ZIP محلي)
- البيئة: بيثون 3.7
- قم بتنزيل الملف المضغوط
- اختيار شريحة مؤقتة من Cloud Storage
- في الحقل "وظيفة تسمى" نكتب "الرئيسي"
- الذاكرة المخصصة: اختياري

تكوين جدولة و PUB / SUB
في الخطوة الأخيرة ، أنشأنا المشغل `update_calls`. لقد ظهر هذا الموضوع التلقائي في
قائمة المواضيع .

الآن ، مع Cloud Scheduler ، تحتاج إلى تكوين المشغل. عندما سوف تطلق وسوف تبدأ GCF.
- انتقل إلى وحدة التحكم
- في مجال التردد بتنسيق CRON ، حدد متى يجب إطلاق المشغل وبدء تشغيل الوظيفة.
- الوجهة: حانة / Sub
- الموضوع: سجل السمة التي تم تحديدها عند إنشاء الوظيفة: "update_calls"
- Payloads * (Payloads) - هذه هي المعلومات التي سيتم نقلها إلى Pub / Sub وإلى الوظيفة الرئيسية

سيتم الآن تشغيل البرنامج النصي يوميًا في الساعة 01:00 وسيتم تحديث بيانات الاتصال في نهاية اليوم السابق.
اربط بـ GitHub للتشغيل من الكمبيوتر المحلي
جيثب
رابط لملف مضغوط