Saat menyiapkan program pendidikan kami, kami secara berkala menghadapi kesulitan dalam hal bekerja dengan beberapa alat. Dan pada saat itu ketika kita bertemu mereka, tidak selalu ada cukup dokumentasi dan artikel yang akan membantu untuk mengatasi masalah ini.
Ini adalah kasus, misalnya, pada tahun 2015 dan kami menggunakan cluster Hadoop dengan Spark untuk 35 pengguna simultan pada program Spesialis Data Besar. Cara memasaknya di bawah kasus pengguna menggunakan BENANG tidak jelas. Sebagai hasilnya, setelah mengetahui dan berjalan sendiri, mereka membuat posting tentang Habré dan juga tampil di Moscow Spark Meetup .
Latar belakang
Kali ini kita akan berbicara tentang program lain - Insinyur Data . Peserta kami membangun dua jenis arsitektur di atasnya: lambda dan kappa. Dan dalam arsitektur lamdba, sebagai bagian dari pemrosesan batch, Airflow digunakan untuk mentransfer log dari HDFS ke ClickHouse.
Semuanya baik secara umum. Biarkan mereka membangun saluran pipa mereka. Namun, ada "tetapi": semua program kami adalah teknologi dalam hal proses pembelajaran itu sendiri. Untuk memeriksa lab, kami menggunakan pemeriksa otomatis: peserta harus masuk ke akun pribadinya, klik tombol "Periksa", dan setelah beberapa saat ia melihat semacam umpan balik panjang tentang apa yang telah ia lakukan. Dan pada saat inilah kita mulai mendekati masalah kita.
Memeriksa lab ini disusun sebagai berikut: kami mengirim paket data kontrol ke Kafka dari peserta, lalu Gobblin mentransfer paket data ke HDFS, kemudian Airflow mengambil paket data ini dan meletakkannya di ClickHouse. Kuncinya adalah bahwa Airflow tidak harus melakukan ini secara real time, ia melakukannya sesuai jadwal: setiap 15 menit dibutuhkan banyak file dan melemparnya.
Ternyata kita perlu memicu DAG mereka sendiri atas permintaan pemeriksa di sini dan sekarang. Googling, kami mengetahui bahwa untuk versi Airflow yang lebih baru ada yang disebut API Eksperimental . Kata experimental
, tentu saja, terdengar menakutkan, tetapi apa yang harus dilakukan ... Tiba-tiba itu akan terbang.
Selanjutnya, kami jelaskan seluruhnya: dari menginstal Airflow hingga menghasilkan permintaan POST yang memicu DAG menggunakan API Eksperimental. Kami akan bekerja dengan Ubuntu 16.04.
1. Menginstal Airflow
Mari kita periksa apakah kita memiliki Python 3 dan virtualenv.
$ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0
Jika ada yang hilang, maka instal.
Sekarang buat direktori di mana kami akan terus bekerja dengan Airflow.
$ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $
Pasang Aliran Udara:
(venv) $ pip install airflow
Versi tempat kami bekerja: 1.10.
Sekarang kita perlu membuat direktori airflow_home
mana file DAG dan plugin Airflow akan ditemukan. Setelah membuat direktori, atur variabel lingkungan AIRFLOW_HOME
.
(venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home>
Langkah selanjutnya adalah menjalankan perintah yang akan membuat dan menginisialisasi database aliran data dalam SQLite:
(venv) $ airflow initdb
Basis data akan dibuat di airflow.db
secara default.
Periksa apakah Aliran Udara diinstal:
$ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0
Jika perintah itu berhasil, maka Airflow membuat file konfigurasinya airflow.cfg
di AIRFLOW_HOME
:
$ tree . ├── airflow.cfg └── unittests.cfg
Airflow memiliki antarmuka web. Itu dapat diluncurkan dengan menjalankan perintah:
(venv) $ airflow webserver --port 8081
Sekarang Anda dapat mengakses antarmuka web di browser pada port 8081 pada host tempat Airflow diluncurkan, misalnya: <hostname:8081>
.
2. Bekerja dengan API Eksperimental
Pada ini, Aliran Udara dikonfigurasi dan siap untuk pergi. Namun, kami juga perlu menjalankan API Eksperimental. Checker kami ditulis dalam Python, jadi selanjutnya semua permintaan akan ada di dalamnya menggunakan perpustakaan requests
.
Faktanya, API sudah berfungsi untuk permintaan sederhana. Misalnya, permintaan semacam itu memungkinkan Anda menguji operasinya:
>>> import requests >>> host = <your hostname> >>> airflow_port = 8081
Jika Anda menerima pesan seperti itu sebagai tanggapan, maka ini berarti semuanya berfungsi.
Namun, ketika kami ingin mengaktifkan DAG, kami akan menghadapi kenyataan bahwa permintaan semacam ini tidak dapat dibuat tanpa otentikasi.
Untuk melakukan ini, Anda perlu melakukan sejumlah tindakan.
Pertama, Anda perlu menambahkan ini ke konfigurasi:
[api] auth_backend = airflow.contrib.auth.backends.password_auth
Kemudian, Anda perlu membuat pengguna Anda dengan hak admin:
>>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
Kemudian, Anda perlu membuat pengguna dengan hak normal, yang akan diizinkan membuat pemicu DAG.
>>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
Sekarang semuanya sudah siap.
3. Memulai permintaan POST
Permintaan POST itu sendiri akan terlihat seperti ini:
>>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n'
Permintaan berhasil diproses.
Maka, kami memberikan waktu kepada DAG untuk memproses dan membuat permintaan ke tabel ClickHouse, mencoba menangkap paket kontrol data.
Verifikasi selesai.