Brevemente sobre trabajar con RabbitMQ de Python

KDPV


Dio la casualidad de que en el proceso de trabajar en MegaFon, uno tiene que enfrentar las mismas tareas cuando trabaja con RabbitMQ. La pregunta surge naturalmente: "¿Cómo simplificar y automatizar la implementación de tales tareas?"


La primera solución que viene a la mente es usar la interfaz HTTP y, por supuesto, RabbitMQ tiene una buena interfaz web y API HTTP. Sin embargo, el uso de la API HTTP no siempre es conveniente, y a veces incluso imposible (digamos que no tiene suficientes derechos de acceso, pero realmente quiero publicar un mensaje) en esos momentos, se hace necesario trabajar usando el protocolo AMQP


Al no encontrar soluciones preparadas adecuadas para mí en los espacios abiertos de la red, se decidió escribir una pequeña aplicación para trabajar con RabbitMQ utilizando el protocolo AMQP con la capacidad de transferir parámetros de inicio a través de la línea de comando y proporcionar el conjunto mínimo de características necesarias, a saber:


  • Publicar
  • Corrección de mensajes
  • Crear y editar elementos de ruta básicos

Python fue elegido como la herramienta más simple (y en mi opinión hermosa) para implementar tal tarea. (Uno puede discutir aquí, pero ¿qué va a cambiar?)


Las traducciones de guías oficiales ( una , dos ) por RabbitMQ se presentan en el centro; sin embargo, a veces es útil un ejemplo simple de la práctica. En el artículo, trataré de ilustrar los principales problemas que surgen al trabajar con conejos usando el canal AMQP de Python usando un ejemplo de una pequeña aplicación. La aplicación en sí está disponible en GitHub .


Brevemente sobre el protocolo AMQP y el agente de mensajes RabbitMQ


AMQP es uno de los protocolos de mensajería más comunes entre componentes de un sistema distribuido. La principal característica distintiva de este protocolo es el concepto de construir una ruta de mensajes, que contiene dos elementos estructurales principales: una cola y un punto de intercambio . La cola acumula mensajes hasta que se recibe. Un punto de intercambio es un distribuidor de mensajes que los dirige a la cola deseada oa otro punto de intercambio. Las reglas de distribución (enlaces) , mediante las cuales el punto de intercambio determina hacia dónde dirigir el mensaje, se basan en verificar que la clave de enrutamiento del mensaje cumpla con la máscara especificada. Puede leer más sobre cómo funciona AMQP aquí .


RabbitMQ es una aplicación de código abierto que es totalmente compatible con AMQP y ofrece una serie de características adicionales. Para trabajar con RabbitMQ, se ha escrito una gran cantidad de bibliotecas en una variedad de lenguajes de programación, incluido Python.


Implementación de Python


Siempre puede lanzar un par de scripts para uso personal y no conocer los problemas con ellos. Cuando se trata de difundirlos entre colegas, todo se vuelve más complicado. Todos necesitan mostrar y decir cómo y qué lanzar, qué y dónde cambiar, dónde obtener la última versión y qué ha cambiado en ella ... Involuntariamente, llega a la conclusión de que es más fácil crear una interfaz simple una vez, para que no pierda el tiempo en el futuro. Para facilitar su uso, se decidió dividir la aplicación en 4 módulos:


  1. El módulo responsable de publicar
  2. Módulo responsable de restar mensajes de la cola
  3. Un módulo diseñado para realizar cambios en la configuración del corredor RabbitMQ
  4. Un módulo que contiene parámetros y métodos comunes a los módulos anteriores.

Este enfoque simplifica el conjunto de parámetros de inicio. Seleccionamos el módulo requerido, seleccionamos uno de sus modos de operación y pasamos los parámetros necesarios (para obtener más información sobre los modos y parámetros de operación en la ayuda de ayuda).


Dado que la estructura de los "conejos" en MegaFon consiste en un número suficientemente grande de nodos, por conveniencia de uso, los datos para conectarse a los nodos se transfieren a un módulo con parámetros y métodos generales rmq_common_tools.py


Para trabajar en AMQP en Python, utilizaremos la biblioteca Pika .


import pika 

Usando esta biblioteca, trabajar con RabbitMQ consistirá en tres etapas principales:


  1. Establecer una conexión
  2. Realizar operaciones requeridas
  3. Conexión cercana

La primera y la última etapa son las mismas para todos los módulos y se implementan en rmq_common_tools.py


Para establecer una conexión:


 rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel() 

La biblioteca Pika le permite usar varias opciones de diseño para conectarse a RabbitMQ. En este caso, la opción más conveniente era pasar los parámetros en forma de una cadena URL en el siguiente formato:


 'amqp://rabbit_user:rabbit_password@host:port/vhost' 

Para cerrar una conexión:


 rmq_connection.close() 

Publicar


Publicar un mensaje es probablemente la operación más fácil, pero al mismo tiempo la más popular cuando se trabaja con conejos.


Publicar herramientas de publicación compiladas en rmq_publish.py


Para publicar un mensaje, use el método


 rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text) 

donde:
intercambio : el nombre del punto de intercambio en el que se publicará el mensaje
routing_key : clave de enrutamiento con la que se publicará el mensaje
cuerpo - cuerpo del mensaje


rmq_publish.py admite dos modos de entrada de mensajes para publicación:


  1. El mensaje se ingresa como un parámetro a través de la línea de comando (from_console)
  2. El mensaje se lee del archivo (from_file)

El segundo modo, en mi opinión, es más conveniente cuando se trabaja con mensajes grandes o matrices de mensajes. El primero, a su vez, le permite enviar un mensaje sin archivos adicionales, lo cual es conveniente al integrar el módulo en otros escenarios.


Recibiendo mensajes


El problema de recibir mensajes ya no es tan trivial como publicar. Cuando se trata de leer mensajes, debe comprender:


  • Después de confirmar la recepción del mensaje, se eliminará de la cola. Entonces, al leer los mensajes de la línea de "batalla", los "seleccionamos" del consumidor principal. Si no queremos perder el flujo de mensajes, pero solo queremos entender qué mensajes se mueven en el "conejo", entonces la opción más lógica es crear una cola de "registro" separada, o como también se le llama, "cola de trampa".
  • Los mensajes leídos, por regla general, requieren un procesamiento o análisis adicional, lo que significa que deben guardarse en algún lugar si el procesamiento en tiempo real es imposible o no es necesario.

Lector de mensajes implementado en el archivo rmq_consume.py


Se proporcionan dos modos de funcionamiento:


  1. Leer mensajes de una cola existente
  2. Crear una cola de tiempo y una ruta para leer mensajes de esta cola

La cuestión de crear una cola y rutas se considerará a continuación.


La revisión directa se implementa de la siguiente manera:


 channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc()) 

donde
on_message - procedimiento de manejo de mensajes
params.queue : el nombre de la cola desde la que se realizará la resta


El manejador de mensajes debe realizar alguna operación en el mensaje leído y confirmar (o no confirmar, si es necesario) la entrega del mensaje.


 def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log('   .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag) 

donde
all_cnt - contador global
lim - la cantidad de mensajes a leer


En dicha implementación del controlador, se sustrae un cierto número de mensajes y la información sobre el progreso de la sustracción se envía a la consola si la grabación se produce en un archivo.


También es posible escribir mensajes leídos en la base de datos. En la implementación actual, tal oportunidad no se presenta, pero no es difícil de agregar.


Grabar en un DB

Consideraremos un ejemplo de escritura de mensajes en la base de datos para la base de datos Oracle y la biblioteca cx_oracle .


Conectarse a la base de datos


 ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor() 

En el controlador on_message, agregue


 global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1 

donde
cnt es otro contador
commit_int : el número de inserciones en la base de datos, después de lo cual es necesario hacer "commit". La presencia de dicho parámetro se debe al deseo de reducir la carga en la base de datos. Sin embargo, instalarlo no es particularmente grande, porque en caso de falla, existe la posibilidad de perder los mensajes leídos después de la última confirmación exitosa.


Y, como se esperaba, al final del trabajo hacemos el compromiso final y cerramos la conexión


 ora_cursor.execute('commit') connection_ora.close() 

Algo como esto es leer mensajes. Si elimina la restricción en la cantidad de mensajes leídos, puede realizar un proceso en segundo plano para la lectura continua de mensajes del "conejo".


Configuracion


A pesar de que el protocolo AMQP está destinado principalmente a publicar y leer mensajes, también le permite realizar manipulaciones simples con la configuración de rutas (no estamos hablando de configurar conexiones de red y otras configuraciones de RabbitMQ como una aplicación).


Las principales operaciones de configuración son:


  1. Crear una cola o punto de intercambio
  2. Crear una regla de reenvío (enlace)
  3. Eliminar una cola o punto de intercambio
  4. Eliminar una regla de reenvío (enlace)
  5. Limpieza de la cola

Dado que para cada uno de ellos hay un procedimiento listo para usar en la biblioteca pika, para su comodidad de lanzamiento, simplemente se compilan en el archivo rmq_setup.py . A continuación, enumeramos los procedimientos de la biblioteca pika con algunos comentarios sobre los parámetros.


Crear una cola


 rmq_channel.queue_declare(queue=params.queue, durable = params.durable) 

todo es simple aquí
cola - nombre de la cola para crear
durable : un parámetro lógico, un valor de True significará que cuando el conejo se reinicie, la cola seguirá existiendo. Si es falso, la cola se eliminará al reiniciar. La segunda opción se usa generalmente para colas temporales que se garantiza que no serán necesarias en el futuro.


Crear un punto de intercambio (intercambio)


 rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable) 

aquí surge un nuevo parámetro exchange_type : el tipo de punto de intercambio. Sobre qué tipos de puntos de intercambio se leen aquí .
exchange - nombre del punto de intercambio creado


Eliminar una cola o punto de intercambio


 rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch) 

Crear una regla de reenvío (enlace)


 rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

intercambio : el nombre del punto de intercambio desde el que se realizará la transferencia
cola : el nombre de la cola a la que se enviará
routing_key : máscara de la clave de enrutamiento, que se utilizará para reenviar.


Las siguientes entradas son válidas:


  • rk.my_key. * : en esta máscara, un asterisco significa un conjunto de caracteres no vacío. En otras palabras, dicha máscara omitirá cualquier tecla del tipo rk.my_key. + algo más, pero no perderá la clave rk.my_key
  • rk.my_key. # : esta máscara omitirá todo como la tecla + anterior rk.my_key

Eliminar una regla de reenvío (enlace)


 rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

todo es similar a crear una regla de reenvío.


Limpieza de la cola


 rmq_channel.queue_purge(queue=params.queue) 

cola - el nombre de la cola que se borrará


Acerca del uso de la interfaz de línea de comandos en aplicaciones Python

Las opciones de inicio hacen la vida mucho más fácil. Para no editar el código antes de cada lanzamiento, es lógico proporcionar un mecanismo para pasar parámetros al inicio. La biblioteca argparse fue elegida para este propósito . No voy a entrar en detalles sobre las complejidades de su uso; hay suficientes guías sobre este tema ( uno , dos , tres ). Solo noto que esta herramienta me ayudó a simplificar en gran medida el proceso de uso de la aplicación (si se puede llamar así). Incluso después de lanzar una secuencia simple de comandos y envolverlos en una interfaz similar, puede obtener una herramienta completa y fácil de usar.


Aplicación en la vida cotidiana. Lo que fue más útil.


Bueno, ahora una pequeña impresión sobre el uso de AMQP en la vida cotidiana.


La característica más solicitada fue la publicación del mensaje. Los derechos de acceso de un usuario en particular no siempre permiten el uso de una interfaz web, aunque a veces es simplemente necesario probar un servicio en particular. Aquí AMQP y la autorización en nombre del servicio que usa este canal pasan a la ayuda.


El segundo más popular fue la capacidad de leer mensajes de la cola de tiempo. Esta característica es útil para configurar nuevas rutas y flujos de mensajes, así como para prevenir accidentes.


Otras posibilidades también encontraron aplicación en diversas tareas.

Source: https://habr.com/ru/post/es434510/


All Articles