Integración continua en Yandex. Parte 2

En el artículo anterior , hablamos sobre la transferencia del desarrollo a un único repositorio con un enfoque de desarrollo basado en troncales, con sistemas unificados para ensamblaje, prueba, implementación y monitoreo, sobre qué tareas debe resolver un sistema de integración continua para trabajar eficazmente en tales condiciones.


Hoy le diremos a los lectores de Habr sobre el dispositivo del sistema de integración continua.


imagen


Un sistema de integración continua debe funcionar de manera confiable y rápida. El sistema debe responder rápidamente a los eventos entrantes y no debe introducir demoras adicionales en el proceso de entrega de resultados de ejecución de prueba al usuario. Los resultados del montaje y las pruebas deben entregarse al usuario en tiempo real.


El sistema de integración continua es un sistema de procesamiento de datos de transmisión con retrasos mínimos.


Después de enviar todos los resultados en una determinada etapa (configuración, compilación, estilo, pruebas pequeñas, pruebas medianas, etc.), el sistema de compilación indica esto al sistema de integración continua ("cierra" la etapa), y el usuario lo ve para esta verificación y En esta etapa se conocen todos los resultados. Cada etapa se cierra de forma independiente. El usuario recibe una señal útil más rápido. Después de cerrar todas las etapas, la verificación se considera completa.


Para implementar el sistema, elegimos la arquitectura Kappa . El sistema consta de 2 subsistemas:


  • El procesamiento de eventos y datos tiene lugar en un circuito en tiempo real. Cualquier dato de entrada se trata como flujos de datos (flujos). Primero, los eventos se registran en la transmisión y solo luego se procesan.
  • Los resultados del procesamiento de datos se escriben continuamente en la base de datos, donde luego van las llamadas a través de la API. En la arquitectura Kappa, esto se llama la capa de servicio.

Todas las solicitudes de modificación de datos deben pasar por el circuito en tiempo real, porque siempre debe tener el estado actual del sistema. Las solicitudes de lectura van solo a la base de datos.




Siempre que sea posible, seguimos la regla de solo agregar. Sin modificaciones ni eliminaciones de objetos, con la excepción de eliminar datos antiguos e innecesarios.


Más de 2 Tb de datos sin procesar pasan por el servicio por día.


Ventajas:


  • Las transmisiones contienen todos los eventos y mensajes. Siempre podemos entender qué y cuándo sucedió. Stream puede ser percibido como un gran registro.
  • Alta eficiencia y gastos generales mínimos. Resulta un sistema totalmente orientado a eventos, sin ninguna pérdida en el sondeo. No hay evento, no estamos haciendo nada extra.
  • El código de la aplicación prácticamente no trata las primitivas de la sincronización de subprocesos y la memoria compartida entre subprocesos. Esto hace que el sistema sea más confiable.
  • Los procesadores están bien aislados entre sí, porque no interactúes directamente, solo a través de flujos. Se puede proporcionar una buena cobertura de prueba.

Pero el procesamiento de datos de transmisión no es tan simple:


  • Se requiere una buena comprensión del modelo computacional. Tendrá que repensar los algoritmos de procesamiento de datos existentes. No todos los algoritmos entran inmediatamente en el modelo de transmisión y tienes que aplastarte un poco.
  • Es necesario garantizar que se mantenga el orden de recepción y procesamiento de eventos.
  • Debe poder manejar eventos interrelacionados, es decir tener acceso rápido a todos los datos necesarios mientras procesa un nuevo mensaje.
  • También debe ser capaz de manejar eventos duplicados.

Procesamiento de flujo


Mientras trabajaba en el proyecto, se escribió la biblioteca Stream Processor, que nos ayudó a implementar y lanzar rápidamente algoritmos de procesamiento de datos de transmisión en producción.


Stream Processor es una biblioteca para construir sistemas de procesamiento de datos de transmisión. La secuencia es una secuencia de datos (mensajes) potencialmente interminable en la que solo es posible agregar mensajes nuevos; los mensajes ya grabados no se modifican ni se eliminan de la secuencia. Los convertidores de un flujo a otro (procesadores de flujo) constan funcionalmente de tres partes: un proveedor de mensajes entrantes, que generalmente lee mensajes de uno o más flujos y los coloca en una cola de procesamiento, un procesador de mensajes que convierte los mensajes entrantes en salientes y los coloca en una cola al registro y al escritor, donde los mensajes salientes agrupados dentro de la ventana de tiempo caen en la secuencia de salida. Los mensajes de datos generados por un procesador de flujo pueden ser utilizados por otros más adelante. Por lo tanto, las secuencias y los procesadores forman un gráfico dirigido en el que los bucles son posibles, en particular, un procesador de secuencias puede incluso generar mensajes en la misma secuencia desde donde recibe datos.


Se garantiza que cada mensaje de la secuencia de entrada será procesado por cada procesador asociado con él al menos una vez (semántica al menos una vez). También se garantiza que todos los mensajes se procesarán en el orden en que llegaron a esta secuencia. Para hacer esto, los procesadores de flujo se distribuyen en todos los nodos de servicio en funcionamiento, de modo que al mismo tiempo no funcione más de una instancia de cada procesador registrado.


El procesamiento de eventos interrelacionados es uno de los principales problemas en la construcción de sistemas para el procesamiento de datos en streaming. Como regla general, cuando se transmiten mensajes, los procesadores de flujo crean gradualmente un cierto estado que era válido en el momento en que se procesó el mensaje actual. Tales objetos de estado generalmente se asocian no con la secuencia completa en su conjunto, sino con un cierto subconjunto de mensajes, que está determinado por el valor clave en esta secuencia. El almacenamiento eficiente de la riqueza es la clave del éxito. Al procesar el siguiente mensaje, es importante que el procesador pueda obtener rápidamente este estado y, basándose en él y en el mensaje actual, generar mensajes salientes. Estos objetos de estado son accesibles para los procesadores en L1 (no confunda con el caché de la CPU) caché LRU, que se encuentra en la memoria. En el caso de que no haya estado en el caché L1, se restaura desde el caché L2 ubicado en el mismo almacenamiento donde se almacenan las secuencias y donde se almacena periódicamente durante el funcionamiento del procesador. Si no había estado en el caché L2, se restaura a partir de los mensajes de flujo originales, como si el procesador hubiera procesado todos los mensajes originales asociados con la clave de mensaje actual. La técnica de almacenamiento en caché también le permite lidiar con el problema de la alta latencia del almacenamiento, ya que a menudo el procesamiento secuencial no descansa en el rendimiento del servidor, sino en el retraso de las solicitudes y respuestas al comunicarse con el almacén de datos.




Para almacenar datos de manera efectiva en cachés L1 y datos de mensajes en la memoria, además de estructuras eficientes en la memoria, utilizamos grupos de objetos que le permiten tener solo una copia de un objeto (o incluso partes de él) en la memoria. Esta técnica ya se usa en el JDK para cadenas de pasadas de cadena y se extiende de manera similar a otros tipos de objetos, que deberían ser inmutables.


Para el almacenamiento compacto de datos en el almacenamiento de flujo, algunos datos se normalizan antes de escribir en el flujo, es decir Conviértete en números. Los algoritmos de compresión efectivos se pueden aplicar a los números (identificadores de objeto). Los números se ordenan, se cuentan los deltas, luego se codifica con ZigZag Encoding y luego se comprime con el archivador. La normalización no es una técnica muy estándar para la transmisión de sistemas de procesamiento de datos. Pero esta técnica de compresión es muy efectiva y la cantidad de datos en la secuencia más cargada se reduce en aproximadamente 1,000 veces.




Para cada secuencia y procesador, rastreamos el ciclo de vida del procesamiento de mensajes: la aparición de nuevos mensajes en la secuencia de entrada, el tamaño de la cola de mensajes no procesados, el tamaño de la cola para escribir en la secuencia resultante, el tiempo de procesamiento de mensajes y la distribución del tiempo por etapas de procesamiento de mensajes:




Almacén de datos


Los resultados del procesamiento de datos de transmisión deben estar disponibles para el usuario lo antes posible. Los datos procesados ​​de las transmisiones deben registrarse continuamente en la base de datos, donde puede solicitar datos (por ejemplo, mostrar un informe con los resultados de la prueba, mostrar el historial de la prueba).


Características de los datos almacenados y consultas.
La mayoría de los datos son ejecuciones de prueba. Durante un mes, se lanzan más de 1.500 millones de compilaciones y pruebas. Se almacena una cantidad bastante grande de información para cada lanzamiento: el resultado y el tipo de error, una breve descripción del error (fragmento), varios enlaces a los registros, la duración de la prueba, un conjunto de valores numéricos, métricas, en el formato nombre = valor, etc. Algunos de estos datos, por ejemplo, métricas y duración, son muy difíciles de comprimir, ya que de hecho son valores aleatorios. La otra parte, por ejemplo, el resultado, el tipo de error, los registros, se pueden guardar de manera más eficiente, ya que casi no cambian en la misma prueba de una ejecución a otra.


Anteriormente, usábamos MySQL para almacenar datos procesados. Poco a poco comenzamos a descansar contra las capacidades de la base de datos:


  • La cantidad de datos procesados ​​se duplica cada seis meses.
  • Solo pudimos almacenar datos durante los últimos 2 meses, pero queríamos almacenar datos durante al menos un año.
  • Problemas con la velocidad de ejecución de algunas consultas pesadas (cercanas a las analíticas).
  • Esquema de base de datos complicado. Muchas tablas (normalización), lo que complica la escritura en la base de datos. El esquema base es muy diferente del esquema de objetos utilizados en el circuito en tiempo real.
  • No experimentar un apagado del servidor. La falla de un servidor separado o el apagado del centro de datos puede conducir a una falla del sistema.
  • Operación bastante complicada.

Como candidatos para el nuevo almacén de datos, consideramos varias opciones: PostgreSQL, MongoDB y varias soluciones internas, incluida ClickHouse .


Algunas soluciones no nos permiten almacenar nuestros datos de manera más eficiente que la antigua solución basada en MySQL. Otros no permiten la implementación de consultas rápidas y complejas (casi analíticas). Por ejemplo, tenemos una solicitud bastante pesada que muestra confirmaciones que afectan a un proyecto específico (un conjunto de pruebas). En todos los casos en que no podamos ejecutar consultas SQL rápidas, tendríamos que obligar al usuario a esperar mucho tiempo o hacer algunos cálculos por adelantado con una pérdida de flexibilidad. Si cuenta algo por adelantado, debe escribir más código y al mismo tiempo perder flexibilidad: no hay forma de cambiar rápidamente el comportamiento y contar nada. Es mucho más conveniente y rápido escribir una consulta SQL que devolverá los datos que el usuario necesita y podrá modificarlos rápidamente si desea cambiar el comportamiento del sistema.


Clickhouse


Optamos por ClickHouse . ClickHouse es un sistema de gestión de bases de datos en columnas (DBMS) para el procesamiento de consultas analíticas en línea (OLAP).


Cambiando a ClickHouse, abandonamos deliberadamente algunas de las oportunidades proporcionadas por otros DBMS, recibiendo una compensación más que valiosa por esto en forma de consultas analíticas muy rápidas y un almacén de datos compacto.


En los DBMS relacionales, los valores relacionados con una fila se almacenan físicamente uno al lado del otro. En ClickHouse, los valores de diferentes columnas se almacenan por separado, y los datos de una columna se almacenan juntos. Este orden de almacenamiento de datos le permite proporcionar un alto grado de compresión de datos con la elección correcta de la clave primaria. También afecta en qué escenarios el DBMS funcionará bien. ClickHouse funciona mejor con consultas, donde se lee una pequeña cantidad de columnas y la consulta usa una tabla grande y el resto de las tablas son pequeñas. Pero incluso en consultas no analíticas, ClickHouse puede mostrar buenos resultados.


Los datos en las tablas se ordenan por clave primaria. La clasificación se realiza en segundo plano. Esto le permite crear un índice disperso de un volumen pequeño, lo que le permite encontrar datos rápidamente. ClickHouse no tiene índices secundarios. Hablando estrictamente, hay un índice secundario: la clave de partición (ClickHouse corta los datos de partición donde la clave de partición se especifica en la solicitud). Más detalles


El esquema de datos con normalización no es funcional, por el contrario, es preferible desnormalizar los datos en función de las solicitudes. Es preferible crear tablas "anchas" con una gran cantidad de columnas. Este elemento también está relacionado con el anterior, porque la ausencia de índices secundarios a veces hace copias de las tablas con una clave primaria diferente.


ClickHouse no tiene UPDATE y DELETE en el sentido clásico, pero existe la posibilidad de emularlos.


Los datos deben insertarse en bloques grandes y no con demasiada frecuencia (una vez cada pocos segundos). La carga de datos línea por línea es prácticamente inoperante en volúmenes de datos reales.


ClickHouse no admite transacciones; el sistema eventualmente se vuelve consistente .


Sin embargo, algunas características de ClickHouse, similares a otros DBMS, hacen que sea más fácil transferirle los sistemas existentes.


  • ClickHouse usa SQL, pero con ligeras diferencias, útil para consultas típicas de los sistemas OLAP. Existe un poderoso sistema de funciones agregadas, ALL / ANY JOIN, expresiones lambda en funciones y otras extensiones SQL que le permiten escribir casi cualquier consulta analítica.
  • ClickHouse admite replicación, grabación de quórum, lectura de quórum. Es necesaria una escritura de quórum para un almacenamiento de datos confiable: INSERT es exitoso solo si ClickHouse pudo escribir datos en un número dado de réplicas sin error.

Puede leer más sobre las características de ClickHouse en la documentación .


Características de trabajar con ClickHouse


Elección de clave principal y clave de partición.


¿Cómo elegir una clave principal y una clave de partición? Quizás esta es la primera pregunta que surge al crear una nueva tabla. La elección de la clave principal y la clave de partición generalmente está dictada por las consultas que se realizarán en los datos. Al mismo tiempo, las consultas que usan ambas condiciones resultan ser las más efectivas: tanto por la clave primaria como por la clave de partición.


En nuestro caso, las tablas principales son las matrices para ejecutar las pruebas. Es lógico suponer que con esta estructura de datos, las claves deben seleccionarse de modo que el orden de anulación de una de ellas vaya en el orden de aumentar el número de fila y el orden de anulación de la otra, en el orden de aumentar el número de columna.


También es importante tener en cuenta que la elección de la clave primaria puede afectar drásticamente la compacidad del almacenamiento de datos, ya que los valores idénticos en la omisión de la clave primaria en otras columnas casi no ocupan espacio en la tabla. Entonces, en nuestro caso, por ejemplo, los estados de las pruebas cambian poco de commit a commit. Este hecho esencialmente predeterminó la elección de la clave primaria: un par de identificador de prueba y número de confirmación. Además, en ese orden.




La clave de partición tiene dos propósitos. Por un lado, permite que las particiones se "archiven" para que puedan eliminarse permanentemente del almacenamiento, ya que los datos en ellas ya están desactualizados. Por otro lado, la clave de partición es un índice secundario, lo que significa que le permite acelerar las consultas si hay una expresión para ello.


Para nuestras matrices, elegir el número de confirmación como clave de partición parece bastante natural. Pero si establece el valor de revisión en la expresión para la clave de partición, habrá una gran cantidad de particiones en dicha tabla, lo que degradará el rendimiento de las consultas. Por lo tanto, en la expresión para la clave de partición, el valor de revisión puede dividirse en un número grande para reducir el número de particiones, por ejemplo, PARTITION BY intDiv (revisión, 2000). Este número debe ser lo suficientemente grande como para que el número de particiones no exceda los valores recomendados, mientras que debe ser lo suficientemente pequeño como para que no caigan muchos datos en una partición y la base de datos no tenga que leer demasiados datos.


¿Cómo implementar ACTUALIZAR y ELIMINAR?


En el sentido habitual, UPDATE y DELETE no son compatibles con ClickHouse. Sin embargo, en lugar de ACTUALIZAR y ELIMINAR, puede agregar una columna con la versión a la tabla y usar el motor especial ReplacingMergeTree (elimina los registros duplicados con el mismo valor de clave principal). En algunos casos, la versión estará naturalmente presente en la tabla desde el principio: por ejemplo, si queremos crear una tabla para el estado actual de la prueba, la versión en esta tabla será el número de confirmación.


CREATE TABLE current_tests ( test_id UInt64, value Nullable(String), version UInt64 ) ENGINE = ReplacingMergeTree(version) ORDER BY test_id 

En el caso de un cambio de registro, agregamos la versión con un nuevo valor, en el caso de eliminación, con un valor NULL (o algún otro valor especial que no se puede encontrar en los datos).


¿Qué lograste con el nuevo almacenamiento?


Uno de los principales objetivos de cambiar a ClickHouse era la capacidad de almacenar el historial de pruebas durante un largo período de tiempo (varios años o al menos un año en el peor de los casos). Ya en la etapa de prototipo, quedó claro que podemos sortear los SSD existentes en nuestros servidores para almacenar al menos una historia de tres años. Las consultas analíticas se han acelerado significativamente, ahora podemos extraer mucha más información útil de nuestros datos. El margen RPS ha aumentado. Además, este valor se escala casi linealmente mediante la adición de nuevos servidores al clúster ClickHouse. Crear un nuevo almacén de datos para la base de datos ClickHouse es solo un paso apenas perceptible para el usuario final hacia un objetivo más importante: agregar nuevas funciones, acelerar y simplificar el desarrollo, gracias a la capacidad de almacenar y procesar grandes cantidades de datos.


Ven a nosotros


Nuestro departamento está en constante expansión. Visítanos si quieres trabajar en tareas y algoritmos complejos e interesantes. Si tiene preguntas, puede preguntarme directamente en PM.


Enlaces utiles


Procesamiento de flujo



Arquitectura Kappa



ClickHouse:


Source: https://habr.com/ru/post/es429956/


All Articles