Hallo allerseits!

Die Aufgabe ist wie folgt: Es gibt einen Fluss, der in der obigen Abbildung dargestellt ist und auf N Servern mit
Apache NiFi ausgerollt werden
muss . Flow Test - Datei wird generiert und an eine andere NiFi-Instanz gesendet. Die Datenübertragung erfolgt über das NiFi-Site-to-Site-Protokoll.
NiFi Site to Site (S2S) ist eine sichere, leicht anpassbare Methode zum Übertragen von Daten zwischen NiFi-Instanzen. Weitere Informationen zur Funktionsweise des S2S finden Sie in der Dokumentation. Vergessen Sie nicht, die NiFi-Instanz so zu konfigurieren, dass S2S aktiviert wird. Weitere Informationen finden Sie hier .
In diesen Fällen, wenn es um die Datenübertragung mit S2S geht, wird eine Instanz als Client, der zweite Server bezeichnet. Client sendet Daten, Server sendet. Es gibt zwei Möglichkeiten, die Datenübertragung zwischen ihnen zu konfigurieren:
- Drücken Von einer Client-Instanz werden Daten mithilfe der Remote Process Group (RPG) gesendet. Auf einer Serverinstanz werden Daten über den Eingabeport empfangen.
- Ziehen Der Server empfängt Daten über das RPG, der Client sendet über den Output-Port.
Wir speichern den Flow zum Rollen in der Apache Registry.
Apache NiFi Registry ist ein Teilprojekt von Apache NiFi, das ein Tool zum Speichern des Datenflusses und zur Versionskontrolle bietet. Eine Art Idiot. Informationen zum Installieren, Konfigurieren und Arbeiten mit der Registrierung finden Sie in der offiziellen Dokumentation . Der Flow für die Speicherung wird zu einer Prozessgruppe zusammengefasst und als solche in der Registrierung gespeichert. Weiter im Artikel werden wir darauf zurückkommen.
Zu Beginn, wenn N eine kleine Zahl ist, wird der Fluss von Hand in einer akzeptablen Zeit geliefert und aktualisiert.
Aber mit dem Wachstum von N gibt es mehr Probleme:
- Der Aktualisierungsvorgang dauert länger. Es ist notwendig, zu allen Servern zu gehen
- Beim Aktualisieren der Vorlagen sind Fehler aufgetreten. Hier haben sie aktualisiert, aber hier haben sie vergessen
- menschliche Fehler bei der Ausführung einer großen Anzahl von Operationen des gleichen Typs
All dies führt uns dazu, dass wir den Prozess automatisieren müssen. Ich habe die folgenden Möglichkeiten ausprobiert, um dieses Problem zu lösen:
- Verwenden Sie MiNiFi anstelle von NiFi
- NiFi-CLI
- NiPyAPI
MiNiFi verwenden
Apache MiNiFy ist ein Teilprojekt von Apache NiFi. MiNiFy ist ein kompakter Agent, der die gleichen Prozessoren wie NiFi verwendet, sodass Sie den gleichen Fluss wie in NiFi erzeugen können. Die Leichtigkeit des Mittels wird unter anderem dadurch erreicht, dass MiNiFy keine grafische Oberfläche für die Flusskonfiguration besitzt. Das Fehlen einer grafischen Oberfläche in MiNiFy macht es erforderlich, das Problem der Flussabgabe in Minifunktion zu lösen. Da MiNiFy in IOT aktiv verwendet wird, gibt es viele Komponenten und der Prozess der Flussübermittlung an die endgültigen Mini-Instanzen muss automatisiert werden. Eine vertraute Aufgabe, oder?
Ein weiteres Teilprojekt wird zur Lösung dieses Problems beitragen - MiNiFi C2 Server. Dieses Produkt soll ein zentraler Punkt in der Architektur rollierender Konfigurationen sein. So konfigurieren Sie die Umgebung - beschrieben in
diesem Artikel über Habré und genügend Informationen, um die Aufgabe zu lösen. MiNiFi in Verbindung mit dem C2-Server aktualisiert die Konfiguration automatisch zu Hause. Der einzige Nachteil dieses Ansatzes ist, dass Sie Vorlagen auf C2 Server erstellen müssen. Ein einfaches Festschreiben der Registrierung reicht nicht aus.
Die im obigen Artikel beschriebene Option funktioniert und ist nicht schwer zu implementieren. Vergessen Sie jedoch Folgendes nicht:
- In minifi gibt es nicht alle Prozessoren von nifi
- Die Prozessorversionen in Minifi hinken den Prozessorversionen in NiFi hinterher.
Zum Zeitpunkt des Schreibens ist die neueste Version von NiFi 1.9.2. Die Prozessorversion der neuesten MiNiFi-Version ist 1.7.0. Prozessoren können zu MiNiFi hinzugefügt werden. Aufgrund von Versionsunterschieden zwischen NiFi- und MiNiFi-Prozessoren funktioniert dies möglicherweise nicht.
NiFi-CLI
Gemessen an der
Beschreibung des Tools auf der offiziellen Website handelt es sich um ein Tool zur Automatisierung des Zusammenspiels von NiFI und NiFi Registry im Bereich der Flussübermittlung oder Prozesssteuerung. Um zu beginnen, muss dieses Tool
von hier heruntergeladen
werden .
Führen Sie das Dienstprogramm aus
./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete.
Damit wir den erforderlichen Datenfluss aus der Registrierung laden können, müssen wir die Kennungen des Korbs (Bucket-Kennung) und des Datenflusses (Flow-Kennung) selbst kennen. Diese Daten können entweder über cli oder über die Weboberfläche der NiFi-Registrierung abgerufen werden. Das Webinterface sieht folgendermaßen aus:

Mit der CLI wird Folgendes ausgeführt:
#> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Wir starten den Import der Prozessgruppe aus der Registry:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3
Der wichtige Punkt ist, dass jede NIFI-Instanz als Host angegeben werden kann, auf dem die Prozessgruppe ausgeführt wird.
Prozessgruppe mit gestoppten Prozessoren hinzugefügt, müssen sie gestartet werden
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Super, die Prozessoren haben angefangen. Entsprechend den Bedingungen des Problems benötigen wir jedoch NiFi-Instanzen, um Daten an andere Instanzen zu senden. Angenommen, Sie wählen die Push-Methode aus, um Daten auf den Server zu übertragen. Um die Datenübertragung zu organisieren, müssen Sie die Datenübertragung (Übertragung aktivieren) für die hinzugefügte Remote Process Group (RPG) aktivieren, die bereits in unserem Flow enthalten ist.

In der Dokumentation in der CLI und anderen Quellen habe ich keine Möglichkeit gefunden, die Datenübertragung zu aktivieren. Wenn Sie wissen, wie das geht, schreiben Sie bitte in die Kommentare.
Da wir Bash haben und bereit sind, bis zum Ende zu gehen, werden wir einen Ausweg finden! Sie können die NiFi-API verwenden, um dieses Problem zu lösen. Wir verwenden die folgende Methode, wir nehmen die ID aus den obigen Beispielen (in unserem Fall ist es 7f522a13-016e-1000-e504-d5b15587f2f3). Beschreibung der NiFi-API-Methoden
hier .

In body müssen Sie JSON in der folgenden Form übergeben:
{ "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true }
Parameter, die ausgefüllt werden müssen, um "arbeiten" zu können:
state - Datenübertragungsstatus. Verfügbar SENDEN, um die Datenübertragung zu aktivieren, STOPPED, um sie auszuschalten
version - Prozessorversion
Die Version wird beim Erstellen standardmäßig auf 0 gesetzt. Diese Parameter können jedoch mit der Methode abgerufen werden

Für Liebhaber von Bash-Skripten mag diese Methode geeignet erscheinen, aber für mich ist es schwierig - Bash-Skripte sind nicht meine Lieblingsskripte. Die folgende Methode ist meiner Meinung nach interessanter und bequemer.
NiPyAPI
NiPyAPI ist eine Python-Bibliothek für die Interaktion mit NiFi-Instanzen.
Die Dokumentationsseite enthält die notwendigen Informationen zum Arbeiten mit der Bibliothek. Der Schnellstart wird in einem Github-
Projekt beschrieben .
Unser Skript zum Ausrollen der Konfiguration ist ein Python-Programm. Wir gehen zur Kodierung über.
Wir konfigurieren Configs für die weitere Arbeit. Wir benötigen die folgenden Parameter:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' # nifi-api , process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' # nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry, nifi nipyapi.config.bucket_name = 'BucketName' # bucket, flow nipyapi.config.flow_name = 'FlowName' # flow,
Weiterhin werde ich die Namen der Methoden dieser Bibliothek einfügen, die hier beschrieben
werden .
Verbinden Sie die Registrierung mit der NIFI-Instanz
nipyapi.versioning.create_registry_client
In diesem Schritt können Sie auch eine Überprüfung hinzufügen, ob die Registrierung bereits zur Instanz hinzugefügt wurde. Hierzu können Sie die Methode verwenden
nipyapi.versioning.list_registry_clients
Finden Sie einen Eimer, um im Warenkorb weiter nach Strömungen zu suchen.
nipyapi.versioning.get_registry_bucket
Eimer nach Durchfluss durchsuchen
nipyapi.versioning.get_flow_in_bucket
Weiterhin ist es wichtig zu verstehen, ob diese Prozessgruppe bereits hinzugefügt wurde. Die Prozessgruppe wird in die Koordinaten eingefügt, und es kann vorkommen, dass eine zweite über eine Komponente gelegt wird. Ich habe überprüft, das kann sein :) Um alle Prozessgruppen hinzuzufügen, verwenden wir die Methode
nipyapi.canvas.list_all_process_groups
und dann können wir zum Beispiel nach Namen suchen.
Ich werde den Aktualisierungsprozess der Vorlage nicht beschreiben. Ich werde nur sagen, dass es keine Probleme mit dem Vorhandensein von Nachrichten in den Warteschlangen gibt, wenn Prozessoren in der neuen Version der Vorlage hinzugefügt werden. Wenn die Prozessoren jedoch gelöscht werden, können Probleme auftreten (nifi lässt nicht zu, dass der Prozessor gelöscht wird, wenn sich vor ihm eine Nachrichtenwarteschlange angesammelt hat). Wenn Sie daran interessiert sind, wie ich dieses Problem gelöst habe - schreiben Sie mir, wir werden diesen Punkt besprechen. Kontakt am Ende des Artikels. Fahren wir mit dem Schritt des Hinzufügens einer Prozessgruppe fort.
Beim Debuggen des Skripts bin ich auf eine Funktion gestoßen, bei der die neueste Version von Flow nicht immer aufgerufen wird. Daher empfehle ich, diese Version zuerst zu klären:
nipyapi.versioning.get_latest_flow_ver
Bereitstellungsprozessgruppe:
nipyapi.versioning.deploy_flow_version
Wir starten Prozessoren:
nipyapi.canvas.schedule_process_group
Im CLI-Block wurde geschrieben, dass die Datenübertragung in der Remote-Prozessgruppe nicht automatisch eingeschaltet wird? Bei der Implementierung des Skripts bin ich auch auf dieses Problem gestoßen. Zu diesem Zeitpunkt gelang es mir nicht, die Datenübertragung über die API zu starten, und ich entschied mich, an den Entwickler der NiPyAPI-Bibliothek zu schreiben und um Rat / Hilfe zu bitten. Der Entwickler antwortete mir, wir diskutierten das Problem und er schrieb, dass er Zeit brauchte, um "etwas zu überprüfen". Und jetzt, nach ein paar Tagen, kommt ein Brief, in dem eine Python-Funktion geschrieben ist, die mein Startproblem löst !!! Zu dieser Zeit war die NiPyAPI-Version 0.13.3 und natürlich war nichts dergleichen drin. In der kürzlich veröffentlichten Version 0.14.0 wurde diese Funktion jedoch bereits in die Bibliothek aufgenommen. Treffen
nipyapi.canvas.set_remote_process_group_transmission
Mit der NiPyAPI-Bibliothek haben wir also die Registrierung verbunden, den Datenfluss gerollt und sogar Prozessoren und die Datenübertragung gestartet. Dann können Sie den Code kämmen, alle Arten von Prüfungen hinzufügen, protokollieren und fertig. Aber das ist eine ganz andere Geschichte.
Von den Automatisierungsoptionen, die ich in Betracht gezogen habe, schien mir letztere die effizienteste zu sein. Erstens ist dies immer noch Python-Code, in den Sie zusätzlichen Programmcode einbetten und die Programmiersprache voll ausnutzen können. Zweitens entwickelt sich das NiPyAPI-Projekt aktiv weiter und bei Problemen können Sie dem Entwickler schreiben. Drittens ist NiPyAPI immer noch ein flexibleres Tool für die Interaktion mit NiFi bei der Lösung komplexer Probleme. Zum Beispiel, um zu bestimmen, ob Nachrichtenwarteschlangen derzeit leer sind und ob die Prozessgruppe aktualisiert werden kann.
Das ist alles. Ich habe drei Ansätze zur Automatisierung der Flussübermittlung in NiFi beschrieben, die Fallstricke, auf die ein Entwickler stoßen könnte, und einen Arbeitscode zur Automatisierung der Übermittlung bereitgestellt. Wenn Sie sich genauso für dieses Thema interessieren -
schreiben Sie!