
في الوقت الحالي ، أنا منخرط في مهمة دفق البيانات (وتحويلها). في بعض الدوائر
تُعرف مثل هذه العملية بـ ETL ، أي استخراج وتحويل وتحميل المعلومات.
تشمل العملية برمتها مشاركة خدمات Google Cloud Platform التالية:
- حانة / خدمة فرعية لتدفق البيانات في الوقت الفعلي
- تدفق البيانات - خدمة لتحويل البيانات (يمكن
العمل في الوقت الفعلي وفي وضع الدُفعات) - BigQuery - خدمة لتخزين البيانات في شكل جداول
(يدعم SQL)
0. الوضع الحالي
في الوقت الحالي ، هناك إصدار عملي للبث على الخدمات المذكورة أعلاه ، ولكن في
كقالب ، يتم استخدام أحد القوالب القياسية .
المشكلة هي أن هذا القالب يوفر نقل البيانات من 1 إلى 1 ، أي على
عند مدخل Pub / Sub ، لدينا سلسلة تنسيق JSON ، عند الإخراج لدينا جدول BigQuery مع الحقول ،
التي تتوافق مع مفاتيح الكائنات في المستوى العلوي من المدخلات JSON.
1. بيان المشكلة
قم بإنشاء قالب تدفق بيانات يسمح لك بالحصول على جدول أو جداول في الإخراج
وفقا لشروط معينة. على سبيل المثال ، نريد إنشاء جدول منفصل لكل منها
قيم إدخال محدد JSON. من الضروري أن تأخذ في الاعتبار حقيقة أن بعض
يمكن أن تحتوي كائنات JSON المدخلة على JSON متداخلة كقيمة ، أي هو ضروري
تكون قادرًا على إنشاء جداول BigQuery مع حقول من النوع RECORD
لتخزين المتداخلة
البيانات.
2. التحضير للقرار
لإنشاء قالب Dataflow ، استخدم Apache Beam SDK ، والذي بدوره ،
يدعم Java و Python كلغة برمجة. يجب أن أقول ذلك
الإصدار الوحيد من Python 2.7.x مدعوم ، الأمر الذي فاجأني قليلاً. علاوة على ذلك ، الدعم
جافا أوسع إلى حد ما ، لأن بالنسبة لـ Python ، على سبيل المثال ، بعض الوظائف غير متوفرة وأكثر من ذلك
قائمة متواضعة من الموصلات المدمجة. بالمناسبة ، يمكنك كتابة الموصلات الخاصة بك.
ومع ذلك ، نظرًا لحقيقة أنني لست معتادًا على Java ، فقد استخدمت Python.
قبل البدء في إنشاء قالب ، يجب أن يكون لديك ما يلي:
- إدخال تنسيق JSON ويجب ألا يتغير في الوقت المناسب
- مخطط أو مخططات جداول BigQuery التي سيتم دفق البيانات إليها
- عدد الجداول التي سيتم دفق دفق بيانات المخرجات إليها
لاحظ أنه بعد إنشاء قالب وبدء تشغيل Dataflow Job بناءً عليه ، يمكن أن تكون هذه المعلمات
التغيير فقط عن طريق إنشاء قالب جديد.
دعنا نقول بضع كلمات حول هذه القيود. انهم جميعا يأتون من حقيقة أنه لا يوجد أي احتمال
إنشاء قالب ديناميكي يمكن أن يأخذ أي سلسلة كمدخلات ، تحليل ذلك
وفقًا للمنطق الداخلي ، ثم ملء الجداول التي تم إنشاؤها ديناميكيًا بشكل حيوي
التي أنشأتها الدائرة. من المحتمل جدًا وجود هذا الاحتمال ، ولكن ضمن البيانات
أنا لم تنجح في تنفيذ مثل هذا المخطط. بقدر ما أفهم كله
تم إنشاء خط الأنابيب قبل تنفيذه في وقت التشغيل ، وبالتالي لا توجد طريقة لتغييره إلى
ليطير. ربما شخص ما سوف يشارك قرارهم.
3. القرار
لفهم أكثر اكتمالا للعملية ، يجدر إحضار رسم تخطيطي لما يسمى خط الأنابيب
من وثائق شعاع Apache.

في حالتنا (سنستخدم القسمة في عدة جداول):
- المدخلات - البيانات تأتي من PubSub في Dataflow Job
- Transform # 1 - يتم تحويل البيانات من سلسلة إلى قاموس Python ، نحصل على الإخراج
PCollection # 1 - التحويل رقم 2 - يتم تمييز البيانات ، لمزيد من الفصل وفقًا لجداول منفصلة ، إلى
الإخراج هو PCollection # 2 (في الواقع tolle PCollection) - التحويل رقم 3 - تتم كتابة البيانات من PCollection # 2 إلى الجداول باستخدام المخططات
الجداول
في عملية كتابة القالب الخاص بي ، استلهمت هذه الأمثلة بنشاط.
كود القالب مع التعليقات (التعليقات اليسرى بالطريقة نفسها من المؤلفين السابقين): الآن سنتطرق إلى الكود ونقدم توضيحات ، لكن أولاً يجدر بنا القول بأن الأمر الرئيسي
صعوبة في كتابة هذا القالب هو التفكير من حيث "دفق البيانات" ، و
ليس رسالة محددة. من الضروري أيضًا فهم أن Pub / Sub يعمل مع الرسائل و
من منهم سوف نتلقى معلومات لوضع علامات على الدفق.
pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) pipeline_options.view_as(SetupOptions).save_main_session = True pipeline_options.view_as(StandardOptions).streaming = True
بسبب يستخدم موصل Apache Beam Pub / Sub IO فقط في وضع التدفق الضروري
أضف PipelineOptions () (على الرغم من أنه في الواقع لا يتم استخدام الخيارات) ؛ وإلا ، قم بإنشاء قالب
يقع مع الاستثناء. يجب أن يقال عن خيارات إطلاق القالب. يمكن أن يكون
ثابت وما يسمى ب "وقت التشغيل". فيما يلي رابط للوثائق المتعلقة بهذا الموضوع. تتيح لك الخيارات إنشاء قالب دون تحديد المعلمات مقدمًا ، ولكن تمريرها عند بدء تشغيل Dataflow Job من القالب ، ولكن لا يزال يتعذر علي تنفيذه ، وربما يرجع ذلك إلى حقيقة أن هذا الرابط لا يدعم RuntimeValueProvider
.
كل شيء واضح من التعليق ، قرأنا الموضوع من الموضوع. تجدر الإشارة إلى أنه يمكنك استخدام الدفق
سواء من الموضوع ومن الاشتراك (الاشتراك). إذا تم تحديد الموضوع كمدخل ، ثم
سيتم إنشاء اشتراك مؤقت في هذا الموضوع تلقائيًا. بناء الجملة هو أيضا جميلة
واضح ، beam.io.ReadFromPubSub(input_topic)
إرسال دفق بيانات الإدخال beam.io.ReadFromPubSub(input_topic)
إلى موقعنا
خط أنابيب p
.
هذا هو المكان الذي يحدث فيه التحويل رقم 1 ويتم تحويل مدخلاتنا من سلسلة بيثون إلى
dython dict ، وفي الإخراج نحصل على PCollection # 1. >>
يظهر في بناء الجملة. على
في الواقع ، النص في علامات اقتباس هو اسم الدفق (يجب أن يكون فريدًا) ، بالإضافة إلى تعليق ،
الذي سيتم إضافته إلى الكتلة على الرسم البياني في واجهة ويب GCP Dataflow. دعنا نفكر بمزيد من التفاصيل
الطبقة المتجاوزة TransformToBigQuery
.
class TransformToBigQuery(beam.DoFn):
سيحتوي متغير element
على رسالة واحدة من اشتراك PubSub. كما نرى من
رمز ، في حالتنا يجب أن يكون JSON صالح. في الفصول الدراسية يجب أن يكون
يتم إعادة تعريف طريقة process
، والتي التحولات اللازمة
خط الإدخال للحصول على الإخراج الذي يطابق الدائرة
الجدول الذي سيتم فيه تحميل هذه البيانات. بسبب تدفقنا في هذه الحالة هو
مستمر ، unbounded
فيما يتعلق بـ Apache Beam ، يجب عليك إعادته باستخدام
yield
، وليس return
، كما بالنسبة لدفق البيانات النهائي. في حالة التدفق النهائي ،
(وضروري) بالإضافة إلى تكوين windowing
triggers
هذا الكود يوجه PCollection # 1 إلى Transform # 2 حيث سيتم وضع العلامات
(الفصل) من دفق البيانات. في متغير schema_dct
في هذه الحالة ، قاموس ، حيث يكون المفتاح هو اسم ملف المخطط بدون الامتداد ، سيكون هذا هو العلامة ، وتكون القيمة هي JSON الصحيحة للنظام
جداول BigQuery لهذه العلامة. تجدر الإشارة إلى أنه يجب إرسال المخطط بالضبط إلى
عرض {'fields': }
حيث
هو مخطط جدول BigQuery في نموذج JSON (يمكنك
تصدير من واجهة الويب).
main='default'
هو اسم علامة سلاسل الرسائل التي سيذهبون إليها
جميع الرسائل التي لا تخضع لشروط وضع العلامات. النظر في الصف
TagDataWithReqType
.
class TagDataWithReqType(beam.DoFn):
كما ترون ، تتم إعادة تعريف فئة process
هنا أيضًا. يحتوي متغير types
على أسماء
وعليها أن تتطابق مع الرقم والاسم ورقم وأسماء مفاتيح القاموس
schema_dct
. على الرغم من أن أسلوب process
لديه القدرة على قبول الحجج ، إلا أنني لا أبداً
كنت قادرا على تمريرها. لم أحسب السبب بعد.
في المخرجات ، نحصل على مجموعة من الخيوط في عدد العلامات ، أي عدد علاماتنا
علامات محددة مسبقا + الموضوع الافتراضي الذي فشل في وضع علامة.
التحويل رقم ... (في الواقع ، إنه ليس على الرسم التخطيطي ، هذا "فرع") - نكتب الدفق الافتراضي
إلى الجدول الافتراضي.
tagged_stream.default
- يتم أخذ دفق مع العلامة default
، بناء جملة بديل هو tagged_stream['default']
schema=parse_table_schema_from_json(schema_dct.get('default'))
- هنا يتم تعريف المخطط
الجداول. يرجى ملاحظة أن ملف default.json
مع مخطط جدول BigQuery صالح
يجب أن يكون في الدليل schema_dir = './'
.
سيتم الانتقال إلى جدول يسمى default
.
إذا كان الجدول بهذا الاسم (في مجموعة البيانات المحددة لهذا المشروع) غير موجود ، فعندئذٍ
سيتم إنشاؤه تلقائيًا من المخطط بفضل الإعداد الافتراضي
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
تحويل رقم 3 ، يجب أن يكون كل شيء واضحًا لأولئك الذين يقرؤون المقالة من البداية والخاصة
بناء جملة بيثون. نفصل مجموعة الدفق مع حلقة ونكتب كل دفق في جدوله الخاص به
مخططه. تجدر الإشارة إلى أن اسم الدفق يجب أن يكون فريدًا - '%s:%s.%s' % (gcp_project, bq_dataset, name)
.
الآن يجب أن يكون من الواضح كيف يعمل هذا ويمكنك إنشاء قالب. لهذا تحتاج
تشغيل في وحدة التحكم (لا تنس تنشيط venv إذا كان ذلك متاحًا) أو من IDE:
python _.py / --runner DataflowRunner / --project dreamdata-test / --staging_location gs://STORAGE_NAME/STAGING_DIR / --temp_location gs://STORAGE_NAME/TEMP_DIR / --template_location gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
في الوقت نفسه ، يجب تنظيم الوصول إلى حساب Google ، على سبيل المثال ، من خلال التصدير
GOOGLE_APPLICATION_CREDENTIALS
البيئة GOOGLE_APPLICATION_CREDENTIALS
أو طريقة أخرى.
بضع كلمات عن - --runner
. في هذه الحالة ، يقول DataflowRunner
أن هذا الرمز
سيتم تشغيل كقالب لوظيفة Dataflow Job. لا يزال من الممكن تحديد
DirectRunner
، سيتم استخدامه افتراضيا إذا لم يكن هناك خيار - --runner
ورمز
ستعمل كوظيفة Dataflow Job ، لكن محليًا ، وهو مناسب جدًا للتصحيح.
إذا لم تحدث أي أخطاء ، فسيتم gs://STORAGE_NAME/TEMPLATES_DIR/TEMPLATE_NAME
إنشاء قالب. تجدر الإشارة إلى أنه في gs://STORAGE_NAME/STAGING_DIR
سيتم كتابتها أيضًا
ملفات الخدمة الضرورية للتشغيل الناجح لوظيفة Datafow التي تم إنشاؤها على أساس
قالب ولست بحاجة إلى حذفها.
بعد ذلك ، تحتاج إلى إنشاء وظيفة تدفق بيانات باستخدام هذا القالب ، يدويًا أو بواسطة أي
بطريقة أخرى (CI على سبيل المثال).
4. الاستنتاجات
وبالتالي ، تمكنا من دفق الدفق من PubSub إلى BigQuery باستخدام
تحويلات البيانات الضرورية لغرض مزيد من التخزين والتحويل و
استخدام البيانات.
الروابط الرئيسية
في هذه المقالة ، من الممكن عدم الدقة أو حتى الأخطاء ، سأكون ممتناً للبناء
النقد. في النهاية ، أود أن أضيف ذلك في الواقع ، لا يتم استخدام كل شيء هنا
ميزات Apache Beam SDK ، لكن هذا لم يكن الهدف.