Apache NiFi流交付自动化

大家好!



任务如下-上图中显示了流程,必须使用Apache NiFi将其推广到N个服务器。 流量测试-文件正在生成并发送到另一个NiFi实例。 数据是使用NiFi站点到站点协议传输的。


NiFi站点到站点(S2S)是一种安全,易于自定义的方法,可以在NiFi实例之间传输数据。 文档中查看S2S的工作方式,重要的是不要忘记配置NiFi实例以启用S2S,请参见此处

在那些使用S2S进行数据传输的情况下-一个实例称为客户端,第二个服务器。 客户端发送数据,服务器发送。 配置它们之间的数据传输的两种方法:

  1. 推入 从客户端实例,使用远程进程组(RPG)发送数据。 在服务器实例上,使用输入端口接收数据。
  2. 拉力 服务器使用RPG接收数据,客户端使用输出端口发送数据。


我们将滚动流程存储在Apache注册表中。


Apache NiFi Registry是Apache NiFi的子项目,提供了用于存储流和版本控制的工具。 一种git。 有关安装,配置和使用注册表的信息,请参见官方文档 用于存储的流被组合到一个过程组中,并以此形式存储在注册表中。 在本文的进一步部分,我们将回到这一点。


首先,当N为小数时,将在可接受的时间内手动传递和更新流。

但是随着N的增长,还有更多的问题:

  1. 更新流程需要更多时间。 有必要去所有的服务器
  2. 更新模板时出错。 在这里他们更新了,但是在这里他们忘记了
  3. 执行大量相同类型的操作时出现人为错误

所有这些使我们得出一个事实,那就是我们需要使流程自动化。 我尝试了以下方法来解决此问题:

  1. 使用MiNiFi代替NiFi
  2. NiFi CLI
  3. NiPyAPI

使用MiNiFi


Apache MiNiFy是Apache NiFi的子项目。 MiNiFy是一种紧凑型代理,使用与NiFi相同的处理器,使您可以创建与NiFi中相同的流。 除其他因素外,由于MiNiFy没有用于流程配置的图形界面,因此可以实现代理的轻便性。 MiNiFy中缺少图形界面,这意味着有必要解决minifi中流传输的问题。 由于MiNiFy已在IOT中得到积极使用,因此有许多组件,并且将流传输到最终minifi实例的过程需要自动化。 熟悉的任务,对不对?

另一个子项目-MiNiFi C2 Server将帮助解决此问题。 该产品旨在成为滚动配置体系结构的中心点。 如何配置环境- 本文在Habré上进行了介绍,并提供了足够的信息来解决该任务。 MiNiFi与C2服务器一起可以在家中自动更新配置。 这种方法的唯一缺点是,您必须在C2服务器上创建模板,简单的注册表提交是不够的。

上面文章中描述的选项正在运行,并且很难实现,但是请不要忘记以下内容:

  1. 在minifi中,并非所有来自nifi的处理器
  2. Minifi中的处理器版本落后于NiFi中的处理器版本。

在撰写本文时,NiFi的最新版本是1.9.2。 最新的MiNiFi版本的处理器版本为1.7.0。 可以将处理器添加到MiNiFi,但是由于NiFi和MiNiFi处理器之间的版本差异,这可能无法正常工作。

NiFi CLI


从官方网站上对该工具的描述来看,它是一种用于自动化NiFI和NiFi Registry在流程交付或过程控制领域中交互作用的工具。 首先,必须从此处下载该工具。

运行实用程序

./bin/cli.sh _ ___ _ Apache (_) .' ..](_) , _ .--. __ _| |_ __ )\ [ `.-. | [ |'-| |-'[ | / \ | | | | | | | | | | ' ' [___||__][___][___] [___]', ,' `' CLI v1.9.2 Type 'help' to see a list of available commands, use tab to auto-complete. 

为了使我们能够从注册表中加载必要的流程,我们需要知道篮子的标识符(存储桶标识符)和流程本身(流程标识符)。 可以通过cli或NiFi注册中心网络界面获取此数据。 Web界面如下所示:



使用CLI可以做到这一点:

 #> registry list-buckets -u http://nifi-registry:18080 # Name Id Description - -------------- ------------------------------------ ----------- 1 test_bucket 709d387a-9ce9-4535-8546-3621efe38e96 (empty) #> registry list-flows -b 709d387a-9ce9-4535-8546-3621efe38e96 -u http://nifi-registry:18080 # Name Id Description - ------------ ------------------------------------ ----------- 1 test_flow d27af00a-5b47-4910-89cd-9c664cd91e85 

我们从注册表开始导入流程组:

 #> nifi pg-import -b 709d387a-9ce9-4535-8546-3621efe38e96 -f d27af00a-5b47-4910-89cd-9c664cd91e85 -fv 1 -u http://nifi:8080 7f522a13-016e-1000-e504-d5b15587f2f3 

重要的一点是,可以将任何nifi实例指定为我们在其上滚动进程组的主机。

添加了已停止处理器的进程组,必须启动它们

 #> nifi pg-start -pgid 7f522a13-016e-1000-e504-d5b15587f2f3 -u http://nifi:8080 

太好了,处理器开始了。 但是,根据问题的情况,我们需要NiFi实例将数据发送到其他实例。 假设您选择Push方法将数据传输到服务器。 为了组织数据传输,您需要在添加的远程进程组(RPG)上启用数据传输(启用传输),该组已经包含在我们的流程中。



在CLI和其他来源的文档中,我找不到启用数据传输的方法。 如果您知道该怎么做,请在评论中写。

既然我们有重击,并且我们准备走到尽头-我们将找到一条出路! 您可以使用NiFi API解决此问题。 我们使用以下方法,从上面的示例中获取ID(在我们的示例中为7f522a13-016e-1000-e504-d5b15587f2f3)。 NiFi API方法的说明在此处


在正文中,您需要传递以下格式的JSON:

 { "revision": { "clientId": "value", "version": 0, "lastModifier": "value" }, "state": "value", "disconnectedNodeAcknowledged": true } 

为了“工作”必须填写的参数:
状态 -数据传输状态。 可用的TRANSMITTING启用数据传输,STOPPED关闭
版本 -处理器版本

创建时,版本默认为0,但是可以使用方法获得这些参数



对于喜欢bash脚本的人来说,这种方法似乎很合适,但是对我来说很难-bash脚本不是我的最爱。 我认为以下方法更有趣且更舒适。

NiPyAPI


NiPyAPI是用于与NiFi实例进行交互的Python库。 文档页面包含使用库的必要信息。 快速启动在github 项目中进行了描述。

我们用于部署配置的脚本是Python程序。 我们通过编码。
我们配置配置以进行进一步的工作。 我们将需要以下参数:

 nipyapi.config.nifi_config.host = 'http://nifi:8080/nifi-api' #  nifi-api ,    process group nipyapi.config.registry_config.host = 'http://nifi-registry:18080/nifi-registry-api' #  nifi-registry-api registry nipyapi.config.registry_name = 'MyBeutifulRegistry' # registry,      nifi nipyapi.config.bucket_name = 'BucketName' # bucket,    flow nipyapi.config.flow_name = 'FlowName' # flow,   

此外,我将插入此库的方法名称,在此进行描述。

使用以下命令将注册表连接到Nifi实例

 nipyapi.versioning.create_registry_client 

在这一步,您还可以添加检查注册表是否已经添加到实例中;为此,您可以使用方法

 nipyapi.versioning.list_registry_clients 

找到一个桶以进一步搜索篮子中的流量。

 nipyapi.versioning.get_registry_bucket 

搜索桶中的流量

 nipyapi.versioning.get_flow_in_bucket 

此外,重要的是要了解是否已添加此过程组。 将过程组放置在坐标中,并且将第二个组件叠加在一个组件的顶部时,可能会出现这种情况。 我检查了一下,这可以是:)要添加所有流程组,我们使用方法

 nipyapi.canvas.list_all_process_groups 

然后我们可以搜索,例如按名称搜索。

我不会描述模板的更新过程,只会说,如果在模板的新版本中添加了处理器,那么队列中消息的存在就不会有问题。 但是,如果删除了处理器,则可能会出现问题(如果消息队列前面已累积消息,nifi不允许删除处理器)。 如果您对我如何解决此问题感兴趣-请给我写信,我们将讨论这一点。 文章末尾的联系方式。 让我们继续进行添加流程组的步骤。

在调试脚本时,我遇到了一个功能,即最新的流版本不会总是被提取,因此我建议首先澄清该版本:

 nipyapi.versioning.get_latest_flow_ver 

部署过程组:

 nipyapi.versioning.deploy_flow_version 

我们启动处理器:

 nipyapi.canvas.schedule_process_group 

在CLI块中,写道远程进程组中的数据传输没有自动打开? 在实施脚本时,我也遇到了这个问题。 当时,我没有成功使用API​​启动数据传输,因此决定写信给NiPyAPI库开发人员并寻求建议/帮助。 开发人员回答了我,我们讨论了问题,他写道他需要时间来“检查”。 现在,几天后,收到一封来信,其中写有Python函数可以解决我的启动问题!!! 当时,NiPyAPI版本为0.13.3,当然,里面没有类似的东西。 但是在最近发布的0.14.0版本中,此功能已包含在库中。 见面

 nipyapi.canvas.set_remote_process_group_transmission 

因此,使用NiPyAPI库,我们连接了注册表,滚动流程,甚至启动了处理器和数据传输。 然后,您可以整理代码,添加各种检查,日志记录,仅此而已。 但这是一个完全不同的故事。

在我考虑的自动化选项中,后者在我看来是最有效的。 首先,这仍然是python代码,您可以在其中嵌入辅助程序代码并充分利用编程语言。 其次,NiPyAPI项目正在积极开发中,如有问题,您可以写信给开发人员。 第三,NiPyAPI仍然是与NiFi进行交互以解决复杂问题的更灵活的工具。 例如,在确定消息队列现在是否在流动中是否为空以及是否可以更新过程组时。

仅此而已。 我介绍了3种在NiFi中自动执行流交付的方法,开发人员可能遇到的陷阱并提供了用于自动交付的工作代码。 如果您对此主题同样感兴趣,请写下!

Source: https://habr.com/ru/post/zh-CN476722/


All Articles