Olá Habr!
Descobrimos as últimas reservas do livro "
Apache Kafka. Processamento de fluxo e análise de dados " e o enviamos à pré-impressão. Além disso, recebemos um contrato para o livro "
Kafka Streams in Action " e começamos a traduzi-lo literalmente na próxima semana.

Para mostrar o caso interessante do uso da biblioteca Kafka Streams, decidimos traduzir o artigo sobre o paradigma Event Sourcing em Kafka do próprio Adam Worski, cujo
artigo sobre o idioma Scala foi publicado há duas semanas. É ainda mais interessante que a opinião de Adam Worski não seja inegável:
aqui , por exemplo, argumenta-se que esse paradigma definitivamente não é adequado para Kafka. Ainda mais memorável, esperamos, temos a impressão do artigo.
O termo "Event Sourcing" é traduzido como "Event Logging", tanto em nossa publicação de
Arquitetura Limpa, de Robert Martin, quanto neste artigo. Se alguém estiver impressionado com a tradução de "eventos de bombeamento", informe-me.
Criando um sistema que fornece registro de eventos (fonte de eventos), mais cedo ou mais tarde nos deparamos com o problema de persistência (persistência) - e aqui temos algumas opções. Em primeiro lugar, existe o
EventStore , uma implementação madura endurecida em batalha. Como alternativa, você pode usar
akka-persistence para aproveitar ao máximo
a escalabilidade
do Cassandra , além de confiar no desempenho do modelo de ator. Outra opção é o bom e antigo
banco de dados relacional , em que a abordagem
CRUD
é combinada com o uso de eventos e o benefício máximo é extraído das transações.
Além dessas (e talvez de muitas outras) oportunidades que surgiram graças a várias coisas implementadas recentemente, hoje ficou bastante simples organizar o registro de eventos em cima de
Kafka . Vamos ver como.
O que é log de eventos?Existem vários
excelentes artigos introdutórios sobre esse assunto, portanto vou me limitar à introdução mais concisa. Ao registrar eventos, não salvamos o estado “atual” das entidades usadas em nosso sistema, mas o fluxo de eventos relacionados a essas entidades. Cada
evento é um
fato que descreve uma mudança de estado (já!) Que
ocorreu com o objeto. Como você sabe, os fatos não são discutidos e
inalterados .
Quando temos um fluxo de tais eventos, o estado atual de uma entidade pode ser esclarecido, minimizando todos os eventos relacionados a ela; no entanto, lembre-se de que o oposto não é possível - preservando apenas o estado "atual", descartamos muitas informações cronológicas valiosas.
O registro de eventos pode
coexistir pacificamente com formas mais tradicionais de armazenamento de estado. Como regra, o sistema processa vários tipos de entidades (por exemplo: usuários, pedidos, mercadorias, ...) e é bem possível que o registro de eventos seja útil apenas para algumas dessas categorias. É importante notar que aqui não somos confrontados com a escolha de "tudo ou nada"; trata-se apenas do recurso adicional de gerenciamento de estado em nosso aplicativo.
Armazenamento de eventos em KafkaO primeiro problema a ser resolvido: como armazenar eventos em Kafka? Existem três estratégias possíveis:
- Armazene todos os eventos para todos os tipos de entidades em um único tópico (com muitos segmentos)
- Por tópico por tipo de entidade, ou seja, retiramos todos os eventos relacionados ao usuário em um tópico separado, em um separado - todos relacionados ao produto, etc.
- Por tópico por essência, ou seja, por um tópico separado para cada usuário específico e nome de cada produto
A terceira estratégia (tópico por essência) é praticamente impraticável. Se, quando cada novo usuário aparecesse no sistema, ele precisasse iniciar um tópico separado, logo o número de tópicos se tornaria ilimitado. Qualquer agregação nesse caso seria muito difícil, por exemplo, seria difícil indexar todos os usuários em um mecanismo de pesquisa; você não apenas teria que consumir um grande número de tópicos - mas ainda nem todos eles eram conhecidos antecipadamente.
Portanto, resta escolher entre 1 e 2. Ambas as opções têm suas vantagens e desvantagens. Ter um único tópico facilita a
visualização global de todos os eventos. Por outro lado, destacando o tópico para cada tipo de entidade, você pode dimensionar e segmentar o fluxo de cada entidade individualmente. A escolha de uma das duas estratégias depende do caso de uso específico.
Além disso, você pode implementar as duas estratégias ao mesmo tempo, se tiver espaço de armazenamento adicional: produza tópicos por tipo de entidade a partir de um tópico abrangente.

No restante do artigo, trabalharemos com apenas um tipo de entidade e com um único tópico, embora o material apresentado possa ser facilmente extrapolado e aplicado para trabalhar com muitos tópicos ou tipos de entidade.
(EDIT: como observou
Chris Hunt , há
um excelente artigo de Martin Kleppman , que examinou em detalhes como distribuir eventos por tópico e segmento).
As operações mais simples de armazenamento no paradigma de log de eventosA operação mais simples, que é lógica esperar de um armazenamento que ofereça suporte ao log de eventos, é ler o estado "atual" (minimizado) de uma entidade específica. Como regra, cada entidade tem um ou outro
id
. Conseqüentemente, conhecendo esse
id
, nosso sistema de armazenamento deve retornar o estado atual do objeto.
A verdade, em último recurso, será o log de eventos: o estado atual sempre pode ser deduzido do fluxo de eventos associado a uma entidade específica. Para isso, o mecanismo de banco de dados precisará de uma função pura (sem efeitos colaterais) que aceite o evento e o estado inicial e retorne o estado alterado:
Event = > State => State
. Na presença de tal função e do
valor do estado inicial, o estado atual é uma
convolução do fluxo de eventos (a função de mudança de estado deve estar
limpa para que possa ser aplicada livremente repetidamente aos mesmos eventos).
Uma implementação simplificada da operação “ler estado atual” no Kafka coleta um fluxo de
todos os eventos do tópico, os filtra, deixando apenas eventos com o
id
fornecido e recolhe usando a função especificada. Se houver muitos eventos (e com o tempo o número de eventos aumentar apenas), essa operação poderá ficar lenta e consumir muitos recursos. Mesmo que seu resultado seja armazenado em cache na memória e armazenado no nó de serviço, essas informações ainda precisarão ser recriadas periodicamente, por exemplo, devido a falhas no nó ou devido ao bloqueio dos dados do cache.

Portanto, é necessária uma maneira mais racional. É aqui que os kafka-streams e os repositórios de estados são úteis. Os aplicativos Kafka-streams são executados em um cluster inteiro de nós que consomem certos tópicos juntos. Cada nó recebe uma série de segmentos de tópicos consumidos, assim como o consumidor Kafka comum. No entanto, o kafka-streams fornece operações de dados de nível superior que facilitam a criação de fluxos derivados.
Uma dessas operações em
kafka-streams é a convolução de um fluxo no armazenamento local. Cada armazenamento local contém dados apenas dos segmentos que são consumidos por um determinado nó. Fora da caixa, duas implementações de armazenamento local estão disponíveis:
na RAM e com base no
RocksDB .
Voltando ao tópico do registro de eventos, observamos que é possível reduzir o fluxo de eventos no
armazenamento de estado mantendo no nó local o "estado atual" de cada entidade dos segmentos designados ao nó. Se usarmos a implementação do armazenamento de estado com base no RocksDB, quantas entidades podemos rastrear em um único nó depende apenas da quantidade de espaço em disco.
Veja como é a convolução de eventos no armazenamento local ao usar a API Java (serde significa "serializador / desserializador"):
KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde)
Um exemplo completo
de processamento de pedidos com base em microsserviços está disponível no site da Confluent.
(EDIT: como observado por
Sergei Egorov e
Nikita Salnikov no Twitter, para um sistema com log de eventos, você provavelmente precisará alterar as configurações padrão de armazenamento de dados em Kafka para que nenhum tempo ou tamanho limite funcione e, opcionalmente, opcionalmente , habilite a compactação de dados.)
Exibir status atualCriamos um repositório de estados no qual estão localizados os estados atuais de todas as entidades provenientes de segmentos atribuídos ao nó, mas como solicitar esse repositório agora? Se a solicitação é local (ou seja, vem do mesmo nó em que o repositório está localizado), tudo é bem simples:
streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId);
Mas e se quisermos solicitar dados localizados em outro nó? E como descobrir o que é esse nó? Aqui, outro recurso recentemente introduzido no Kafka é útil:
consultas interativas . Com a ajuda deles, você pode acessar os metadados Kafka e descobrir qual nó processa o segmento de tópico com o
id
fornecido (nesse caso, a ferramenta para segmentação de tópicos é usada implicitamente):
metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)
Em seguida, você precisa redirecionar de alguma forma a solicitação para o nó correto. Observe: a maneira específica pela qual a comunicação entre sites é implementada e gerenciada - seja REST, akka-remote ou qualquer outra - não pertence à área de responsabilidade da kafka-streams. O Kafka simplesmente fornece acesso ao armazenamento de estado e fornece informações em qual nó o armazenamento de estado está localizado para o
id
fornecido.
Recuperação de desastreAs lojas de estado têm uma boa aparência, mas o que acontece quando um nó falha? Reconstruir um armazenamento local do estado para um determinado segmento também pode ser uma operação cara. Isso pode provocar atrasos aumentados ou perda de solicitações por um longo tempo, pois os kafka-streams precisarão ser reequilibrados (após adicionar ou remover um nó).
É por isso que, por padrão, os armazenamentos de estado de longo prazo são registrados: ou seja, todas as alterações feitas no repositório são gravadas adicionalmente no tópico do registro de alterações. Este tópico está compactado (porque para cada
id
, estamos interessados apenas no último registro, sem um histórico de alterações, pois o histórico é armazenado nos próprios eventos) - portanto, é o menor possível. É por isso que a recriação do armazenamento em outro nó pode ocorrer muito mais rapidamente.
No entanto, com o reequilíbrio nesse caso, atrasos ainda são possíveis. Para reduzi-los ainda mais, o kafka-streams oferece a capacidade de armazenar várias
réplicas de backup (
num.standby.replicas
) para cada repositório. Essas réplicas aplicam todas as atualizações recuperadas dos tópicos com os logs de alterações à medida que ficam disponíveis e estão prontas para alternar para o modo principal de armazenamento de estado para um determinado segmento assim que o armazenamento principal atual falhar.
CoerênciaCom as configurações padrão, o Kafka fornece pelo menos uma entrega única. Ou seja, no caso de uma falha no nó, algumas mensagens podem ser entregues várias vezes. Por exemplo, é possível que um evento específico seja aplicado duas vezes ao armazenamento de estado se o sistema travar após o armazenamento de estado mudar para o log, mas antes que o deslocamento desse evento específico seja executado. Talvez isso não cause dificuldades: nossa função de atualização de estado (
Event = > State => State
) normalmente pode lidar com essas situações. No entanto, pode não ser capaz de lidar: nesse caso, as garantias de
entrega estritamente única fornecidas pela Kafka podem ser usadas. Essas garantias se aplicam apenas ao ler e escrever tópicos Kafka, mas é o que estamos fazendo aqui:
em segundo plano, todas as entradas nos tópicos Kafka são reduzidas à atualização do log de alterações do armazenamento de estados e à execução de compensações. Tudo isso pode ser feito
na forma de transações .
Portanto, se nossa função de atualizar o estado exigir isso, podemos ativar a semântica do processamento de fluxos “entrega estritamente única” usando uma única opção de configuração:
processing.guarantee
. Por esse motivo, o desempenho cai, mas nada é em vão.
Escuta de eventoAgora que abordamos o básico - consultar o "estado atual" e atualizá-lo para cada entidade - e quanto a desencadear
efeitos colaterais ? Em algum momento, isso será necessário, por exemplo, para:
- Enviando e-mails de notificação
- Indexação de entidade do mecanismo de pesquisa
- Chamando serviços externos via REST (ou SOAP, CORBA, etc.)
Todas essas tarefas são, em um grau ou outro, bloqueadas e relacionadas a operações de E / S (isso é natural para efeitos colaterais), portanto, provavelmente não é uma boa ideia executá-las dentro da estrutura da lógica de atualização de estado: como resultado, a frequência de falhas no loop principal pode aumentar eventos e, em termos de desempenho, haverá um gargalo.
Além disso, uma função com lógica de atualização de estado (E
Event = > State => State
) pode ser executada várias vezes (no caso de falhas ou reinicializações), e na maioria das vezes queremos minimizar o número de casos em que os efeitos colaterais de um evento específico são executados várias vezes.
Felizmente, como trabalhamos com tópicos de Kafka, temos uma boa flexibilidade. No estágio de fluxos, onde o armazenamento de estado é atualizado, os eventos podem ser emitidos inalterados (ou, se necessário, também em uma forma modificada), e o fluxo / tópico resultante (no Kafka esses conceitos são equivalentes) pode ser consumido como você desejar. Além disso, ele pode ser consumido antes ou depois do estágio de atualização do estado. Finalmente, podemos controlar como lançamos efeitos colaterais: pelo menos uma vez ou no máximo uma vez. A primeira opção é fornecida se você executar o deslocamento do evento-tópico consumido somente depois que todos os efeitos colaterais forem concluídos com êxito. Por outro lado, com o máximo de uma corrida, realizamos mudanças até o início dos efeitos colaterais.
Existem várias opções para desencadear efeitos colaterais, eles dependem da situação prática específica. Antes de tudo, você pode definir o estágio Kafka-streams em que os efeitos colaterais de cada evento são acionados como parte da função de processamento de fluxo.
A configuração de um mecanismo desse tipo é bastante simples, mas essa solução não é flexível quando você precisa lidar com novas tentativas, controlar compensações e compensar compensações por muitos eventos ao mesmo tempo. Nesses casos mais complexos, pode ser mais apropriado determinar o processamento usando, digamos,
kafka reativo ou outro mecanismo que consome os tópicos Kafka "diretamente".
Também é possível que um evento
ative outros eventos - por exemplo, o evento "pedido" pode acionar os eventos "preparação para expedição" e "notificação ao cliente". Isso também pode ser implementado no estágio kafka-streams.
Por fim, se quisermos armazenar eventos ou alguns dados extraídos de eventos em um banco de dados ou mecanismo de pesquisa, digamos, no ElasticSearch ou no PostgreSQL, poderíamos usar o conector
Kafka Connect , que processará para nós todos os detalhes relacionados ao consumo de tópicos.
Criando vistas e projeçõesNormalmente, os requisitos do sistema não se limitam a consultar e processar apenas fluxos de entidade única. Agregação, combinação de vários fluxos de eventos também deve ser suportada. Esses fluxos combinados são geralmente chamados de
projeções e, quando recolhidos, podem ser usados para criar
representações de dados . É possível implementá-los com o Kafka?

Mais uma vez sim! Lembre-se de que, em princípio, estamos lidando simplesmente com o tópico Kafka, onde nossos eventos são armazenados; portanto, temos todo o poder do Kafka Consumer / Producer bruto, do combinador kafka-streams e até do
KSQL - tudo isso é útil para definir projeções. Por exemplo, usando kafka-streams, você pode filtrar um fluxo, exibir, agrupar por chave, agregar em janelas temporárias ou de sessão, etc. no nível do código ou usando o KSQL semelhante ao SQL.
Esses fluxos podem ser armazenados e fornecidos para consultas por um longo tempo usando armazenamentos de estado e consultas interativas, assim como fizemos com os fluxos de entidades individuais.
O que vem a seguirPara impedir o fluxo infinito de eventos à medida que o sistema se desenvolve, uma opção de compactação, como salvar
instantâneos do "estado atual", pode ser útil. Assim, podemos nos limitar a armazenar apenas alguns instantâneos recentes e os eventos que ocorreram após a sua criação.
Embora o Kafka não tenha suporte direto para snapshots (e em alguns outros sistemas operando com o princípio de registrar eventos), você pode definitivamente adicionar esse tipo de funcionalidade, usando alguns dos mecanismos acima, como fluxos, consumidores, lojas de estado, etc. d.
SumárioEmbora, inicialmente, o Kafka não tenha sido projetado com um olho no paradigma de registro de eventos, na verdade, é um mecanismo de dados de streaming com suporte para
replicação de tópicos , segmentação,
repositórios de estados e
APIs de streaming , e é muito flexível ao mesmo tempo. Portanto, além do Kafka, você pode implementar facilmente um sistema de registro de eventos. Além disso, como no contexto de tudo o que acontece, sempre teremos um tópico Kafka, ganharemos flexibilidade adicional, pois podemos trabalhar com APIs de streaming de alto nível ou com consumidores de baixo nível.