Apache NiFi otomatisasi pengiriman aliran

Halo semuanya!



Tugasnya adalah sebagai berikut - ada aliran, disajikan dalam gambar di atas, yang harus diluncurkan ke server N dengan Apache NiFi . Tes aliran - file sedang dibuat dan dikirim ke instance NiFi lain. Data ditransmisikan menggunakan protokol Situs NiFi ke Situs.


Situs NiFi ke Situs (S2S) adalah cara yang aman dan mudah disesuaikan untuk mentransfer data di antara instance NiFi. Lihat bagaimana S2S bekerja dalam dokumentasi dan penting untuk tidak lupa mengkonfigurasi instance NiFi untuk mengaktifkan S2S lihat di sini .

Dalam kasus-kasus ketika datang ke transfer data menggunakan S2S - satu contoh disebut klien, server kedua. Klien mengirim data, server mengirim. Dua cara untuk mengkonfigurasi transfer data di antara mereka:

  1. Dorong Dari instance klien, data dikirim menggunakan Remote Process Group (RPG). Pada contoh server, data diterima menggunakan Input Port.
  2. Tarik Server menerima data menggunakan RPG, klien mengirim menggunakan port Output.


Kami menyimpan alur untuk bergulir di Apache Registry.


Apache NiFi Registry adalah sub proyek dari Apache NiFi yang menyediakan alat untuk menyimpan aliran dan kontrol versi. Semacam git. Informasi tentang menginstal, mengkonfigurasi, dan bekerja dengan registri dapat ditemukan dalam dokumentasi resmi . Aliran untuk penyimpanan digabungkan ke dalam kelompok proses dan disimpan seperti itu di registri. Lebih lanjut dalam artikel kami akan kembali ke ini.


Pada awalnya, ketika N adalah angka kecil, aliran dikirimkan dan diperbarui dengan tangan dalam waktu yang dapat diterima.

Tetapi dengan pertumbuhan N, ada lebih banyak masalah:

  1. memperbarui aliran membutuhkan lebih banyak waktu. Perlu untuk pergi ke semua server
  2. ada kesalahan memperbarui templat. Di sini mereka diperbarui, tetapi di sini mereka lupa
  3. kesalahan manusia saat melakukan sejumlah besar operasi dari jenis yang sama

Semua ini membawa kita pada kenyataan bahwa kita perlu mengotomatiskan proses. Saya mencoba cara berikut untuk menyelesaikan masalah ini:

  1. Gunakan MiNiFi sebagai ganti NiFi
  2. CLI NiFi
  3. NiPyAPI

Menggunakan MiNiFi


Apache MiNiFy adalah sub proyek dari Apache NiFi. MiNiFy adalah agen kompak yang menggunakan prosesor yang sama dengan NiFi, memungkinkan Anda untuk membuat aliran yang sama seperti di NiFi. Kelincahan agen dicapai, antara lain, karena fakta bahwa MiNiFy tidak memiliki antarmuka grafis untuk konfigurasi aliran. Kurangnya antarmuka grafis di MiNiFy berarti bahwa perlu untuk menyelesaikan masalah pengiriman aliran di minifi. Karena MiNiFy secara aktif digunakan dalam IOT, ada banyak komponen dan proses pengiriman aliran ke instance minimum perlu diotomatisasi. Tugas yang akrab, bukan?

Subproyek lain akan membantu menyelesaikan masalah ini - MiNiFi C2 Server. Produk ini dimaksudkan sebagai titik sentral dalam arsitektur konfigurasi pengguliran. Cara mengkonfigurasi lingkungan - dijelaskan dalam artikel ini tentang Habré dan informasi yang cukup untuk menyelesaikan tugas. MiNiFi bersama dengan server C2 secara otomatis memperbarui konfigurasi di rumah. Satu-satunya kelemahan dari pendekatan ini adalah bahwa Anda harus membuat template di Server C2, komit registri sederhana tidak cukup.

Opsi yang dijelaskan dalam artikel di atas berfungsi dan tidak sulit untuk diterapkan, tetapi jangan lupa yang berikut:

  1. Dalam minifi tidak semua prosesor dari nifi
  2. Versi prosesor di Minifi tertinggal dari versi prosesor di NiFi.

Pada saat penulisan, versi NiFi terbaru adalah 1.9.2. Versi prosesor dari versi MiNiFi terbaru adalah 1.7.0. Prosesor dapat ditambahkan ke MiNiFi, tetapi karena perbedaan versi antara prosesor NiFi dan MiNiFi ini mungkin tidak berfungsi.

CLI NiFi


Dilihat oleh deskripsi alat di situs web resmi, ini adalah alat untuk mengotomatisasi interaksi NiFI dan NiFi Registry di bidang pengiriman aliran atau kontrol proses. Untuk memulai, alat ini harus diunduh dari sini .

Jalankan utilitas

./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete. 

Agar kami dapat memuat aliran yang diperlukan dari registri, kita perlu mengetahui pengidentifikasi keranjang (pengidentifikasi ember) dan aliran (pengidentifikasi aliran) itu sendiri. Data ini dapat diperoleh baik melalui cli, atau di antarmuka web registri NiFi. Antarmuka web terlihat seperti ini:



Menggunakan CLI melakukan ini:

 #> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85 

Kami mulai mengimpor grup proses dari registri:

 #> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3 

Poin penting adalah bahwa setiap instance nifi dapat ditentukan sebagai host tempat kami menggulung grup proses.

Kelompok proses ditambahkan dengan prosesor berhenti, mereka harus dimulai

 #> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080 

Hebat, prosesornya dimulai. Namun, sesuai dengan kondisi masalah, kami membutuhkan instance NiFi untuk mengirim data ke instance lain. Asumsikan bahwa Anda memilih metode Push untuk mentransfer data ke server. Untuk mengatur transfer data, Anda harus mengaktifkan transmisi data (Aktifkan transmisi) pada Kelompok Proses Jarak Jauh (RPG) yang ditambahkan, yang sudah termasuk dalam aliran kami.



Dalam dokumentasi di CLI dan sumber lainnya, saya tidak menemukan cara untuk mengaktifkan transfer data. Jika Anda tahu cara melakukan ini, silakan tulis di komentar.

Karena kami memiliki bash dan kami siap untuk pergi ke akhir - kami akan menemukan jalan keluar! Anda dapat menggunakan API NiFi untuk menyelesaikan masalah ini. Kami menggunakan metode berikut, kami mengambil ID dari contoh di atas (dalam kasus kami adalah 7f522a13-016e-1000-e504-d5b15587f2f3). Deskripsi metode API NiFi di sini .


Secara tubuh, Anda harus melewati JSON, dari formulir berikut:

 { "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true } 

Parameter yang harus diisi untuk "bekerja":
status - status transfer data. TRANSMITTING yang tersedia untuk mengaktifkan transfer data, STOPPED untuk mematikan
versi - versi prosesor

versi akan default ke 0 saat dibuat, tetapi parameter ini dapat diperoleh dengan menggunakan metode ini



Untuk pecinta skrip bash, metode ini mungkin cocok, tetapi sulit bagi saya - skrip bash bukan favorit saya. Metode berikut ini lebih menarik dan nyaman menurut saya.

NiPyAPI


NiPyAPI adalah perpustakaan Python untuk berinteraksi dengan instance NiFi. Halaman dokumentasi berisi informasi yang diperlukan untuk bekerja dengan perpustakaan. Mulai cepat dijelaskan dalam proyek github.

Skrip kami untuk menjalankan konfigurasi adalah program Python. Kami memberikan kode.
Kami mengonfigurasi konfigurasi untuk pekerjaan lebih lanjut. Kami membutuhkan parameter berikut:

 nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #  nifi-api ,    process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #  nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry,      nifi nipyapi.config.bucket_name = 'BucketName' # bucket,    flow nipyapi.config.flow_name = 'FlowName' # flow,   

Selanjutnya saya akan memasukkan nama-nama metode perpustakaan ini, yang dijelaskan di sini .

Hubungkan registri ke nifi instance dengan

 nipyapi.versioning.create_registry_client 

Pada langkah ini, Anda juga dapat menambahkan cek bahwa registri telah ditambahkan ke instance; untuk ini, Anda dapat menggunakan metode

 nipyapi.versioning.list_registry_clients 

Temukan ember untuk mencari aliran lebih lanjut di keranjang.

 nipyapi.versioning.get_registry_bucket 

Cari ember untuk aliran

 nipyapi.versioning.get_flow_in_bucket 

Lebih lanjut penting untuk memahami apakah grup proses ini telah ditambahkan. Grup proses ditempatkan dalam koordinat dan situasi dapat muncul ketika yang kedua ditumpangkan di atas satu komponen. Saya memeriksa, ini bisa :) Untuk mendapatkan semua grup proses ditambahkan, kami menggunakan metode ini

 nipyapi.canvas.list_all_process_groups 

dan kemudian kita dapat mencari, misalnya dengan nama.

Saya tidak akan menjelaskan proses memperbarui templat, saya hanya akan mengatakan bahwa jika prosesor ditambahkan dalam versi baru templat, maka tidak ada masalah dengan keberadaan pesan dalam antrian. Tetapi jika prosesor dihapus, maka masalah dapat muncul (nifi tidak memungkinkan prosesor untuk dihapus jika antrian pesan telah terakumulasi di depannya). Jika Anda tertarik pada bagaimana saya memecahkan masalah ini - tolong kirimkan surat kepada saya, kami akan membahas poin ini Kontak di akhir artikel. Mari kita lanjutkan ke langkah menambahkan grup proses.

Saat men-debug skrip, saya menemukan fitur yang aliran versi terbarunya tidak selalu ditarik, jadi saya sarankan agar versi ini diklarifikasi terlebih dahulu:

 nipyapi.versioning.get_latest_flow_ver 

Grup proses penyebaran:

 nipyapi.versioning.deploy_flow_version 

Kami memulai prosesor:

 nipyapi.canvas.schedule_process_group 

Di blok CLI, ada tertulis bahwa transfer data tidak secara otomatis dihidupkan dalam grup proses jarak jauh? Saat mengimplementasikan skrip, saya juga mengalami masalah ini. Pada saat itu, saya tidak berhasil memulai transfer data menggunakan API dan saya memutuskan untuk menulis ke pengembang perpustakaan NiPyAPI dan meminta saran / bantuan. Pengembang menjawab saya, kami membahas masalah dan dia menulis bahwa dia perlu waktu untuk "memeriksa sesuatu". Dan sekarang, setelah beberapa hari, sebuah surat tiba di mana fungsi Python ditulis yang memecahkan masalah peluncuran saya !!! Pada saat itu, versi NiPyAPI adalah 0.13.3 dan, tentu saja, tidak ada yang seperti itu di dalamnya. Tetapi dalam versi 0.14.0, yang dirilis baru-baru ini, fungsi ini sudah termasuk dalam perpustakaan. Bertemu

 nipyapi.canvas.set_remote_process_group_transmission 

Jadi, dengan menggunakan perpustakaan NiPyAPI, kami menghubungkan registri, aliran yang mengalir, dan bahkan memulai prosesor dan transfer data. Kemudian Anda dapat menyisir kode, menambahkan semua jenis cek, masuk, dan hanya itu. Tetapi ini adalah kisah yang sangat berbeda.

Dari opsi otomatisasi yang saya pertimbangkan, yang terakhir menurut saya paling efisien. Pertama, ini masih kode python, di mana Anda dapat menanamkan kode program tambahan dan mengambil keuntungan penuh dari bahasa pemrograman. Kedua, proyek NiPyAPI secara aktif berkembang dan jika ada masalah Anda dapat menulis kepada pengembang. Ketiga, NiPyAPI masih merupakan alat yang lebih fleksibel untuk berinteraksi dengan NiFi dalam memecahkan masalah yang kompleks. Misalnya, dalam menentukan apakah antrian pesan kosong sekarang dalam aliran dan apakah grup proses dapat diperbarui.

Itu saja. Saya menggambarkan 3 pendekatan untuk mengotomatisasi pengiriman aliran di NiFi, perangkap yang mungkin ditemui pengembang dan memberikan kode kerja untuk mengotomatisasi pengiriman. Jika Anda hanya tertarik pada topik ini - tulis!

Source: https://habr.com/ru/post/id476722/


All Articles