Introduccion
Sucedió que en mi lugar de trabajo actual tuve que familiarizarme con esta tecnología. Comenzaré con un poco de historia. En el próximo rally, a nuestro equipo le dijeron que teníamos que crear integración con un sistema conocido . Por integración, se entendió que este conocido sistema enviaría solicitudes a través de HTTP a un punto final específico y, curiosamente, enviaríamos las respuestas en forma de mensaje SOAP. Todo parece ser simple y trivial. Sigue lo que se necesita ...
Desafío
Crea 3 servicios. El primero es el Servicio de actualización de la base de datos. Este servicio, cuando llegan nuevos datos de un sistema de terceros, actualiza los datos en la base de datos y genera un cierto archivo CSV para transferirlos al siguiente sistema. Se llama al punto final del segundo servicio: el servicio de transporte FTP, que recibe el archivo transferido, lo valida y lo coloca en el almacenamiento de archivos a través de FTP. El tercer servicio, el Servicio de transferencia de datos para el consumidor, funciona de forma asíncrona con los dos primeros. Recibe una solicitud de un sistema externo de terceros, para recibir el archivo discutido anteriormente, toma el archivo de respuesta terminado, lo modifica (actualiza los campos id, description, linkToFile) y envía la respuesta en forma de mensaje SOAP. Es decir, el panorama general es el siguiente: los dos primeros servicios comienzan su trabajo solo cuando llegan los datos para la actualización. El tercer servicio funciona constantemente ya que hay muchos consumidores de información, alrededor de 1000 solicitudes para recibir datos por minuto. Los servicios están constantemente disponibles y sus instancias están ubicadas en diferentes entornos, como prueba, demostración, preprod y prod. A continuación se muestra un diagrama del trabajo de estos servicios. Explicaré de inmediato que algunos detalles se simplifican para evitar una complejidad innecesaria.

Profundización técnica
Cuando planificamos una solución al problema, primero decidimos hacer aplicaciones Java utilizando Spring Framework, el equilibrador Nginx, la base de datos Postgres y otras cosas técnicas y no muy importantes. Dado que el tiempo para desarrollar una solución técnica nos permitió considerar otros enfoques para resolver este problema, mis ojos se centraron en la tecnología Apache NIFI, de moda en ciertos círculos. Debo decir de inmediato que esta tecnología nos permitió notar estos 3 servicios. Este artículo describirá el desarrollo de un servicio de transferencia de archivos y un servicio de transferencia de datos a un consumidor, sin embargo, si el artículo llega, escribiré sobre un servicio de actualización de datos en la base de datos.
Que es esto
NIFI es una arquitectura distribuida para la carga paralela rápida y el procesamiento de datos, una gran cantidad de complementos para fuentes y transformaciones, configuraciones de versiones y mucho más. Una buena ventaja es que es muy fácil de usar. Los procesos triviales, como getFile, sendHttpRequest y otros, se pueden representar como cuadrados. Cada cuadrado representa un cierto proceso, cuya interacción se puede ver en la figura a continuación. Aquí se escribe documentación más detallada sobre la interacción del ajuste del proceso . , para los que están en ruso, aquí . La documentación describe perfectamente cómo desempaquetar y ejecutar NIFI, así como cómo crear procesos, son cuadrados
La idea de escribir un artículo nació después de una larga búsqueda y estructuración de la información recibida en algo consciente, así como el deseo de facilitar la vida de los futuros desarrolladores.
Ejemplo
Se considera un ejemplo de cómo los cuadrados interactúan entre sí. El esquema general es bastante simple: recibimos una solicitud HTTP (en teoría, con un archivo en el cuerpo de la solicitud. Para demostrar las capacidades de NIFI, en este ejemplo, la solicitud inicia el proceso de recibir el archivo desde el PF local), luego enviamos la respuesta de que la solicitud fue recibida, el proceso de recibir el archivo desde FH y luego el proceso de moverlo a través de FTP a FH. Vale la pena explicar que los procesos interactúan entre sí a través del llamado flowFile. Esta es la entidad básica en NIFI que almacena atributos y contenido. El contenido son los datos representados por el archivo continuo. En términos generales, si recibió un archivo de un cuadrado y lo transfirió a otro, el contenido será su archivo.

Como puede ver, esta figura representa el proceso general. HandleHttpRequest: acepta solicitudes, ReplaceText: genera un cuerpo de respuesta, HandleHttpResponse: devuelve una respuesta. FetchFile: recibe un archivo del almacenamiento de archivos y lo transfiere al cuadrado PutSftp: coloca este archivo en FTP en la dirección especificada. Ahora más sobre este proceso.
En este caso, la solicitud es el comienzo de todo. Veamos sus opciones de configuración.

Todo aquí es bastante trivial con la excepción de StandartHttpContextMap: este es un servicio que le permite enviar y recibir solicitudes. Puedes ver más detalles e incluso ejemplos
aquí A continuación, vea las opciones de configuración del cuadrado Reemplazar texto. Vale la pena prestar atención a ReplacementValue: esto es lo que devolverá al usuario en forma de respuesta. En la configuración puede ajustar el nivel de registro, los registros se pueden ver {donde se descomprimió nifi} /nifi-1.9.2/logs también hay parámetros de falla / éxito; en función de estos parámetros, puede controlar todo el proceso. Es decir, en el caso de un procesamiento de texto exitoso, se invoca el proceso de enviar una respuesta al usuario, y en otro caso, simplemente nos comprometemos con el proceso fallido.

Las propiedades HandleHttpResponse no tienen nada especial excepto el estado para la creación exitosa de respuestas.

Resolvimos la solicitud con la respuesta: pasemos a recibir el archivo y colocarlo en el servidor FTP. FetchFile: recibe el archivo en la ruta especificada en la configuración y lo transfiere al siguiente proceso.

Y luego el cuadrado PutSftp: coloca el archivo en el almacenamiento de archivos. Los parámetros de configuración se pueden ver a continuación.

Vale la pena prestar atención al hecho de que cada cuadrado es un proceso separado que debe iniciarse. Examinamos el ejemplo más simple que no requiere ninguna personalización complicada. Luego, considere el proceso un poco más complicado, donde escribimos un poco en los surcos.
Ejemplo más complejo
El servicio de transferencia de datos al consumidor resultó ser un poco más complicado debido al proceso de modificación del mensaje SOAP. El proceso general se presenta en la figura a continuación.

Aquí, la idea tampoco es muy complicada: recibimos una solicitud del consumidor de que necesitaban datos, enviamos una respuesta de que recibieron un mensaje, comenzamos el proceso de recibir el archivo de respuesta, luego lo editamos con cierta lógica y luego transferimos el archivo al consumidor en forma de mensaje SOAP al servidor.
Creo que no vale la pena volver a describir esos cuadrados que vimos arriba: iremos directamente a los nuevos. Si necesita editar un archivo y los cuadrados regulares como Reemplazar texto no son adecuados, tendrá que escribir su propio script. Esto se puede hacer usando el cuadrado ExecuteGroogyScript. Su configuración se presenta a continuación.

Hay dos opciones para cargar el script en este cuadrado. El primero es cargando el archivo de script. El segundo es mediante la inserción de un script en scriptBody. Hasta donde yo sé, el cuadrado executeScript admite varios JP, uno de ellos es maravilloso. Estoy decepcionando a los desarrolladores de Java: no se pueden escribir scripts en dichos cuadrados en Java. Para aquellos que realmente quieran, debe crear su propio cuadrado personalizado y lanzarlo al sistema NIFI. Toda esta operación va acompañada de bailes bastante largos con una pandereta, que no trataremos en el marco de este artículo. Elegí un lenguaje maravilloso. A continuación se muestra un script de prueba que simplemente actualiza incrementalmente la identificación en el mensaje SOAP. Es importante tener en cuenta. Tome el archivo de flowFile, actualícelo, no olvide que lo necesita, actualizado, vuelva a colocarlo allí. También vale la pena señalar que no todas las bibliotecas están conectadas. Puede suceder que todavía tenga que importar una de las librerías. La desventaja es que el guión en esta plaza es bastante difícil de debutar. Hay una manera de conectarse a JVM NIFI e iniciar el proceso de depuración. Personalmente, ejecuté una aplicación local y simulé obtener un archivo de una sesión. La depuración también se realizó localmente. Los errores que salen al cargar un script son bastante fáciles de google y NIFI los escribe en el registro.
import org.apache.commons.io.IOUtils import groovy.xml.XmlUtil import java.nio.charset.* import groovy.xml.StreamingMarkupBuilder def flowFile = session.get() if (!flowFile) return try { flowFile = session.write(flowFile, { inputStream, outputStream -> String result = IOUtils.toString(inputStream, "UTF-8"); def recordIn = new XmlSlurper().parseText(result) def element = recordIn.depthFirst().find { it.name() == 'id' } def newId = Integer.parseInt(element.toString()) + 1 def recordOut = new XmlSlurper().parseText(result) recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString() outputStream.write(res.getBytes(StandardCharsets.UTF_8)) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(Exception e) { log.error("Error during processing of validate.groovy", e) session.transfer(flowFile, REL_FAILURE) }
En realidad, en esto termina la personalización de los cuadrados. A continuación, el archivo actualizado se transfiere al cuadrado, que se encarga de enviar el archivo al servidor. A continuación se encuentran las configuraciones para este cuadrado.

Describimos el método por el cual se transmitirá el mensaje SOAP. Escribimos a donde. A continuación, debe indicar que esto es exactamente SOAP.

Agregue algunas propiedades como host y action (soapAction). Guardar, consultar. Puede encontrar más detalles sobre cómo enviar solicitudes SOAP aquí.
Analizamos varios usos de los procesos NIFI. Cómo interactúan y qué beneficio real tienen. Los ejemplos considerados son de prueba y son ligeramente diferentes de lo que es real en la batalla. Espero que este artículo sea un poco útil para los desarrolladores. Gracias por su atencion Si tiene alguna pregunta, escriba. Intentaré responder