عند إعداد برامجنا التعليمية ، نواجه صعوبات بشكل دوري فيما يتعلق بالعمل مع بعض الأدوات. وفي تلك اللحظة عندما نواجههم ، لا يوجد دائمًا ما يكفي من الوثائق والمقالات التي من شأنها أن تساعد في التغلب على هذه المشكلة.
كان هذا هو الحال ، على سبيل المثال ، في عام 2015 ، ونحن ، في برنامج Big Data Specialist ، استخدمنا مجموعة Hadoop مع Spark لـ 35 مستخدمًا في وقت واحد. كيف لطهي الطعام تحت مثل هذه الحالة المستخدم باستخدام YARN لم يكن واضحا. نتيجة لذلك ، بعد أن اكتشفوا السير على الطريق بمفردهم ، قاموا بإنشاء موقع على Habré وأدى أيضًا في Moscow Spark Meetup .
قبل التاريخ
هذه المرة سوف نتحدث عن برنامج آخر - مهندس البيانات . يبني المشاركون لدينا نوعين من الهندسة المعمارية على ذلك: لامدا وكابا. وفي بنية lamdba ، كجزء من معالجة الدُفعات ، يتم استخدام Airflow لنقل السجلات من HDFS إلى ClickHouse.
كل شيء جيد بشكل عام. دعهم يبنون خطوط الأنابيب الخاصة بهم. ومع ذلك ، هناك "لكن": جميع برامجنا تكنولوجية من حيث عملية التعلم نفسها. للتحقق من المختبر ، نستخدم أدوات الفحص التلقائية: يحتاج المشارك إلى الانتقال إلى حسابه الشخصي ، والنقر فوق الزر "فحص" ، وبعد فترة من الوقت يرى نوعًا من الملاحظات الموسعة حول ما قام به. وفي هذه اللحظة نبدأ في معالجة مشكلتنا.
يتم تنظيم فحص هذا التمرين المعملي على النحو التالي: نرسل حزمة بيانات تحكم إلى كافكا المشارك ، ثم تنقل Gobblin حزمة البيانات إلى HDFS ، ثم يأخذ Airflow حزمة البيانات هذه ويضعها في ClickHouse. الحيلة هي أن Airflow يجب ألا يقوم بذلك في الوقت الفعلي ، بل يقوم بذلك وفق جدول زمني: كل 15 دقيقة يستغرق مجموعة من الملفات ويلقي بها.
اتضح أننا بحاجة إلى تشغيل DAG بطريقة أو بأخرى من تلقاء نفسها بناءً على طلب المدقق هنا والآن. غوغلينغ ، اكتشفنا أنه بالنسبة للإصدارات الأحدث من Airflow ، هناك ما يسمى API التجريبية . الكلمة experimental
، بالطبع ، تبدو مخيفة ، ولكن ماذا تفعل ... فجأة ستطير.
بعد ذلك ، نوضح الطريقة الكاملة: من تثبيت Airflow إلى إنشاء طلب POST الذي يشغل DAG باستخدام واجهة برمجة التطبيقات التجريبية. سوف نعمل مع Ubuntu 16.04.
1. تثبيت تدفق الهواء
دعونا نتحقق من أن لدينا Python 3 و virtualenv.
$ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0
إذا كان أي من هذا مفقودًا ، فقم بالتثبيت.
قم الآن بإنشاء دليل سنواصل فيه العمل مع Airflow.
$ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $
تثبيت تدفق الهواء:
(venv) $ pip install airflow
النسخة التي عملنا عليها: 1.10.
نحتاج الآن إلى إنشاء دليل airflow_home
حيث توجد ملفات DAG airflow_home
. بعد إنشاء الدليل ، قم بتعيين متغير البيئة AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home>
الخطوة التالية هي تنفيذ الأمر الذي سينشئ قاعدة بيانات دفق البيانات وتهيئتها في SQLite:
(venv) $ airflow initdb
سيتم إنشاء قاعدة البيانات في airflow.db
بشكل افتراضي.
تحقق من تثبيت Airflow:
$ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0
إذا airflow.cfg
الأمر ، فقد أنشأ airflow.cfg
ملف التكوين الخاص به airflow.cfg
في AIRFLOW_HOME
:
$ tree . ├── airflow.cfg └── unittests.cfg
يحتوي Airflow على واجهة ويب. يمكن إطلاقه عن طريق تشغيل الأمر:
(venv) $ airflow webserver --port 8081
يمكنك الآن الوصول إلى واجهة الويب في متصفح على المنفذ 8081 على المضيف حيث تم إطلاق Airflow ، على سبيل المثال: <hostname:8081>
.
2. العمل مع واجهة برمجة التطبيقات التجريبية
على هذا ، تم تكوين Airflow وجاهزة للذهاب. ومع ذلك ، نحتاج إلى تشغيل واجهة برمجة التطبيقات التجريبية أيضًا. مكتوبة لعبة الداما لدينا في بيثون ، لذلك مزيد من الطلبات ستكون على ذلك باستخدام مكتبة requests
.
في الواقع ، يعمل API بالفعل لاستعلامات بسيطة. على سبيل المثال ، يسمح لك هذا الطلب باختبار تشغيله:
>>> import requests >>> host = <your hostname> >>> airflow_port = 8081
إذا تلقيت مثل هذه الرسالة ردًا ، فهذا يعني أن كل شيء يعمل.
ومع ذلك ، عندما نريد تشغيل DAG ، سنواجه حقيقة أن هذا النوع من الطلبات لا يمكن تقديمه دون المصادقة.
للقيام بذلك ، سوف تحتاج إلى القيام بعدد من الإجراءات.
أولاً ، تحتاج إلى إضافة هذا إلى التكوين:
[api] auth_backend = airflow.contrib.auth.backends.password_auth
ثم ، تحتاج إلى إنشاء المستخدم الخاص بك مع حقوق المسؤول:
>>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
بعد ذلك ، تحتاج إلى إنشاء مستخدم يتمتع بحقوق عادية ، والذي يُسمح له بإنشاء مشغل DAG.
>>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
الآن كل شيء جاهز.
3. بدء طلب POST
سيبدو طلب POST نفسه كما يلي:
>>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n'
طلب معالجتها بنجاح.
وفقًا لذلك ، فإننا نمنح DAG بعض الوقت للمعالجة وتقديم طلب لجدول ClickHouse ، في محاولة لالتقاط حزمة تحكم من البيانات.
اكتمل التحقق.