
Al desarrollar cualquier producto, ya sea un servicio de video o una cinta, historias o artículos, quiero poder medir la "felicidad" condicional del usuario. Para comprender si estamos haciendo nuestros cambios mejores o peores, para ajustar la dirección del desarrollo del producto, confiando no en la intuición y nuestros propios sentimientos, sino en métricas y números en los que puede creer.
En este artículo, le contaré cómo logramos lanzar estadísticas y análisis de productos en un servicio con una audiencia mensual de 97 millones, mientras recibimos consultas analíticas de rendimiento extremadamente alto. Hablaremos sobre ClickHouse, los motores utilizados y las características de las consultas. Describiré un enfoque para la agregación de datos, que nos permite obtener métricas complejas en una fracción de segundo y hablar sobre la conversión y las pruebas de datos.
Ahora tenemos alrededor de 6 mil millones de eventos alimenticios por día, en un futuro cercano alcanzaremos 20-25 mil millones. Y luego, no a un ritmo tan rápido, aumentaremos a 40-50 mil millones para fin de año, cuando describamos todos los eventos de alimentos que nos interesan.
1 filas en conjunto. Transcurrido: 0.287 seg. Procesado 59.85 mil millones de filas, 59.85 GB (208.16 mil millones de filas / s., 208.16 GB / s.)Detalles debajo del corte.
Prólogo
Las herramientas analíticas fueron VKontakte antes. Se consideraron usuarios únicos, fue posible construir cronogramas de eventos por sectores y, por lo tanto, caer en las profundidades del servicio. Sin embargo, se trataba de cortes fijos por adelantado, de datos agregados, de HLL para los únicos, de cierta rigidez e incapacidad para responder rápidamente preguntas un poco más complicadas que "¿cuánto?"
Por supuesto, hubo, hay y habrá hadoop, también fue escrito, escrito y se escribirá mucho, muchos registros de uso de servicios. Desafortunadamente, hdfs fue utilizado solo por algunos equipos para implementar sus propias tareas. Aún más triste, hdfs no se trata de consultas analíticas rápidas: hubo preguntas en muchos campos, las respuestas se tuvieron que encontrar en el código y no en la documentación accesible para todos.
Llegamos a la conclusión de que ya no es posible vivir así. Cada equipo debe tener datos, las consultas sobre ellos deben ser rápidas y los datos en sí mismos deben ser precisos y ricos en parámetros útiles.
Por lo tanto, formulamos requisitos claros para el nuevo sistema de estadísticas / análisis:
- las consultas analíticas deben ser rápidas;
- los datos son bastante precisos, idealmente estos son eventos de interacción del usuario sin procesar con el servicio;
- la estructura de los eventos debe ser descrita, entendida y accesible;
- almacenamiento de datos confiable, garantía de entrega única;
- es posible contar los únicos, la audiencia (diaria, semanal, mensual), las métricas de retención, el tiempo que el usuario pasa en el servicio, las acciones cuantificadas en métricas únicas y otras por el conjunto de sectores;
- Las pruebas, la conversión de datos y la visualización están en progreso.
En la cocina
La experiencia sugirió que necesitábamos dos bases de datos: una lenta, donde agregaríamos y enriqueceríamos los datos, y una rápida, donde podríamos trabajar con estos datos y construir gráficos sobre ellos. Este es uno de los enfoques más comunes, en el que, en una base de datos lenta, por ejemplo, en hdfs, se construyen diferentes proyecciones, sobre las únicas y sobre el número de eventos por sectores durante un cierto período de tiempo.
En un cálido día de septiembre, mientras hablamos con una taza de té en la cocina con vista a la Catedral de Kazan, tuvimos la idea de probar ClickHouse como base rápida; en ese momento ya lo usábamos para almacenar registros técnicos. Hubo muchas dudas asociadas principalmente con la velocidad y la confiabilidad: las pruebas de rendimiento declaradas parecían poco realistas, y las nuevas versiones de la base de datos rompían periódicamente la funcionalidad existente. Por lo tanto, la propuesta era simple: probar.
Primeras muestras
Implementamos un clúster de dos máquinas con esta configuración:
2xE5-2620 v4 (32 núcleos en total), 256G ram, 28T lugares (raid10 con ext4).
Inicialmente, estaba cerca del diseño, pero luego cambiamos a lejos. ClickHouse tiene muchos motores de mesa diferentes, pero los principales son de la familia MergeTree. Elegimos ReplicatedReplacingMergeTree con aproximadamente la siguiente configuración:
PARTITION BY dt ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id) SAMPLE BY cityHash64(user_id) SETTINGS index_granularity = 8192;
Replicado : significa que la tabla se replica y esto resuelve uno de nuestros requisitos de confiabilidad.
Reemplazo : la tabla admite la deduplicación por la clave primaria: de forma predeterminada, la clave primaria coincide con la clave de clasificación, por lo que la sección ORDER BY simplemente le indica cuál es la clave primaria.
SAMPLE BY - También quería probar el muestreo: la muestra devuelve una muestra pseudoaleatoria uniforme.
index_granularity = 8192 es el número mágico de filas de datos entre serif de índice (sí, es escaso), que se usa por defecto. No lo cambiamos.
El particionamiento se realizó por día (aunque por defecto, por mes). Se suponía que muchas solicitudes de datos eran intradía, por ejemplo, crear un gráfico de minutos de vistas de video para un día determinado.
Luego, tomamos un trozo de registros técnicos y llenamos la mesa con aproximadamente mil millones de filas. Excelente compresión, agrupación por tipo de columna Int *, contando valores únicos: ¡todo funcionó increíblemente rápido!
Hablando de velocidad, quiero decir que ni una sola solicitud duró más de 500 ms, y la mayoría de ellos se ajustan a 50-100 ms. Y esto está en dos máquinas, y, de hecho, solo una estuvo involucrada en los cálculos.
Analizamos todo esto e imaginamos que en lugar de la columna UInt8 habrá una identificación del país, y la columna Int8 será reemplazada por datos, por ejemplo, sobre la edad del usuario. Y se dieron cuenta de que ClickHouse es completamente adecuado para nosotros, si todo se hace correctamente.
Fuerte tipificación de datos
El beneficio de ClickHouse comienza exactamente cuando se forma el esquema de datos correcto. Ejemplo: plataforma String - mala, plataforma Int8 + diccionario - buena, LowCardinality (String) - conveniente y buena (hablaré de LowCardinality un poco más adelante).
Creamos una clase de generador especial en php, que, a pedido, crea clases de envoltura sobre eventos basados en tablas en ClickHouse, y un único punto de entrada al registro. Explicaré el ejemplo del esquema que resultó:
- El analista / ingeniero de datos / desarrollador describe la documentación: qué campos, posibles valores, eventos deben registrarse.
- Se crea una tabla en ClickHouse de acuerdo con la estructura de datos del párrafo anterior.
- Se generan clases de envoltura para eventos basados en una tabla.
- El equipo de producto implementa rellenando los campos de un objeto de esta clase, enviando.
Cambiar el esquema a nivel de php y el tipo de datos registrados no funcionará sin cambiar primero la tabla en ClickHouse. Y esto, a su vez, no puede hacerse sin coordinación con el equipo, cambios en la documentación y descripción de los eventos.
Para cada evento, puede establecer dos configuraciones que controlan el porcentaje de eventos enviados a ClickHouse y hadoop, respectivamente. La configuración es necesaria principalmente para un avance gradual con la capacidad de reducir el registro si algo sale mal. Antes de hadoop, los datos se entregan de manera estándar utilizando Kafka. Y en ClickHouse, vuelan a través de un
esquema con KittenHouse en modo persistente, lo que garantiza al menos la entrega de un solo evento.
El evento se entrega a la tabla de búfer en el fragmento deseado, en función del resto de dividir algún hash de user_id por el número de fragmentos en el clúster. A continuación, la tabla de búfer vacía los datos al ReplicatedReplacingMergeTree local. Y encima de las tablas locales, se extrae una tabla distribuida con el motor Distribuido, que le permite acceder a los datos de todos los fragmentos.
Desnormalización
ClickHouse es un DBMS columnar. No se trata de formas normales, lo que significa que es mejor tener toda la información correcta en el evento que unirse. También hay Join, pero si la tabla correcta no cabe en la memoria, comienza el dolor. Por lo tanto, tomamos una decisión decidida: toda la información que nos interesa debe almacenarse en el evento en sí. Por ejemplo, género, edad del usuario, país, ciudad, fecha de nacimiento: toda esa información pública puede ser útil para el análisis de la audiencia, así como toda la información útil sobre el objeto de interacción. Si, por ejemplo, estamos hablando de video, es video_id, video_owner_id, fecha de carga del video, duración, calidad en el momento del evento, calidad máxima, etc.
En total, en cada tabla tenemos de 50 a 200 columnas, mientras que en todas las tablas hay campos de servicio. Por ejemplo, el registro de errores es error_log; de hecho, llamamos a un error fuera del rango del tipo. En caso de que los valores extraños vayan más allá del tamaño del tipo en el campo con la edad.
Tipo de baja cardinalidad (T)
ClickHouse tiene la capacidad de usar diccionarios externos. Se almacenan en la memoria, se actualizan periódicamente, se pueden usar de manera efectiva en varios escenarios, incluso como libros de referencia clásicos. Por ejemplo, desea registrar el sistema operativo y tiene dos alternativas: una cadena o un número + un directorio. Por supuesto, en grandes cantidades de datos y para consultas analíticas de alto rendimiento, es lógico escribir un número y obtener una representación de cadena del diccionario cuando necesite:
dictGetString('os', 'os_name', toUInt64(os_id))
Pero hay una forma mucho más conveniente: usar el tipo LowCardinality (String), que crea automáticamente un diccionario. El rendimiento con LowCardinality bajo la condición de baja cardinalidad del conjunto de valores es radicalmente mayor que con String.
Por ejemplo, usamos LowCardinality (String) para los tipos de evento 'play', 'pause', 'rewind'. O para la plataforma: 'web', 'android', 'iphone':
SELECT vk_platform, count() FROM t WHERE dt = yesterday() GROUP BY vk_platform Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB (13.65 billion rows/s., 41.04 GB/s.)
La característica aún es experimental, por lo que para usarla debe realizar:
SET allow_experimental_low_cardinality_type = 1;
Pero existe la sensación de que después de un tiempo ya no estará en el entorno.
Agregación de datos de VKontakte
Dado que hay muchas columnas, y hay muchos eventos, el deseo natural es cortar las particiones "viejas", pero primero, ensamblar las unidades. Ocasionalmente, es necesario analizar eventos sin procesar (hace un mes o un año), por lo que no recortamos los datos en hdf; cualquier analista puede comunicarse con el parquet deseado para cualquier fecha.
Como regla general, cuando se agrega en un intervalo de tiempo, siempre descansamos en el hecho de que el número de filas por unidad de tiempo es igual al producto de la potencia de corte. Esto impone restricciones: los países comienzan a agruparse en grupos como 'Rusia', 'Asia', 'Europa', 'El resto del mundo' y edades, a intervalos, para reducir la dimensión a un millón condicional de líneas por fecha.
Agregación por dt, user_id
¡Pero tenemos un ClickHouse reactivo! ¿Podemos acelerar a 50-100 millones de líneas en una fecha?
Las pruebas rápidas demostraron que podemos, y en ese momento surgió una idea simple: dejar al usuario en la máquina. Es decir, agregar no por "fecha, cortes" usando herramientas de chispa, sino por "fecha, usuario" significa por ClickHouse, mientras se hace una "transposición" de datos.
Con este enfoque, almacenamos a los usuarios en datos agregados, lo que significa que aún podemos considerar indicadores de audiencia, retención y métricas de frecuencia. Podemos conectar unidades, contando las audiencias comunes de varios servicios hasta toda la audiencia de VKontakte. Todo esto puede hacerse por cualquier segmento que esté presente en la tabla condicionalmente al mismo tiempo.
Ilustraré con un ejemplo:

Después de la agregación (muchas más columnas a la derecha):

En este caso, la agregación ocurre precisamente por (dt, user_id). Para los campos con información del usuario, con dicha agregación, puede usar las funciones any, anyHeavy (selecciona un valor frecuente). Puede, por ejemplo, recopilar anyHeavy (plataforma) en conjunto para saber qué plataforma está utilizando el usuario en su mayor parte de los eventos de video. Si lo desea, puede usar groupUniqArray (plataforma) y almacenar una matriz de todas las plataformas desde las cuales el usuario generó el evento. Si esto no es suficiente, puede crear columnas separadas para la plataforma y almacenar, por ejemplo, la cantidad de videos únicos proyectados a la mitad desde una plataforma específica:
uniqCombinedIf(cityHash64(video_owner_id, video_id), (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android
Con este enfoque, se obtiene un agregado bastante amplio en el que cada fila es un usuario único y cada columna contiene información sobre el usuario o sobre su interacción con el servicio.
Resulta que para calcular la DAU de un servicio, es suficiente ejecutar dicha solicitud además de su agregado:
SELECT dt, count() as DAU FROM agg GROUP BY dt Elapsed: 0.078 sec.
O calcule cuántos días los usuarios estuvieron en el servicio durante la semana:
SELECT days_in_service, count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 2.922 sec.
Podemos acelerar por muestreo, mientras casi sin perder precisión:
SELECT days_in_service, 10 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 10 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 0.454 sec.
Cabe señalar de inmediato que el muestreo no es por el porcentaje de eventos, sino por el porcentaje de usuarios, y como resultado se convierte en una herramienta increíblemente poderosa.
O lo mismo durante 4 semanas con 1/100 de muestreo: se obtienen resultados 1% menos precisos.
SELECT days_in_service, 100 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 100 WHERE dt > (yesterday() - 28) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 28 rows in set. Elapsed: 0.287 sec.
Agregación por otro lado
Al agregar por (dt, user_id), no perdemos al usuario, no perdemos información sobre su interacción con el servicio, pero, por supuesto, perdemos las métricas sobre un objeto de interacción específico. Pero tampoco puedes perder esto: construyamos la unidad
(dt, video_owner_id, video_id), adhiriéndose a las mismas ideas. Mantenemos la información sobre el video tanto como sea posible, no nos perdemos los datos sobre la interacción del video con el usuario, y extrañamos por completo la información sobre el usuario específico.
SELECT starts FROM agg3 WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...) 1 rows in set. Elapsed: 0.030 sec
O las 10 mejores vistas de video de ayer:
SELECT video_id, video_owner_id, watches FROM video_agg_video_d1 WHERE dt = yesterday() ORDER BY watches DESC LIMIT 10 10 rows in set. Elapsed: 0.035 sec.
Como resultado, tenemos un esquema de unidades de la forma:
- agregación por "fecha, usuario" dentro del producto;
- agregación por "fecha, objeto de interacción" dentro del producto;
- a veces surgen otras proyecciones.
Azkaban y TeamCity
Finalmente, algunas palabras sobre la infraestructura. Nuestra colección agregada comienza por la noche, comenzando con OPTIMIZE en cada una de las tablas con datos sin procesar para desencadenar una fusión de datos extraordinaria en ReplicatedReplacingMergeTree. La operación puede durar lo suficiente, sin embargo, si es necesario, es necesario eliminar las tomas. Vale la pena señalar que hasta ahora nunca he encontrado duplicados, pero no hay garantías de que no aparezcan en el futuro.
El siguiente paso es la creación de agregados. Estos son scripts de bash en los que ocurre lo siguiente:
- primero obtenemos el número de fragmentos y algún host del fragmento:
SELECT shard_num, any(host_name) AS host FROM system.clusters GROUP BY shard_num
- entonces el script ejecuta secuencialmente para cada fragmento (clickhouse-client -h $ host) una solicitud del formulario (para agregados de usuarios):
INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num
Esto no es del todo óptimo y puede generar mucha interacción de red entre hosts. Sin embargo, al agregar nuevos fragmentos, todo sigue funcionando de forma inmediata, se mantiene la localidad de los datos de las unidades, por lo que decidimos no preocuparnos mucho por eso.
Tenemos a Azkaban como el programador de tareas. No diría que esta es una herramienta súper conveniente, pero hace frente a su tarea perfectamente, incluso cuando se trata de construir tuberías un poco más complejas y cuando un script necesita esperar a que se completen varios otros.
El tiempo total que se dedica a convertir los eventos que ahora existen en agregados es de 15 minutos.
Prueba
Todas las mañanas realizamos pruebas automatizadas que responden preguntas sobre datos sin procesar, así como la disponibilidad y la calidad de los agregados: "Compruebe que para ayer no había más de medio por ciento menos de datos o datos únicos sobre datos sin procesar o en agregados en comparación con el mismo día hace una semana ".
Tecnológicamente, estas son pruebas unitarias comunes que usan JUnit e implementan el controlador jdbc para ClickHouse. La ejecución de todas las pruebas se inicia en TeamCity y dura aproximadamente 30 segundos en 1 subproceso, y en caso de fallas, recibimos notificaciones VKontakte de nuestro maravilloso bot TeamCity.
Conclusión
Use solo versiones estables de ClickHouse y su cabello será suave y sedoso. Vale la pena agregar que
ClickHouse no se ralentiza .