El libro "Apache Kafka. Procesamiento de flujo y análisis de datos "

imagen Durante el trabajo de cualquier aplicación empresarial, se generan datos: estos son archivos de registro, métricas, información sobre la actividad del usuario, mensajes salientes, etc. La manipulación adecuada de todos estos datos no es menos importante que los datos en sí. Si eres un arquitecto, desarrollador o ingeniero graduado que quiere resolver tales problemas, pero aún no estás familiarizado con Apache Kafka, entonces de este maravilloso libro aprenderás cómo trabajar con esta plataforma de transmisión gratuita que te permite procesar colas de datos en tiempo real.

¿Para quién es este libro?


“Apache Kafka. Procesamiento de flujo y análisis de datos ”fue escrito para desarrolladores que usan la API de Kafka en su trabajo, así como para ingenieros de procesos (también llamados SRE, DevOps o administradores de sistemas) que están involucrados en la instalación, configuración, configuración y monitoreo de su operación durante la operación industrial. Tampoco nos olvidamos de los arquitectos de datos e ingenieros analíticos, los responsables del diseño y la creación de toda la infraestructura de datos de la empresa. Algunos capítulos, en particular 3, 4 y 11, están dirigidos a desarrolladores de Java. Para comprenderlos, es importante que el lector esté familiarizado con los conceptos básicos del lenguaje de programación Java, incluidos temas como el manejo de excepciones y la competencia.

Otros capítulos, especialmente 2, 8, 9 y 10, suponen que el lector tiene experiencia con Linux y está familiarizado con la configuración de la red y el almacenamiento de Linux. El resto del libro de Kafka y las arquitecturas de software se discuten en términos más generales, por lo que no se requiere ningún conocimiento especial de los lectores.

Otra categoría de personas que pueden estar interesadas en este libro son los gerentes y arquitectos que trabajan no directamente con Kafka, sino con aquellos que trabajan con él. No es menos importante que entiendan cuáles son las garantías de la plataforma y qué compromisos tendrán que hacer sus subordinados y colegas al crear sistemas basados ​​en Kafka. Este libro será útil para aquellos gerentes que deseen capacitar a sus empleados para trabajar con Kafka o para asegurarse de que el equipo de desarrollo posea la información necesaria.

Capítulo 2. Instalando Kafka


Apache Kafka es una aplicación Java que puede ejecutarse en muchos sistemas operativos, incluidos Windows, MacOS, Linux y otros. En este capítulo, nos centraremos en instalar Kafka en Linux, ya que es la plataforma que se instala con mayor frecuencia en este sistema operativo. Linux también es el sistema operativo recomendado para la implementación de Kafka de uso general. Para obtener información sobre cómo instalar Kafka en Windows y MacOS, consulte el Apéndice A.

Instalar java

Antes de instalar ZooKeeper o Kafka, debe instalar y configurar el entorno Java. Se recomienda que use Java 8, y esta puede ser una versión, incluida en su sistema operativo o descargada directamente desde java.com. Aunque ZooKeeper y Kafka trabajarán con Java Runtime Edition, es más conveniente utilizar el Kit de desarrollo de Java (JDK) completo al desarrollar utilidades y aplicaciones. Estos pasos de instalación suponen que tiene instalada la versión 8.0.51 de JDK en el directorio /usr/java/jdk1.8.0_51.

Instalar ZooKeeper

Apache Kafka usa ZooKeeper para almacenar metadatos sobre el clúster de Kafka, así como detalles sobre clientes consumidores (Fig. 2.1). Aunque ZooKeeper también se puede iniciar utilizando scripts incluidos en la distribución Kafka, instalar la versión completa del repositorio ZooKeeper desde la distribución es muy simple.

imagen

Kafka ha sido probado exhaustivamente con la versión estable 3.4.6 del repositorio ZooKeeper, que se puede descargar desde apache.org.

Servidor independiente

El siguiente ejemplo muestra cómo instalar ZooKeeper con configuraciones básicas en el directorio / usr / local / zookeeper y guardar los datos en el directorio / var / lib / zookeeper:

# tar -zxf zookeeper-3.4.6.tar.gz # mv zookeeper-3.4.6 /usr/local/zookeeper # mkdir -p /var/lib/zookeeper # cat > /usr/local/zookeeper/conf/zoo.cfg << EOF > tickTime=2000 > dataDir=/var/lib/zookeeper > clientPort=2181 > EOF # /usr/local/zookeeper/bin/zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED # export JAVA_HOME=/usr/java/jdk1.8.0_51 # /usr/local/zookeeper/bin/zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED # 

Ahora puede verificar que se supone que ZooKeeper funciona sin conexión conectándose al puerto del cliente y enviando el comando srvr de cuatro letras:

 # telnet localhost 2181 Trying ::1... Connected to localhost. Escape character is '^]'. srvr Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT Latency min/avg/max: 0/0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: standalone Node count: 4 Connection closed by foreign host. # 

ZooKeeper Ensemble

El clúster ZooKeeper se llama conjunto. Debido a la naturaleza del algoritmo en sí, se recomienda que el conjunto incluya un número impar de servidores, por ejemplo, 3, 5, etc., ya que para que ZooKeeper pueda responder a las solicitudes, la mayoría de los miembros del conjunto deben funcionar (quórum). Esto significa que un conjunto de tres nodos puede funcionar con un nodo inactivo. Si el conjunto tiene tres nodos, puede haber dos.

Para configurar el funcionamiento de los servidores ZooKeeper en el conjunto, deben tener una configuración única con una lista de todos los servidores, y cada servidor en el directorio de datos debe tener un archivo myid con el identificador de este servidor. Si los hosts en el conjunto se llaman zoo1.example.com, zoo2.example.com y zoo3.example.com, entonces el archivo de configuración puede verse así:

 tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 initLimit=20 syncLimit=5 server.1=zoo1.example.com:2888:3888 server.2=zoo2.example.com:2888:3888 server.3=zoo3.example.com:2888:3888 

En esta configuración, initLimit es la cantidad de tiempo que los nodos esclavos pueden conectarse al maestro. El valor syncLimit limita el retraso de los nodos esclavos del maestro. Ambos valores se especifican en unidades tickTime, es decir, initLimit = 20 · 2000 ms = 40 s. La configuración también enumera todos los servidores de conjunto. Están en el formato server.X = hostname: peerPort: leaderPort con los siguientes parámetros:

  • X es el identificador del servidor. Debe ser un número entero, pero el recuento puede no ser cero y no ser secuencial;
  • hostname: nombre de host o dirección IP del servidor;
  • peerPort: puerto TCP a través del cual los servidores de conjunto se comunican entre sí;
  • leaderPort: puerto TCP a través del cual se selecciona el host.

Es suficiente que los clientes puedan conectarse al conjunto a través del puerto clientPort, pero los miembros del conjunto deben poder intercambiar mensajes entre ellos en los tres puertos.

Además de un único archivo de configuración, cada servidor en el directorio dataDir debe tener un archivo myid. Debe contener el identificador del servidor correspondiente al que figura en el archivo de configuración. Después de completar estos pasos, puede iniciar los servidores e interactuarán entre sí en el conjunto.

Instalación de Kafka Broker


Después de completar la configuración de Java y ZooKeeper, puede continuar con la instalación de Apache Kafka. La última versión de Apache Kafka se puede descargar en kafka.apache.org/downloads.html .

En el siguiente ejemplo, instale la plataforma Kafka en el directorio / usr / local / kafka, configúrela para usar el servidor ZooKeeper lanzado anteriormente y guarde los segmentos de registro de mensajes en el directorio / tmp / kafka-logs:

 # tar -zxf kafka_2.11-0.9.0.1.tgz # mv kafka_2.11-0.9.0.1 /usr/local/kafka # mkdir /tmp/kafka-logs # export JAVA_HOME=/usr/java/jdk1.8.0_51 # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties # 

Después de iniciar el agente Kafka, puede probar su funcionamiento realizando operaciones simples con el clúster, incluida la creación de un tema de prueba, la generación de mensajes y su consumo.

Crear y verificar hilos:

 # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test". # /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 # 

Generando mensajes para el tema de prueba:

 # /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Test Message 1 Test Message 2 ^D # 

Consumir mensajes del tema de prueba:

 # /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Test Message 1 Test Message 2 ^C Consumed 2 messages # 

Configuración de corredor


El ejemplo de configuración de intermediario suministrado con la distribución Kafka es bastante adecuado para una ejecución de prueba de un servidor independiente, pero para la mayoría de las instalaciones no será suficiente. Hay muchas opciones de configuración de Kafka que rigen todos los aspectos de la instalación y la configuración. Puede dejar los valores predeterminados para muchos de ellos, ya que se relacionan con los matices de configurar un agente Kafka que no son aplicables hasta que trabaje con un escenario específico que requiera su uso.

Configuración básica del agente


Hay varias configuraciones del agente Kafka que debe tener en cuenta al implementar la plataforma en cualquier entorno, a excepción de un agente independiente en un servidor separado. Estos parámetros se relacionan con la configuración principal del intermediario, y la mayoría de ellos deben cambiarse para que el intermediario pueda trabajar en un clúster con otros intermediarios.

broker.id

Cada agente de Kafka debe tener un identificador entero especificado por el parámetro broker.id. Por defecto, este valor es 0, pero puede ser cualquier número. Lo principal es que no se repite dentro del mismo clúster de Kafka. La elección del número puede ser arbitraria y, si es necesario, para la comodidad del mantenimiento, puede transferirse de un agente a otro. Es deseable que este número esté de alguna manera conectado con el host, luego la correspondencia de los identificadores de intermediario con los hosts con seguimiento será más transparente. Por ejemplo, si sus nombres de host contienen números únicos (por ejemplo, host1.example.com, host2.example.com, etc.), estos números serían una buena opción para los valores de broker.id.

puerto

Un archivo de configuración típico inicia Kafka con un escucha en el puerto TCP 9092. Este puerto se puede cambiar a cualquier otro disponible cambiando el puerto del parámetro de configuración. Tenga en cuenta que al elegir un puerto con un número inferior a 1024, Kafka debe ejecutarse como root. No se recomienda ejecutar Kafka como root.

zookeeper.connect

La ruta que utiliza ZooKeeper para almacenar los metadatos del intermediario se establece mediante el parámetro de configuración zookeeper.connect. En la configuración de muestra, ZooKeeper se ejecuta en el puerto 2181 en el host local, que se indica como localhost: 2181. El formato de este parámetro es una lista de líneas separadas por punto y coma con el nombre de host del formulario: puerto / ruta, que incluye:

  • hostname: nombre de host o dirección IP del servidor ZooKeeper;
  • puerto: número de puerto del cliente para el servidor;
  • / ruta: una ruta opcional de ZooKeeper utilizada como la nueva ruta raíz (chroot) del clúster Kafka. Si no se especifica, se utiliza la ruta raíz.

Si la ruta de chroot especificada no existe, se creará cuando se inicie el intermediario.

log.dirs

Kafka guarda todos los mensajes en el disco duro, y estos segmentos del registro se almacenan en los directorios especificados en la configuración log.dirs. Es una lista de rutas separadas por comas en el sistema local. Si se especifican varias rutas, el intermediario guardará secciones en ellas de acuerdo con el principio de las menos utilizadas, con la preservación de los segmentos de registro de una sección a lo largo de una ruta. Tenga en cuenta que el intermediario colocará la nueva sección en el directorio en el que actualmente se almacenan la menor cantidad de particiones y no se utiliza el menor espacio, de modo que no se garantiza la distribución uniforme de datos entre las secciones.

num.recovery.threads.per.data.dir

Kafka utiliza un grupo de subprocesos personalizado para procesar segmentos de registro. Actualmente se aplica:

  • durante el inicio normal: para abrir los segmentos de registro de cada sección;
  • comenzar después de una falla: para verificar y truncar los segmentos de registro de cada sección;
  • Parar: para cerrar suavemente los segmentos de registro.

Por defecto, solo se usa un hilo por directorio de registro. Dado que esto solo ocurre al iniciar y detener, tiene sentido usar más para paralelizar las operaciones. ¡Al recuperarse de un apagado incorrecto, los beneficios de usar este enfoque pueden alcanzar varias horas si se reinicia el corredor con una gran cantidad de particiones! Recuerde que el valor de este parámetro se determina en función de un directorio de registro a partir del número especificado utilizando log.dirs. Es decir, si el valor del parámetro num.recovery.threads.per.data.dir es 8 y se especifican tres rutas en log.dirs, entonces el número total de subprocesos es 24.

auto.create.topics.enable

Según la configuración predeterminada de Kafka, el corredor debería crear automáticamente un tema cuando:

  • el fabricante comienza a escribir en la línea de asunto;
  • el consumidor comienza a leer el tema del mensaje;
  • cualquier cliente solicita metadatos del tema.

En muchos casos, este comportamiento puede ser indeseable, especialmente debido al hecho de que no hay forma de verificar la existencia de un tema utilizando el protocolo Kafka sin hacer que se cree. Si controla la creación de eso explícitamente, manualmente o mediante el sistema de inicialización, puede establecer el parámetro auto.create.topics.enable en falso.

Configuración de tema predeterminada


La configuración del servidor Kafka establece una gran cantidad de configuraciones predeterminadas para los temas creados. Algunos de estos parámetros, incluido el número de secciones y los parámetros de guardado de mensajes, se pueden configurar para cada tema por separado utilizando las herramientas de administrador (discutidas en el Capítulo 9). Los valores predeterminados en la configuración del servidor deben establecerse iguales a los valores de referencia que son adecuados para la mayoría de los temas del clúster.

número de particiones

El parámetro num.partitions determina con cuántas secciones se crea un nuevo tema, principalmente cuando la creación automática por temas está habilitada (que es el comportamiento predeterminado). El valor predeterminado de este parámetro es 1. Tenga en cuenta que el número de secciones para un tema solo puede aumentarse, pero no reducirse. Esto significa que si requiere menos particiones que las indicadas en número de particiones, tendrá que crearlo con cuidado manualmente (esto se trata en el Capítulo 9).

Como se discutió en el Capítulo 1, las secciones son una forma de escalar temas en un clúster de Kafka, por lo que es importante que tenga tantos como necesite para equilibrar la carga de los mensajes en todo el clúster a medida que se agregan los corredores. Muchos usuarios prefieren que el número de particiones sea igual o el número de intermediarios en el clúster. Esto hace posible distribuir uniformemente las secciones entre los corredores, lo que conducirá a una distribución uniforme de la carga entre los mensajes. Sin embargo, este no es un requisito obligatorio, porque la presencia de varios temas le permite equilibrar la carga.

log.retention.ms

La mayoría de las veces, el almacenamiento de mensajes en Kafka es limitado en el tiempo. El valor predeterminado se especifica en el archivo de configuración utilizando el parámetro log.retention.hours y es igual a 168 horas, o 1 semana. Sin embargo, puede usar otros dos parámetros: log.retention.minutes y log.retention.ms. Los tres parámetros determinan lo mismo: el período de tiempo después del cual se eliminan los mensajes. Pero se recomienda usar el parámetro log.retention.ms, porque si se especifican varios parámetros, la prioridad pertenece a la unidad de medida más pequeña, por lo que siempre se usará el valor de log.retention.ms.

log.retention.bytes

Otra forma de limitar la validez de los mensajes se basa en el tamaño total (en bytes) de los mensajes almacenados. El valor se establece utilizando el parámetro log.retention.bytes y se aplica por separado. Esto significa que en el caso de un tema de ocho secciones e igual a 1 GB del valor de log.retention.bytes, la cantidad máxima de datos almacenados para este tema será de 8 GB. Tenga en cuenta que la cantidad de almacenamiento depende de las secciones individuales y no del tema. Esto significa que si aumenta el número de secciones para el tema, la cantidad máxima de datos guardados al usar log.retention.bytes también aumentará.

log.segment.bytes

La configuración de registro mencionada se refiere a segmentos de registro, no a mensajes individuales. A medida que el agente Kafka genera mensajes, se agregan al final del segmento de diario actual de la sección correspondiente. Cuando el segmento de registro alcanza el tamaño especificado por el parámetro log.segment.bytes y es igual a 1 GB de forma predeterminada, este segmento se cierra y se abre uno nuevo. Después del cierre, el segmento de diario se puede retirar. Cuanto menor sea el tamaño de los segmentos de registro, más a menudo tendrá que cerrar archivos y crear nuevos, lo que reduce la eficiencia general de las escrituras en disco.

El dimensionamiento de segmentos de registro es importante cuando los temas se caracterizan por una baja frecuencia de generación de mensajes. Por ejemplo, si un tema recibe solo 100 MB de mensajes por día, y el parámetro log.segment.bytes se establece en el valor predeterminado, tarda 10 días en completar un segmento. Y dado que los mensajes no pueden declararse inválidos hasta que se cierre el segmento de registro, entonces con el valor de 604.8 millones (1 semana) del parámetro log.retention.ms, los mensajes pueden acumularse en 17 días antes de que el segmento de registro cerrado se retire de la circulación. Esto se debe a que cuando cierra un segmento con mensajes que se han acumulado durante 10 días, debe almacenarlo durante otros 7 días antes de poder retirarlo de acuerdo con las reglas temporales adoptadas, ya que el segmento no se puede eliminar antes de que caduque el último mensaje. .

log.segment.ms

Otra forma de controlar el cierre de segmentos de registro es mediante el uso del parámetro log.segment.ms, que especifica el período de tiempo después del cual se cierra el segmento de registro. Al igual que los parámetros log.retention.bytes y log.retention.ms, los parámetros log.segment.bytes y log.segment.ms no son mutuamente excluyentes. Kafka cierra el segmento de registro cuando se agota el tiempo o se alcanza el límite de tamaño especificado, dependiendo de cuál de estos eventos ocurra primero. De manera predeterminada, el valor del parámetro log.segment.ms no está establecido, como resultado de lo cual el cierre de los segmentos de registro está determinado por su tamaño.

message.max.bytes

El agente de Kafka permite utilizar el parámetro message.max.bytes para limitar el tamaño máximo de los mensajes generados. El valor predeterminado para este parámetro es 1,000,000 (1 MB). Un fabricante que intente enviar un mensaje más grande recibirá una notificación de error del agente, pero el mensaje no será aceptado. Como en el caso de todos los demás tamaños en bytes especificados en la configuración del agente, estamos hablando del tamaño del mensaje comprimido, por lo que los fabricantes pueden enviar mensajes, cuyo tamaño sin comprimir es mucho mayor si pueden comprimirse a los límites especificados por message.max.bytes .

Aumentar el tamaño del mensaje puede afectar seriamente el rendimiento. Un tamaño de mensaje mayor significa que los subprocesos de intermediario que procesan las conexiones de red y las solicitudes tardarán más en cada solicitud. Los mensajes más grandes también aumentan la cantidad de datos escritos en el disco, lo que afecta el rendimiento de E / S.

Selección de hardware


Elegir el hardware adecuado para el corredor Kafka es más un arte que una ciencia. La plataforma Kafka en sí misma no tiene requisitos estrictos de hardware; funcionará sin problemas en ningún sistema. Pero si hablamos de rendimiento, entonces está influenciado por varios factores: capacidad y rendimiento de los discos, RAM, red y CPU.

Primero debe decidir qué tipos de rendimiento son más importantes para su sistema, después de lo cual puede elegir la configuración de hardware óptima que se ajuste al presupuesto.

Rendimiento de disco


El rendimiento de los discos de los intermediarios, que se utilizan para almacenar segmentos de registro, afecta directamente el rendimiento de los clientes de fabricación. Los mensajes de Kafka deben enviarse al almacenamiento local que confirme su grabación. Solo entonces la operación de envío puede considerarse exitosa. Esto significa que cuanto más rápido se realicen las operaciones de escritura en el disco, menor será el retraso en la generación de mensajes.

La acción obvia en caso de problemas con el ancho de banda de los discos es usar discos duros con placas giratorias (HDD) o unidades de estado sólido (SSD). Los SSD tienen órdenes de magnitud de menor tiempo de búsqueda / acceso y mayor rendimiento. Los discos duros son más económicos y tienen una mayor capacidad relativa. El rendimiento del HDD se puede mejorar debido a su mayor número en el intermediario, o mediante el uso de varios directorios de datos, o mediante la instalación de discos en una matriz de discos independientes con redundancia (matriz redundante de discos independientes, RAID). Otros factores influyen en el rendimiento, por ejemplo, la tecnología de fabricación de un disco duro (por ejemplo, SAS o SATA), así como las características del controlador del disco duro.

Capacidad de disco


La capacidad es otro aspecto del almacenamiento. La cantidad requerida de espacio en disco está determinada por la cantidad de mensajes que deben almacenarse al mismo tiempo. Si se espera que el corredor reciba 1 TB de tráfico por día, entonces con 7 días de almacenamiento, necesitará almacenamiento disponible para segmentos de registro de al menos 7 TB. También debe considerar una saturación de al menos el 10% para otros archivos, sin contar el búfer para posibles fluctuaciones de tráfico o su crecimiento con el tiempo.

La capacidad de almacenamiento es uno de los factores que deben considerarse al determinar el tamaño óptimo del clúster de Kafka y decidir su expansión. El tráfico total del clúster puede equilibrarse en varias secciones para cada tema, lo que le permite utilizar intermediarios adicionales para aumentar la capacidad disponible en los casos en que la densidad de datos por intermediario no sea suficiente. La decisión sobre cuánto espacio en disco se necesita también está determinada por la estrategia de replicación seleccionada para el clúster (discutido con más detalle en el Capítulo 6).

El recuerdo


En el modo normal de operación, el consumidor Kafka lee desde el final de la sección, y el consumidor compensa constantemente el tiempo perdido y solo ligeramente por detrás de los fabricantes, si es que lo hace.Al mismo tiempo, los mensajes leídos por el consumidor se almacenan de manera óptima en el caché de páginas del sistema, de modo que las operaciones de lectura son más rápidas que si el intermediario tuviera que volver a leerlas desde el disco. Por lo tanto, cuanto mayor sea la cantidad de RAM disponible para la memoria caché de la página, mayor será el rendimiento de los clientes consumidores.

Kafka en sí no necesita asignar grandes cantidades de RAM en el montón para la JVM. Incluso un agente que procesa X mensajes por segundo con una velocidad de transferencia de datos de X megabits por segundo puede funcionar con un montón de 5 GB. La RAM restante del sistema se utilizará para la memoria caché de la página y beneficiará a Kafka debido a la capacidad de almacenar en caché los segmentos de registro utilizados. Es por eso que no se recomienda colocar Kafka en un sistema donde ya se estén ejecutando otras aplicaciones importantes, ya que tendrán que compartir el caché de la página, lo que reducirá la productividad de los consumidores de Kafka.

Datos de red


La cantidad máxima de tráfico que Kafka puede manejar está determinada por el ancho de banda de red disponible. A menudo, este es un factor clave (junto con la cantidad de almacenamiento en disco) para elegir un tamaño de clúster. Esta elección se dificulta por el desequilibrio inherente de Kafka (debido al apoyo de varios consumidores) entre el tráfico de red entrante y saliente. Un productor puede generar 1 MB de mensajes por segundo para un tema determinado, pero el número de consumidores puede llegar a ser cualquier cosa, agregando un factor apropiado para el tráfico saliente. Otras operaciones de red, como la replicación de clúster (ver capítulo 6) y la duplicación (discutida en el capítulo 8), aumentan los requisitos de red. Con el uso intensivo de la interfaz de red, el retraso de replicación del clúster es bastante posible, lo que causará la inestabilidad de su estado.

CPU


El poder de cálculo no es tan importante como el espacio en disco y la RAM, pero también afecta en cierta medida el rendimiento general del agente. Idealmente, los clientes deberían comprimir los mensajes para optimizar el uso de la red y el disco. Sin embargo, el agente de Kafka debe descomprimir todos los paquetes de mensajes para verificar las sumas de verificación de mensajes individuales y asignar compensaciones. Luego necesita comprimir el paquete de mensajes nuevamente para guardarlo en el disco. Para eso es que Kafka necesita la mayor parte de su potencia informática. Sin embargo, esto no debe considerarse como el factor principal en la elección del hardware.

Kafka en la nube


Kafka a menudo se instala en un entorno de computación en la nube como Amazon Web Services (AWS). AWS proporciona muchos nodos informáticos virtuales, todos con varias combinaciones de CPU, RAM y espacio en disco. Para seleccionar la configuración de host virtual adecuada, primero debe considerar los factores de rendimiento de Kafka. Puede comenzar con la cantidad requerida de almacenamiento de datos y luego tener en cuenta el rendimiento requerido de los generadores. Si necesita una latencia muy baja, es posible que necesite nodos virtuales optimizados para E / S con almacenamiento local basado en SSD. De lo contrario, puede haber suficiente almacenamiento remoto (por ejemplo, AWS Elastic Block Store). Después de tomar estas decisiones, puede elegir entre las opciones disponibles para la CPU y la RAM.
En la práctica, esto significa que si AWS está habilitado, puede seleccionar nodos virtuales de los tipos m4 o r3. Un nodo virtual de tipo m4 permite un almacenamiento más largo, pero con menos ancho de banda para escribir en el disco, ya que se basa en el almacenamiento de bloques adaptativo. El rendimiento de un nodo virtual como r3 es mucho mayor debido al uso de SSD locales, pero este último limita la cantidad de datos disponibles para el almacenamiento. Las ventajas de ambas opciones combinan tipos significativamente más caros de nodos virtuales i2 y d2.

Kafka Clusters


Un servidor Kafka separado es muy adecuado para el desarrollo local o la creación de prototipos de sistemas, pero la configuración de varios intermediarios para trabajar juntos como un clúster es mucho más rentable (Fig. 2.2). El principal beneficio de esto es la capacidad de escalar la carga en múltiples servidores. El segundo más importante es la capacidad de usar la replicación para proteger contra la pérdida de datos debido a fallas de sistemas individuales. La replicación también brinda la capacidad de realizar trabajos de mantenimiento en un Kafka o sistema subyacente mientras se mantiene la accesibilidad del cliente. En esta sección, solo consideraremos configurar el clúster de Kafka. Para obtener más información sobre la replicación de datos, consulte el Capítulo 6.

imagen


¿Cuántos corredores deberían ser?


El tamaño del grupo Kafka está determinado por varios factores. El primero de ellos es la cantidad de espacio en disco requerido para almacenar mensajes y la cantidad de espacio disponible en un intermediario separado. Si un clúster necesita almacenar 10 TB de datos y un agente independiente puede almacenar 2 TB, entonces el tamaño mínimo del grupo es de cinco agentes. Además, el uso de la replicación puede aumentar los requisitos de almacenamiento en al menos un 100% (dependiendo de su relación) (consulte el Capítulo 6). Esto significa que cuando se usa la replicación, el mismo clúster tendrá que contener al menos diez intermediarios.

Otro factor a considerar es la capacidad del clúster para procesar solicitudes. Por ejemplo, cuáles son las capacidades de las interfaces de red y si pueden hacer frente al tráfico de clientes con múltiples consumidores de datos o fluctuaciones de tráfico durante el almacenamiento de datos (es decir, en caso de picos de tráfico durante los períodos pico). Si la interfaz de red de un agente individual se utiliza al 80% en la carga máxima, y ​​hay dos consumidores de datos, entonces no podrán hacer frente al tráfico máximo con menos de dos agentes. Si se usa la replicación en un clúster, desempeña el papel de un consumidor de datos adicional que debe considerarse. Puede ser útil aumentar el número de intermediarios en el clúster para lidiar con problemas de rendimiento causados ​​por un menor rendimiento del disco o RAM disponible.

Configuración de corredores


Solo hay dos requisitos de configuración para los corredores cuando trabajan como parte de un solo clúster de Kafka. Primero, la configuración de todos los corredores debe tener el mismo valor para el parámetro zookeeper.connect. Define el conjunto de ZooKeeper y la ruta de almacenamiento para el clúster de metadatos. En segundo lugar, cada uno de los corredores de clúster debe tener un valor único para broker.id. Si dos intermediarios con el mismo valor de broker.id intentan unirse al clúster, el segundo intermediario escribirá un mensaje de error en el registro y no se iniciará. Hay otros parámetros de configuración de intermediarios utilizados durante la operación del clúster, a saber, los parámetros para la gestión de replicación descritos en los capítulos posteriores.

Afina el sistema operativo


Aunque la mayoría de las distribuciones de Linux tienen configuraciones de configuración de kernel preconfiguradas que son bastante buenas para la mayoría de las aplicaciones, puede hacer algunos cambios para mejorar el rendimiento del agente Kafka. Básicamente, se relacionan con los subsistemas de memoria virtual y la red, así como con puntos específicos con respecto al punto de montaje del disco para guardar segmentos de los registros. Estos parámetros generalmente se configuran en el archivo /etc/sysctl.conf, pero es mejor consultar la documentación de una distribución de Linux específica para conocer todos los matices de ajustar la configuración del kernel.

Memoria virtual


Por lo general, el sistema de memoria virtual de Linux se ajusta a la carga del sistema. Pero puede hacer algunos ajustes al trabajo tanto con el área de intercambio como con las páginas de memoria "sucias", para adaptarlo mejor a los detalles de la carga de Kafka.
Al igual que con la mayoría de las aplicaciones, especialmente aquellas en las que el ancho de banda es importante, es mejor evitar el intercambio (casi) a toda costa. El costo de intercambiar páginas de memoria al disco afecta significativamente todos los aspectos del rendimiento de Kafka. Además, Kafka usa activamente la memoria caché de la página del sistema, y ​​si el subsistema de memoria virtual se está intercambiando en el disco, entonces la memoria caché de la página no tiene suficiente memoria.

Una forma de evitar el intercambio es no asignarle espacio en la configuración. La búsqueda no es un requisito obligatorio, sino un seguro en caso de cualquier accidente en el sistema. Puede evitar que el sistema interrumpa inesperadamente la ejecución del proceso debido a la falta de memoria. Por lo tanto, se recomienda que el valor del parámetro vm.swappiness sea muy pequeño, por ejemplo 1. Este parámetro representa la probabilidad (en porcentaje) de que el subsistema de memoria virtual use el intercambio en lugar de eliminar páginas del caché de páginas. Es mejor reducir el tamaño de la memoria caché de la página que usar el intercambio.

También tiene sentido corregir lo que hace el núcleo del sistema con páginas sucias que deben vaciarse en el disco. La capacidad de respuesta de Kafka a los fabricantes depende del rendimiento de las E / S de disco. Es por eso que los segmentos de registro generalmente se encuentran en discos rápidos: discos separados con tiempos de respuesta rápidos (por ejemplo, SSD) o subsistemas de disco con una gran cantidad de NVRAM para el almacenamiento en caché (por ejemplo, RAID). Como resultado, es posible reducir el número de páginas "sucias", al llegar a las cuales se inicia un volcado de fondo de ellas en el disco. Para hacer esto, establezca el parámetro vm.dirty_background_ratio en un valor menor que el valor predeterminado (igual a 10). Significa una fracción de la memoria total del sistema (en porcentaje), y en muchos casos se puede establecer en 5. Sin embargo, no debe ser igual a 0,dado que en este caso el kernel vaciará continuamente las páginas al disco y, por lo tanto, perderá la capacidad de almacenar temporalmente las operaciones de escritura del disco con fluctuaciones temporales de rendimiento de los componentes de hardware subyacentes.

El número total de páginas "sucias", cuando se excede, el núcleo del sistema inicia por la fuerza el lanzamiento de operaciones síncronas para volcarlas en el disco, puede aumentarse aumentando el parámetro vm.dirty_ratio a un valor que excede el valor predeterminado de 20 (también un porcentaje de la memoria total del sistema ) Existe un amplio rango de valores posibles para este parámetro, pero los más razonables son entre 60 y 80. Cambiar este parámetro es algo arriesgado en términos tanto del volumen de acciones que no se transfieren al disco como de la probabilidad de pausas de E / S prolongadas en caso de un inicio forzado de operaciones de reinicio sincrónico. Al elegir valores más altos para el parámetro vm.dirty_ratio, se recomienda encarecidamente que utilice la replicación en el clúster Kafka para protegerse contra fallas del sistema.

Al elegir los valores de estos parámetros, tiene sentido controlar el número de páginas "sucias" durante la operación del clúster Kafka bajo carga durante la operación industrial o la simulación. Puede determinarlo mirando el archivo / proc / vmstat:

 # cat /proc/vmstat | egrep "dirty|writeback" nr_dirty 3875 nr_writeback 29 nr_writeback_temp 0 # 

Conducir


Además de la elección del hardware para el subsistema del disco duro, así como la configuración de la matriz RAID si se usa, el sistema de archivos utilizado para estas unidades es el más afectado. Hay muchos sistemas de archivos diferentes, pero EXT4 (cuarto sistema de archivos extendido - el cuarto sistema de archivos extendido) o XFS (Extents File System - el sistema de archivos basado en extensiones) se usa con mayor frecuencia como el local. EXT4 funciona bastante bien, pero requiere opciones de ajuste fino potencialmente inseguras. Entre ellos, establecer un intervalo de fijación más largo que el valor predeterminado (5), para reducir la frecuencia de descarga al disco. EXT4 también introdujo la asignación de bloques retrasados, lo que aumenta la probabilidad de pérdida de datos y daños en el sistema de archivos en caso de falla del sistema.El sistema de archivos XFS también utiliza un algoritmo de asignación diferido, pero es más seguro que EXT4. El rendimiento de XFS para la carga típica de Kafka también es mayor, y no hay necesidad de ajustarlo más allá del automático realizado por el propio sistema de archivos. También es más eficiente con las escrituras de disco por lotes combinadas para aumentar el rendimiento de E / S.

Independientemente del sistema de archivos seleccionado como punto de montaje para los segmentos de registro, se recomienda que especifique la opción de montaje noatime. Los metadatos del archivo contienen tres marcas de fecha / hora: hora de creación (ctime), última hora de modificación (mtime) y último archivo accedido (atime). Por defecto, el valor del atributo atime se actualiza cada vez que se lee un archivo. Esto aumenta significativamente el número de escrituras en el disco. El atributo atime generalmente no es muy útil, a menos que la aplicación necesite información sobre si se accedió al archivo después de su último cambio (en este caso, se puede aplicar el parámetro en tiempo real). Kafka no utiliza el atributo atime en absoluto, por lo que puede deshabilitarlo de forma segura. Establecer el parámetro noatime en un punto de montaje evita las actualizaciones de las marcas de fecha / hora,pero no afecta el manejo correcto de los atributos ctime y mtime.


Ajustar la configuración predeterminada de la pila de red de Linux es algo común para cualquier aplicación que genere mucho tráfico de red, ya que el núcleo predeterminado no es adecuado para la transmisión a alta velocidad de grandes cantidades de datos. De hecho, los cambios recomendados para Kafka no son diferentes de los recomendados para la mayoría de los servidores web y otras aplicaciones de red. Primero, debe cambiar el tamaño (predeterminado y máximo) de la memoria asignada para las memorias intermedias de envío y recepción para cada socket. Esto aumentará significativamente la productividad en caso de transferir grandes cantidades de datos. Los parámetros correspondientes para las memorias intermedias de envío y recepción predeterminadas para cada socket se denominan net.core.wmem_default y net.core.rmem_default, respectivamente, y un valor razonable es 2 097 152 (2 MB). Tener en cuentaque el tamaño máximo no significa asignar dicho espacio para cada búfer, sino que solo le permite hacerlo si es necesario.

Además de configurar sockets, debe establecer por separado los tamaños de los buffers de envío y recepción para los sockets TCP utilizando los parámetros net.ipv4.tcp_wmem y net.ipv4.tcp_rmem. Incluyen tres enteros separados por espacios que definen el tamaño mínimo, el tamaño predeterminado y el tamaño máximo, respectivamente. Un ejemplo de estos parámetros, 4096 65536 2048000, significa que el tamaño mínimo del búfer es de 4 KB, el tamaño predeterminado es de 64 KB y el máximo es de 2 MB. El tamaño máximo no puede exceder los valores especificados para todos los sockets por los parámetros net.core.wmem_max y net.core.rmem_max. Dependiendo de la carga real de sus corredores, Kafka puede necesitar aumentar los valores máximos para aumentar el almacenamiento en búfer de las conexiones de red.

Hay varios otros parámetros de red útiles. Puede habilitar el escalado de la ventana TCP configurando el parámetro net.ipv4.tcp_window_scaling en 1, lo que permitirá a los clientes transferir datos de manera más eficiente y proporcionará la capacidad de almacenar estos datos en el lado del intermediario. El valor del parámetro net.ipv4.tcp_max_syn_backlog es mayor que el valor predeterminado de 1024, lo que permite aumentar el número de conexiones simultáneas. Un valor de net.core.netdev_max_backlog que excede el valor predeterminado de 1000 puede ayudar en caso de ráfagas de tráfico de red, especialmente a velocidades de conexión de red del orden de gigabits, debido a un aumento en el número de paquetes en cola para su posterior procesamiento por el núcleo.

Explotación industrial


Cuando llegue el momento de llevar a Kafka de la prueba a la producción, solo hay algunas cosas más para encargarse de establecer un servicio de mensajería confiable.

Opciones de recolección de basura


El ajuste fino de la recolección de basura de Java para una aplicación siempre ha sido una especie de arte, que requiere información detallada sobre el uso de la memoria de la aplicación y una cantidad considerable de observaciones, prueba y error. Afortunadamente, esto ha cambiado desde el lanzamiento de Java 7 y el advenimiento del recolector de basura Garbage First (G1). G1 puede adaptarse automáticamente a diferentes tipos de carga y garantizar la coherencia de las pausas para la recolección de basura durante todo el ciclo de vida de la aplicación. También maneja fácilmente una gran pila, ya que la divide en pequeñas zonas, en lugar de recolectar basura en todo el montón con cada pausa.

En funcionamiento normal, todo este G1 requiere una configuración mínima. Para ajustar su rendimiento, se utilizan dos parámetros.

  • MaxGCPauseMillis. . — G1 . 200 . , G1 , , , , 200 .
  • InitiatingHeapOccupancyPercent. , . 45. , G1 , 45 % , (Eden), .

El agente de Kafka utiliza la memoria de almacenamiento dinámico de manera muy eficiente y crea objetos, por lo que puede establecer valores más bajos para estos parámetros. Los parámetros de recolección de basura dados en esta sección se consideran bastante adecuados para un servidor con 64 GB de RAM, donde Kafka trabajó con un montón de 5 GB. Este intermediario podría funcionar con el valor 20 del parámetro MaxGCPauseMillis. Y el valor del parámetro InitiatingHeapOccupancyPercent se establece en 35, de modo que la recolección de basura se inicia un poco antes que en el valor predeterminado.

El script de inicio de Kafka no usa el recolector de basura G1 de manera predeterminada, sino un nuevo recolector de basura paralelo y un recolector de basura de etiquetado y limpieza competitivo. Esto se puede cambiar fácilmente a través de variables de entorno. Modificamos el comando de ejecución anterior de la siguiente manera:

 # export JAVA_HOME=/usr/java/jdk1.8.0_51 # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true" # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties # 

Diseño del centro de datos


Cuando se utilizan sistemas orientados al desarrollo, la ubicación física de los corredores de Kafka en el centro de datos no importa mucho, ya que la inaccesibilidad parcial o total del clúster por períodos cortos de tiempo no afecta mucho el trabajo. Sin embargo, en la operación industrial, un simple proceso de salida de datos significa la pérdida de dinero debido a la incapacidad de atender a los usuarios o recibir telemetría de sus acciones. Al mismo tiempo, la importancia de utilizar la replicación en el clúster de Kafka (ver Capítulo 6), así como la ubicación física de los corredores en los bastidores del centro de datos, está creciendo. Si no se ocupa de esto antes de implementar Kafka, puede requerir un costoso trabajo de reubicación del servidor.

El corredor de Kafka no sabe nada acerca de la colocación del bastidor durante la asignación de nuevas particiones a los corredores, lo que significa que no puede tener en cuenta la posible ubicación de dos corredores en el mismo bastidor físico o en la misma zona de disponibilidad (cuando trabaja en un servicio en la nube, por ejemplo, AWS) Como resultado, puede poner accidentalmente todas las réplicas de una sección en correspondencia con los corredores que usan la misma red y conexiones de alimentación en el mismo bastidor. En caso de falla de este bastidor, las secciones serán inaccesibles para los clientes. Además, como resultado de elecciones "no limpias" del nodo maestro, esto puede conducir a una pérdida de datos adicional para la recuperación (ver detalles en el Capítulo 6).

Práctica recomendada: instalar cada agente Kafka en un clúster en un bastidor separado, o al menos usar varios puntos críticos de servicios de infraestructura, como energía y red. Por lo general, esto significa al menos el uso de servidores de energía de respaldo para los corredores (que se conectan a dos circuitos de suministro de energía diferentes) y conmutadores de red duales con una interfaz unificada a los servidores para cambiar a otra interfaz sin interrupciones. De vez en cuando, puede ser necesario realizar mantenimiento en el hardware del bastidor o gabinete y apagarlos, por ejemplo, mover el servidor o reemplazar el cableado.

Hospedar aplicaciones en ZooKeeper


Kafka usa ZooKeeper para almacenar metadatos sobre corredores, temas y secciones. Escribir en ZooKeeper se realiza solo cuando se cambian las listas de miembros de grupos de consumidores o los cambios en el clúster Kafka. El volumen de tráfico es mínimo, por lo que el uso de un conjunto ZooKeeper dedicado para un grupo Kafka no está justificado. De hecho, un conjunto de ZooKeeper a menudo se usa para varios grupos de Kafka (usando la nueva ruta raíz de ZooKeeper para cada grupo, como se describió anteriormente en este capítulo).

Sin embargo, cuando los consumidores y ZooKeeper trabajan con ciertas configuraciones, hay un matiz. Para corregir las compensaciones, los consumidores pueden usar ZooKeeper o Kafka, y el intervalo entre las fijaciones se puede ajustar. Si los consumidores usan ZooKeeper para las compensaciones, cada consumidor realizará una operación de escritura ZooKeeper después de un tiempo específico para cada sección que consume. El período de tiempo habitual para corregir las compensaciones es de 1 minuto, ya que es después de este tiempo que un grupo de consumidores lee mensajes duplicados en caso de una falla del consumidor. Estas confirmaciones pueden constituir una parte significativa del tráfico de ZooKeeper, especialmente en un clúster con muchos consumidores, por lo que deben considerarse. Si el conjunto de ZooKeeper no puede manejar esta cantidad de tráfico, es posible que deba aumentar el intervalo de confirmación. Sin embargo recomendadopara que los consumidores que trabajan con las bibliotecas actuales de Kafka utilicen Kafka para corregir las compensaciones y no dependan de ZooKeeper.

Además de usar un conjunto para varios grupos de Kafka, no se recomienda compartir el conjunto con otras aplicaciones si se puede evitar. Kafka es muy sensible a la duración del retraso y la latencia de ZooKeeper, y la interrupción de la comunicación con el conjunto puede causar un comportamiento impredecible de los corredores. Como resultado, varios corredores pueden desconectarse al mismo tiempo en caso de pérdida de conexiones a ZooKeeper, lo que conducirá a la desconexión de las particiones. Esto también creará una carga adicional en el administrador de clúster, lo que puede causar errores no obvios durante mucho tiempo después de una falla de comunicación, por ejemplo, cuando se intenta detener el intermediario de forma controlada. Otras aplicaciones que crean una carga en el administrador de clúster como resultado del uso activo o el funcionamiento incorrecto deben trasladarse a conjuntos separados.

Resumen


En este capítulo, hablamos sobre cómo instalar y ejecutar Apache Kafka. Analizamos cómo elegir el hardware adecuado para los corredores y descubrimos los problemas específicos de la configuración para la operación industrial. Ahora que tenemos un clúster de Kafka, podemos repasar los problemas básicos de las aplicaciones cliente de Kafka. Los próximos dos capítulos se dedicarán a crear clientes tanto para generar mensajes para Kafka (capítulo 3) como para su consumo posterior (capítulo 4).

»Se puede encontrar más información sobre el libro en el sitio web del editor
» Contenidos
» Extracto

Cupón de 20% de descuento para Habrozhitelami - Apache Kafka

Source: https://habr.com/ru/post/es421519/


All Articles