Automação de entrega de fluxo Apache NiFi

Olá pessoal!



A tarefa é a seguinte - existe um fluxo, apresentado na figura acima, que deve ser implementado em N servidores com o Apache NiFi . Teste de fluxo - o arquivo está sendo gerado e enviado para outra instância de NiFi. Os dados são transmitidos usando o protocolo NiFi Site to Site.


O site NiFi Site to Site (S2S) é uma maneira segura e facilmente personalizável de transferir dados entre instâncias NiFi. Veja como o S2S funciona na documentação e é importante não esquecer de configurar a instância NiFi para ativar o S2S, veja aqui .

Nesses casos, quando se trata de transferência de dados usando o S2S - uma instância é chamada cliente, o segundo servidor. Cliente envia dados, servidor envia. Duas maneiras de configurar a transferência de dados entre eles:

  1. Push De uma instância do cliente, os dados são enviados usando o Remote Process Group (RPG). Em uma instância do servidor, os dados são recebidos usando a Porta de Entrada.
  2. Pull O servidor recebe dados usando o RPG, o cliente envia usando a porta de saída.


Armazenamos o fluxo para rolagem no Apache Registry.


O Apache NiFi Registry é um subprojeto do Apache NiFi que fornece uma ferramenta para armazenar fluxo e controle de versão. Uma espécie de idiota. Informações sobre instalação, configuração e trabalho com o registro podem ser encontradas na documentação oficial . O fluxo para armazenamento é combinado em um grupo de processos e armazenado como tal no registro. Mais adiante neste artigo, retornaremos a isso.


No início, quando N é um número pequeno, o fluxo é entregue e atualizado manualmente, em um tempo aceitável.

Mas com o crescimento de N, há mais problemas:

  1. o fluxo de atualização leva mais tempo. É necessário ir a todos os servidores
  2. há erros ao atualizar modelos. Aqui eles atualizaram, mas aqui eles esqueceram
  3. erros humanos ao executar um grande número de operações do mesmo tipo

Tudo isso nos leva ao fato de que precisamos automatizar o processo. Tentei as seguintes maneiras de resolver esse problema:

  1. Use MiNiFi em vez de NiFi
  2. NiFi CLI
  3. NiPyAPI

Usando o MiNiFi


O Apache MiNiFy é um subprojeto do Apache NiFi. O MiNiFy é um agente compacto que usa os mesmos processadores que o NiFi, permitindo criar o mesmo fluxo que o NiFi. A leveza do agente é alcançada, entre outras coisas, devido ao fato do MiNiFy não possuir uma interface gráfica para a configuração do fluxo. A falta de uma interface gráfica no MiNiFy significa que é necessário resolver o problema da entrega de fluxo em minifi. Como o MiNiFy é usado ativamente no IOT, existem muitos componentes e o processo de entrega do fluxo para as instâncias finais de minifi precisa ser automatizado. Uma tarefa familiar, certo?

Outro subprojeto ajudará a resolver esse problema - MiNiFi C2 Server. Este produto deve ser um ponto central na arquitetura de configurações de rolamento. Como configurar o ambiente - descrito neste artigo sobre Habré e informações suficientes para resolver a tarefa. O MiNiFi em conjunto com o servidor C2 atualiza automaticamente a configuração em casa. A única desvantagem dessa abordagem é que você precisa criar modelos no C2 Server, uma simples confirmação do registro não é suficiente.

A opção descrita no artigo acima está funcionando e não é difícil de implementar, mas não esqueça o seguinte:

  1. Em minifi, nem todos os processadores da nifi
  2. As versões do processador no Minifi ficam atrás das versões do processador no NiFi.

No momento da redação deste artigo, a versão mais recente do NiFi é 1.9.2. A versão do processador da versão mais recente do MiNiFi é 1.7.0. Os processadores podem ser adicionados ao MiNiFi, mas devido às diferenças de versão entre os processadores NiFi e MiNiFi, isso pode não funcionar.

NiFi CLI


A julgar pela descrição da ferramenta no site oficial, é uma ferramenta para automatizar a interação do NiFI e do NiFi Registry no campo de entrega de fluxo ou controle de processo. Para começar, é necessário fazer o download desta ferramenta aqui .

Execute o utilitário

./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete. 

Para carregarmos o fluxo necessário do registro, precisamos conhecer os identificadores da cesta (identificador de bucket) e o próprio fluxo (identificador de fluxo). Esses dados podem ser obtidos através do cli ou na interface da web do registro NiFi. A interface da web fica assim:



O uso da CLI faz o seguinte:

 #> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85 

Iniciamos a importação do grupo de processos do registro:

 #> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3 

O ponto importante é que qualquer instância nifi pode ser especificada como o host no qual rolamos o grupo de processos.

Grupo de processos adicionado com processadores parados, eles devem ser iniciados

 #> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080 

Ótimo, os processadores começaram. No entanto, de acordo com as condições do problema, precisamos de instâncias de NiFi para enviar dados para outras instâncias. Suponha que você selecione o método Push para transferir dados para o servidor. Para organizar a transferência de dados, você precisa habilitar a transmissão de dados (Habilitar transmissão) no RPG (Remote Process Group) adicionado, que já está incluído em nosso fluxo.



Na documentação da CLI e de outras fontes, não encontrei uma maneira de ativar a transferência de dados. Se você sabe como fazer isso, escreva nos comentários.

Já que temos o bash e estamos prontos para ir até o fim - vamos encontrar uma saída! Você pode usar a API NiFi para resolver esse problema. Usamos o método a seguir, usamos o ID dos exemplos acima (no nosso caso, é 7f522a13-016e-1000-e504-d5b15587f2f3). Descrição dos métodos da API NiFi aqui .


No corpo, você precisa passar o JSON, do seguinte formato:

 { "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true } 

Parâmetros que devem ser preenchidos para "trabalhar":
estado - status da transferência de dados. TRANSMISSÃO disponível para ativar a transferência de dados, PARADO para desligar
version - versão do processador

A versão assumirá o padrão 0 quando criado, mas esses parâmetros podem ser obtidos usando o método



Para os amantes de scripts bash, esse método pode parecer adequado, mas é difícil para mim - os scripts bash não são os meus favoritos. O método a seguir é mais interessante e confortável na minha opinião.

NiPyAPI


NiPyAPI é uma biblioteca Python para interagir com instâncias de NiFi. A página de documentação contém as informações necessárias para trabalhar com a biblioteca. O início rápido é descrito em um projeto do github.

Nosso script para implementar a configuração é um programa Python. Passamos para a codificação.
Nós configuramos configurações para trabalhos futuros. Vamos precisar dos seguintes parâmetros:

 nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #  nifi-api ,    process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #  nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry,      nifi nipyapi.config.bucket_name = 'BucketName' # bucket,    flow nipyapi.config.flow_name = 'FlowName' # flow,   

Além disso, vou inserir os nomes dos métodos desta biblioteca, que são descritos aqui .

Conecte o registro à instância nifi com

 nipyapi.versioning.create_registry_client 

Nesta etapa, você também pode adicionar uma verificação de que o registro já foi adicionado à instância; para isso, você pode usar o método

 nipyapi.versioning.list_registry_clients 

Encontre um balde para procurar mais fluxo na cesta.

 nipyapi.versioning.get_registry_bucket 

Pesquisar caçamba por fluxo

 nipyapi.versioning.get_flow_in_bucket 

Além disso, é importante entender se esse grupo de processos já foi adicionado. O grupo de processos é colocado nas coordenadas e uma situação pode surgir quando um segundo é sobreposto a um componente. Eu verifiquei, isso pode ser :) Para adicionar todo o grupo de processos, usamos o método

 nipyapi.canvas.list_all_process_groups 

e então podemos pesquisar, por exemplo, por nome.

Não descreverei o processo de atualização do modelo, apenas direi que se os processadores forem adicionados à nova versão do modelo, não haverá problemas com a presença de mensagens nas filas. Porém, se os processadores forem excluídos, poderão surgir problemas (o nifi não permitirá que o processador seja excluído se uma fila de mensagens tiver se acumulado na frente). Se você estiver interessado em como eu resolvi esse problema - escreva para mim, por favor, discutiremos esse ponto. Contato no final do artigo. Vamos seguir para a etapa de adicionar um grupo de processos.

Ao depurar o script, deparei-me com um recurso em que a versão mais recente do fluxo nem sempre é exibida, portanto, recomendo que essa versão seja esclarecida primeiro:

 nipyapi.versioning.get_latest_flow_ver 

Grupo de processos de implantação:

 nipyapi.versioning.deploy_flow_version 

Iniciamos processadores:

 nipyapi.canvas.schedule_process_group 

No bloco CLI, foi escrito que a transferência de dados não é ativada automaticamente no grupo de processos remotos? Ao implementar o script, eu também encontrei esse problema. Naquele momento, não consegui iniciar a transferência de dados usando a API e decidi escrever para o desenvolvedor da biblioteca NiPyAPI e pedir conselhos / ajuda. O desenvolvedor me respondeu, discutimos o problema e ele escreveu que precisava de tempo para "verificar alguma coisa". E agora, depois de alguns dias, chega uma carta na qual uma função Python é escrita que resolve meu problema de inicialização !!! Naquela época, a versão NiPyAPI era 0.13.3 e, é claro, não havia nada disso. Mas na versão 0.14.0, lançada recentemente, essa função já foi incluída na biblioteca. Conheça

 nipyapi.canvas.set_remote_process_group_transmission 

Portanto, usando a biblioteca NiPyAPI, conectamos o registro, o fluxo rolado e até iniciamos os processadores e a transferência de dados. Em seguida, você pode vasculhar o código, adicionar todos os tipos de verificações, registros e isso é tudo. Mas esta é uma história completamente diferente.

Das opções de automação que considerei, as últimas me pareciam as mais eficientes. Primeiramente, esse ainda é o código python, no qual você pode incorporar o código do programa auxiliar e tirar o máximo proveito da linguagem de programação. Em segundo lugar, o projeto NiPyAPI está desenvolvendo ativamente e, no caso de problemas, você pode escrever para o desenvolvedor. Em terceiro lugar, o NiPyAPI ainda é uma ferramenta mais flexível para interagir com o NiFi na solução de problemas complexos. Por exemplo, ao determinar se as filas de mensagens estão vazias agora no fluxo e se o grupo de processos pode ser atualizado.

Só isso. Descrevi três abordagens para automatizar a entrega de fluxo no NiFi, armadilhas que um desenvolvedor pode encontrar e forneci um código de trabalho para automatizar a entrega. Se você está tão interessado neste tópico - escreva!

Source: https://habr.com/ru/post/pt476722/


All Articles