Kurz über die Arbeit mit RabbitMQ aus Python

KDPV


So kam es, dass man bei der Arbeit in MegaFon bei der Arbeit mit RabbitMQ vor denselben Aufgaben stehen muss. Es stellt sich natürlich die Frage: "Wie kann die Implementierung solcher Aufgaben vereinfacht und automatisiert werden?"


Die erste Lösung, die mir in den Sinn kommt, ist die Verwendung der HTTP-Schnittstelle. Natürlich verfügt RabbitMQ sofort über eine gute Weboberfläche und eine HTTP-API. Trotzdem ist die Verwendung der HTTP-API nicht immer bequem und manchmal sogar unmöglich (sagen wir, Sie haben nicht genügend Zugriffsrechte, aber ich möchte wirklich eine Nachricht veröffentlichen). In solchen Zeiten ist es erforderlich, mit dem AMQP-Protokoll zu arbeiten


Da auf den Freiflächen des Netzwerks keine für mich geeigneten Lösungen gefunden wurden, wurde beschlossen, eine kleine Anwendung für die Arbeit mit RabbitMQ unter Verwendung des AMQP-Protokolls zu schreiben mit der Fähigkeit, Startparameter über die Befehlszeile zu übertragen und die minimal erforderlichen Funktionen bereitzustellen, nämlich:


  • Posting
  • Korrekturlesen von Nachrichten
  • Erstellen und Bearbeiten grundlegender Routenelemente

Python wurde als einfachstes (und meiner Meinung nach schönes) Tool zur Implementierung einer solchen Aufgabe ausgewählt. (Man kann hier streiten, aber was wird sich ändern?)


Übersetzungen von offiziellen Leitfäden ( einmal , zweimal ) zu RabbitMQ werden auf dem Hub präsentiert, manchmal ist jedoch ein einfaches Beispiel aus der Praxis hilfreich. In diesem Artikel werde ich versuchen, die grundlegenden Probleme, die bei der Arbeit mit Kaninchen über den AMQP-Kanal von Python auftreten, anhand eines Beispiels einer kleinen Anwendung zu veranschaulichen. Die Anwendung selbst ist auf GitHub verfügbar.


Kurz über das AMQP-Protokoll und den RabbitMQ-Nachrichtenbroker


AMQP ist eines der häufigsten Messaging-Protokolle zwischen Komponenten eines verteilten Systems. Das Hauptunterscheidungsmerkmal dieses Protokolls ist das Konzept der Erstellung einer Nachrichtenroute, die zwei Hauptstrukturelemente enthält: eine Warteschlange und einen Austauschpunkt . Die Warteschlange sammelt Nachrichten, bis sie empfangen werden. Ein Austauschpunkt ist ein Nachrichtenverteiler, der sie entweder an die gewünschte Warteschlange oder an einen anderen Austauschpunkt weiterleitet. Verteilungsregeln (Bindungen) , anhand derer der Austauschpunkt bestimmt, wohin die Nachricht geleitet werden soll, basieren auf der Überprüfung des Routing-Schlüssels der Nachricht auf Übereinstimmung mit der angegebenen Maske. Weitere Informationen zur Funktionsweise von AMQP finden Sie hier .


RabbitMQ ist eine Open Source-Anwendung, die AMQP vollständig unterstützt und eine Reihe zusätzlicher Funktionen bietet. Für die Arbeit mit RabbitMQ wurde eine große Anzahl von Bibliotheken in verschiedenen Programmiersprachen geschrieben, einschließlich Python.


Python-Implementierung


Sie können immer ein paar Skripte für den persönlichen Gebrauch werfen und die Probleme damit nicht kennen. Wenn es darum geht, sie unter Kollegen zu verbreiten, wird alles komplizierter. Jeder muss zeigen und sagen, wie und was zu starten ist, was und wo zu ändern ist, wo die neueste Version zu erhalten ist und was sich daran geändert hat ... Unwillkürlich kommen Sie zu dem Schluss, dass es einfacher ist, eine einfache Benutzeroberfläche einmal zu erarbeiten, um in Zukunft keine Zeit zu verschwenden. Aus Gründen der Benutzerfreundlichkeit wurde beschlossen, die Anwendung in 4 Module zu unterteilen:


  1. Das für die Buchung zuständige Modul
  2. Modul, das für das Subtrahieren von Nachrichten aus der Warteschlange verantwortlich ist
  3. Ein Modul, mit dem Änderungen an der Konfiguration des RabbitMQ-Brokers vorgenommen werden können
  4. Ein Modul, das Parameter und Methoden enthält, die früheren Modulen gemeinsam sind

Dieser Ansatz vereinfacht den Satz von Startparametern. Wir haben das gewünschte Modul ausgewählt, einen seiner Betriebsmodi ausgewählt und die erforderlichen Parameter übergeben (weitere Informationen zu Betriebsmodi und Parametern in der Hilfe –help).


Da die Struktur von „Kaninchen“ in MegaFon aus einer ausreichend großen Anzahl von Knoten besteht, werden die Daten für die Verbindung mit den Knoten zur Vereinfachung der Verwendung an ein Modul mit allgemeinen Parametern und Methoden rmq_common_tools.py übertragen


Um an AMQP in Python zu arbeiten, verwenden wir die Pika- Bibliothek.


import pika 

Bei Verwendung dieser Bibliothek besteht die Arbeit mit RabbitMQ aus drei Hauptphasen:


  1. Stellen Sie eine Verbindung her
  2. Erforderliche Operationen ausführen
  3. Verbindung schließen

Die erste und letzte Stufe sind für alle Module gleich und in rmq_common_tools.py implementiert


So stellen Sie eine Verbindung her:


 rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel() 

In der Pika-Bibliothek können Sie verschiedene Entwurfsoptionen für die Verbindung mit RabbitMQ verwenden. In diesem Fall bestand die bequemste Option darin, die Parameter in Form einer URL-Zeichenfolge im folgenden Format zu übergeben:


 'amqp://rabbit_user:rabbit_password@host:port/vhost' 

So schließen Sie eine Verbindung:


 rmq_connection.close() 

Posting


Das Veröffentlichen einer Nachricht ist wahrscheinlich die einfachste, aber gleichzeitig die beliebteste Operation bei der Arbeit mit Kaninchen.


In rmq_publish.py kompilierte Post-Publishing-Tools


Verwenden Sie die Methode, um eine Nachricht zu veröffentlichen


 rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text) 

wo:
Austausch - Der Name des Austauschpunkts, an dem die Nachricht veröffentlicht wird
routing_key - Routing-Schlüssel, mit dem die Nachricht veröffentlicht wird
body - Nachrichtentext


rmq_publish.py unterstützt zwei Nachrichteneingabemodi für die Veröffentlichung:


  1. Die Nachricht wird als Parameter über die Befehlszeile (from_console) eingegeben.
  2. Die Nachricht wird aus der Datei gelesen (from_file)

Der zweite Modus ist meiner Meinung nach bequemer, wenn Sie mit großen Nachrichten oder Nachrichtenarrays arbeiten. Mit der ersten Option können Sie eine Nachricht ohne zusätzliche Dateien senden. Dies ist praktisch, wenn Sie das Modul in andere Szenarien integrieren.


Nachrichten empfangen


Das Problem des Empfangs von Nachrichten ist nicht mehr so ​​trivial wie das Veröffentlichen. Wenn Sie Nachrichten lesen möchten, müssen Sie Folgendes verstehen:


  • Nachdem der Empfang der Nachricht bestätigt wurde, wird sie aus der Warteschlange entfernt. Wenn wir also die Nachrichten aus der "Battle" -Linie lesen, "wählen" wir sie vom Hauptverbraucher aus. Wenn wir den Nachrichtenfluss nicht verlieren möchten, sondern nur verstehen möchten, welche Nachrichten sich im "Kaninchen" bewegen, besteht die logischste Option darin, eine separate "Protokollierungs" -Warteschlange oder, wie sie auch genannt wird, "Trap-Warteschlange" zu erstellen.
  • Gelesene Nachrichten erfordern in der Regel eine weitere Verarbeitung oder Analyse. Dies bedeutet, dass sie irgendwo gespeichert werden müssen, wenn eine Echtzeitverarbeitung nicht möglich oder nicht erforderlich ist.

Nachrichtenleser in der Datei rmq_consume.py implementiert


Es stehen zwei Betriebsarten zur Verfügung:


  1. Lesen Sie Nachrichten aus einer vorhandenen Warteschlange
  2. Erstellen einer Zeitwarteschlange und einer Route zum Lesen von Nachrichten aus dieser Warteschlange

Die Frage des Erstellens einer Warteschlange und von Routen wird unten behandelt.


Das direkte Korrekturlesen wird wie folgt implementiert:


 channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc()) 

wo
on_message - Nachrichtenbehandlungsprozedur
params.queue - Der Name der Warteschlange, von der die Subtraktion durchgeführt wird


Der Nachrichtenhandler muss eine Operation für die gelesene Nachricht ausführen und die Nachrichtenübermittlung bestätigen (oder nicht bestätigen, falls erforderlich).


 def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log('   .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag) 

wo
all_cnt - globaler Zähler
lim - Die Anzahl der zu lesenden Nachrichten


Bei einer solchen Implementierung des Handlers wird eine bestimmte Anzahl von Nachrichten subtrahiert und Informationen über den Fortschritt der Subtraktion werden an die Konsole ausgegeben, wenn die Aufzeichnung in einer Datei erfolgt.


Es ist auch möglich, gelesene Nachrichten in die Datenbank zu schreiben. In der aktuellen Implementierung wird eine solche Gelegenheit nicht angeboten, aber es ist nicht schwierig, sie hinzuzufügen.


Datensatz in einer DB

Wir werden ein Beispiel für das Schreiben von Nachrichten in die Datenbank für die Oracle-Datenbank und die cx_oracle- Bibliothek betrachten.


Stellen Sie eine Verbindung zur Datenbank her


 ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor() 

Fügen Sie im Handler on_message hinzu


 global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1 

wo
cnt ist ein weiterer Zähler
commit_int - Die Anzahl der Einfügungen in die Datenbank, nach denen "commit" ausgeführt werden muss. Das Vorhandensein eines solchen Parameters ist auf den Wunsch zurückzuführen, die Belastung der Datenbank zu verringern. Die Installation ist jedoch nicht besonders groß, weil Im Falle eines Fehlers besteht die Möglichkeit, dass nach dem letzten erfolgreichen Festschreiben gelesene Nachrichten verloren gehen.


Und wie erwartet machen wir am Ende der Arbeit das endgültige Commit und schließen die Verbindung


 ora_cursor.execute('commit') connection_ora.close() 

So etwas liest Nachrichten. Wenn Sie die Beschränkung für die Anzahl der gelesenen Nachrichten aufheben, können Sie einen Hintergrundprozess zum kontinuierlichen Lesen von Nachrichten vom "Kaninchen" durchführen.


Konfiguration


Trotz der Tatsache, dass das AMQP-Protokoll hauptsächlich zum Veröffentlichen und Lesen von Nachrichten vorgesehen ist, können Sie auch einfache Manipulationen bei der Konfiguration von Routen durchführen (wir sprechen nicht über das Konfigurieren von Netzwerkverbindungen und anderen RabbitMQ-Einstellungen als Anwendung).


Die Hauptkonfigurationsvorgänge sind:


  1. Erstellen einer Warteschlange oder eines Austauschpunkts
  2. Weiterleitungsregel erstellen (verbindlich)
  3. Löschen einer Warteschlange oder eines Austauschpunkts
  4. Weiterleitungsregel entfernen (verbindlich)
  5. Löschen der Warteschlange

Da es für jeden von ihnen eine vorgefertigte Prozedur in der Pika-Bibliothek gibt, werden sie zur Vereinfachung des Starts einfach in der Datei rmq_setup.py kompiliert. Als nächstes listen wir die Prozeduren aus der Pika-Bibliothek mit einigen Kommentaren zu den Parametern auf.


Eine Warteschlange erstellen


 rmq_channel.queue_declare(queue=params.queue, durable = params.durable) 

hier ist alles einfach
Warteschlange - Name der zu erstellenden Warteschlange
dauerhaft - ein logischer Parameter, ein Wert von True bedeutet, dass die Warteschlange beim Neustart des Kaninchens weiterhin besteht. Bei False wird die Warteschlange beim Neustart gelöscht. Die zweite Option wird normalerweise für temporäre Warteschlangen verwendet, die in Zukunft garantiert nicht mehr benötigt werden.


Austauschpunkt erstellen (Austausch)


 rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable) 

hier entsteht ein neuer Parameter exchange_type - die Art des Austauschpunktes. Über welche Arten von Austauschpunkten lesen Sie hier .
Austausch - Name des erstellten Austauschpunkts


Löschen einer Warteschlange oder eines Austauschpunkts


 rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch) 

Weiterleitungsregel erstellen (verbindlich)


 rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

Austausch - Der Name des Austauschpunkts, von dem aus die Übertragung erfolgt
Warteschlange - Der Name der Warteschlange, an die weitergeleitet werden soll
routing_key - Maske des Routing-Schlüssels, der für die Weiterleitung verwendet wird.


Folgende Einträge sind gültig:


  • rk.my_key. * - In dieser Maske bedeutet ein Sternchen einen nicht leeren Zeichensatz. Mit anderen Worten, eine solche Maske überspringt jeden Schlüssel vom Typ rk.my_key. + etwas anderes, aber den Schlüssel rk.my_key nicht verpassen
  • rk.my_key. # - Diese Maske überspringt alles als vorherige + Taste rk.my_key

Weiterleitungsregel entfernen (verbindlich)


 rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

Alles ähnelt dem Erstellen einer Weiterleitungsregel.


Löschen der Warteschlange


 rmq_channel.queue_purge(queue=params.queue) 

Warteschlange - Der Name der Warteschlange, die gelöscht werden soll


Informationen zur Verwendung der Befehlszeilenschnittstelle in Python-Anwendungen

Startoptionen erleichtern das Leben erheblich. Um den Code nicht vor jedem Start zu bearbeiten, ist es logisch, einen Mechanismus zum Übergeben von Parametern beim Start bereitzustellen. Zu diesem Zweck wurde die Argparse- Bibliothek ausgewählt. Ich werde nicht näher auf die Feinheiten seiner Verwendung eingehen, es gibt genügend Anleitungen zu diesem Thema ( eins , zwei , drei ). Ich stelle nur fest, dass dieses Tool mir geholfen hat, den Prozess der Verwendung der Anwendung erheblich zu vereinfachen (wenn Sie es so nennen können). Selbst wenn Sie eine einfache Folge von Befehlen geworfen und in eine ähnliche Oberfläche eingebunden haben, erhalten Sie ein vollwertiges und benutzerfreundliches Tool.


Anwendung im Alltag. Was sich am meisten als nützlich erwiesen hat.


Nun ein kleiner Eindruck über den Einsatz von AMQP im Alltag.


Die am häufigsten nachgefragte Funktion war die Veröffentlichung der Nachricht. Die Zugriffsrechte eines bestimmten Benutzers erlauben nicht immer die Verwendung einer Webschnittstelle, obwohl es manchmal einfach erforderlich ist, einen bestimmten Dienst zu testen. Hier werden AMQP und die Autorisierung im Namen des Dienstes, der diesen Kanal verwendet, an die Hilfe weitergegeben.


Am zweitbeliebtesten war die Möglichkeit, Nachrichten aus der Zeitwarteschlange zu lesen. Diese Funktion ist nützlich, um neue Routen und Nachrichtenflüsse zu konfigurieren und Unfälle zu vermeiden.


Andere Möglichkeiten fanden auch Anwendung in verschiedenen Aufgaben.

Source: https://habr.com/ru/post/de434510/


All Articles