كيفية إنشاء مشغل DAG في Airflow باستخدام واجهة برمجة التطبيقات التجريبية

عند إعداد برامجنا التعليمية ، نواجه صعوبات بشكل دوري فيما يتعلق بالعمل مع بعض الأدوات. وفي تلك اللحظة عندما نواجههم ، لا يوجد دائمًا ما يكفي من الوثائق والمقالات التي من شأنها أن تساعد في التغلب على هذه المشكلة.


كان هذا هو الحال ، على سبيل المثال ، في عام 2015 ، ونحن ، في برنامج Big Data Specialist ، استخدمنا مجموعة Hadoop مع Spark لـ 35 مستخدمًا في وقت واحد. كيف لطهي الطعام تحت مثل هذه الحالة المستخدم باستخدام YARN لم يكن واضحا. نتيجة لذلك ، بعد أن اكتشفوا السير على الطريق بمفردهم ، قاموا بإنشاء موقع على Habré وأدى أيضًا في Moscow Spark Meetup .


قبل التاريخ


هذه المرة سوف نتحدث عن برنامج آخر - مهندس البيانات . يبني المشاركون لدينا نوعين من الهندسة المعمارية على ذلك: لامدا وكابا. وفي بنية lamdba ، كجزء من معالجة الدُفعات ، يتم استخدام Airflow لنقل السجلات من HDFS إلى ClickHouse.


كل شيء جيد بشكل عام. دعهم يبنون خطوط الأنابيب الخاصة بهم. ومع ذلك ، هناك "لكن": جميع برامجنا تكنولوجية من حيث عملية التعلم نفسها. للتحقق من المختبر ، نستخدم أدوات الفحص التلقائية: يحتاج المشارك إلى الانتقال إلى حسابه الشخصي ، والنقر فوق الزر "فحص" ، وبعد فترة من الوقت يرى نوعًا من الملاحظات الموسعة حول ما قام به. وفي هذه اللحظة نبدأ في معالجة مشكلتنا.


يتم تنظيم فحص هذا التمرين المعملي على النحو التالي: نرسل حزمة بيانات تحكم إلى كافكا المشارك ، ثم تنقل Gobblin حزمة البيانات إلى HDFS ، ثم يأخذ Airflow حزمة البيانات هذه ويضعها في ClickHouse. الحيلة هي أن Airflow يجب ألا يقوم بذلك في الوقت الفعلي ، بل يقوم بذلك وفق جدول زمني: كل 15 دقيقة يستغرق مجموعة من الملفات ويلقي بها.


اتضح أننا بحاجة إلى تشغيل DAG بطريقة أو بأخرى من تلقاء نفسها بناءً على طلب المدقق هنا والآن. غوغلينغ ، اكتشفنا أنه بالنسبة للإصدارات الأحدث من Airflow ، هناك ما يسمى API التجريبية . الكلمة experimental ، بالطبع ، تبدو مخيفة ، ولكن ماذا تفعل ... فجأة ستطير.


بعد ذلك ، نوضح الطريقة الكاملة: من تثبيت Airflow إلى إنشاء طلب POST الذي يشغل DAG باستخدام واجهة برمجة التطبيقات التجريبية. سوف نعمل مع Ubuntu 16.04.


1. تثبيت تدفق الهواء


دعونا نتحقق من أن لدينا Python 3 و virtualenv.


 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0 

إذا كان أي من هذا مفقودًا ، فقم بالتثبيت.


قم الآن بإنشاء دليل سنواصل فيه العمل مع Airflow.


 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $ 

تثبيت تدفق الهواء:


 (venv) $ pip install airflow 

النسخة التي عملنا عليها: 1.10.


نحتاج الآن إلى إنشاء دليل airflow_home حيث توجد ملفات DAG airflow_home . بعد إنشاء الدليل ، قم بتعيين متغير البيئة AIRFLOW_HOME .


 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home> 

الخطوة التالية هي تنفيذ الأمر الذي سينشئ قاعدة بيانات دفق البيانات وتهيئتها في SQLite:


 (venv) $ airflow initdb 

سيتم إنشاء قاعدة البيانات في airflow.db بشكل افتراضي.


تحقق من تثبيت Airflow:


 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 

إذا airflow.cfg الأمر ، فقد أنشأ airflow.cfg ملف التكوين الخاص به airflow.cfg في AIRFLOW_HOME :


 $ tree . ├── airflow.cfg └── unittests.cfg 

يحتوي Airflow على واجهة ويب. يمكن إطلاقه عن طريق تشغيل الأمر:


 (venv) $ airflow webserver --port 8081 

يمكنك الآن الوصول إلى واجهة الويب في متصفح على المنفذ 8081 على المضيف حيث تم إطلاق Airflow ، على سبيل المثال: <hostname:8081> .


2. العمل مع واجهة برمجة التطبيقات التجريبية


على هذا ، تم تكوين Airflow وجاهزة للذهاب. ومع ذلك ، نحتاج إلى تشغيل واجهة برمجة التطبيقات التجريبية أيضًا. مكتوبة لعبة الداما لدينا في بيثون ، لذلك مزيد من الطلبات ستكون على ذلك باستخدام مكتبة requests .


في الواقع ، يعمل API بالفعل لاستعلامات بسيطة. على سبيل المثال ، يسمح لك هذا الطلب باختبار تشغيله:


 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK' 

إذا تلقيت مثل هذه الرسالة ردًا ، فهذا يعني أن كل شيء يعمل.


ومع ذلك ، عندما نريد تشغيل DAG ، سنواجه حقيقة أن هذا النوع من الطلبات لا يمكن تقديمه دون المصادقة.


للقيام بذلك ، سوف تحتاج إلى القيام بعدد من الإجراءات.


أولاً ، تحتاج إلى إضافة هذا إلى التكوين:


 [api] auth_backend = airflow.contrib.auth.backends.password_auth 

ثم ، تحتاج إلى إنشاء المستخدم الخاص بك مع حقوق المسؤول:


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

بعد ذلك ، تحتاج إلى إنشاء مستخدم يتمتع بحقوق عادية ، والذي يُسمح له بإنشاء مشغل DAG.


 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit() 

الآن كل شيء جاهز.


3. بدء طلب POST


سيبدو طلب POST نفسه كما يلي:


 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n' 

طلب معالجتها بنجاح.


وفقًا لذلك ، فإننا نمنح DAG بعض الوقت للمعالجة وتقديم طلب لجدول ClickHouse ، في محاولة لالتقاط حزمة تحكم من البيانات.


اكتمل التحقق.

Source: https://habr.com/ru/post/ar445852/


All Articles