Olá pessoal!

A tarefa é a seguinte - existe um fluxo, apresentado na figura acima, que deve ser implementado em N servidores com o
Apache NiFi . Teste de fluxo - o arquivo está sendo gerado e enviado para outra instância de NiFi. Os dados são transmitidos usando o protocolo NiFi Site to Site.
O site NiFi Site to Site (S2S) é uma maneira segura e facilmente personalizável de transferir dados entre instâncias NiFi. Veja como o S2S funciona na documentação e é importante não esquecer de configurar a instância NiFi para ativar o S2S, veja aqui .
Nesses casos, quando se trata de transferência de dados usando o S2S - uma instância é chamada cliente, o segundo servidor. Cliente envia dados, servidor envia. Duas maneiras de configurar a transferência de dados entre eles:
- Push De uma instância do cliente, os dados são enviados usando o Remote Process Group (RPG). Em uma instância do servidor, os dados são recebidos usando a Porta de Entrada.
- Pull O servidor recebe dados usando o RPG, o cliente envia usando a porta de saída.
Armazenamos o fluxo para rolagem no Apache Registry.
O Apache NiFi Registry é um subprojeto do Apache NiFi que fornece uma ferramenta para armazenar fluxo e controle de versão. Uma espécie de idiota. Informações sobre instalação, configuração e trabalho com o registro podem ser encontradas na documentação oficial . O fluxo para armazenamento é combinado em um grupo de processos e armazenado como tal no registro. Mais adiante neste artigo, retornaremos a isso.
No início, quando N é um número pequeno, o fluxo é entregue e atualizado manualmente, em um tempo aceitável.
Mas com o crescimento de N, há mais problemas:
- o fluxo de atualização leva mais tempo. É necessário ir a todos os servidores
- há erros ao atualizar modelos. Aqui eles atualizaram, mas aqui eles esqueceram
- erros humanos ao executar um grande número de operações do mesmo tipo
Tudo isso nos leva ao fato de que precisamos automatizar o processo. Tentei as seguintes maneiras de resolver esse problema:
- Use MiNiFi em vez de NiFi
- NiFi CLI
- NiPyAPI
Usando o MiNiFi
O Apache MiNiFy é um subprojeto do Apache NiFi. O MiNiFy é um agente compacto que usa os mesmos processadores que o NiFi, permitindo criar o mesmo fluxo que o NiFi. A leveza do agente é alcançada, entre outras coisas, devido ao fato do MiNiFy não possuir uma interface gráfica para a configuração do fluxo. A falta de uma interface gráfica no MiNiFy significa que é necessário resolver o problema da entrega de fluxo em minifi. Como o MiNiFy é usado ativamente no IOT, existem muitos componentes e o processo de entrega do fluxo para as instâncias finais de minifi precisa ser automatizado. Uma tarefa familiar, certo?
Outro subprojeto ajudará a resolver esse problema - MiNiFi C2 Server. Este produto deve ser um ponto central na arquitetura de configurações de rolamento. Como configurar o ambiente - descrito
neste artigo sobre Habré e informações suficientes para resolver a tarefa. O MiNiFi em conjunto com o servidor C2 atualiza automaticamente a configuração em casa. A única desvantagem dessa abordagem é que você precisa criar modelos no C2 Server, uma simples confirmação do registro não é suficiente.
A opção descrita no artigo acima está funcionando e não é difícil de implementar, mas não esqueça o seguinte:
- Em minifi, nem todos os processadores da nifi
- As versões do processador no Minifi ficam atrás das versões do processador no NiFi.
No momento da redação deste artigo, a versão mais recente do NiFi é 1.9.2. A versão do processador da versão mais recente do MiNiFi é 1.7.0. Os processadores podem ser adicionados ao MiNiFi, mas devido às diferenças de versão entre os processadores NiFi e MiNiFi, isso pode não funcionar.
NiFi CLI
A julgar pela
descrição da ferramenta no site oficial, é uma ferramenta para automatizar a interação do NiFI e do NiFi Registry no campo de entrega de fluxo ou controle de processo. Para começar, é necessário fazer o download desta ferramenta
aqui .
Execute o utilitário
./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete.
Para carregarmos o fluxo necessário do registro, precisamos conhecer os identificadores da cesta (identificador de bucket) e o próprio fluxo (identificador de fluxo). Esses dados podem ser obtidos através do cli ou na interface da web do registro NiFi. A interface da web fica assim:

O uso da CLI faz o seguinte:
#> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85
Iniciamos a importação do grupo de processos do registro:
#> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3
O ponto importante é que qualquer instância nifi pode ser especificada como o host no qual rolamos o grupo de processos.
Grupo de processos adicionado com processadores parados, eles devem ser iniciados
#> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080
Ótimo, os processadores começaram. No entanto, de acordo com as condições do problema, precisamos de instâncias de NiFi para enviar dados para outras instâncias. Suponha que você selecione o método Push para transferir dados para o servidor. Para organizar a transferência de dados, você precisa habilitar a transmissão de dados (Habilitar transmissão) no RPG (Remote Process Group) adicionado, que já está incluído em nosso fluxo.

Na documentação da CLI e de outras fontes, não encontrei uma maneira de ativar a transferência de dados. Se você sabe como fazer isso, escreva nos comentários.
Já que temos o bash e estamos prontos para ir até o fim - vamos encontrar uma saída! Você pode usar a API NiFi para resolver esse problema. Usamos o método a seguir, usamos o ID dos exemplos acima (no nosso caso, é 7f522a13-016e-1000-e504-d5b15587f2f3). Descrição dos métodos da API NiFi
aqui .

No corpo, você precisa passar o JSON, do seguinte formato:
{ "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true }
Parâmetros que devem ser preenchidos para "trabalhar":
estado - status da transferência de dados. TRANSMISSÃO disponível para ativar a transferência de dados, PARADO para desligar
version - versão do processador
A versão assumirá o padrão 0 quando criado, mas esses parâmetros podem ser obtidos usando o método

Para os amantes de scripts bash, esse método pode parecer adequado, mas é difícil para mim - os scripts bash não são os meus favoritos. O método a seguir é mais interessante e confortável na minha opinião.
NiPyAPI
NiPyAPI é uma biblioteca Python para interagir com instâncias de NiFi.
A página de documentação contém as informações necessárias para trabalhar com a biblioteca. O início rápido é descrito em um
projeto do github.
Nosso script para implementar a configuração é um programa Python. Passamos para a codificação.
Nós configuramos configurações para trabalhos futuros. Vamos precisar dos seguintes parâmetros:
nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' # nifi-api , process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' # nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry, nifi nipyapi.config.bucket_name = 'BucketName' # bucket, flow nipyapi.config.flow_name = 'FlowName' # flow,
Além disso, vou inserir os nomes dos métodos desta biblioteca, que são descritos
aqui .
Conecte o registro à instância nifi com
nipyapi.versioning.create_registry_client
Nesta etapa, você também pode adicionar uma verificação de que o registro já foi adicionado à instância; para isso, você pode usar o método
nipyapi.versioning.list_registry_clients
Encontre um balde para procurar mais fluxo na cesta.
nipyapi.versioning.get_registry_bucket
Pesquisar caçamba por fluxo
nipyapi.versioning.get_flow_in_bucket
Além disso, é importante entender se esse grupo de processos já foi adicionado. O grupo de processos é colocado nas coordenadas e uma situação pode surgir quando um segundo é sobreposto a um componente. Eu verifiquei, isso pode ser :) Para adicionar todo o grupo de processos, usamos o método
nipyapi.canvas.list_all_process_groups
e então podemos pesquisar, por exemplo, por nome.
Não descreverei o processo de atualização do modelo, apenas direi que se os processadores forem adicionados à nova versão do modelo, não haverá problemas com a presença de mensagens nas filas. Porém, se os processadores forem excluídos, poderão surgir problemas (o nifi não permitirá que o processador seja excluído se uma fila de mensagens tiver se acumulado na frente). Se você estiver interessado em como eu resolvi esse problema - escreva para mim, por favor, discutiremos esse ponto. Contato no final do artigo. Vamos seguir para a etapa de adicionar um grupo de processos.
Ao depurar o script, deparei-me com um recurso em que a versão mais recente do fluxo nem sempre é exibida, portanto, recomendo que essa versão seja esclarecida primeiro:
nipyapi.versioning.get_latest_flow_ver
Grupo de processos de implantação:
nipyapi.versioning.deploy_flow_version
Iniciamos processadores:
nipyapi.canvas.schedule_process_group
No bloco CLI, foi escrito que a transferência de dados não é ativada automaticamente no grupo de processos remotos? Ao implementar o script, eu também encontrei esse problema. Naquele momento, não consegui iniciar a transferência de dados usando a API e decidi escrever para o desenvolvedor da biblioteca NiPyAPI e pedir conselhos / ajuda. O desenvolvedor me respondeu, discutimos o problema e ele escreveu que precisava de tempo para "verificar alguma coisa". E agora, depois de alguns dias, chega uma carta na qual uma função Python é escrita que resolve meu problema de inicialização !!! Naquela época, a versão NiPyAPI era 0.13.3 e, é claro, não havia nada disso. Mas na versão 0.14.0, lançada recentemente, essa função já foi incluída na biblioteca. Conheça
nipyapi.canvas.set_remote_process_group_transmission
Portanto, usando a biblioteca NiPyAPI, conectamos o registro, o fluxo rolado e até iniciamos os processadores e a transferência de dados. Em seguida, você pode vasculhar o código, adicionar todos os tipos de verificações, registros e isso é tudo. Mas esta é uma história completamente diferente.
Das opções de automação que considerei, as últimas me pareciam as mais eficientes. Primeiramente, esse ainda é o código python, no qual você pode incorporar o código do programa auxiliar e tirar o máximo proveito da linguagem de programação. Em segundo lugar, o projeto NiPyAPI está desenvolvendo ativamente e, no caso de problemas, você pode escrever para o desenvolvedor. Em terceiro lugar, o NiPyAPI ainda é uma ferramenta mais flexível para interagir com o NiFi na solução de problemas complexos. Por exemplo, ao determinar se as filas de mensagens estão vazias agora no fluxo e se o grupo de processos pode ser atualizado.
Só isso. Descrevi três abordagens para automatizar a entrega de fluxo no NiFi, armadilhas que um desenvolvedor pode encontrar e forneci um código de trabalho para automatizar a entrega. Se você está tão interessado neste tópico -
escreva!