Bonjour à tous!

La tâche est la suivante - il y a un flux, présenté dans l'image ci-dessus, qui doit être déployé sur N serveurs avec
Apache NiFi . Test de flux - le fichier est généré et envoyé à une autre instance NiFi. Les données sont transmises à l'aide du protocole NiFi Site to Site.
NiFi Site to Site (S2S) est un moyen sécurisé et facilement personnalisable de transférer des données entre des instances NiFi. Voir comment fonctionne le S2S dans la documentation et il est important de ne pas oublier de configurer l'instance NiFi pour activer S2S voir ici .
Dans ces cas, lorsqu'il s'agit de transfert de données à l'aide de S2S - une instance est appelée le client, le deuxième serveur. Le client envoie des données, le serveur envoie. Deux façons de configurer le transfert de données entre eux:
- Poussez À partir d'une instance client, les données sont envoyées à l'aide du groupe de processus distant (RPG). Sur une instance de serveur, les données sont reçues via le port d'entrée.
- Tirer Le serveur reçoit des données en utilisant le RPG, le client envoie en utilisant le port de sortie.
Nous stockons le flux pour le rouler dans le registre Apache.
Apache NiFi Registry est un sous-projet d'Apache NiFi qui fournit un outil pour stocker le contrôle de flux et de version. Une sorte de connard. Vous trouverez des informations sur l'installation, la configuration et l'utilisation du registre dans la documentation officielle . Le flux de stockage est combiné dans un groupe de processus et stocké en tant que tel dans le registre. Plus loin dans l'article, nous y reviendrons.
Au début, lorsque N est un petit nombre, le flux est délivré et mis à jour manuellement dans un délai acceptable.
Mais avec la croissance de N, il y a plus de problèmes:
- la mise à jour du flux prend plus de temps. Il faut aller sur tous les serveurs
- il y a des erreurs lors de la mise à jour des modèles. Ici, ils ont mis à jour, mais ici, ils ont oublié
- erreurs humaines lors de l'exécution d'un grand nombre d'opérations du même type
Tout cela nous amène au fait que nous devons automatiser le processus. J'ai essayé les moyens suivants pour résoudre ce problème:
- Utilisez MiNiFi au lieu de NiFi
- CLI NiFi
- NiPyAPI
Utiliser MiNiFi
Apache MiNiFy est un sous-projet d'Apache NiFi. MiNiFy est un agent compact qui utilise les mêmes processeurs que NiFi, vous permettant de créer le même flux qu'en NiFi. La légèreté de l'agent est obtenue, entre autres, du fait que MiNiFy ne possède pas d'interface graphique pour la configuration du flux. L'absence d'une interface graphique dans MiNiFy signifie qu'il est nécessaire de résoudre le problème de livraison de flux en minifi. Étant donné que MiNiFy est activement utilisé dans l'IOT, il existe de nombreux composants et le processus de distribution du flux vers les instances minifi finales doit être automatisé. Une tâche familière, non?
Un autre sous-projet aidera à résoudre ce problème - le serveur MiNiFi C2. Ce produit est destiné à être un point central dans l'architecture des configurations roulantes. Comment configurer l'environnement - décrit dans
cet article sur Habré et suffisamment d'informations pour résoudre la tâche. MiNiFi conjointement avec le serveur C2 met à jour automatiquement la configuration à la maison. Le seul inconvénient de cette approche est que vous devez créer des modèles sur C2 Server, une simple validation de registre ne suffit pas.
L'option décrite dans l'article ci-dessus fonctionne et n'est pas difficile à implémenter, mais n'oubliez pas ce qui suit:
- Dans minifi, il n'y a pas tous les processeurs de nifi
- Les versions de processeur dans Minifi sont à la traîne des versions de processeur dans NiFi.
Au moment d'écrire ces lignes, la dernière version de NiFi est la 1.9.2. La version processeur de la dernière version MiNiFi est 1.7.0. Des processeurs peuvent être ajoutés à MiNiFi, mais en raison des différences de version entre les processeurs NiFi et MiNiFi, cela peut ne pas fonctionner.
CLI NiFi
A en juger par la
description de l' outil sur le site officiel, il s'agit d'un outil pour automatiser l'interaction de NiFI et NiFi Registry dans le domaine de la livraison de flux ou du contrôle de processus. Pour commencer, cet outil doit être téléchargé à
partir d'ici .
Exécutez l'utilitaire
./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete.
Pour que nous puissions charger le flux nécessaire à partir du registre, nous devons connaître les identifiants du panier (identifiant de compartiment) et le flux lui-même (identifiant de flux). Ces données peuvent être obtenues soit via cli, soit dans l'interface web du registre NiFi. L'interface Web ressemble à ceci:

L'utilisation de l'interface CLI fait ceci:
#> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Nous commençons l'importation du groupe de processus à partir du registre:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3
Le point important est que toute instance nifi peut être spécifiée comme l'hôte sur lequel nous roulons le groupe de processus.
Groupe de processus ajouté avec des processeurs arrêtés, ils doivent être démarrés
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Génial, les processeurs ont commencé. Cependant, selon les conditions du problème, nous avons besoin d'instances NiFi pour envoyer des données à d'autres instances. Supposons que vous sélectionnez la méthode Push pour transférer des données vers le serveur. Afin d'organiser le transfert de données, vous devez activer la transmission de données (Activer la transmission) sur le groupe de processus distant (RPG) ajouté, qui est déjà inclus dans notre flux.

Dans la documentation de l'interface CLI et d'autres sources, je n'ai pas trouvé de moyen d'activer le transfert de données. Si vous savez comment procéder, veuillez écrire dans les commentaires.
Puisque nous avons bash et que nous sommes prêts à aller jusqu'au bout - nous trouverons une issue! Vous pouvez utiliser l'API NiFi pour résoudre ce problème. Nous utilisons la méthode suivante, nous prenons l'ID des exemples ci-dessus (dans notre cas, c'est 7f522a13-016e-1000-e504-d5b15587f2f3). Description des méthodes d'API NiFi
ici .

Dans le corps, vous devez passer JSON, de la forme suivante:
{ "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true }
Paramètres à remplir pour "fonctionner":
état -
état du transfert de données. TRANSMISSION disponible pour activer le transfert de données, ARRÊTÉ pour désactiver
version - version processeur
la version sera par défaut à 0 lors de la création, mais ces paramètres peuvent être obtenus en utilisant la méthode

Pour les amateurs de scripts bash, cette méthode peut sembler appropriée, mais c'est difficile pour moi - les scripts bash ne sont pas mes préférés. La méthode suivante est à mon avis plus intéressante et confortable.
NiPyAPI
NiPyAPI est une bibliothèque Python pour interagir avec des instances NiFi.
La page de documentation contient les informations nécessaires pour travailler avec la bibliothèque. Le démarrage rapide est décrit dans un
projet github.
Notre script pour déployer la configuration est un programme Python. On passe au codage.
Nous configurons les configurations pour un travail ultérieur. Nous aurons besoin des paramètres suivants:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' # nifi-api , process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' # nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry, nifi nipyapi.config.bucket_name = 'BucketName' # bucket, flow nipyapi.config.flow_name = 'FlowName' # flow,
En outre, je vais insérer les noms des méthodes de cette bibliothèque, qui sont décrites
ici .
Connectez le registre à l'instance nifi avec
nipyapi.versioning.create_registry_client
À cette étape, vous pouvez également ajouter une vérification que le registre a déjà été ajouté à l'instance; pour cela, vous pouvez utiliser la méthode
nipyapi.versioning.list_registry_clients
Trouvez un seau pour rechercher davantage le flux dans le panier.
nipyapi.versioning.get_registry_bucket
Seau de recherche pour le flux
nipyapi.versioning.get_flow_in_bucket
De plus, il est important de comprendre si ce groupe de processus a déjà été ajouté. Le groupe de processus est placé dans les coordonnées et une situation peut se produire lorsqu'un second est superposé au-dessus d'un composant. J'ai vérifié, cela peut être :) Pour obtenir tout le groupe de processus ajouté, nous utilisons la méthode
nipyapi.canvas.list_all_process_groups
puis nous pouvons rechercher, par exemple par nom.
Je ne décrirai pas le processus de mise à jour du modèle, je dirai seulement que si des processeurs sont ajoutés dans la nouvelle version du modèle, il n'y a aucun problème avec la présence de messages dans les files d'attente. Mais si les processeurs sont supprimés, des problèmes peuvent survenir (nifi ne permet pas de supprimer le processeur si une file d'attente de messages s'est accumulée devant lui). Si vous êtes intéressé par la façon dont j'ai résolu ce problème - écrivez-moi, s'il vous plaît, nous discuterons de ce point. Contact en fin d'article. Passons à l'étape d'ajout d'un groupe de processus.
Lors du débogage du script, je suis tombé sur une fonctionnalité selon laquelle la dernière version du flux n'est pas toujours affichée, je recommande donc que cette version soit clarifiée en premier:
nipyapi.versioning.get_latest_flow_ver
Groupe de processus de déploiement:
nipyapi.versioning.deploy_flow_version
Nous démarrons des processeurs:
nipyapi.canvas.schedule_process_group
Dans le bloc CLI, il a été écrit que le transfert de données n'est pas automatiquement activé dans le groupe de processus distant? Lors de la mise en œuvre du script, j'ai également rencontré ce problème. À cette époque, je n'ai pas réussi à démarrer le transfert de données à l'aide de l'API et j'ai décidé d'écrire au développeur de la bibliothèque NiPyAPI et de demander conseil / aide. Le développeur m'a répondu, nous avons discuté du problème et il a écrit qu'il avait besoin de temps pour «vérifier quelque chose». Et maintenant, après quelques jours, une lettre arrive dans laquelle une fonction Python est écrite qui résout mon problème de lancement !!! À cette époque, la version NiPyAPI était 0.13.3 et, bien sûr, il n'y avait rien de tel en elle. Mais dans la version 0.14.0, qui a été publiée récemment, cette fonction a déjà été incluse dans la bibliothèque. Rencontrez
nipyapi.canvas.set_remote_process_group_transmission
Ainsi, en utilisant la bibliothèque NiPyAPI, nous avons connecté le registre, roulé le flux et même démarré les processeurs et le transfert de données. Ensuite, vous pouvez peigner le code, ajouter toutes sortes de vérifications, de journalisation, et c'est tout. Mais c'est une histoire complètement différente.
Parmi les options d'automatisation que j'ai envisagées, cette dernière m'a semblé la plus efficace. Tout d'abord, il s'agit toujours de code python, dans lequel vous pouvez intégrer du code de programme auxiliaire et tirer pleinement parti du langage de programmation. Deuxièmement, le projet NiPyAPI se développe activement et en cas de problème, vous pouvez écrire au développeur. Troisièmement, NiPyAPI est toujours un outil plus flexible pour interagir avec NiFi dans la résolution de problèmes complexes. Par exemple, pour déterminer si les files d'attente de messages sont vides dans le flux et si le groupe de processus peut être mis à jour.
C’est tout. J'ai décrit 3 approches pour automatiser la livraison de flux en NiFi, les pièges qu'un développeur peut rencontrer et donné un code de travail pour automatiser la livraison. Si vous êtes tout aussi intéressé par ce sujet -
écrivez!