Halo semuanya! Nama saya Anton, di Rostelecom saya sedang mengembangkan data warehouse pusat. Penyimpanan kami terdiri dari modul, orkestrator yang menggunakan beberapa contoh Informatica, beberapa di antaranya kami ingin transfer ke Airflow sebagai bagian dari transisi ke solusi open-source. Karena Informatica dan Airflow adalah alat yang secara fundamental berbeda, mengambil dan mengulangi implementasi yang ada tidaklah mudah. Kami ingin mendapatkan alur kerja, di satu sisi, sedekat mungkin dengan implementasi saat ini dan, di sisi lain, menggunakan prinsip Aliran Udara pertama yang paling menarik - dinamisme, yang memberikan fleksibilitas.
Dalam artikel singkat ini, saya ingin berbicara tentang generasi DAG yang benar-benar dinamis dalam Aliran Udara. Pada topik ini, Internet terutama berisi banyak artikel dari pengembang dari India, yang merupakan bahan dari bentuk "Anda dapat menghasilkan dag secara dinamis di Airflow, berikut adalah contohnya: <contoh untuk menghasilkan 10 tugas / dag HelloWorld>" . Tapi kami tertarik pada generasi dags, yang akan berubah waktu dengan jumlah variabel dan nama tugas.

Saat ini, Airflow diimplementasikan untuk meluncurkan modul yang menghasilkan paket data pada server sumber jarak jauh untuk diunggah lebih lanjut ke repositori. Ini berjalan sesuai dengan jadwal sederhana, tidak terlalu menarik untuk memeriksanya secara rinci. Juga, segera orkestrasi akan diperkenalkan melalui modul Airflow, yang memberikan paket data untuk pemuatan lebih lanjut lapisan demi lapisan dalam pementasan menengah. Di sini kami menunggu serangkaian garu, deskripsi yang belum saya temukan di mana pun dan saya ingin berbagi pengalaman.
Di Airflow on Habré ada beberapa artikel dari pengembang dari Mail.ru di mana hal-hal dasar dijelaskan dengan baik:
Deskripsi umum Aliran Udara
Percabangan, parameterisasi via jinja, dan komunikasi dalam DAG melalui Xcom
Glosarium Kecil:
DAG / DAG adalah grafik asiklik terarah. Dalam hal ini, yang kami maksud adalah serangkaian tindakan yang saling bergantung dan tidak membentuk siklus.
SubDAG / Sabdag - sama dengan DAG, tetapi terletak di dalam DAG lain, diluncurkan sebagai bagian dari induk DAG (mis., Menjadi tugas) dan tidak memiliki jadwal terpisah.
Operator / Operator - langkah spesifik dalam dag, melakukan tindakan tertentu. Misalnya, PythonOperator.
Tugas / Tugas - contoh spesifik dari operator ketika memulai DAG, divisualisasikan sebagai kotak kecil di antarmuka web. Misalnya, PythonOperator, yang disebut run_task dan berjalan di DAG check_dag .
Gagasan generasi tugas yang dinamis dalam dag, masalah dan kerugian
Input data:
Ada tabel di repositori orkestra, sebut saja PKG_TABLE.
Ada mekanisme yang menambahkan entri ke tabel PKG_TABLE bahwa paket data siap untuk diunduh.
Apa yang kami inginkan:
DAG, yang akan dihasilkan untuk paket yang siap diunduh dan mulai mengunduhnya (spoiler: pada akhirnya, semuanya berubah).
Menggunakan kode di bawah ini, kami menghasilkan barang yang terdiri dari tugas LatestOnlyOperator dan tugas dependennya, yang dibuat ketika fungsi pkg_subdag_factory dijalankan, yang menerima daftar paket dari tabel PKG_TABLE dan menghasilkan beberapa PythonOperators. Jika tidak ada paket untuk diunduh, DummyOperator dibuat.
Mereka memutuskan untuk membuat versi pertama dengan satu PythonOperator, mendistribusikannya menjadi alur kerja terperinci menggunakan Airflow.
Tangkapan layar berikut menunjukkan bagaimana hasilnya.
Penampilan DAG:

Penampilan subdag dengan tidak adanya paket untuk pengiriman:

Penampilan subdag dengan paket untuk pengiriman:

Masalah dan Nuansa
- Penangkapan tidak berfungsi seperti yang kami harapkan: setelah menyalakan dag dimatikan, beberapa peluncuran terjadi (bukan untuk seluruh periode jadwal, tetapi 2-3 pada waktu yang sama). Karena itu, saya harus menambahkan LatestOnlyOperator, sehingga semua peluncuran kecuali yang terakhir tidak digunakan.
- Jika Anda membuat subdag, Anda harus mengaktifkannya secara eksplisit melalui baris perintah dengan perintah "airflow unpause <subdag_name>", jika tidak, Anda harus memulai, dan membuat ini saat membuat setiap subdag baru (subdag dengan nama baru), yang akan membuatnya sangat tidak nyaman untuk menghasilkan secara dinamis . Jika Anda mengatur parameter "dags_are_paused_at_creation" = false dalam konfigurasi aliran udara ($ airflow_home / airflow.cfg), itu tidak akan diperlukan, tetapi dapat mengakibatkan konsekuensi yang tidak menyenangkan dengan peluncuran otomatis otomatis dag baru - menurut saya Anda harus memulai dagas baru secara manual.
Seperti yang dikatakan dalam dokumentasi , "Kemampuan utama Airflow adalah bahwa DAG Run ini adalah atom, item idempoten, <...>", yang berarti: "Dipahami bahwa dag yang dihasilkan tidak berubah." Karena fakta bahwa kami melanggar "kemampuan utama" ini, kami belajar beberapa hal:
- Tagong kosong (tanpa tugas) dimulai dan tidak dapat berakhir, menyumbat semua kemungkinan paralel. Ini terjadi jika tidak ada paket unduhan pada saat dag diluncurkan. Untuk mengatasi ini, DummyOperator dibuat.
- Jika selama bekerja, tugas dag dibuat ulang dan tidak ada lagi tugas ini dalam pembaruan yang diperbarui - itu akan berhenti dengan gangguan proses yang sedang berjalan. Dan ini terjadi pada setiap langkah sheduler, tetapi tidak lebih sering daripada yang ditunjukkan dalam parameter min_file_process_interval dalam konfigurasi aliran udara ($ airflow_home / airflow.cfg). Untuk menghindari hal ini, kami melakukan pembuatan paket tugas tidak hanya dengan status "siap untuk mengunduh", tetapi juga oleh status "pemuatan dalam proses" sehingga terus dihasilkan saat pengunduhan sedang berlangsung.
- Jika versi saat ini dari dag tidak memiliki tugas apa pun sebelumnya - misalnya, ada tugas dengan nama "pkg_123" yang dimuat sebelumnya dan tidak dibuat dalam versi saat ini dari dag, Anda tidak dapat melihat statistik tentang tugas ini di antarmuka web. Meskipun semua informasi disimpan dalam database aliran udara dan atas dasar itu dimungkinkan untuk membangun dashboard yang indah untuk peluncuran lama dengan cara eksternal. Ketika muncul pertanyaan tentang frekuensi memperbarui DAG dan kemampuan untuk menonaktifkannya, Anda dapat membacanya di sini .
- Karena generasi dinamis dari task_id, perlu untuk melemparkan kamus dengan data untuk semua paket saat ini ke setiap tugas tersebut, serta id dari paket saat ini, sehingga ketika fungsi itu sendiri bekerja, pilih data yang diperlukan dari kamus yang sama dengan paket id. Kalau tidak, semua tugas dimulai untuk paket yang sama.
Execution_date dalam log dan waktu mulai aktual
Saya akan mengakhiri dengan nuansa lain dari Airflow, yang pada awalnya membingungkan dan tidak dijelaskan dalam kata-kata sederhana dalam artikel lain - eksekusi_date (yang ditampilkan di semua log, di antarmuka, dll.) Dan waktu mulai yang sebenarnya. Pada prinsipnya, uraiannya ada dalam dokumentasi aliran udara dan FAQ , tetapi hasilnya tidak jelas, jadi menurut saya diperlukan klarifikasi.
Dokumentasi : "Penjadwal meluncurkan pekerjaan Anda di akhir periode"
Hasil : Jika Anda membuat sampah dengan jadwal, misalnya, @daily, jalankan dengan eksekusi_date "2018-01-01 00:00:00" sebenarnya akan menjalankan "2018-02-01 00:00:00".
Tautan yang bermanfaat:
Dokumentasi catchup
DokumentasiOnlyOperator Terbaru
Dokumentasi lebih lanjut tentang LatestOnlyOperator
Contoh penggunaan LatestOnlyOperator
Beberapa nuansa
Pertanyaan tentang dependensi pada peluncuran sebelumnya
Contoh kecil tentang generasi dinamis
Sebuah pertanyaan tentang generasi dinamis dengan deskripsi kecil