Apache NIFI - Uma Breve Visão Geral dos Recursos em Prática

1. Introdução


Aconteceu que no meu local de trabalho atual eu tive que me familiarizar com essa tecnologia. Vou começar com um pouco de experiência. No próximo comício, nossa equipe foi informada de que precisamos criar integração com um sistema conhecido . Por integração, entendeu-se que esse sistema conhecido enviaria solicitações via HTTP para um terminal específico e, curiosamente, enviaríamos as respostas de volta na forma de uma mensagem SOAP. Tudo parece ser simples e trivial. Segue o que é necessário ...


Desafio


Crie 3 serviços. O primeiro é o Serviço de Atualização de Banco de Dados. Esse serviço, quando novos dados chegam de um sistema de terceiros, atualiza os dados no banco de dados e gera um determinado arquivo CSV para transferi-los para o próximo sistema. O ponto final do segundo serviço é chamado - o Serviço de Transporte FTP, que recebe o arquivo transferido, o valida e o coloca no armazenamento de arquivos via FTP. O terceiro serviço, o Serviço de Transferência de Dados para o consumidor, funciona de forma assíncrona com os dois primeiros. Ele recebe uma solicitação de um sistema externo de terceiros, para receber o arquivo discutido acima, pega o arquivo de resposta finalizado, modifica-o (atualiza os campos id, descrição, linkToFile) e envia a resposta na forma de uma mensagem SOAP. Ou seja, a imagem geral é a seguinte: os dois primeiros serviços só começam seu trabalho quando os dados para atualização chegam. O terceiro serviço trabalha constantemente, pois há muitos consumidores de informações, cerca de 1000 solicitações para recebimento de dados por minuto. Os serviços estão constantemente disponíveis e suas instâncias estão localizadas em diferentes ambientes, como teste, demonstração, pré-produto e produto. Abaixo está um diagrama do trabalho desses serviços. Explicarei imediatamente que alguns detalhes são simplificados para evitar complexidade desnecessária.



Aprofundamento técnico


Ao planejar uma solução para o problema, primeiro decidimos criar aplicativos java usando a estrutura Spring, o Nginx balancer, o banco de dados Postgres e outras coisas técnicas e não muito. Desde que o tempo para desenvolver uma solução técnica nos permitiu considerar outras abordagens para resolver esse problema, meus olhos se voltaram para a tecnologia NIFI Apache, na moda em certos círculos. Devo dizer imediatamente que essa tecnologia nos permitiu perceber esses três serviços. Este artigo descreve o desenvolvimento de um serviço de transferência de arquivos e um serviço de transferência de dados para um consumidor; no entanto, se o artigo entrar, escreverei sobre um serviço de atualização de dados no banco de dados.


O que é isso


NIFI é uma arquitetura distribuída para carregamento paralelo rápido e processamento de dados, um grande número de plug-ins para origens e transformações, configurações de versão e muito mais. Um bom bônus é que é muito fácil de usar. Processos triviais, como getFile, sendHttpRequest e outros, podem ser representados como quadrados. Cada quadrado representa um certo processo, cuja interação pode ser vista na figura abaixo. Documentação mais detalhada sobre a interação do ajuste do processo está escrita aqui. , para quem está em russo - aqui . A documentação descreve perfeitamente como descompactar e executar o NIFI, além de como criar processos, eles são quadrados
A idéia de escrever um artigo nasceu após uma longa pesquisa e estruturação das informações recebidas em algo consciente, além do desejo de facilitar a vida de futuros desenvolvedores.


Exemplo


Um exemplo de como os quadrados interagem é considerado. O esquema geral é bastante simples: recebemos uma solicitação HTTP (em teoria, com um arquivo no corpo da solicitação. Para demonstrar os recursos do NIFI, neste exemplo, a solicitação inicia o processo de recebimento do arquivo pelo PF local), enviamos de volta a resposta de que o pedido foi recebido, o processo de recebimento do arquivo de FH e, em seguida, o processo de movê-lo através de FTP para FH. Vale a pena explicar que os processos interagem entre si por meio do chamado flowFile. Esta é a entidade básica no NIFI que armazena atributos e conteúdo. Conteúdo são os dados representados pelo arquivo de fluxo. Grosso modo, se você recebeu um arquivo de um quadrado e o transferiu para outro, o conteúdo será o seu arquivo.



Como você pode ver, esta figura mostra o processo geral. HandleHttpRequest - aceita solicitações, ReplaceText - gera um corpo de resposta, HandleHttpResponse - retorna uma resposta. FetchFile - recebe um arquivo do armazenamento e o transfere para o quadrado PutSftp - coloca esse arquivo no FTP no endereço especificado. Agora mais sobre esse processo.


Nesse caso, a solicitação é o começo de tudo. Vamos ver suas opções de configuração.



Tudo aqui é bastante trivial, com exceção do StandartHttpContextMap - este é um serviço que permite enviar e receber solicitações. Você pode ver mais detalhes e até exemplos aqui

Em seguida, consulte as opções de configuração quadrada ReplaceText. Vale a pena prestar atenção ao ReplacementValue - é isso que retornará ao usuário na forma de uma resposta. Nas configurações, você pode ajustar o nível de registro, os registros podem ser visualizados {onde nifi foi descompactado} /nifi-1.9.2/logs também existem parâmetros de falha / sucesso - com base nesses parâmetros, você pode controlar todo o processo. Ou seja, no caso de processamento de texto bem-sucedido, o processo de envio de uma resposta ao usuário é invocado e, no outro caso, simplesmente prometemos o processo sem êxito.



As propriedades HandleHttpResponse não têm nada de especial, exceto o status para criação de resposta bem-sucedida.



Classificamos a solicitação com a resposta - vamos continuar recebendo o arquivo e colocando-o no servidor FTP. FetchFile - recebe o arquivo no caminho especificado nas configurações e o transfere para o próximo processo.



E então o quadrado PutSftp - coloca o arquivo no armazenamento de arquivos. Os parâmetros de configuração podem ser vistos abaixo.



Vale a pena prestar atenção ao fato de que cada quadrado é um processo separado que deve ser iniciado. Examinamos o exemplo mais simples que não requer nenhuma personalização complicada. Em seguida, considere o processo um pouco mais complicado, onde escrevemos um pouco nas ranhuras.


Exemplo mais complexo


O serviço de transferência de dados para o consumidor acabou sendo um pouco mais complicado devido ao processo de modificação da mensagem SOAP. O processo geral é apresentado na figura abaixo.



Aqui, a ideia também não é muito complicada: recebemos uma solicitação do consumidor de que eles precisavam de dados, enviamos uma resposta de que receberam uma mensagem, iniciamos o processo de recebimento do arquivo de resposta, depois o editamos com certa lógica e depois transferimos o arquivo para o consumidor na forma de uma mensagem SOAP para o servidor.


Acho que não vale a pena descrever novamente os quadrados que vimos acima - iremos direto para os novos. Se você precisar editar um arquivo e quadrados regulares como ReplaceText não forem adequados, você precisará escrever seu próprio script. Isso pode ser feito usando o quadrado ExecuteGroogyScript. Suas configurações são apresentadas abaixo.



Existem duas opções para carregar o script neste quadrado. A primeira é carregando o arquivo de script. A segunda é inserindo um script no scriptBody. Até onde eu sei, o quadrado executeScript suporta vários JPs - um deles é legal. Estou decepcionando os desenvolvedores de java - você não pode escrever scripts em tais quadrados em java. Para quem realmente quer - você precisa criar seu próprio quadrado personalizado e jogá-lo no sistema NIFI. Toda essa operação é acompanhada de danças bastante longas com um pandeiro, sobre as quais não trataremos no quadro deste artigo. Eu escolhi uma linguagem bacana. Abaixo está um script de teste que simplesmente atualiza incrementalmente o ID na mensagem SOAP. É importante notar. Você pega o arquivo do flowFile, atualiza-o, não esqueça que você precisa, atualizou, coloque-o lá novamente. Também é importante notar que nem todas as bibliotecas estão conectadas. Pode acontecer que você ainda precise importar uma das bibliotecas. A desvantagem é que o roteiro nesta praça é bastante difícil de estrear. Existe uma maneira de conectar-se à JVM NIFI e iniciar o processo de depuração. Pessoalmente, executei um aplicativo local e simulei a obtenção de um arquivo de uma sessão. A depuração também foi feita localmente. Os erros que surgem ao carregar um script são bastante fáceis para o Google e são gravados por NIFI no log.


import org.apache.commons.io.IOUtils import groovy.xml.XmlUtil import java.nio.charset.* import groovy.xml.StreamingMarkupBuilder def flowFile = session.get() if (!flowFile) return try { flowFile = session.write(flowFile, { inputStream, outputStream -> String result = IOUtils.toString(inputStream, "UTF-8"); def recordIn = new XmlSlurper().parseText(result) def element = recordIn.depthFirst().find { it.name() == 'id' } def newId = Integer.parseInt(element.toString()) + 1 def recordOut = new XmlSlurper().parseText(result) recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString() outputStream.write(res.getBytes(StandardCharsets.UTF_8)) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(Exception e) { log.error("Error during processing of validate.groovy", e) session.transfer(flowFile, REL_FAILURE) } 

Na verdade, a personalização do quadrado termina. Em seguida, o arquivo atualizado é transferido para o quadrado, que está envolvido no envio do arquivo para o servidor. Abaixo estão as configurações para este quadrado.



Nós descrevemos o método pelo qual a mensagem SOAP será transmitida. Escrevemos para onde. Em seguida, você precisa indicar que isso é exatamente SOAP.



Adicione algumas propriedades, como host e ação (soapAction). Salve, verifique. Mais detalhes sobre como enviar solicitações SOAP podem ser encontrados aqui.


Vimos vários usos dos processos NIFI. Como eles interagem e que benefício real eles têm. Os exemplos considerados são de teste e são ligeiramente diferentes do que é real na batalha. Espero que este artigo seja um pouco útil para desenvolvedores. Obrigado pela atenção. Se você tiver alguma dúvida, escreva. Vou tentar responder.

Source: https://habr.com/ru/post/pt465299/


All Articles