Einführung
So kam es, dass ich mich an meinem derzeitigen Arbeitsplatz mit dieser Technologie vertraut machen musste. Ich werde mit einem kleinen Hintergrund beginnen. Bei der nächsten Rallye wurde unserem Team gesagt, dass wir eine Integration mit einem bekannten System schaffen müssen . Durch die Integration sendet uns dieses bekannte System Anfragen über HTTP an einen bestimmten Endpunkt, und seltsamerweise senden wir die Antworten in Form einer SOAP-Nachricht zurück. Alles scheint einfach und trivial zu sein. Es folgt, was benötigt wird ...
Herausforderung
Erstellen Sie 3 Dienste. Der erste ist der Datenbankaktualisierungsdienst. Wenn neue Daten von einem Drittanbieter-System eingehen, aktualisiert dieser Dienst die Daten in der Datenbank und generiert eine bestimmte CSV-Datei, um sie auf das nächste System zu übertragen. Der Endpunkt des zweiten Dienstes wird aufgerufen - der FTP-Transportdienst, der die übertragene Datei empfängt, validiert und über FTP in den Dateispeicher legt. Der dritte Dienst, der Datenübertragungsdienst zum Verbraucher, arbeitet asynchron mit den ersten beiden. Es empfängt eine Anforderung von einem externen System eines Drittanbieters, die oben beschriebene Datei zu empfangen, nimmt die fertige Antwortdatei, ändert sie (aktualisiert die Felder id, description, linkToFile) und sendet die Antwort in Form einer SOAP-Nachricht. Das heißt, das Gesamtbild sieht wie folgt aus: Die ersten beiden Dienste beginnen ihre Arbeit erst, wenn die Daten für die Aktualisierung eingetroffen sind. Der dritte Dienst funktioniert ständig, da es viele Informationskonsumenten gibt, etwa 1000 Anfragen nach Datenempfang pro Minute. Dienste sind ständig verfügbar und ihre Instanzen befinden sich in verschiedenen Umgebungen wie Test, Demo, Preprod und Prod. Unten finden Sie ein Diagramm der Arbeit dieser Dienste. Ich werde sofort erklären, dass einige Details vereinfacht werden, um unnötige Komplexität zu vermeiden.

Technische Vertiefung
Bei der Planung einer Lösung für das Problem haben wir uns zunächst entschlossen, Java-Anwendungen mit dem Spring-Framework, dem Nginx-Balancer, der Postgres-Datenbank und anderen technischen und nicht sehr technischen Dingen zu erstellen. Da die Zeit für die Entwicklung einer technischen Lösung es uns ermöglichte, andere Lösungsansätze für dieses Problem in Betracht zu ziehen, fiel mein Blick auf die Apache NIFI-Technologie, die in bestimmten Kreisen in Mode war. Ich muss sofort sagen, dass diese Technologie es uns ermöglicht hat, diese 3 Dienste zu bemerken. In diesem Artikel wird die Entwicklung eines Dateiübertragungsdienstes und eines Datenübertragungsdienstes für einen Verbraucher beschrieben. Wenn der Artikel jedoch eingeht, werde ich über einen Datenaktualisierungsdienst in der Datenbank schreiben.
Was ist das
NIFI ist eine verteilte Architektur für schnelles paralleles Laden und Datenverarbeitung, eine große Anzahl von Plug-Ins für Quellen und Transformationen, Versionskonfigurationen und vieles mehr. Ein schöner Bonus ist, dass es sehr einfach zu bedienen ist. Triviale Prozesse wie getFile, sendHttpRequest und andere können als Quadrate dargestellt werden. Jedes Quadrat repräsentiert einen bestimmten Prozess, dessen Wechselwirkung in der folgenden Abbildung zu sehen ist. Eine ausführlichere Dokumentation zum Zusammenspiel der Prozessoptimierung finden Sie hier. , für diejenigen, die auf Russisch sind - hier . Die Dokumentation beschreibt perfekt, wie NIFI entpackt und ausgeführt wird sowie wie Prozesse erstellt werden. Es handelt sich um Quadrate
Die Idee, einen Artikel zu schreiben, entstand nach einer langen Suche und Strukturierung der Informationen, die in etwas Bewusstem empfangen wurden, sowie dem Wunsch, zukünftigen Entwicklern das Leben zu erleichtern.
Beispiel
Ein Beispiel dafür, wie Quadrate miteinander interagieren, wird betrachtet. Das allgemeine Schema ist recht einfach: Wir erhalten eine HTTP-Anfrage (theoretisch mit einer Datei im Hauptteil der Anfrage. Um die Funktionen von NIFI zu demonstrieren, startet die Anfrage in diesem Beispiel den Prozess des Empfangens der Datei vom lokalen PF), senden wir dann die Antwort zurück, von der die Anfrage empfangen wurde, den Prozess des Empfangens der Datei von FH und dann der Prozess des Verschiebens über FTP nach FH. Es ist zu erklären, dass Prozesse über die sogenannte flowFile miteinander interagieren. Dies ist die grundlegende Entität in NIFI, in der Attribute und Inhalte gespeichert werden. Inhalt sind die Daten, die durch die Stream-Datei dargestellt werden. Wenn Sie eine Datei von einem Quadrat erhalten und auf ein anderes übertragen haben, ist der Inhalt grob gesagt Ihre Datei.

Wie Sie sehen können, zeigt diese Abbildung den Gesamtprozess. HandleHttpRequest - akzeptiert Anforderungen, ReplaceText - generiert einen Antworttext, HandleHttpResponse - gibt eine Antwort zurück. FetchFile - empfängt eine Datei aus dem Dateispeicher und überträgt sie auf das PutSftp-Quadrat - legt diese Datei unter der angegebenen Adresse auf FTP ab. Nun mehr zu diesem Prozess.
In diesem Fall ist die Anfrage der Anfang von allem. Sehen wir uns die Konfigurationsoptionen an.

Mit Ausnahme von StandartHttpContextMap ist hier alles ziemlich trivial - dies ist ein Dienst, mit dem Sie Anfragen senden und empfangen können. Weitere Details und sogar Beispiele finden Sie
hier Weitere Informationen finden Sie in den Konfigurationsoptionen für das ReplaceText-Quadrat. Es lohnt sich, auf ReplacementValue zu achten - dies wird dem Benutzer in Form einer Antwort zurückgegeben. In den Einstellungen können Sie die Protokollierungsstufe anpassen. Protokolle können angezeigt werden {wo nifi entpackt wurde} /nifi-1.9.2/logs. Es gibt auch Fehler- / Erfolgsparameter. Basierend auf diesen Parametern können Sie den gesamten Prozess steuern. Das heißt, im Falle einer erfolgreichen Textverarbeitung wird der Prozess des Sendens einer Antwort an den Benutzer aufgerufen, und im anderen Fall verpfänden wir einfach den erfolglosen Prozess.

Die HandleHttpResponse-Eigenschaften haben nichts Besonderes außer dem Status für eine erfolgreiche Antworterstellung.

Wir haben die Anfrage mit der Antwort aussortiert. Fahren wir mit dem Empfang der Datei fort und platzieren sie auf dem FTP-Server. FetchFile - empfängt die Datei unter dem in den Einstellungen angegebenen Pfad und überträgt sie an den nächsten Prozess.

Und dann das PutSftp-Quadrat - legt die Datei im Dateispeicher ab. Konfigurationsparameter sind unten zu sehen.

Es ist zu beachten, dass jedes Quadrat ein separater Prozess ist, der gestartet werden muss. Wir haben das einfachste Beispiel untersucht, für das keine komplizierten Anpassungen erforderlich sind. Betrachten Sie als nächstes den Prozess etwas komplizierter, wo wir ein wenig auf die Rillen schreiben.
Komplexeres Beispiel
Der Datenübertragungsdienst zum Verbraucher erwies sich aufgrund des Änderungsprozesses der SOAP-Nachricht als etwas komplizierter. Der Gesamtprozess ist in der folgenden Abbildung dargestellt.

Auch hier ist die Idee nicht sehr kompliziert: Wir haben vom Verbraucher eine Anfrage erhalten, dass er Daten benötigt, eine Antwort gesendet, dass er eine Nachricht erhalten hat, den Prozess des Empfangs der Antwortdatei gestartet, diese dann mit einer bestimmten Logik bearbeitet und die Datei dann in Form einer SOAP-Nachricht an den Verbraucher an den Server übertragen.
Ich denke, es lohnt sich nicht, die Quadrate, die wir oben gesehen haben, noch einmal zu beschreiben - wir werden direkt zu den neuen gehen. Wenn Sie eine Datei bearbeiten müssen und reguläre Quadrate wie ReplaceText nicht geeignet sind, müssen Sie Ihr eigenes Skript schreiben. Dies kann mithilfe des ExecuteGroogyScript-Quadrats erfolgen. Die Einstellungen sind unten dargestellt.

Es gibt zwei Möglichkeiten, das Skript in dieses Quadrat zu laden. Das erste ist das Laden der Skriptdatei. Die zweite Möglichkeit besteht darin, ein Skript in scriptBody einzufügen. Soweit ich weiß, unterstützt das executeScript-Quadrat mehrere JPs - einer davon ist groovig. Ich enttäusche Java-Entwickler - Sie können in Java keine Skripte in solche Quadrate schreiben. Für diejenigen, die es wirklich wollen - Sie müssen Ihr eigenes Quadrat erstellen und es in das NIFI-System werfen. Diese ganze Operation wird von ziemlich langen Tänzen mit einem Tamburin begleitet, auf die wir im Rahmen dieses Artikels nicht eingehen werden. Ich habe mich für eine gute Sprache entschieden. Unten finden Sie ein Testskript, mit dem die ID in der SOAP-Nachricht einfach schrittweise aktualisiert wird. Es ist wichtig zu beachten. Sie nehmen die Datei aus flowFile, aktualisieren sie, vergessen nicht, dass Sie sie benötigen, aktualisieren sie und legen sie dort ab. Es ist auch erwähnenswert, dass nicht alle Bibliotheken verbunden sind. Es kann vorkommen, dass Sie noch eine der Bibliotheken importieren müssen. Der Nachteil ist, dass das Drehbuch in diesem Quadrat ziemlich schwer zu debütieren ist. Es gibt eine Möglichkeit, eine Verbindung zum JVM-NIFI herzustellen und den Debugging-Prozess zu starten. Persönlich habe ich eine lokale Anwendung ausgeführt und simuliert, wie eine Datei aus einer Sitzung abgerufen wird. Das Debuggen wurde auch lokal durchgeführt. Fehler, die beim Laden eines Skripts auftreten, sind recht einfach zu googeln und werden von NIFI in das Protokoll geschrieben.
import org.apache.commons.io.IOUtils import groovy.xml.XmlUtil import java.nio.charset.* import groovy.xml.StreamingMarkupBuilder def flowFile = session.get() if (!flowFile) return try { flowFile = session.write(flowFile, { inputStream, outputStream -> String result = IOUtils.toString(inputStream, "UTF-8"); def recordIn = new XmlSlurper().parseText(result) def element = recordIn.depthFirst().find { it.name() == 'id' } def newId = Integer.parseInt(element.toString()) + 1 def recordOut = new XmlSlurper().parseText(result) recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString() outputStream.write(res.getBytes(StandardCharsets.UTF_8)) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(Exception e) { log.error("Error during processing of validate.groovy", e) session.transfer(flowFile, REL_FAILURE) }
Eigentlich endet hier die Anpassung des Quadrats. Als nächstes wird die aktualisierte Datei auf das Quadrat übertragen, das die Datei an den Server sendet. Unten sind die Einstellungen für dieses Quadrat.

Wir beschreiben die Methode, mit der die SOAP-Nachricht übertragen wird. Wir schreiben wohin. Als nächstes müssen Sie angeben, dass dies genau SOAP ist.

Fügen Sie einige Eigenschaften wie Host und Aktion (soapAction) hinzu. Speichern, überprüfen. Weitere Informationen zum Senden von SOAP-Anfragen finden Sie hier.
Wir haben uns verschiedene Verwendungszwecke von NIFI-Prozessen angesehen. Wie sie interagieren und welchen wirklichen Nutzen sie haben. Die betrachteten Beispiele sind Testbeispiele und unterscheiden sich geringfügig von dem, was im Kampf real ist. Ich hoffe, dieser Artikel ist ein wenig nützlich für Entwickler. Vielen Dank für Ihre Aufmerksamkeit. Wenn Sie Fragen haben - schreiben Sie. Ich werde versuchen zu antworten.