
Aconteceu que, no processo de trabalho no MegaFon, é preciso enfrentar as mesmas tarefas ao trabalhar com o RabbitMQ. A questão naturalmente surge: "Como simplificar e automatizar a implementação de tais tarefas?"
A primeira solução que vem à mente é usar a interface HTTP e, é claro, pronto para uso, o RabbitMQ possui uma boa interface da web e uma API HTTP. No entanto, o uso da API HTTP nem sempre é conveniente e, às vezes, impossível (digamos que você não tenha direitos de acesso suficientes, mas eu realmente quero publicar uma mensagem) nesses momentos, torna-se necessário trabalhar usando o protocolo AMQP
Não encontrando soluções prontas adequadas para mim nos espaços abertos da rede, foi decidido escrever um pequeno aplicativo para trabalhar com o RabbitMQ usando o protocolo AMQP com a capacidade de transferir parâmetros de inicialização pela linha de comando e fornecer o conjunto mínimo de recursos necessário, a saber:
- Postagem
- Revisão de Mensagens
- Criando e editando elementos básicos de rota
O Python foi escolhido como a ferramenta mais simples (e na minha opinião bonita) para implementar essa tarefa. (pode-se argumentar aqui, mas o que isso vai mudar?)
Traduções de guias oficiais ( um , dois ) pelo RabbitMQ são apresentadas no hub; no entanto, às vezes, um exemplo simples da prática é útil. No artigo, tentarei ilustrar os principais problemas que surgem ao trabalhar com coelhos usando o canal AMQP do Python usando um exemplo de aplicativo pequeno. O aplicativo em si está disponível no GitHub .
Brevemente sobre o protocolo AMQP e o broker de mensagens RabbitMQ
O AMQP é um dos protocolos de mensagens mais comuns entre componentes de um sistema distribuído. A principal característica distintiva desse protocolo é o conceito de construção de uma rota de mensagens, contendo dois elementos estruturais principais: uma fila e um ponto de troca . A fila acumula mensagens até ser recebida. Um ponto de troca é um distribuidor de mensagens que os direciona para a fila desejada ou para outro ponto de troca. As regras de distribuição (ligações) , pelas quais o ponto de troca determina para onde direcionar a mensagem, são baseadas na verificação da chave de roteamento da mensagem para verificar a conformidade com a máscara especificada. Você pode ler mais sobre como o AMQP funciona aqui .
O RabbitMQ é um aplicativo de código aberto que oferece suporte total ao AMQP e oferece vários recursos adicionais. Para trabalhar com o RabbitMQ, um grande número de bibliotecas foi escrito em uma variedade de linguagens de programação, incluindo Python.
Implementação Python
Você sempre pode lançar alguns scripts para uso pessoal e não conhecer os problemas com eles. Quando se trata de espalhá-los entre colegas, tudo se torna mais complicado. Todo mundo precisa mostrar e dizer como e o que iniciar, o que e onde mudar, onde obter a versão mais recente e o que mudou nela ... Involuntariamente, chega-se à conclusão de que é mais fácil elaborar uma interface simples uma vez, para não perder tempo no futuro. Para facilitar o uso, foi decidido dividir o aplicativo em 4 módulos:
- O módulo responsável pela postagem
- Módulo responsável por subtrair mensagens da fila
- Um módulo projetado para fazer alterações na configuração do broker RabbitMQ
- Um módulo contendo parâmetros e métodos comuns aos módulos anteriores
Essa abordagem simplifica o conjunto de parâmetros de inicialização. Selecionamos o módulo necessário, um dos modos de operação e passamos os parâmetros necessários (para obter mais informações sobre modos e parâmetros de operação na ajuda –help).
Como a estrutura de "coelhos" no MegaFon consiste em um número suficientemente grande de nós, para facilitar o uso, os dados para conexão com os nós são transferidos para um módulo com parâmetros e métodos gerais rmq_common_tools.py
Para trabalhar no AMQP em Python, usaremos a biblioteca Pika .
import pika
Usando esta biblioteca, o trabalho com o RabbitMQ consistirá em três estágios principais:
- Estabelecer uma conexão
- Executando operações necessárias
- Fechar conexão
O primeiro e o último estágio são os mesmos para todos os módulos e são implementados em rmq_common_tools.py
Para estabelecer uma conexão:
rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel()
A biblioteca Pika permite que você use várias opções de design para conectar-se ao RabbitMQ. Nesse caso, a opção mais conveniente era passar os parâmetros na forma de uma string de URL no seguinte formato:
'amqp://rabbit_user:rabbit_password@host:port/vhost'
Para fechar uma conexão:
rmq_connection.close()
Postagem
A publicação de uma mensagem é provavelmente a mais fácil, mas ao mesmo tempo a operação mais popular ao trabalhar com coelhos.
Publicar ferramentas de publicação compiladas em rmq_publish.py
Para postar uma mensagem, use o método
rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text)
onde:
troca - o nome do ponto de troca no qual a mensagem será publicada
routing_key - chave de roteamento com a qual a mensagem será publicada
corpo - corpo da mensagem
O rmq_publish.py suporta dois modos de entrada de mensagens para publicação:
- A mensagem é inserida como um parâmetro através da linha de comando (from_console)
- A mensagem é lida no arquivo (from_file)
O segundo modo, na minha opinião, é mais conveniente ao trabalhar com mensagens grandes ou matrizes de mensagens. O primeiro, por sua vez, permite enviar uma mensagem sem arquivos adicionais, o que é conveniente ao integrar o módulo em outros cenários.
Recebendo Mensagens
A questão de receber mensagens não é mais tão trivial como publicar. Quando se trata de ler mensagens, você precisa entender:
- Após confirmar o recebimento da mensagem, ela será removida da fila. Assim, lendo as mensagens da linha "batalha", nós as "selecionamos" no consumidor principal. Se não queremos perder o fluxo de mensagens, mas apenas queremos entender quais mensagens estão sendo movidas no "coelho", a opção mais lógica é criar uma fila de "log" separada ou, como também é chamado, "interceptação de fila".
- Como regra geral, as mensagens de leitura requerem processamento ou análise adicionais, o que significa que elas precisam ser salvas em algum lugar se o processamento em tempo real for impossível ou não for necessário.
Leitor de mensagens implementado no arquivo rmq_consume.py
São fornecidos dois modos de operação:
- Ler mensagens de uma fila existente
- Criando uma fila de tempo e rota para ler mensagens dessa fila
A questão de criar uma fila e rotas será considerada abaixo.
A revisão direta é implementada da seguinte maneira:
channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc())
onde
on_message - procedimento do manipulador de mensagens
params.queue - o nome da fila da qual a subtração será feita
O manipulador de mensagens deve executar alguma operação na mensagem lida e confirmar (ou não confirmar, se necessário) a entrega da mensagem.
def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log(' .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag)
onde
all_cnt - contador global
lim - o número de mensagens a serem lidas
Nessa implementação do manipulador, um certo número de mensagens é subtraído e as informações sobre o andamento da subtração são enviadas ao console se a gravação ocorrer em um arquivo.
Também é possível gravar mensagens de leitura no banco de dados. Na implementação atual, essa oportunidade não é apresentada, mas não é difícil de adicionar.
Gravar em um banco de dadosVamos considerar um exemplo de gravação de mensagens no banco de dados para o banco de dados Oracle e a biblioteca cx_oracle .
Conecte-se ao banco de dados
ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor()
No manipulador on_message, adicione
global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1
onde
cnt é outro contador
commit_int - o número de inserções no banco de dados, após o qual é necessário fazer o "commit". A presença desse parâmetro se deve ao desejo de reduzir a carga no banco de dados. No entanto, a instalação não é particularmente grande, porque no caso de uma falha, há uma chance de perder as mensagens lidas após a última confirmação bem-sucedida.
E, como esperado, no final do trabalho, fazemos o commit final e fechamos a conexão
ora_cursor.execute('commit') connection_ora.close()
Algo assim está lendo mensagens. Se você remover a restrição do número de mensagens lidas, poderá fazer um processo em segundo plano para a leitura contínua de mensagens do "coelho".
Configuração
Apesar de o AMQP se destinar principalmente à publicação e leitura de mensagens, ele também permite que você execute manipulações simples com a configuração de rotas (não estamos falando sobre a configuração de conexões de rede e outras configurações do RabbitMQ como um aplicativo).
As principais operações de configuração são:
- Criando uma fila ou ponto de troca
- Criando uma regra de encaminhamento (ligação)
- Excluindo uma fila ou ponto de troca
- Removendo uma regra de encaminhamento (ligação)
- Limpeza de fila
Como para cada um deles existe um procedimento pronto na biblioteca pika, para conveniência do lançamento, eles são simplesmente compilados no arquivo rmq_setup.py . A seguir, listamos os procedimentos da biblioteca pika com alguns comentários sobre os parâmetros.
Criando uma fila
rmq_channel.queue_declare(queue=params.queue, durable = params.durable)
tudo é simples aqui
fila - nome da fila a ser criada
durável - um parâmetro lógico, um valor True significa que, quando o coelho for reiniciado, a fila continuará a existir. Se False, a fila será excluída na reinicialização. A segunda opção é geralmente usada para filas temporárias que garantem não serem necessárias no futuro.
Criando um ponto de troca (troca)
rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable)
aqui surge um novo parâmetro exchange_type - o tipo de ponto de troca. Sobre que tipos de pontos de troca são lidos aqui .
troca - nome do ponto de troca criado
Excluindo uma fila ou ponto de troca
rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch)
Criando uma regra de encaminhamento (ligação)
rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
troca - o nome do ponto de troca a partir do qual a transferência será feita
fila - o nome da fila a ser encaminhada para
routing_key - máscara da chave de roteamento, que será usada para encaminhamento.
As seguintes entradas são válidas:
- rk.my_key. * - nesta máscara, um asterisco significa um conjunto de caracteres não vazio. Em outras palavras, essa máscara ignorará qualquer chave do tipo rk.my_key. + outra coisa, mas não perderá a chave rk.my_key
- rk.my_key. # - esta máscara pulará tudo como a tecla + anterior rk.my_key
Removendo uma regra de encaminhamento (ligação)
rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
tudo é semelhante à criação de uma regra de encaminhamento.
Limpeza de fila
rmq_channel.queue_purge(queue=params.queue)
fila - o nome da fila a ser limpa
Sobre o uso da interface da linha de comandos nos aplicativos PythonAs opções de inicialização tornam a vida muito mais fácil. Para não editar o código antes de cada inicialização, é lógico fornecer um mecanismo para passar parâmetros na inicialização. A biblioteca argparse foi escolhida para esse fim . Não vou entrar em detalhes dos meandros de seu uso; existem guias suficientes sobre esse assunto ( um , dois , três ). Observo apenas que essa ferramenta me ajudou a simplificar bastante o processo de uso do aplicativo (se é que você pode chamar assim). Mesmo tendo lançado uma sequência simples de comandos e agrupado-os em uma interface semelhante, você pode obter uma ferramenta completa e fácil de usar.
Aplicação na vida cotidiana. O que veio a calhar mais.
Bem, agora uma pequena impressão sobre o uso do AMQP na vida cotidiana.
O recurso mais solicitado foi a publicação da mensagem. Os direitos de acesso de um usuário específico nem sempre permitem o uso de uma interface da web, embora às vezes seja simplesmente necessário testar um serviço específico. Aqui, o AMQP e a autorização em nome do serviço usando este canal passam para o auxílio.
O segundo mais popular foi a capacidade de ler mensagens da fila de tempo. Esse recurso é útil na configuração de novas rotas e fluxos de mensagens, bem como na prevenção de acidentes.
Outras possibilidades também foram aplicadas em várias tarefas.