简要介绍如何从Python使用RabbitMQ

KDPV


碰巧的是,在MegaFon中工作时,使用RabbitMQ必须面对相同的任务。 问题自然产生了:“如何简化和自动化此类任务的执行?”


我想到的第一个解决方案是使用HTTP接口,当然,RabbitMQ开箱即用,具有良好的Web界面和HTTP API。 但是,使用HTTP API并不总是很方便,有时甚至是不可能的(假设您没有足够的访问权限,但我确实想发布一条消息),使用AMQP协议就变得很有必要


未能在网络的开放空间中找到适合我的现成解决方案,因此决定编写一个小型应用程序以使用AMQP协议与RabbitMQ一起使用 能够通过命令行传输启动参数并提供最少的必要功能集,即:


  • 过帐
  • 校对消息
  • 创建和编辑基本路线元素

Python被选为实现此类任务的最简单(我认为很漂亮)的工具。 (有人可以在这里争论,但是会发生什么变化?)


中心上提供RabbitMQ提供的官方指南( 一本二本 )的翻译;但是,有时从实践中得到一个简单的例子也是有用的。 在本文中,我将通过一个小应用程序示例来说明使用Python的AMQP通道处理兔子时出现的基本问题。 该应用程序本身可在GitHub上获得


简要介绍AMQP协议和RabbitMQ消息代理


AMQP是分布式系统组件之间最常见的消息传递协议之一。 该协议的主要区别特征是构造消息路由的概念,该消息路由包含两个主要结构元素: 队列交换点 。 队列累积消息,直到收到消息为止。 交换点是消息分发者,可以将它们路由到所需的队列或另一个交换点。 交换点通过该分发规则(绑定)来确定将消息定向到何处,该分发规则(绑定)基于检查消息的路由密钥是否符合指定的掩码。 您可以在此处阅读有关AMQP的更多信息。


RabbitMQ是一个完全支持AMQP的开源应用程序,并提供许多其他功能。 为了与RabbitMQ一起使用,已经用多种编程语言(包括Python)编写了大量库。


Python实现


您总是可以抛出几个脚本供个人使用,而不知道它们带来的麻烦。 要在同事之间传播他们,一切都会变得更加复杂。 每个人都需要展示并告诉他们如何启动,什么版本,要在哪里进行更改,在哪里获得最新版本以及版本中发生了什么更改……您不由自主地得出结论,一次创建一个简单的界面会更容易,这样您就不会再浪费时间了。 为了易于使用,决定将应用程序分为4个模块:


  1. 负责发布的模块
  2. 负责从队列中减去消息的模块
  3. 一个旨在更改RabbitMQ代理配置的模块
  4. 包含先前模块共有的参数和方法的模块

这种方法简化了启动参数集。 我们选择了所需的模块,选择了其操作模式之一,并传递了必要的参数(有关–help帮助中有关操作模式和参数的更多信息)。


由于MegaFon中的“兔子”结构由足够多的节点组成,为方便使用,将用于连接到节点的数据传输到具有常规参数和方法rmq_common_tools.py的模块中。


要使用Python处理AMQP,我们将使用Pika库。


import pika 

使用该库,使用RabbitMQ将包括三个主要阶段:


  1. 建立连接
  2. 执行所需的操作
  3. 紧密连接

第一阶段和最后阶段对于所有模块都是相同的,并在rmq_common_tools.py中实现


建立连接:


 rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel() 

通过Pika库,您可以使用各种设计选项来连接RabbitMQ。 在这种情况下,最方便的选择是采用以下格式的URL字符串形式传递参数:


 'amqp://rabbit_user:rabbit_password@host:port/vhost' 

要关闭连接:


 rmq_connection.close() 

过帐


发布消息可能是最简单的方法,但同时也是处理兔子时最流行的操作。


rmq_publish.py中编译的发布工具


要发布消息,请使用方法


 rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text) 

其中:
exchange-消息将发布到的交换点的名称
routing_key-用来发布消息的路由密钥
正文 -邮件正文


rmq_publish.py支持两种消息输入模式进行发布:


  1. 该消息是通过命令行(from_console)作为参数输入的
  2. 从文件(from_file)中读取消息

我认为,第二种模式在处理大型消息或消息数组时更为方便。 反过来,第一个允许您发送不带其他文件的消息,这在将模块集成到其他方案中时非常方便。


接收讯息


接收消息的问题不再像发布那样琐碎。 在阅读邮件时,您需要了解:


  • 确认收到消息后,将从队列中将其删除。 因此,从“战斗”行中读取消息后,我们从主要使用者中“选择”它们。 如果我们不想丢失消息流,而只是想了解“兔子”中正在移动的消息,那么最合乎逻辑的选择是创建一个单独的“日志记录”队列,或者也称为“陷阱队列”。
  • 通常,读取的消息需要进一步的处理或分析,这意味着如果无法进行实时处理或不需要进行实时处理,则需要将其保存在某个位置。

rmq_consume.py文件中实现的消息阅读器


提供两种操作模式:


  1. 从现有队列中读取消息
  2. 创建时间队列和路由以从该队列中读取消息

下面将考虑创建队列和路由的问题。


直接校对的实现方式如下:


 channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc()) 

在哪里
on_message-消息处理程序过程
params.queue-将从中进行减法的队列的名称


消息处理程序必须对读取的消息执行某些操作,并确认(或在需要时不确认)消息传递。


 def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log('   .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag) 

在哪里
all_cnt-全球柜台
lim-要读取的消息数


在处理程序的这种实现中,如果在文件中进行记录,则减去一定数量的消息,并将有关减法进度的信息输出到控制台。


也可以将读取的消息写入数据库。 在当前的实现中,没有提供这样的机会,但是并不难添加。


记录在数据库中

我们将考虑为Oracle数据库和cx_oracle库将消息写入数据库的示例


连接到数据库


 ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor() 

on_message处理程序中添加


 global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1 

在哪里
cnt是另一个柜台
commit_int-插入数据库的次数,之后必须执行“ commit”。 该参数的存在是由于希望减少数据库的负载。 但是,安装它不是特别大,因为 如果失败,则有可能会丢失最后一次成功提交后读取的消息。


并且,正如预期的那样,在工作结束时,我们进行最后的提交并关闭连接


 ora_cursor.execute('commit') connection_ora.close() 

这样的事情正在阅读邮件。 如果取消了对已读消息数量的限制,则可以进行后台处理以连续读取“兔子”中的消息。


构型


尽管AMQP协议主要用于发布和阅读消息,但它也允许您使用路由配置执行简单的操作(我们不是在将网络连接和其他RabbitMQ设置配置为应用程序)。


主要配置操作为:


  1. 创建队列或交换点
  2. 创建转发规则(绑定)
  3. 删除队列或交换点
  4. 删除转发规则(绑定)
  5. 队列清除

由于每个组件都在pika库中有一个现成的过程,为了启动方便,只需将它们编译在rmq_setup.py文件中即可。 接下来,我们列出了pika库中的过程,并对参数进行了一些注释。


创建队列


 rmq_channel.queue_declare(queue=params.queue, durable = params.durable) 

一切都很简单
queue-要创建的队列的名称
持久 -逻辑参数,值为True表示兔子重启时,队列将继续存在。 如果为False,则重新启动后将删除队列。 第二个选项通常用于保证将来不再需要的临时队列。


创建交换点(交换)


 rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable) 

这里出现一个新的参数exchange_type-交换点的类型。 关于在这里阅读什么类型的交换点。
exchange-创建的交换点的名称


删除队列或交换点


 rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch) 

创建转发规则(绑定)


 rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

交换 -从中进行转移的交换点的名称
queue-要转发到的队列的名称
routing_key-路由密钥的掩码,将用于转发。


以下条目有效:


  • rk.my_key。* -在此掩码中,星号表示非空字符集。 换句话说,这样的掩码将跳过rk.my_key类型的任何键 +其他东西,但不会丢失键rk.my_key
  • rk.my_key。# -此掩码将跳过所有与上一个+键rk.my_key相同的内容

删除转发规则(绑定)


 rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

一切都类似于创建转发规则。


队列清除


 rmq_channel.queue_purge(queue=params.queue) 

queue-要清除的队列的名称


关于在Python应用程序中使用命令行界面

启动选项使生活更加轻松。 为了在每次启动之前不编辑代码,提供一种在启动时传递参数的机制是合乎逻辑的。 为此选择了argparse库。 我不会详细介绍其用法的复杂性;对此主题有足够的指南( )。 我只注意到该工具帮助我极大地简化了应用程序的使用过程(如果可以调用它的话)。 即使抛出了简单的命令序列并将它们包装在类似的界面中,您也可以获得功能齐全且易于使用的工具。


在日常生活中的应用。 最方便的是什么。


好了,现在对在日常生活中使用AMQP有了一点印象。


最需要的功能是消息的发布。 特定用户的访问权限并不总是允许使用Web界面,尽管有时仅需要测试特定服务。 在此,AMQP和代表使用此通道的服务的授权传递给帮助。


第二受欢迎的是从时间队列中读取消息的功能。 此功能在配置新路由和消息流以及防止事故方面很有用。


在其他任务中也发现了其他可能性。

Source: https://habr.com/ru/post/zh-CN434510/


All Articles