En bref sur l'utilisation de RabbitMQ de Python

KDPV


Il se trouve que dans le processus de travail dans MegaFon, on doit faire face aux mêmes tâches lorsque l'on travaille avec RabbitMQ. La question se pose naturellement: "Comment simplifier et automatiser la mise en œuvre de telles tâches?"


La première solution qui vient à l'esprit est d'utiliser l'interface HTTP et, bien sûr, prêt à l'emploi, RabbitMQ possède une bonne interface Web et une API HTTP. Néanmoins, l'utilisation de l'API HTTP n'est pas toujours pratique, et parfois même impossible (disons que vous n'avez pas assez de droits d'accès, mais je veux vraiment publier un message) à ces moments, il devient nécessaire de travailler en utilisant le protocole AMQP


Ne trouvant pas de solutions toutes faites me convenant sur les espaces ouverts du réseau, il a été décidé d'écrire une petite application pour travailler avec RabbitMQ en utilisant le protocole AMQP avec la possibilité de transférer les paramètres de démarrage via la ligne de commande et de fournir le minimum de fonctionnalités nécessaires, à savoir:


  • Publication
  • Relecture des messages
  • Création et modification d'éléments de route de base

Python a été choisi comme l'outil le plus simple (et à mon avis magnifique) pour implémenter une telle tâche. (on peut discuter ici, mais qu'est-ce que cela va changer?)


Des traductions de guides officiels ( une , deux fois ) sur RabbitMQ sont présentées sur le hub; cependant, un exemple pratique simple est parfois utile. Dans l'article, je vais essayer d'illustrer les principaux problèmes qui se posent lorsque vous travaillez avec des lapins en utilisant le canal AMQP de Python en utilisant un exemple de petite application. L'application elle-même est disponible sur GitHub .


En bref sur le protocole AMQP et le courtier de messages RabbitMQ


AMQP est l'un des protocoles de messagerie les plus courants entre les composants d'un système distribué. La principale caractéristique distinctive de ce protocole est le concept de construction d'une route de message, contenant deux éléments structurels principaux: une file d'attente et un point d'échange . La file d'attente accumule les messages jusqu'à leur réception. Un point d'échange est un distributeur de messages qui les dirige vers la file d'attente souhaitée ou vers un autre point d'échange. Les règles de distribution (liaisons) , par lesquelles le point d'échange détermine où diriger le message, sont basées sur la vérification de la conformité de la clé de routage du message avec le masque spécifié. Vous pouvez en savoir plus sur le fonctionnement d'AMQP ici .


RabbitMQ est une application open source qui prend entièrement en charge AMQP et offre un certain nombre de fonctionnalités supplémentaires. Pour fonctionner avec RabbitMQ, un grand nombre de bibliothèques ont été écrites dans une variété de langages de programmation, y compris Python.


Implémentation de Python


Vous pouvez toujours lancer quelques scripts pour un usage personnel et ne pas connaître les problèmes avec eux. Quand il s'agit de les diffuser entre collègues, tout devient plus compliqué. Tout le monde doit montrer et dire comment et quoi lancer, quoi et où changer, où obtenir la dernière version, et ce qui a changé dedans ... Involontairement, vous arrivez à la conclusion qu'il est plus facile de concevoir une interface simple une fois, afin de ne pas perdre de temps à l'avenir. Pour faciliter l'utilisation, il a été décidé de diviser l'application en 4 modules:


  1. Le module responsable de l'affichage
  2. Module responsable de la soustraction des messages de la file d'attente
  3. Un module conçu pour apporter des modifications à la configuration du courtier RabbitMQ
  4. Un module contenant des paramètres et des méthodes communs aux modules précédents

Cette approche simplifie l'ensemble des paramètres de démarrage. Nous avons sélectionné le module requis, sélectionné l'un de ses modes de fonctionnement et transmis les paramètres nécessaires (pour plus d'informations sur les modes de fonctionnement et les paramètres dans l'aide –help).


Étant donné que la structure des «lapins» dans MegaFon se compose d'un nombre suffisamment important de nœuds, pour plus de commodité d'utilisation, les données de connexion aux nœuds sont transférées vers un module avec des paramètres et méthodes généraux rmq_common_tools.py


Pour travailler sur AMQP en Python, nous utiliserons la bibliothèque Pika .


import pika 

En utilisant cette bibliothèque, travailler avec RabbitMQ comprendra trois étapes principales:


  1. Établir une connexion
  2. Exécution des opérations requises
  3. Fermer la connexion

La première et la dernière étape sont les mêmes pour tous les modules et sont implémentées dans rmq_common_tools.py


Pour établir une connexion:


 rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel() 

La bibliothèque Pika vous permet d'utiliser diverses options de conception pour vous connecter à RabbitMQ. Dans ce cas, l'option la plus pratique était de passer les paramètres sous la forme d'une chaîne URL au format suivant:


 'amqp://rabbit_user:rabbit_password@host:port/vhost' 

Pour fermer une connexion:


 rmq_connection.close() 

Publication


La publication d'un message est probablement la plus simple, mais en même temps l'opération la plus populaire lorsque vous travaillez avec des lapins.


Outils de post-publication compilés dans rmq_publish.py


Pour publier un message, utilisez la méthode


 rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text) 

où:
exchange - le nom du point d'échange vers lequel le message sera publié
routing_key - clé de routage avec laquelle le message sera publié
corps - corps du message


rmq_publish.py prend en charge deux modes d'entrée de message pour la publication:


  1. Le message est entré en tant que paramètre via la ligne de commande (from_console)
  2. Le message est lu à partir du fichier (from_file)

Le deuxième mode, à mon avis, est plus pratique lorsque vous travaillez avec des messages volumineux ou des tableaux de messages. Le premier, à son tour, vous permet d'envoyer un message sans fichiers supplémentaires, ce qui est pratique lors de l'intégration du module dans d'autres scénarios.


Réception de messages


La question de la réception des messages n'est plus aussi banale que la publication. En ce qui concerne la lecture des messages, vous devez comprendre:


  • Après avoir confirmé la réception du message, il sera supprimé de la file d'attente. Ainsi, en lisant les messages de la ligne de «bataille», nous les «sélectionnons» parmi le consommateur principal. Si nous ne voulons pas perdre le flux de messages, mais voulons simplement comprendre quels messages se déplacent dans le "lapin", alors l'option la plus logique est de créer une file d'attente de "journalisation" distincte, ou comme on l'appelle aussi, "file d'attente-trap".
  • Les messages lus, en règle générale, nécessitent un traitement ou une analyse supplémentaire, ce qui signifie qu'ils doivent être enregistrés quelque part si le traitement en temps réel est impossible ou non requis.

Lecteur de messages implémenté dans le fichier rmq_consume.py


Deux modes de fonctionnement sont proposés:


  1. Lire les messages d'une file d'attente existante
  2. Création d'une file d'attente et d'un itinéraire pour lire les messages de cette file d'attente

La question de la création d'une file d'attente et d'itinéraires sera examinée ci-dessous.


La relecture directe est implémentée comme suit:


 channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc()) 


on_message - procédure du gestionnaire de messages
params.queue - le nom de la file d'attente à partir de laquelle la soustraction sera effectuée


Le gestionnaire de messages doit effectuer une opération sur le message lu et confirmer (ou non confirmer, si nécessaire) la remise du message.


 def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log('   .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag) 


all_cnt - compteur global
lim - le nombre de messages à lire


Dans une telle implémentation du gestionnaire, un certain nombre de messages sont soustraits et des informations sur la progression de la soustraction sont sorties vers la console si l'enregistrement se produit dans un fichier.


Il est également possible d'écrire des messages lus dans la base de données. Dans la mise en œuvre actuelle, une telle opportunité n'est pas présentée, mais elle n'est pas difficile à ajouter.


Enregistrer dans une BD

Nous considérerons un exemple d'écriture de messages dans la base de données pour la base de données Oracle et la bibliothèque cx_oracle .


Connectez-vous à la base de données


 ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor() 

Dans le gestionnaire on_message, ajoutez


 global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1 


cnt est un autre compteur
commit_int - le nombre d'insertions dans la base de données, après quoi il est nécessaire de faire «commit». La présence d'un tel paramètre est due à la volonté de réduire la charge sur la base de données. Cependant, son installation n'est pas particulièrement importante, car en cas d'échec, il y a une chance de perdre les messages lus après la dernière validation réussie.


Et, comme prévu, à la fin du travail, nous faisons le commit final et fermons la connexion


 ora_cursor.execute('commit') connection_ora.close() 

Quelque chose comme ça lit des messages. Si vous supprimez la restriction sur le nombre de messages lus, vous pouvez effectuer un processus d'arrière-plan pour la lecture continue des messages du "lapin".


La configuration


Malgré le fait que le protocole AMQP est principalement destiné à la publication et à la lecture de messages, il vous permet également d'effectuer des manipulations simples avec la configuration des routes (nous ne parlons pas de configurer les connexions réseau et d'autres paramètres RabbitMQ en tant qu'application).


Les principales opérations de configuration sont:


  1. Création d'une file d'attente ou d'un point d'échange
  2. Création d'une règle de transfert (liaison)
  3. Suppression d'une file d'attente ou d'un point d'échange
  4. Suppression d'une règle de transfert (liaison)
  5. Suppression de file d'attente

Étant donné que pour chacun d'eux il existe une procédure prête à l'emploi dans la bibliothèque pika, pour faciliter le lancement, ils sont simplement compilés dans le fichier rmq_setup.py . Ensuite, nous listons les procédures de la bibliothèque pika avec quelques commentaires sur les paramètres.


Création d'une file d'attente


 rmq_channel.queue_declare(queue=params.queue, durable = params.durable) 

tout est simple ici
file d'attente - nom de la file d'attente à créer
durable - un paramètre logique, une valeur True signifie que lorsque le lapin redémarre, la file d'attente continuera d'exister. Si False, la file d'attente sera supprimée au redémarrage. La deuxième option est généralement utilisée pour les files d'attente temporaires qui ne seront plus nécessaires à l'avenir.


Création d'un point d'échange (échange)


 rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable) 

ici apparaît un nouveau paramètre exchange_type - le type de point d'échange. À propos des types de points d'échange lus ici .
exchange - nom du point d'échange créé


Suppression d'une file d'attente ou d'un point d'échange


 rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch) 

Création d'une règle de transfert (liaison)


 rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

échange - le nom du point d'échange à partir duquel le transfert sera effectué
file d'attente - le nom de la file d'attente à transférer
routing_key - masque de la clé de routage, qui sera utilisé pour le transfert.


Les entrées suivantes sont valides:


  • rk.my_key. * - dans ce masque, un astérisque signifie un jeu de caractères non vide. En d'autres termes, un tel masque sautera toute clé du type rk.my_key. + autre chose, mais ne manquera pas la clé rk.my_key
  • rk.my_key. # - ce masque va tout sauter comme la touche + précédente rk.my_key

Suppression d'une règle de transfert (liaison)


 rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

tout est similaire à la création d'une règle de transfert.


Suppression de file d'attente


 rmq_channel.queue_purge(queue=params.queue) 

file d'attente - le nom de la file d'attente à effacer


À propos de l'utilisation de l'interface de ligne de commande dans les applications Python

Les options de démarrage rendent la vie beaucoup plus facile. Afin de ne pas éditer le code avant chaque lancement, il est logique de prévoir un mécanisme de passage des paramètres au démarrage. La bibliothèque argparse a été choisie à cet effet . Je n'entrerai pas dans les détails des subtilités de son utilisation, il y a suffisamment de guides sur ce sujet ( un , deux , trois ). Je note seulement que cet outil m'a aidé à simplifier considérablement le processus d'utilisation de l'application (si vous pouvez l'appeler ainsi). Même après avoir lancé une simple séquence de commandes et les avoir enveloppées dans une interface similaire, vous pouvez obtenir un outil à part entière et facile à utiliser.


Application au quotidien. Ce qui a été le plus utile.


Eh bien, maintenant une petite impression sur l'utilisation de l'AMQP dans la vie quotidienne.


La caractéristique la plus demandée a été la publication du message. Les droits d'accès d'un utilisateur particulier ne permettent pas toujours l'utilisation d'une interface Web, bien qu'il soit parfois simplement nécessaire de tester un service particulier. Ici AMQP et l'autorisation au nom du service utilisant ce canal passent à l'aide.


Le deuxième plus populaire était la possibilité de lire les messages de la file d'attente. Cette fonction est utile pour configurer de nouveaux itinéraires et flux de messages, ainsi que pour prévenir les accidents.


D'autres possibilités ont également trouvé une application dans diverses tâches.

Source: https://habr.com/ru/post/fr434510/


All Articles