Brevemente sobre como trabalhar com o RabbitMQ do Python

KDPV


Aconteceu que, no processo de trabalho no MegaFon, é preciso enfrentar as mesmas tarefas ao trabalhar com o RabbitMQ. A questão naturalmente surge: "Como simplificar e automatizar a implementação de tais tarefas?"


A primeira solução que vem à mente é usar a interface HTTP e, é claro, pronto para uso, o RabbitMQ possui uma boa interface da web e uma API HTTP. No entanto, o uso da API HTTP nem sempre é conveniente e, às vezes, impossível (digamos que você não tenha direitos de acesso suficientes, mas eu realmente quero publicar uma mensagem) nesses momentos, torna-se necessário trabalhar usando o protocolo AMQP


Não encontrando soluções prontas adequadas para mim nos espaços abertos da rede, foi decidido escrever um pequeno aplicativo para trabalhar com o RabbitMQ usando o protocolo AMQP com a capacidade de transferir parâmetros de inicialização pela linha de comando e fornecer o conjunto mínimo de recursos necessário, a saber:


  • Postagem
  • Revisão de Mensagens
  • Criando e editando elementos básicos de rota

O Python foi escolhido como a ferramenta mais simples (e na minha opinião bonita) para implementar essa tarefa. (pode-se argumentar aqui, mas o que isso vai mudar?)


Traduções de guias oficiais ( um , dois ) pelo RabbitMQ são apresentadas no hub; no entanto, às vezes, um exemplo simples da prática é útil. No artigo, tentarei ilustrar os principais problemas que surgem ao trabalhar com coelhos usando o canal AMQP do Python usando um exemplo de aplicativo pequeno. O aplicativo em si está disponível no GitHub .


Brevemente sobre o protocolo AMQP e o broker de mensagens RabbitMQ


O AMQP é um dos protocolos de mensagens mais comuns entre componentes de um sistema distribuído. A principal característica distintiva desse protocolo é o conceito de construção de uma rota de mensagens, contendo dois elementos estruturais principais: uma fila e um ponto de troca . A fila acumula mensagens até ser recebida. Um ponto de troca é um distribuidor de mensagens que os direciona para a fila desejada ou para outro ponto de troca. As regras de distribuição (ligações) , pelas quais o ponto de troca determina para onde direcionar a mensagem, são baseadas na verificação da chave de roteamento da mensagem para verificar a conformidade com a máscara especificada. Você pode ler mais sobre como o AMQP funciona aqui .


O RabbitMQ é um aplicativo de código aberto que oferece suporte total ao AMQP e oferece vários recursos adicionais. Para trabalhar com o RabbitMQ, um grande número de bibliotecas foi escrito em uma variedade de linguagens de programação, incluindo Python.


Implementação Python


Você sempre pode lançar alguns scripts para uso pessoal e não conhecer os problemas com eles. Quando se trata de espalhá-los entre colegas, tudo se torna mais complicado. Todo mundo precisa mostrar e dizer como e o que iniciar, o que e onde mudar, onde obter a versão mais recente e o que mudou nela ... Involuntariamente, chega-se à conclusão de que é mais fácil elaborar uma interface simples uma vez, para não perder tempo no futuro. Para facilitar o uso, foi decidido dividir o aplicativo em 4 módulos:


  1. O módulo responsável pela postagem
  2. Módulo responsável por subtrair mensagens da fila
  3. Um módulo projetado para fazer alterações na configuração do broker RabbitMQ
  4. Um módulo contendo parâmetros e métodos comuns aos módulos anteriores

Essa abordagem simplifica o conjunto de parâmetros de inicialização. Selecionamos o módulo necessário, um dos modos de operação e passamos os parâmetros necessários (para obter mais informações sobre modos e parâmetros de operação na ajuda –help).


Como a estrutura de "coelhos" no MegaFon consiste em um número suficientemente grande de nós, para facilitar o uso, os dados para conexão com os nós são transferidos para um módulo com parâmetros e métodos gerais rmq_common_tools.py


Para trabalhar no AMQP em Python, usaremos a biblioteca Pika .


import pika 

Usando esta biblioteca, o trabalho com o RabbitMQ consistirá em três estágios principais:


  1. Estabelecer uma conexão
  2. Executando operações necessárias
  3. Fechar conexão

O primeiro e o último estágio são os mesmos para todos os módulos e são implementados em rmq_common_tools.py


Para estabelecer uma conexão:


 rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel() 

A biblioteca Pika permite que você use várias opções de design para conectar-se ao RabbitMQ. Nesse caso, a opção mais conveniente era passar os parâmetros na forma de uma string de URL no seguinte formato:


 'amqp://rabbit_user:rabbit_password@host:port/vhost' 

Para fechar uma conexão:


 rmq_connection.close() 

Postagem


A publicação de uma mensagem é provavelmente a mais fácil, mas ao mesmo tempo a operação mais popular ao trabalhar com coelhos.


Publicar ferramentas de publicação compiladas em rmq_publish.py


Para postar uma mensagem, use o método


 rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text) 

onde:
troca - o nome do ponto de troca no qual a mensagem será publicada
routing_key - chave de roteamento com a qual a mensagem será publicada
corpo - corpo da mensagem


O rmq_publish.py suporta dois modos de entrada de mensagens para publicação:


  1. A mensagem é inserida como um parâmetro através da linha de comando (from_console)
  2. A mensagem é lida no arquivo (from_file)

O segundo modo, na minha opinião, é mais conveniente ao trabalhar com mensagens grandes ou matrizes de mensagens. O primeiro, por sua vez, permite enviar uma mensagem sem arquivos adicionais, o que é conveniente ao integrar o módulo em outros cenários.


Recebendo Mensagens


A questão de receber mensagens não é mais tão trivial como publicar. Quando se trata de ler mensagens, você precisa entender:


  • Após confirmar o recebimento da mensagem, ela será removida da fila. Assim, lendo as mensagens da linha "batalha", nós as "selecionamos" no consumidor principal. Se não queremos perder o fluxo de mensagens, mas apenas queremos entender quais mensagens estão sendo movidas no "coelho", a opção mais lógica é criar uma fila de "log" separada ou, como também é chamado, "interceptação de fila".
  • Como regra geral, as mensagens de leitura requerem processamento ou análise adicionais, o que significa que elas precisam ser salvas em algum lugar se o processamento em tempo real for impossível ou não for necessário.

Leitor de mensagens implementado no arquivo rmq_consume.py


São fornecidos dois modos de operação:


  1. Ler mensagens de uma fila existente
  2. Criando uma fila de tempo e rota para ler mensagens dessa fila

A questão de criar uma fila e rotas será considerada abaixo.


A revisão direta é implementada da seguinte maneira:


 channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc()) 

onde
on_message - procedimento do manipulador de mensagens
params.queue - o nome da fila da qual a subtração será feita


O manipulador de mensagens deve executar alguma operação na mensagem lida e confirmar (ou não confirmar, se necessário) a entrega da mensagem.


 def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log('   .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag) 

onde
all_cnt - contador global
lim - o número de mensagens a serem lidas


Nessa implementação do manipulador, um certo número de mensagens é subtraído e as informações sobre o andamento da subtração são enviadas ao console se a gravação ocorrer em um arquivo.


Também é possível gravar mensagens de leitura no banco de dados. Na implementação atual, essa oportunidade não é apresentada, mas não é difícil de adicionar.


Gravar em um banco de dados

Vamos considerar um exemplo de gravação de mensagens no banco de dados para o banco de dados Oracle e a biblioteca cx_oracle .


Conecte-se ao banco de dados


 ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor() 

No manipulador on_message, adicione


 global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1 

onde
cnt é outro contador
commit_int - o número de inserções no banco de dados, após o qual é necessário fazer o "commit". A presença desse parâmetro se deve ao desejo de reduzir a carga no banco de dados. No entanto, a instalação não é particularmente grande, porque no caso de uma falha, há uma chance de perder as mensagens lidas após a última confirmação bem-sucedida.


E, como esperado, no final do trabalho, fazemos o commit final e fechamos a conexão


 ora_cursor.execute('commit') connection_ora.close() 

Algo assim está lendo mensagens. Se você remover a restrição do número de mensagens lidas, poderá fazer um processo em segundo plano para a leitura contínua de mensagens do "coelho".


Configuração


Apesar de o AMQP se destinar principalmente à publicação e leitura de mensagens, ele também permite que você execute manipulações simples com a configuração de rotas (não estamos falando sobre a configuração de conexões de rede e outras configurações do RabbitMQ como um aplicativo).


As principais operações de configuração são:


  1. Criando uma fila ou ponto de troca
  2. Criando uma regra de encaminhamento (ligação)
  3. Excluindo uma fila ou ponto de troca
  4. Removendo uma regra de encaminhamento (ligação)
  5. Limpeza de fila

Como para cada um deles existe um procedimento pronto na biblioteca pika, para conveniência do lançamento, eles são simplesmente compilados no arquivo rmq_setup.py . A seguir, listamos os procedimentos da biblioteca pika com alguns comentários sobre os parâmetros.


Criando uma fila


 rmq_channel.queue_declare(queue=params.queue, durable = params.durable) 

tudo é simples aqui
fila - nome da fila a ser criada
durável - um parâmetro lógico, um valor True significa que, quando o coelho for reiniciado, a fila continuará a existir. Se False, a fila será excluída na reinicialização. A segunda opção é geralmente usada para filas temporárias que garantem não serem necessárias no futuro.


Criando um ponto de troca (troca)


 rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable) 

aqui surge um novo parâmetro exchange_type - o tipo de ponto de troca. Sobre que tipos de pontos de troca são lidos aqui .
troca - nome do ponto de troca criado


Excluindo uma fila ou ponto de troca


 rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch) 

Criando uma regra de encaminhamento (ligação)


 rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

troca - o nome do ponto de troca a partir do qual a transferência será feita
fila - o nome da fila a ser encaminhada para
routing_key - máscara da chave de roteamento, que será usada para encaminhamento.


As seguintes entradas são válidas:


  • rk.my_key. * - nesta máscara, um asterisco significa um conjunto de caracteres não vazio. Em outras palavras, essa máscara ignorará qualquer chave do tipo rk.my_key. + outra coisa, mas não perderá a chave rk.my_key
  • rk.my_key. # - esta máscara pulará tudo como a tecla + anterior rk.my_key

Removendo uma regra de encaminhamento (ligação)


 rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key) 

tudo é semelhante à criação de uma regra de encaminhamento.


Limpeza de fila


 rmq_channel.queue_purge(queue=params.queue) 

fila - o nome da fila a ser limpa


Sobre o uso da interface da linha de comandos nos aplicativos Python

As opções de inicialização tornam a vida muito mais fácil. Para não editar o código antes de cada inicialização, é lógico fornecer um mecanismo para passar parâmetros na inicialização. A biblioteca argparse foi escolhida para esse fim . Não vou entrar em detalhes dos meandros de seu uso; existem guias suficientes sobre esse assunto ( um , dois , três ). Observo apenas que essa ferramenta me ajudou a simplificar bastante o processo de uso do aplicativo (se é que você pode chamar assim). Mesmo tendo lançado uma sequência simples de comandos e agrupado-os em uma interface semelhante, você pode obter uma ferramenta completa e fácil de usar.


Aplicação na vida cotidiana. O que veio a calhar mais.


Bem, agora uma pequena impressão sobre o uso do AMQP na vida cotidiana.


O recurso mais solicitado foi a publicação da mensagem. Os direitos de acesso de um usuário específico nem sempre permitem o uso de uma interface da web, embora às vezes seja simplesmente necessário testar um serviço específico. Aqui, o AMQP e a autorização em nome do serviço usando este canal passam para o auxílio.


O segundo mais popular foi a capacidade de ler mensagens da fila de tempo. Esse recurso é útil na configuração de novas rotas e fluxos de mensagens, bem como na prevenção de acidentes.


Outras possibilidades também foram aplicadas em várias tarefas.

Source: https://habr.com/ru/post/pt434510/


All Articles