Hola Habr!
Descubrimos las últimas reservas del libro "
Apache Kafka. Procesamiento de flujo y análisis de datos " y lo enviamos a la preimpresión. Además, hemos recibido un contrato para el libro "
Kafka Streams in Action " y comenzamos a traducirlo literalmente la próxima semana.

Para mostrar el interesante caso de usar la biblioteca Kafka Streams, decidimos traducir el artículo sobre el paradigma de Abastecimiento de eventos en Kafka del mismo Adam Worski, cuyo
artículo sobre el lenguaje Scala se publicó hace dos semanas. Es aún más interesante que la opinión de Adam Worski no sea innegable:
aquí , por ejemplo, se argumenta que este paradigma definitivamente no es adecuado para Kafka. Aún más memorable, esperamos, tenemos la impresión del artículo.
El término "Abastecimiento de eventos" se traduce como "Registro de eventos" tanto en nuestra publicación de
Clean Architecture por Robert Martin como en este artículo. Si alguien está impresionado por la traducción de "eventos de bombeo", hágamelo saber.
Al crear un sistema que proporciona registro de eventos (fuente de eventos), tarde o temprano nos enfrentamos al problema de la persistencia (persistencia), y aquí tenemos un par de opciones. En primer lugar, está
EventStore , una implementación madura reforzada en la batalla. Alternativamente, puede usar
akka-persistence para aprovechar al máximo
la escalabilidad de
Cassandra , así como confiar en el rendimiento del modelo de actor. Otra opción es la buena y antigua
base de datos relacional , donde el enfoque
CRUD
se combina con el uso de eventos y el máximo beneficio se exprime de las transacciones.
Además de estas (y, quizás, muchas otras) oportunidades que han surgido gracias a varias cosas implementadas recientemente, hoy se ha vuelto bastante simple organizar el registro de eventos sobre
Kafka . Veamos cómo.
¿Qué es el registro de eventos?Hay una serie de
excelentes artículos introductorios sobre este tema, por lo que me limitaré a la introducción más concisa. Al registrar eventos, no guardamos el estado "actual" de las entidades utilizadas en nuestro sistema, sino el flujo de eventos relacionados con estas entidades. Cada
evento es un
hecho que describe un cambio de estado (¡ya!) Que ha
ocurrido con el objeto. Como saben, los hechos no se discuten ni se
modifican .
Cuando tenemos una secuencia de tales eventos, el estado actual de una entidad puede aclararse minimizando todos los eventos relacionados con ella; sin embargo, tenga en cuenta que lo contrario no es posible: preservando solo el estado "actual", descartamos mucha información cronológica valiosa.
El registro de eventos puede
coexistir pacíficamente con formas más tradicionales de almacenar el estado. Como regla, el sistema procesa varios tipos de entidades (por ejemplo: usuarios, pedidos, bienes, ...) y es muy posible que el registro de eventos sea útil solo para algunas de estas categorías. Es importante tener en cuenta que aquí no nos enfrentamos a la elección de "todo o nada"; se trata solo de la función de administración de estado adicional en nuestra aplicación.
Almacenamiento de eventos en KafkaEl primer problema a resolver: ¿cómo almacenar eventos en Kafka? Hay tres estrategias posibles:
- Almacene todos los eventos para todo tipo de entidades en un solo tema (con muchos segmentos)
- Por tema por tipo de entidad, es decir, eliminamos todos los eventos relacionados con el usuario en un tema separado, en un tema separado, todos relacionados con el producto, etc.
- Por tema por esencia, es decir, por un tema separado para cada usuario específico y cada nombre de producto
La tercera estrategia (tema por esencia) es prácticamente impracticable. Si, cuando cada nuevo usuario apareciera en el sistema, tuviera que comenzar un tema separado, pronto el número de temas sería ilimitado. Cualquier agregación en este caso sería muy difícil, por ejemplo, sería difícil indexar a todos los usuarios en un motor de búsqueda; no solo tendría que consumir una gran cantidad de temas, sino que no todos se conocían de antemano.
Por lo tanto, queda elegir entre 1 y 2. Ambas opciones tienen sus ventajas y desventajas. Tener un solo tema hace que sea más fácil obtener una
visión global de todos los eventos. Por otro lado, al resaltar el tema para cada tipo de entidad, puede escalar y segmentar el flujo de cada entidad individualmente. La elección de una de dos estrategias depende del caso de uso específico.
Además, puede implementar ambas estrategias a la vez, si tiene espacio de almacenamiento adicional: produzca temas por tipo de entidad a partir de un tema completo.

En el resto del artículo, trabajaremos con un solo tipo de entidad y con un solo tema, aunque el material presentado puede extrapolarse fácilmente y aplicarse para trabajar con muchos temas o tipos de entidad.
(EDITAR: como señaló
Chris Hunt , hay
un excelente artículo de Martin Kleppman , que examinó en detalle cómo distribuir eventos por tema y segmento).
Las operaciones de almacenamiento más simples en el paradigma de registro de eventos.La operación más simple, que es lógico esperar de una tienda que admite el registro de eventos, es leer el estado "actual" (minimizado) de una entidad particular. Como regla general, cada entidad tiene una u otra
id
. En consecuencia, conociendo esta
id
, nuestro sistema de almacenamiento debería devolver el estado actual del objeto.
La verdad en el último recurso será el registro de eventos: el estado actual siempre se puede deducir de la secuencia de eventos asociados con una entidad en particular. Para esto, el motor de la base de datos necesitará una función pura (sin efectos secundarios) que acepte el evento y el estado inicial, y devuelva el estado modificado:
Event = > State => State
. En presencia de dicha función y el
valor del estado inicial, el estado actual es una
convolución del flujo de eventos (la función de cambio de estado debe estar
limpia para que pueda aplicarse libremente repetidamente a los mismos eventos).
Una implementación simplificada de la operación "leer el estado actual" en Kafka recopila una secuencia de
todos los eventos del tema, los filtra, dejando solo eventos con la
id
dada y colapsa usando la función especificada. Si hay muchos eventos (y con el tiempo el número de eventos solo aumenta), esta operación puede ser lenta y consumir muchos recursos. Incluso si su resultado se almacenará en la memoria caché y se almacenará en el nodo de servicio, esta información deberá volver a crearse periódicamente, por ejemplo, debido a fallas en los nodos o debido a la exclusión de los datos de la memoria caché.

Por lo tanto, se necesita una forma más racional. Aquí es donde los flujos de kafka y los repositorios de estado son útiles. Las aplicaciones Kafka-streams se ejecutan en un grupo completo de nodos que consumen ciertos temas juntos. A cada nodo se le asigna una serie de segmentos de temas consumidos, al igual que con el consumidor habitual de Kafka. Sin embargo, kafka-streams proporciona operaciones de datos de nivel superior que hacen que sea mucho más fácil crear flujos derivados.
Una de esas operaciones en
kafka-streams es la convolución de un flujo en el almacenamiento local. Cada almacenamiento local contiene datos de solo aquellos segmentos que son consumidos por un nodo dado. Fuera de la caja, hay dos implementaciones de almacenamiento local disponibles:
en RAM y basadas en
RocksDB .
Volviendo al tema del registro de eventos, observamos que es posible contraer la secuencia de eventos en
la tienda de estado manteniendo en el nodo local el "estado actual" de cada entidad de los segmentos asignados al nodo. Si utilizamos la implementación del almacén de estado basado en RocksDB, entonces cuántas entidades podemos rastrear en un solo nodo solo depende de la cantidad de espacio en disco.
Así es como se ve la convolución de eventos en el almacenamiento local cuando se usa la API de Java (serde significa "serializador / deserializador"):
KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde)
Un ejemplo completo
de procesamiento de pedidos basado en microservicios está disponible en el sitio web de Confluent.
(EDITAR: como señalaron
Sergei Egorov y
Nikita Salnikov en Twitter, para un sistema con registro de eventos, probablemente deba cambiar la configuración predeterminada de almacenamiento de datos en Kafka para que no funcionen los límites de tiempo o tamaño, y también, opcionalmente , habilite la compresión de datos).
Ver estado actualHemos creado un repositorio de estados donde se encuentran los estados actuales de todas las entidades que provienen de los segmentos asignados al nodo, pero ¿cómo solicitar este repositorio ahora? Si la solicitud es local (es decir, proviene del mismo nodo donde se encuentra el repositorio), entonces todo es bastante simple:
streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId);
Pero, ¿qué pasa si queremos solicitar datos ubicados en otro nodo? ¿Y cómo averiguar qué es este nodo? Aquí, otra característica recientemente introducida en Kafka es útil:
consultas interactivas . Con su ayuda, puede acceder a los metadatos de Kafka y descubrir qué nodo procesa el segmento del tema con la
id
dada (en este caso, la herramienta para la segmentación del tema se usa implícitamente):
metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)
A continuación, debe redirigir de alguna manera la solicitud al nodo correcto. Tenga en cuenta: la forma específica en que se implementa y maneja la comunicación entre sitios, ya sea REST, akka-remote o cualquier otro, no pertenece al área de responsabilidad de kafka-streams. Kafka simplemente proporciona acceso a la tienda de estado y proporciona información sobre el nodo en el que se encuentra la tienda de estado para la
id
dada.
Recuperación ante desastresLas tiendas estatales se ven bien, pero ¿qué sucede cuando falla un nodo? Reconstruir una tienda estatal local para un segmento dado también puede ser una operación costosa. Puede provocar mayores demoras o pérdida de solicitudes durante mucho tiempo, ya que será necesario reequilibrar los flujos de kafka (después de agregar o eliminar un nodo).
Es por eso que, de forma predeterminada, los almacenes de estado a largo plazo se registran: es decir, todos los cambios realizados en la tienda se escriben adicionalmente en el tema changelog-topic. Este tema está comprimido (porque para cada
id
solo nos interesa el último registro, sin un historial de cambios, ya que el historial se almacena en los eventos mismos), por lo tanto, es lo más pequeño posible. Es por eso que la recreación del almacenamiento en otro nodo puede ocurrir mucho más rápido.
Sin embargo, con el reequilibrio en este caso, las demoras aún son posibles. Para reducirlos aún más, kafka-streams brinda la capacidad de contener múltiples
réplicas de respaldo (
num.standby.replicas
) para cada repositorio. Estas réplicas aplican todas las actualizaciones recuperadas de los temas con registros de cambios a medida que están disponibles, y están listas para cambiar al modo de almacén de estado principal para un segmento determinado tan pronto como falle el almacén principal actual.
CoherenciaCon la configuración predeterminada, Kafka proporciona al menos una entrega única. Es decir, en caso de falla de un nodo, algunos mensajes pueden entregarse varias veces. Por ejemplo, es posible que un evento en particular se aplique dos veces al almacén de estado si el sistema se bloquea después de que el almacén de estado cambie al registro, pero antes de que se realice el desplazamiento para este evento en particular. Quizás esto no cause ninguna dificultad: nuestra función de actualización de estado (
Event = > State => State
) normalmente puede hacer frente a tales situaciones. Sin embargo, es posible que no pueda hacer frente: en tal caso, se pueden utilizar las garantías de
entrega estrictamente única proporcionadas por Kafka. Dichas garantías solo se aplican al leer y escribir temas de Kafka, pero esto es lo que estamos haciendo aquí:
en el fondo, todas las entradas en los temas de Kafka se reducen a actualizar el registro de cambios para la tienda estatal y realizar compensaciones. Todo esto se puede hacer
en forma de transacciones .
Por lo tanto, si nuestra función de actualizar el estado lo requiere, podemos habilitar la semántica del procesamiento de flujos "estrictamente de una sola vez" utilizando una única opción de configuración:
processing.guarantee
. Debido a esto, el rendimiento cae, pero nada es en vano.
Escucha del eventoAhora que hemos cubierto los conceptos básicos: consultar el "estado actual" y actualizarlo para cada entidad, ¿qué pasa con la activación de
los efectos secundarios ? En algún momento, esto será necesario, por ejemplo, para:
- Enviar correos electrónicos de notificación
- Indexación de entidades del motor de búsqueda
- Llamar a servicios externos a través de REST (o SOAP, CORBA, etc.)
Todas estas tareas son, en un grado u otro, bloqueantes y relacionadas con las operaciones de E / S (esto es natural para los efectos secundarios), por lo que probablemente no sea una buena idea ejecutarlas dentro del marco de la lógica de actualización de estado: como resultado, la frecuencia de fallas en el bucle principal puede aumentar eventos, y en términos de rendimiento habrá un cuello de botella.
Además, una función con lógica de actualización de estado (E
Event = > State => State
) se puede ejecutar varias veces (en caso de fallas o reinicios), y la mayoría de las veces queremos minimizar el número de casos en los que los efectos secundarios para un evento en particular se ejecutan varias veces.
Afortunadamente, dado que trabajamos con temas de Kafka, tenemos bastante flexibilidad. En la etapa de flujos, donde se actualiza el almacén de estado, los eventos se pueden emitir sin cambios (o, si es necesario, también en una forma modificada), y la secuencia / tema resultante (en Kafka, estos conceptos son equivalentes) se pueden consumir a su gusto. Además, se puede consumir antes o después de la etapa de actualización de estado. Finalmente, podemos controlar cómo lanzamos los efectos secundarios: al menos una vez o máximo una vez. La primera opción se proporciona si realiza el desplazamiento del tema-evento consumido solo después de que todos los efectos secundarios se hayan completado con éxito. Por el contrario, con un máximo de una carrera, realizamos turnos hasta que comiencen los efectos secundarios.
Hay varias opciones para desencadenar efectos secundarios, que dependen de la situación práctica específica. En primer lugar, puede definir la etapa Kafka-streams donde se activan los efectos secundarios para cada evento como parte de la función de procesamiento de stream.
Configurar dicho mecanismo es bastante simple, pero esta solución no es flexible cuando tiene que lidiar con reintentos, controlar compensaciones y competir con compensaciones para muchos eventos a la vez. En estos casos más complejos, puede ser más apropiado determinar el procesamiento utilizando, por ejemplo,
reactivo-kafka u otro mecanismo que consume los temas de Kafka "directamente".
También es posible que un evento
desencadene otros eventos ; por ejemplo, el evento "pedido" puede desencadenar los eventos de "preparación para el envío" y "notificación al cliente". Esto también se puede implementar en la etapa kafka-streams.
Finalmente, si quisiéramos almacenar eventos o algunos datos extraídos de eventos en una base de datos o motor de búsqueda, por ejemplo, en ElasticSearch o PostgreSQL, podríamos usar el conector
Kafka Connect , que procesará para nosotros todos los detalles relacionados con el consumo de temas.
Crear vistas y proyeccionesPor lo general, los requisitos del sistema no se limitan a consultar y procesar solo flujos de entidades individuales. Agregación, combinación de múltiples secuencias de eventos también debe ser compatible. Tales flujos combinados a menudo se denominan
proyecciones y, cuando se colapsan, se pueden usar para crear
representaciones de datos . ¿Es posible implementarlos con Kafka?

De nuevo, si! Recuerde que, en principio, estamos tratando simplemente con el tema de Kafka, donde se almacenan nuestros eventos; por lo tanto, tenemos todo el poder de los Consumidores / Productores de Kafka sin procesar, el combinador de flujos de kafka e incluso
KSQL ; todo esto será útil para que definamos las proyecciones. Por ejemplo, usando kafka-streams puede filtrar un flujo, mostrar, agrupar por clave, agregar en ventanas temporales o de sesión, etc. ya sea a nivel de código o usando KSQL similar a SQL.
Dichos flujos pueden almacenarse y proporcionarse para consultas durante mucho tiempo utilizando almacenes de estado y consultas interactivas, tal como lo hicimos con los flujos de entidades individuales.
Que siguePara evitar el flujo infinito de eventos a medida que se desarrolla el sistema, puede ser útil una opción de compresión como guardar
instantáneas del "estado actual". Por lo tanto, podemos limitarnos a almacenar solo unas pocas instantáneas recientes y aquellos eventos que ocurrieron después de su creación.
Aunque Kafka no tiene soporte directo para instantáneas (y en algunos otros sistemas que operan según el principio de grabación de eventos, sí lo es), definitivamente puede agregar este tipo de funcionalidad usted mismo, utilizando algunos de los mecanismos anteriores, como transmisiones, consumidores, tiendas estatales, etc. d.
ResumenAunque, inicialmente, Kafka no se diseñó teniendo en cuenta el paradigma de registro de eventos, de hecho es un motor de transmisión de datos con soporte para
replicación de temas , segmentación,
repositorios de estado y
API de transmisión , y es muy flexible al mismo tiempo. Por lo tanto, además de Kafka, puede implementar fácilmente un sistema de registro de eventos. Además, dado que en el contexto de todo lo que sucede, siempre tendremos un tema de Kafka, obtendremos flexibilidad adicional, ya que podemos trabajar con API de transmisión de alto nivel o consumidores de bajo nivel.