Existem várias maneiras de processar mensagens dos sistemas Pub-Sub: usando um serviço separado, isolando um processo isolado, orquestrando um conjunto de processos / fluxos, IPC complexo, Poll-over-Http e muitos outros. Hoje eu quero falar sobre como usar o Pub-Sub sobre HTTP e sobre o meu serviço escrito especificamente para isso.
O uso de um back-end de serviço HTTP pronto em alguns casos é uma solução ideal para o processamento de uma fila de mensagens:
- Balanceamento fora da caixa. Normalmente, o back-end já está atrás do balanceador e possui uma infraestrutura pronta para carregar, o que simplifica bastante o trabalho com mensagens.
- Usando um controlador REST regular (qualquer recurso HTTP). O consumo de mensagens HTTP minimiza o custo de implementação de computadores para diferentes idiomas se o back-end for misto.
- Simplificação do uso de ganchos da Web de outros serviços. Agora, quase todos os serviços (Jira, Gitlab, Mattermost, Slack ...) de alguma forma suportam ganchos da Web para interagir com o mundo exterior. Você pode facilitar a vida se ensinar a fila a executar as funções de um expedidor HTTP.
Essa abordagem também tem desvantagens:
- Você pode esquecer a leveza da solução. O HTTP é um protocolo pesado, e o uso de estruturas do lado do consumidor aumentará instantaneamente a latência e a carga.
- Perdemos os pontos fortes da abordagem Poll, obtendo as fraquezas do Push.
- O processamento de mensagens pelas mesmas instâncias de serviço que processam clientes pode afetar a capacidade de resposta. Isso não é significativo, pois é tratado com equilíbrio e isolamento.
Eu implementei a ideia como um serviço Fila por HTTP, que será discutido mais adiante. O projeto foi escrito em Kotlin usando o Spring Boot 2.1. Como corretor, apenas o Apache Kafka está disponível no momento.
Além disso, supõe-se que o leitor esteja familiarizado com Kafka e saiba sobre os commit (commit) e as compensações (offset) das mensagens, os princípios de grupos (grupo) e consumidores (consumidor), e também entenda como a partição (partição) difere do tópico (tópico) . Se houver lacunas, recomendo que você leia esta seção da documentação do Kafka antes de continuar.Conteúdo
Revisão
O Queue-Over-Http é um serviço que atua como intermediário entre um intermediário de mensagens e o consumidor HTTP final (o serviço facilita a implementação do suporte ao envio de mensagens aos consumidores de qualquer outra maneira, por exemplo, vários * RPC). No momento, apenas estão disponíveis assinatura, cancelamento de inscrição e exibição da lista de consumidores.O envio de mensagens para o intermediário (produção) via HTTP ainda não foi implementado devido à incapacidade de garantir a ordem das mensagens sem o suporte especial do produtor.
O índice do serviço é o consumidor, que pode se inscrever em partições específicas ou simplesmente em tópicos (o padrão de tópico é suportado). No primeiro caso, o equilíbrio automático de partições é desativado. Após a inscrição, o recurso HTTP especificado começa a receber mensagens das partições Kafka atribuídas. Arquitetonicamente, cada assinante é associado a um cliente Kafka Java nativo.
história divertida sobre KafkaConsumerKafka tem um maravilhoso cliente Java que pode fazer muito. Eu o uso no adaptador de fila para receber mensagens do broker e depois enviá-lo para as filas de serviço local. Vale ressaltar que o cliente trabalha exclusivamente no contexto de um único thread.
A ideia do adaptador é simples. Começamos em um thread, escrevemos o agendador mais simples de clientes nativos, focando na redução da latência. Ou seja, escrevemos algo semelhante:
while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) {
Parece que tudo é maravilhoso, a latência é mínima, mesmo com dezenas de consumidores. Na prática, o
KafkaConsumer
para esse modo de operação e fornece uma taxa de alocação de cerca de 1,5 MB / s em tempo ocioso. Com 100 correios, a taxa de alocação atinge 150 MB / se faz com que o GC pense na aplicação. Obviamente, todo esse lixo está na área jovem, a GC é capaz de lidar com isso, mas ainda assim, a solução não é perfeita.
Obviamente, você precisa seguir o caminho típico do
KafkaConsumer
e agora coloco cada assinante no meu fluxo. Isso fornece uma sobrecarga para memória e programação, mas não há outra maneira.
Reescrevi o código de cima, removendo o loop interno e alterando
Duration.ZERO
para
Duration.ofMillis(100)
. Acontece que a taxa de alocação cai para 80-150 KB / s aceitável por consumidor. No entanto, a pesquisa com um tempo limite de 100 ms atrasa toda a fila de confirmações para esses mesmos 100 ms, e isso é inaceitável.
No processo de encontrar soluções para o problema, lembro-me de
KafkaConsumer::wakeup
, que lança um
WakeupException
e interrompe qualquer operação de bloqueio no consumidor. Com esse método, o caminho para a baixa latência é simples: quando chega uma nova solicitação de confirmação, a colocamos na fila e, no consumidor nativo, chamamos de
wakeup
. No ciclo de trabalho, capture
WakeupException
e confirme o que acumulou. Para a transferência de controle com a ajuda de exceções, você deve entregá-lo imediatamente em suas mãos, mas como nada mais ...
Acontece que essa opção está longe de ser perfeita, já que qualquer operação no consumidor nativo agora gera uma
WakeupException
, incluindo a própria confirmação. O processamento dessa situação desorganizará o código com um sinalizador, permitindo que a
wakeup
seja realizada.
Cheguei à conclusão de que seria bom modificar o método
KafkaConsumer::poll
para que ele possa ser interrompido normalmente, de acordo com um sinalizador adicional. Como resultado,
Frankenstein nasceu da reflexão, que copia exatamente o método de pesquisa original, adicionando uma saída do loop pela bandeira. Esse sinalizador é definido por um método interruptPoll separado, que, além disso, chama a ativação no seletor de cliente para liberar o bloqueio de encadeamento nas operações de E / S.
Tendo implementado o cliente dessa maneira, obtenho a velocidade da reação desde o momento em que uma solicitação de confirmação chega até que seja processada em até 100 microssegundos e uma excelente latência para buscar mensagens de um broker, o que é bom.
Cada partição é representada por uma fila local separada, na qual o adaptador grava mensagens do broker. O trabalhador recebe mensagens dele e as envia para execução, ou seja, para envio via HTTP.
O serviço suporta o processamento de mensagens em lote para aumentar a taxa de transferência. Ao se inscrever, você pode especificar o
concurrencyFactor
cada tópico (aplica-se a cada partição designada independentemente). Por exemplo,
concurrencyFactor=1000
significa que 1000 mensagens na forma de solicitações HTTP podem ser enviadas ao consumidor ao mesmo tempo. Assim que todas as mensagens do pacote foram elaboradas de maneira inequívoca pelo consumidor, o serviço decide o próximo commit do deslocamento da última mensagem em Kafka. Portanto, o segundo valor de
concurrencyFactor
é o número máximo de mensagens processadas pelo consumidor no caso de uma falha Kafka ou Queue-sobre-Http.
Para reduzir atrasos, a fila possui
loadFactor = concurrencyFactor * 2
, que permite ler duas vezes mais mensagens do broker que podem ser enviadas. Como a confirmação automática está desabilitada no cliente nativo, esse esquema não viola as garantias de uma vez.
Um alto valor de
concurrencyFactor
aumenta a taxa de transferência da fila, reduzindo o número de confirmações que levam até 10 ms no pior caso. Ao mesmo tempo, a carga no consumidor aumenta.
A ordem de envio de mensagens dentro do pacote não é garantida, mas pode ser alcançada configurando
concurrencyFactor=1
.
Confirma
As confirmações são uma parte importante do serviço. Quando o próximo pacote de dados estiver pronto, o deslocamento da última mensagem do pacote será imediatamente confirmado para Kafka e somente após uma confirmação bem-sucedida o próximo pacote ficará disponível para processamento. Frequentemente, isso não é suficiente e é necessária uma confirmação automática. Para fazer isso, existe o parâmetro
autoCommitPeriodMs
, que tem pouco em comum com o período de confirmação automática clássico para clientes nativos que confirmam a última mensagem lida na partição. Imagine
concurrencyFactor=10
. O serviço enviou todas as 10 mensagens e aguarda que cada uma delas esteja pronta. O processamento da mensagem 3 é concluído primeiro, depois a mensagem 1 e a mensagem 10. Nesse momento, é hora de confirmar automaticamente. É importante não violar a semântica Pelo menos uma vez. Portanto, você pode confirmar apenas a primeira mensagem, ou seja, o deslocamento 2, pois somente ela foi processada com êxito naquele momento. Além disso, até a próxima confirmação automática, as mensagens 2, 5, 6, 4 e 8. são processadas Agora, você precisa confirmar apenas o deslocamento 7 e assim por diante. A confirmação automática quase não afeta o rendimento.
Tratamento de erros
No modo normal de operação, o serviço envia uma mensagem ao supervisor uma vez. Se, por algum motivo, tiver causado um erro 4xx ou 5xx, o serviço reenviará a mensagem, aguardando o processamento bem-sucedido. O tempo entre as tentativas pode ser configurado como um parâmetro separado.
Também é possível definir o número de tentativas após as quais a mensagem será marcada como processada, o que interromperá as retransmissões, independentemente do status da resposta. Eu não aconselho usar isso para dados confidenciais, situações de falha dos consumidores sempre devem ser ajustadas manualmente. Mensagens fixas podem ser monitoradas por logs de serviço e monitoramento do status da resposta do consumidor.
sobre furarGeralmente, o servidor HTTP, atribuindo a 4xx ou 5xx o status da resposta, também envia o cabeçalho Connection: close
. Uma conexão TCP que é fechada dessa maneira permanece no status TIME_WAITED
até que seja limpa pelo sistema operacional após algum tempo. O problema é que essas conexões ocupam uma porta inteira que não pode ser reutilizada até a liberação. Isso pode resultar na ausência de portas livres na máquina para estabelecer uma conexão TCP e o serviço será lançado com exceções nos logs para cada envio. Na prática, no Windows 10, as portas terminam após 10 a 20 mil enviando mensagens erradas dentro de 1-2 minutos. No modo padrão, isso não é um problema.
Mensagens
Cada mensagem extraída do broker é enviada ao consultor via HTTP para o recurso especificado durante a assinatura. Por padrão, uma mensagem é enviada por uma solicitação POST no corpo. Esse comportamento pode ser alterado especificando qualquer outro método. Se o método não suportar o envio de dados no corpo, você poderá especificar o nome do parâmetro de sequência no qual a mensagem será enviada. Além disso, ao se inscrever, você pode especificar cabeçalhos adicionais que serão adicionados a cada mensagem, o que é conveniente para a autorização básica usando tokens. Os cabeçalhos são adicionados a cada mensagem com o identificador do consumidor, tópico e partição, de onde a mensagem foi lida, número da mensagem, chave da partição, se aplicável, bem como o nome do intermediário.
Desempenho
Para avaliar o desempenho, usei um PC (Windows 10, OpenJDK-11 (G1 sem ajuste), i7-6700K, 16GB), que executa o serviço e um laptop (Windows 10, i5-8250U, 8GB), no qual o produtor de mensagens, HTTP Consumidor de recursos e Kafka com configurações padrão. O PC está conectado ao roteador através de uma conexão com fio de 1 Gb / s, e o laptop via 802.11ac. O produtor grava a cada 110 ms a cada 100 ms por 110 bytes de mensagens nos tópicos designados para os quais os consumidores estão inscritos (
concurrencyFactor=500
, a confirmação automática está desativada) de diferentes grupos. O suporte está longe de ser o ideal, mas você pode obter uma imagem.
Um parâmetro chave de medição é o efeito do serviço na latência.
Vamos:
- t
q - registro de data e hora do serviço que recebe mensagens do cliente nativo
- d
t0 é o tempo entre t
q e a hora em que a mensagem foi enviada da fila local para o conjunto de executivos
- d
t é o tempo entre t
q e a hora em que a solicitação HTTP foi enviada. Essa é a influência do serviço na latência da mensagem.
Durante as medições, foram obtidos os seguintes resultados (C - consumidores, T - tópicos, M - mensagens):

No modo operacional padrão, o serviço em si quase não afeta a latência e o consumo de memória é mínimo. Os valores máximos de d
t (cerca de 60 ms) não são indicados especificamente, pois dependem da operação do GC e não do próprio serviço. O ajuste especial do GC ou a substituição do G1 pelo Shenandoah pode ajudar a suavizar a propagação dos valores máximos.
Tudo muda drasticamente quando o consumidor não lida com o fluxo de mensagens da fila e o serviço ativa o modo de otimização. Nesse modo, o consumo de memória aumenta, pois o tempo de resposta às solicitações aumenta significativamente, o que impede a limpeza oportuna dos recursos. O efeito na latência aqui permanece no nível dos resultados anteriores, e altos valores de dt são causados pelo pré-carregamento de mensagens na fila local.
Infelizmente, não é possível testar com uma carga mais alta, pois o laptop já está dobrado a 1300 RPS. Se alguém puder ajudar na organização de medições em altas cargas, terei prazer em fornecer uma montagem para testes.
Demonstração
Agora vamos à demonstração. Para isso, precisamos:
- Corretor Kafka, pronto para ir. Tomarei a instância levantada em 192.168.99.100:9092 da Bitnami.
- Um recurso HTTP que receberá mensagens. Para maior clareza, tirei ganchos da Web do Slack.
Primeiro de tudo, você precisa aumentar o serviço Fila-sobre-Http. Para fazer isso, crie o seguinte conteúdo em um diretório
application.yml
vazio:
spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"
Aqui, indicamos ao serviço os parâmetros de conexão de um broker específico, bem como onde armazenar os assinantes para que eles não se percam entre as partidas. Em `app.brokers []. Config`, você pode especificar quaisquer parâmetros de conexão suportados pelo cliente Kafka nativo; uma lista completa pode ser encontrada
aqui .
Como o arquivo de configuração é processado pelo Spring, você pode escrever muitas coisas interessantes lá. Inclusive, configure o log.
Agora execute o próprio serviço. Usamos a maneira mais fácil -
docker-compose.yml
:
version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist
Se essa opção não lhe agrada, você pode compilar o serviço a partir da fonte. Instruções de montagem no projeto Leia-me, cujo link é fornecido no final do artigo.O próximo passo é registrar o primeiro assinante. Para fazer isso, você precisa executar uma solicitação HTTP para o serviço com uma descrição do Consumidor:
POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } }
Se tudo der certo, a resposta será quase o mesmo conteúdo enviado.
Vamos passar por cada parâmetro:
Consumer.id
- ID do nosso assinanteConsumer.group.id
- identificador de grupoConsumer.broker
- indique em qual dos corretores de serviço você precisa se inscreverConsumer.topics[0].name
- o nome do tópico do qual queremos receber mensagensConsumer.topics[0].config. concurrencyFactor
Consumer.topics[0].config. concurrencyFactor
- número máximo de mensagens enviadas simultaneamenteConsumer.topics[0].config. autoCommitPeriodMs
Consumer.topics[0].config. autoCommitPeriodMs
- período de confirmação forçada para mensagens prontasConsumer.subscriptionMethod.type
- tipo de assinatura. Somente HTTP está disponível no momento.Consumer.subscriptionMethod.delayOnErrorMs
- tempo antes de reenviar uma mensagem que terminou com erroConsumer.subscriptionMethod.retryBeforeCommit
- o número de tentativas para reenviar a mensagem de erro. Se 0 - a mensagem irá girar até o processamento bem-sucedido. No nosso caso, a garantia da entrega total não é tão importante quanto a constância do fluxo.Consumer.subscriptionMethod.uri
- o recurso para o qual as mensagens serão enviadasConsumer.subscriptionMethod.additionalHeader
- cabeçalhos adicionais que serão enviados com cada mensagem. Observe que haverá JSON no corpo de cada mensagem para que o Slack possa interpretar corretamente a solicitação.
Nesta solicitação, o método HTTP é omitido, pois o padrão, POST, Slack é bastante bom.A partir deste momento, o serviço monitora as partições atribuídas do tópico slack.test para novas mensagens.
Para gravar mensagens no tópico, usarei os utilitários
/opt/bitnami/kafka/bin
Kafka localizados em
/opt/bitnami/kafka/bin
imagem Kafka iniciada (a localização dos utilitários em outras instâncias do Kafka pode ser diferente):
kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”}
Ao mesmo tempo, o Slack notificará você sobre uma nova mensagem:
Para cancelar a assinatura de um consumidor, basta fazer uma solicitação POST para `intermediar / cancelar a inscrição 'com o mesmo conteúdo que estava durante a assinatura.Conclusão
No momento, apenas a funcionalidade básica é implementada. Além disso, está planejado melhorar o lote, tentar implementar a semântica exata uma vez, adicionar a capacidade de enviar mensagens ao broker via HTTP e, o mais importante, adicionar suporte a outros Pub-Sub populares.
O serviço Fila por HTTP está atualmente em desenvolvimento ativo. A versão 0.1.3 é estável o suficiente para testar em suportes de desenvolvimento e palco. O desempenho foi testado no Windows 10, Debian 9 e Ubuntu 18.04. Você pode usar o produto por sua conta e risco. Se você quiser ajudar no desenvolvimento ou dar algum feedback sobre o serviço, seja bem-vindo ao projeto
Github .