مرحبا بالجميع. الأصدقاء ، نحن نشارككم ترجمة لمقال تم إعداده خاصة لطلاب
دورة Data Engineer . دعنا نذهب!

Apache Beam و DataFlow لأنابيب الوقت الفعلي
تستند مشاركة اليوم إلى مهمة عملت عليها مؤخرًا في العمل. كنت سعيدًا حقًا بتطبيقه ووصف العمل المنجز في تنسيق منشور المدونة ، لأنه أتاح لي الفرصة للعمل في هندسة البيانات ، وكذلك للقيام بشيء مفيد للغاية لفريقي. منذ وقت ليس ببعيد ، اكتشفت أن أنظمتنا تخزن كمية كبيرة إلى حد ما من سجلات المستخدمين المتعلقة بأحد منتجاتنا للتعامل مع البيانات. اتضح أن أحدا لم يستخدم هذه البيانات ، لذلك أصبحت على الفور مهتمة بما يمكن أن نعرفه إذا ما بدأنا في تحليلها بانتظام. ومع ذلك ، كان هناك العديد من المشاكل على طول الطريق. المشكلة الأولى هي أن البيانات تم تخزينها في العديد من الملفات النصية المختلفة التي لم تكن متاحة للتحليل الفوري. والمشكلة الثانية هي أنه تم تخزينها في نظام مغلق ، لذلك لا يمكنني استخدام أي من أدوات تحليل البيانات المفضلة لدي.
اضطررت إلى تحديد كيفية تسهيل الوصول إلينا وإضافة بعض القيمة على الأقل من خلال تضمين مصدر البيانات هذا في بعض حلول تفاعل المستخدم. بعد التفكير لفترة من الوقت ، قررت إنشاء خط أنابيب لنقل هذه البيانات إلى قاعدة البيانات السحابية حتى يتسنى لي والفريق الوصول إليها والبدء في إنشاء أي استنتاجات. بعد أن أكملت تخصصي في هندسة البيانات في كورسيرا منذ بعض الوقت ، كنت حريصة على استخدام بعض أدوات الدورة التدريبية في المشروع.
لذا يبدو أن وضع البيانات في قاعدة بيانات سحابية طريقة ذكية لحل مشكلتي الأولى ، ولكن ماذا يمكنني أن أفعل مع المشكلة رقم 2؟ لحسن الحظ ، كانت هناك طريقة لنقل هذه البيانات إلى بيئة حيث يمكنني الوصول إلى أدوات مثل Python و Google Cloud Platform (GCP). ومع ذلك ، كانت عملية طويلة ، لذلك كنت بحاجة إلى القيام بشيء من شأنه أن يسمح لي بمواصلة التطوير بينما كنت أنتظر نهاية نقل البيانات. كان الحل الذي توصلت إليه هو إنشاء بيانات مزيفة باستخدام مكتبة
Faker في Python. لم أكن قد استخدمت هذه المكتبة من قبل ، لكنني أدركت بسرعة مدى فائدتها. سمح لي باستخدام هذا النهج ببدء كتابة التعليمات البرمجية واختبار خط الأنابيب بدون بيانات فعلية.
بناءً على ما تقدم ، سوف أخبرك في هذا المنشور كيف أنشأت خط الأنابيب الموصوف أعلاه باستخدام بعض التقنيات المتاحة في برنامج شركاء Google المعتمدون. على وجه الخصوص ،
سأستخدم Apache Beam (إصدار Python) و Dataflow و Pub / Sub و Big Query لجمع سجلات المستخدمين وتحويل البيانات ونقلها إلى قاعدة بيانات لمزيد من التحليل. في حالتي ، كنت بحاجة فقط إلى وظائف الدُفعات لـ Beam ، نظرًا لأن بياناتي لم تصل في الوقت الفعلي ، لذلك لم يكن Pub / Sub مطلوبًا. ومع ذلك ، سأركز على إصدار البث ، حيث أن هذا هو ما قد تواجهه في الممارسة.
مقدمة إلى GCP و Apache Beam
يوفر Google Cloud Platform مجموعة من الأدوات المفيدة حقًا لمعالجة البيانات الضخمة. فيما يلي بعض الأدوات التي سأستخدمها:
- Pub / Sub هي خدمة مراسلة باستخدام قالب Publisher-المشترك الذي يسمح لنا بتلقي البيانات في الوقت الفعلي.
- DataFlow هي خدمة تبسط عملية إنشاء خطوط أنابيب البيانات وتحل تلقائيًا المهام مثل توسيع نطاق البنية التحتية ، مما يعني أنه يمكننا التركيز فقط على كتابة التعليمات البرمجية الخاصة بخط الأنابيب الخاص بنا.
- BigQuery هو مستودع بيانات قائم على السحابة. إذا كنت معتادًا على قواعد بيانات SQL الأخرى ، فلن يتعين عليك التعامل مع BigQuery لفترة طويلة.
- وأخيرًا ، سنستخدم Apache Beam ، أي التركيز على إصدار Python لإنشاء خط أنابيبنا. تتيح لنا هذه الأداة إنشاء خط أنابيب للدفق أو معالجة الدُفعات التي تتكامل مع برنامج شركاء Google المعتمدون. إنها مفيدة بشكل خاص للمعالجة المتوازية ومناسبة للمهام مثل الاستخراج والتحويل والتحميل (ETL) ، لذلك إذا كنا بحاجة إلى نقل البيانات من مكان إلى آخر باستخدام التحويلات أو العمليات الحسابية ، فإن Beam تعد اختيارًا جيدًا.
يتوفر عدد كبير من الأدوات في برنامج "شركاء Google المعتمدون" ، لذلك قد يكون من الصعب تغطيتها جميعًا ، بما في ذلك الغرض منها ، ولكن مع ذلك ،
يوجد ملخص موجز للرجوع إليها.
تصور الناقل لدينا
دعونا تصور مكونات خط أنابيب لدينا في
الشكل 1 . على مستوى عالٍ ، نريد جمع بيانات المستخدم في الوقت الفعلي ومعالجتها ونقلها إلى BigQuery. يتم إنشاء السجلات عندما يتفاعل المستخدمون مع المنتج عن طريق إرسال طلبات إلى الخادم ، والتي يتم تسجيلها بعد ذلك. يمكن أن تكون هذه البيانات مفيدة بشكل خاص لفهم كيفية تفاعل المستخدمين مع منتجاتنا وما إذا كانوا يعملون بشكل صحيح. بشكل عام ، سوف يحتوي الناقل على الخطوات التالية:
- يتم نشر بيانات سجل مستخدمينا في قسم Pub / Sub.
- سنقوم بالاتصال بـ Pub / Sub ونحول البيانات إلى التنسيق المناسب باستخدام Python و Beam (الخطوتين 3 و 4 في الشكل 1).
- بعد تحويل البيانات ، سيقوم Beam بالاتصال بـ BigQuery وإضافتها إلى جدولنا (الخطوتين 4 و 5 في الشكل 1).
- للتحليل ، يمكننا الاتصال بـ BigQuery باستخدام أدوات متنوعة مثل Tableau و Python.
يجعل Beam هذه العملية بسيطة للغاية ، بغض النظر عما إذا كان لدينا مصدر بيانات متدفق أو ملف CSV ، ونريد إجراء معالجة الدُفعات. سترى لاحقًا أن الكود لا يحتوي إلا على الحد الأدنى من التغييرات اللازمة للتبديل بينها. هذا هو واحد من فوائد استخدام الشعاع.
الشكل 1: خط أنابيب البيانات الرئيسيإنشاء بيانات زائفة باستخدام Faker
كما ذكرت سابقًا ، نظرًا لمحدودية الوصول إلى البيانات ، قررت إنشاء بيانات زائفة بنفس التنسيق مثل البيانات الفعلية. كان هذا تمرينًا مفيدًا حقًا ، حيث يمكنني كتابة الشفرة واختبار خط الأنابيب أثناء توقع البيانات. أقترح إلقاء نظرة على
وثائق فاكر إذا كنت تريد معرفة ما تقدمه هذه المكتبة. ستكون بيانات المستخدم الخاصة بنا عمومًا مماثلة للمثال أدناه. بناءً على هذا التنسيق ، يمكننا إنشاء بيانات سطرية لمحاكاة بيانات الوقت الفعلي. توفر لنا هذه السجلات معلومات مثل التاريخ ونوع الطلب والاستجابة من الخادم وعنوان IP وما إلى ذلك.
192.52.197.161 - - [30/Apr/2019:21:11:42] "PUT /tag/category/tag HTTP/1.1" [401] 155 "https://harris-lopez.com/categories/about/" "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_2) AppleWebKit/5312 (KHTML, like Gecko) Chrome/34.0.855.0 Safari/5312"
استنادًا إلى السطر أعلاه ، نريد إنشاء متغير
LINE الخاص بنا باستخدام 7 متغيرات في الأقواس أدناه. سنستخدمها أيضًا كأسماء متغيرة في مخطط جدولنا لاحقًا.
LINE = """\
{remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\
"""
إذا أردنا إجراء معالجة الدُفعات ، فسيكون الرمز مشابهًا للغاية ، على الرغم من أننا سنحتاج إلى إنشاء مجموعة من العينات في نطاق زمني معين. لاستخدام مزيف ، نحن ببساطة إنشاء كائن واستدعاء الأساليب التي نحتاجها. على وجه الخصوص ، كان فاكر مفيدًا لإنشاء عناوين IP وكذلك مواقع الويب. استخدمت الطرق التالية:
fake.ipv4()
fake.uri_path()
fake.uri()
fake.user_agent()
from faker import Faker import time import random import os import numpy as np from datetime import datetime, timedelta LINE = """\ {remote_addr} - - [{time_local}] "{request_type} {request_path} HTTP/1.1" [{status}] {body_bytes_sent} "{http_referer}" "{http_user_agent}"\ """ def generate_log_line(): fake = Faker() now = datetime.now() remote_addr = fake.ipv4() time_local = now.strftime('%d/%b/%Y:%H:%M:%S') request_type = random.choice(["GET", "POST", "PUT"]) request_path = "/" + fake.uri_path() status = np.random.choice([200, 401, 404], p = [0.9, 0.05, 0.05]) body_bytes_sent = random.choice(range(5, 1000, 1)) http_referer = fake.uri() http_user_agent = fake.user_agent() log_line = LINE.format( remote_addr=remote_addr, time_local=time_local, request_type=request_type, request_path=request_path, status=status, body_bytes_sent=body_bytes_sent, http_referer=http_referer, http_user_agent=http_user_agent ) return log_line
نهاية الجزء الاول.
في الأيام المقبلة ، سوف نشارككم في استمرار المقال ، لكننا ننتظر الآن التعليقات ؛-).
الجزء الثاني