Hola a todos!

La tarea es la siguiente: hay un flujo, presentado en la imagen de arriba, que debe implementarse en N servidores con
Apache NiFi . Prueba de flujo: el archivo se genera y se envía a otra instancia de NiFi. Los datos se transmiten utilizando el protocolo de sitio a sitio de NiFi.
NiFi Site to Site (S2S) es una forma segura y fácilmente personalizable de transferir datos entre instancias de NiFi. Vea cómo funciona el S2S en la documentación y es importante no olvidar configurar la instancia de NiFi para habilitar S2S. Consulte aquí .
En esos casos, cuando se trata de la transferencia de datos mediante S2S, una instancia se denomina cliente, el segundo servidor. El cliente envía datos, el servidor envía. Dos formas de configurar la transferencia de datos entre ellos:
- Empujar Desde una instancia de cliente, los datos se envían utilizando el Grupo de proceso remoto (RPG). En una instancia de servidor, los datos se reciben utilizando el puerto de entrada.
- Tirar El servidor recibe datos utilizando el RPG, el cliente envía utilizando el puerto de salida.
Almacenamos el flujo para rodar en el registro de Apache.
Apache NiFi Registry es un subproyecto de Apache NiFi que proporciona una herramienta para almacenar el flujo y el control de versiones. Una especie de imbécil. La información sobre la instalación, configuración y trabajo con el registro se puede encontrar en la documentación oficial . El flujo para el almacenamiento se combina en un grupo de procesos y se almacena como tal en el registro. Más adelante en el artículo volveremos a esto.
Al principio, cuando N es un número pequeño, el flujo se entrega y actualiza manualmente en un tiempo aceptable.
Pero con el crecimiento de N, hay más problemas:
- El flujo de actualización lleva más tiempo. Es necesario ir a todos los servidores.
- Hay errores al actualizar las plantillas. Aquí se actualizaron, pero aquí se olvidaron
- errores humanos al realizar una gran cantidad de operaciones del mismo tipo
Todo esto nos lleva al hecho de que necesitamos automatizar el proceso. Intenté las siguientes formas de resolver este problema:
- Use MiNiFi en lugar de NiFi
- NiFi CLI
- NiPyAPI
Usando MiNiFi
Apache MiNiFy es un subproyecto de Apache NiFi. MiNiFy es un agente compacto que utiliza los mismos procesadores que NiFi, lo que le permite crear el mismo flujo que en NiFi. La ligereza del agente se logra, entre otras cosas, debido a que MiNiFy no tiene una interfaz gráfica para la configuración del flujo. La falta de una interfaz gráfica en MiNiFy significa que es necesario resolver el problema de la entrega de flujo en minifi. Como MiNiFy se usa activamente en IOT, hay muchos componentes y el proceso de entrega de flujo a las instancias finales de minifi necesita ser automatizado. Una tarea familiar, ¿verdad?
Otro subproyecto ayudará a resolver este problema: MiNiFi C2 Server. Este producto está destinado a ser un punto central en la arquitectura de las configuraciones continuas. Cómo configurar el entorno: descrito en
este artículo sobre Habré y suficiente información para resolver la tarea. MiNiFi junto con el servidor C2 actualiza automáticamente la configuración en casa. El único inconveniente de este enfoque es que tiene que crear plantillas en el servidor C2, una confirmación de registro simple no es suficiente.
La opción descrita en el artículo anterior funciona y no es difícil de implementar, pero no olvide lo siguiente:
- En minifi no hay todos los procesadores de nifi
- Las versiones de procesador en Minifi se quedan atrás de las versiones de procesador en NiFi.
Al momento de escribir, la última versión de NiFi es 1.9.2. La versión del procesador de la última versión de MiNiFi es 1.7.0. Se pueden agregar procesadores a MiNiFi, pero debido a las discrepancias de versión entre los procesadores NiFi y MiNiFi esto puede no funcionar.
NiFi CLI
A juzgar por la
descripción de la herramienta en el sitio web oficial, es una herramienta para automatizar la interacción de NiFI y NiFi Registry en el campo de la entrega de flujo o el control de procesos. Para comenzar, esta herramienta debe descargarse
desde aquí .
Ejecute la utilidad
./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete.
Para que podamos cargar el flujo necesario desde el registro, necesitamos conocer los identificadores de la cesta (identificador de depósito) y el flujo mismo (identificador de flujo). Estos datos se pueden obtener a través de cli o en la interfaz web del registro NiFi. La interfaz web se ve así:

El uso de la CLI hace esto:
#> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Comenzamos la importación del grupo de procesos desde el registro:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3
El punto importante es que cualquier instancia de nifi se puede especificar como el host en el que rodamos el grupo de procesos.
Grupo de procesos agregado con procesadores detenidos, deben iniciarse
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Genial, los procesadores comenzaron. Sin embargo, de acuerdo con las condiciones del problema, necesitamos instancias de NiFi para enviar datos a otras instancias. Suponga que selecciona el método Push para transferir datos al servidor. Para organizar la transferencia de datos, debe habilitar la transmisión de datos (Habilitar transmisión) en el Grupo de proceso remoto (RPG) agregado, que ya está incluido en nuestro flujo.

En la documentación en la CLI y otras fuentes, no encontré una manera de habilitar la transferencia de datos. Si sabe cómo hacer esto, escriba los comentarios.
Como tenemos bash y estamos listos para llegar al final, ¡encontraremos una salida! Puede usar la API NiFi para resolver este problema. Utilizamos el siguiente método, tomamos la ID de los ejemplos anteriores (en nuestro caso es 7f522a13-016e-1000-e504-d5b15587f2f3). Descripción de los métodos API de NiFi
aquí .

En cuerpo, debe pasar JSON, de la siguiente forma:
{ "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true }
Parámetros que deben rellenarse para "trabajar":
estado :
estado de transferencia de datos. TRANSMISIÓN disponible para permitir la transferencia de datos, DETENIDO para apagar
version - versión del procesador
la versión predeterminada será 0 cuando se cree, pero estos parámetros se pueden obtener utilizando el método

Para los amantes de los scripts de bash, este método puede parecer adecuado, pero es difícil para mí: los scripts de bash no son mis favoritos. El siguiente método es más interesante y cómodo en mi opinión.
NiPyAPI
NiPyAPI es una biblioteca de Python para interactuar con instancias de NiFi.
La página de documentación contiene la información necesaria para trabajar con la biblioteca. El inicio rápido se describe en un
proyecto github.
Nuestro script para implementar la configuración es un programa Python. Pasamos a la codificación.
Configuramos configuraciones para más trabajo. Necesitaremos los siguientes parámetros:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' # nifi-api , process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' # nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry, nifi nipyapi.config.bucket_name = 'BucketName' # bucket, flow nipyapi.config.flow_name = 'FlowName' # flow,
Además, insertaré los nombres de los métodos de esta biblioteca, que se describen
aquí .
Conecte el registro a la instancia de nifi con
nipyapi.versioning.create_registry_client
En este paso, también puede agregar una verificación de que el registro ya se ha agregado a la instancia; para esto, puede usar el método
nipyapi.versioning.list_registry_clients
Encuentre un balde para seguir buscando flujo en la canasta.
nipyapi.versioning.get_registry_bucket
Buscar cubo para flujo
nipyapi.versioning.get_flow_in_bucket
Además, es importante comprender si este grupo de procesos ya se ha agregado. El grupo de procesos se coloca en las coordenadas y puede surgir una situación cuando se superpone una segunda encima de un componente. Verifiqué, esto puede ser :) Para agregar todo el grupo de procesos, utilizamos el método
nipyapi.canvas.list_all_process_groups
y luego podemos buscar, por ejemplo, por nombre.
No describiré el proceso de actualización de la plantilla, solo diré que si se agregan procesadores en la nueva versión de la plantilla, entonces no hay problemas con la presencia de mensajes en las colas. Pero si se eliminan los procesadores, entonces pueden surgir problemas (nifi no permite que se elimine el procesador si se ha acumulado una cola de mensajes frente a él). Si está interesado en cómo resolví este problema, escríbame, por favor, discutiremos este punto. Contacto al final del artículo. Pasemos al paso de agregar un grupo de procesos.
Al depurar el script, me encontré con una característica de que la última versión del flujo no siempre se extrae, por lo que recomiendo que esta versión se aclare primero:
nipyapi.versioning.get_latest_flow_ver
Grupo de proceso de implementación:
nipyapi.versioning.deploy_flow_version
Iniciamos procesadores:
nipyapi.canvas.schedule_process_group
En el bloque CLI, ¿se escribió que la transferencia de datos no se activa automáticamente en el grupo de proceso remoto? Al implementar el script, me encontré con este problema también. En ese momento, no tuve éxito en iniciar la transferencia de datos utilizando la API y decidí escribir al desarrollador de la biblioteca NiPyAPI y pedirle consejo / ayuda. El desarrollador me respondió, discutimos el problema y escribió que necesitaba tiempo para "verificar algo". Y ahora, después de un par de días, llega una carta en la que se escribe una función de Python que resuelve mi problema de lanzamiento. En ese momento, la versión NiPyAPI era 0.13.3 y, por supuesto, no había nada de eso en ella. Pero en la versión 0.14.0, que se lanzó recientemente, esta función ya se ha incluido en la biblioteca. Reunirse
nipyapi.canvas.set_remote_process_group_transmission
Entonces, usando la biblioteca NiPyAPI, conectamos el registro, el flujo continuo e incluso comenzamos los procesadores y la transferencia de datos. Luego puede peinar el código, agregar todo tipo de comprobaciones, registros y eso es todo. Pero esta es una historia completamente diferente.
De las opciones de automatización que consideré, esta última me pareció la más eficiente. En primer lugar, este sigue siendo código de Python, en el que puede incrustar código de programa auxiliar y aprovechar al máximo el lenguaje de programación. En segundo lugar, el proyecto NiPyAPI se está desarrollando activamente y, en caso de problemas, puede escribir al desarrollador. En tercer lugar, NiPyAPI sigue siendo una herramienta más flexible para interactuar con NiFi para resolver problemas complejos. Por ejemplo, al determinar si las colas de mensajes están vacías ahora en flujo y si el grupo de procesos se puede actualizar.
Eso es todo. Describí 3 enfoques para automatizar la entrega de flujo en NiFi, dificultades que un desarrollador puede encontrar y di un código de trabajo para automatizar la entrega. Si está tan interesado en este tema,
¡escriba!