Cómo se hizo realidad Kafka


Hola Habr!


Trabajo en el equipo de Tinkoff, que está desarrollando su propio centro de notificaciones. En su mayor parte, desarrollo en Java usando Spring boot y resuelvo varios problemas técnicos que surgen en el proyecto.


La mayoría de nuestros microservicios interactúan de forma asíncrona entre sí a través de un intermediario de mensajes. Anteriormente, utilizamos IBM MQ como intermediario, que dejó de hacer frente a la carga, pero al mismo tiempo tenía altas garantías de entrega.


Como reemplazo, se nos ofreció Apache Kafka, que tiene una alta escalabilidad, pero, desafortunadamente, requiere un enfoque de configuración casi individual para diferentes escenarios. Además, el mecanismo de entrega de al menos una vez, que funciona por defecto en Kafka, no permitió mantener el nivel de consistencia requerido de fábrica. A continuación, compartiré nuestra experiencia en la configuración de Kafka, en particular, le diré cómo configurar y vivir exactamente con una sola entrega.


Entrega garantizada y más.


Los parámetros que se analizarán más adelante ayudarán a evitar una serie de problemas con la configuración de conexión predeterminada. Pero primero, quiero prestar atención a un parámetro que facilitará una posible depuración.


Client.id para Productor y Consumidor ayudará con esto. A primera vista, puede usar el nombre de la aplicación como valor, y en la mayoría de los casos esto funcionará. Aunque la situación en la que se usan varios consumidores en la aplicación y usted les da el mismo client.id lleva a la siguiente advertencia:


org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0 

Si desea usar JMX en una aplicación con Kafka, entonces esto puede ser un problema. Para este caso, es mejor usar una combinación del nombre de la aplicación y, por ejemplo, el nombre del tema, como el valor de client.id. El resultado de nuestra configuración se puede ver en la salida del comando kafka-consumer-groups de las utilidades de Confluent:



Ahora analizaremos el escenario de entrega garantizada de mensajes. Kafka Producer tiene un parámetro acks que le permite configurar después de cuántos acuses de recibo el líder del clúster debe considerar el mensaje grabado con éxito. Este parámetro puede tomar los siguientes valores:


  • 0 - el reconocimiento no será considerado.
  • 1 - parámetro predeterminado, se requiere confirmación de solo 1 réplica.
  • −1: se requiere confirmación de todas las réplicas sincronizadas ( configuración de clúster min.insync.replicas ).

Se puede ver a partir de los valores anteriores que acks igual a -1 da las garantías más fuertes de que el mensaje no se perderá.


Como todos sabemos, los sistemas distribuidos no son confiables. Para protegerse contra el mal funcionamiento temporal, Kafka Producer proporciona un parámetro de reintentos que le permite establecer el número de intentos de reintento durante delivery.timeout.ms . Dado que el parámetro de reintentos predeterminado es Integer.MAX_VALUE (2147483647), el número de retransmisiones de un mensaje puede controlarse cambiando solo delivery.timeout.ms.


Avanzando hacia exactamente una vez la entrega


Esta configuración permite a nuestro Productor entregar mensajes con una alta garantía. ¿Ahora hablemos sobre cómo garantizar la grabación de una sola copia de un mensaje en un tema de Kafka? En el caso más simple, para hacer esto en Producer, establezca el parámetro enable.idempotence en verdadero. La idempotencia garantiza la grabación de un solo mensaje en una partición particular de un tema. Un requisito previo para habilitar la idempotencia es acks = all, retry> 0, max.in.flight.requests.per.connection ≤ 5 . Si el desarrollador no establece estos parámetros, los valores anteriores se establecerán automáticamente.


Cuando se configura la idempotencia, es necesario asegurarse de que los mismos mensajes caigan en las mismas particiones cada vez. Esto se puede hacer configurando la clave y el parámetro divisioner.class en Producer. Comencemos con la llave. Para cada envío, debe ser el mismo. Esto se logra fácilmente utilizando cualquier identificador comercial del mensaje original. El parámetro divisioner.class tiene un valor predeterminado de DefaultPartitioner . Con esta estrategia de partición, el comportamiento predeterminado es el siguiente:


  • Si la partición se especifica explícitamente al enviar el mensaje, entonces la usamos.
  • Si no se especifica la partición, pero se especifica la clave, seleccione la partición por hash de la clave.
  • Si la partición y la clave no están especificadas, seleccione las particiones a su vez (round-robin).

Además, el uso de la clave y el envío idempotente con el parámetro max.in.flight.requests.per.connection = 1 le brinda un procesamiento ordenado de los mensajes en el consumidor. Por separado, vale la pena recordar que si el control de acceso está configurado en su clúster, necesitará los derechos para escribir de forma idempotente sobre el tema.


Si de repente carece de las capacidades de envío idempotente por clave o la lógica en el lado del productor requiere la preservación de la consistencia de los datos entre diferentes particiones, entonces las transacciones vendrán al rescate. Además, utilizando una transacción en cadena, puede sincronizar condicionalmente un registro en Kafka, por ejemplo, con un registro en la base de datos. Para habilitar el envío transaccional al Productor, es necesario que posea idempotencia y, opcionalmente, establezca transaccional.id . Si el control de acceso está configurado en su clúster Kafka, entonces para la grabación transaccional, así como idempotente, necesitará permisos de escritura, que pueden otorgarse mediante una máscara utilizando el valor almacenado en transaccional.id.


Formalmente, puede usar cualquier cadena, por ejemplo, el nombre de la aplicación, como un identificador de transacción. Pero si ejecuta varias instancias de la misma aplicación con el mismo transaccional.id, la primera instancia iniciada se detendrá con un error, ya que Kafka lo considerará un proceso zombie.


 org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 

Para resolver este problema, agregamos un sufijo al nombre de la aplicación en forma de nombre de host, que se obtiene de las variables de entorno.


El productor está configurado, pero las transacciones en Kafka controlan solo el alcance del mensaje. Independientemente del estado de la transacción, el mensaje cae inmediatamente en el tema, pero tiene atributos del sistema adicionales.


Para evitar que el consumidor lea dichos mensajes con anticipación, debe establecer el parámetro isolated.level en read_committed. Tal Consumidor podrá leer mensajes no transaccionales como antes, y los transaccionales solo después de una confirmación.
Si instaló todas las configuraciones enumeradas anteriormente, entonces configuró exactamente una vez la entrega. Felicidades


Pero hay un matiz más. Transactional.id, que configuramos anteriormente, es en realidad un prefijo de transacción. En el administrador de transacciones, se le agrega un número de serie. El identificador recibido se emite en transactional.id.expiration.ms , que se configura en el clúster de Kafka y tiene un valor predeterminado de "7 días". Si durante este tiempo la aplicación no recibió ningún mensaje, cuando intente el siguiente envío transaccional, recibirá una InvalidPidMappingException . Después de eso, el coordinador de transacciones emitirá un nuevo número de secuencia para la próxima transacción. Sin embargo, el mensaje se puede perder si InvalidPidMappingException no se procesa correctamente.


En lugar de totales


Como puede ver, solo enviar mensajes a Kafka no es suficiente. Debe elegir una combinación de parámetros y estar preparado para realizar cambios rápidos. En este artículo, intenté mostrar exactamente una vez las configuraciones de entrega en detalle y describí varios problemas de configuración de client.id y transactional.id que encontramos. El resumen de la configuración de Productor y Consumidor se resume a continuación.


Productor:


  1. acks = todos
  2. reintentos> 0
  3. enable.idempotence = true
  4. max.in.flight.requests.per.connection ≤ 5 (1 - para envío ordenado)
  5. transaccional.id = $ {nombre-aplicación} - $ {nombre de host}

Consumidor:


  1. isolated.level = read_committed

Para minimizar los errores en futuras aplicaciones, creamos nuestro contenedor sobre la configuración del resorte, donde los valores para algunos de los parámetros enumerados ya están establecidos.


Y aquí hay un par de materiales para estudio independiente:


Source: https://habr.com/ru/post/481784/


All Articles