
En développant n'importe quel produit, qu'il s'agisse d'un service vidéo ou d'une bande, d'histoires ou d'articles, je veux pouvoir mesurer le "bonheur" conditionnel de l'utilisateur. Pour comprendre si nous apportons des changements meilleurs ou pires, pour ajuster la direction du développement du produit, en fonction non pas de l'intuition et de nos propres sentiments, mais des métriques et des chiffres auxquels vous pouvez croire.
Dans cet article, je vais vous expliquer comment nous avons réussi à lancer des statistiques et des analyses de produits sur un service avec une audience mensuelle de 97 millions de dollars, tout en obtenant des requêtes analytiques extrêmement performantes. Nous parlerons de ClickHouse, des moteurs utilisés et des fonctionnalités des requêtes. Je vais décrire une approche de l'agrégation de données, qui nous permet d'obtenir des métriques complexes en une fraction de seconde, et parler de la conversion et des tests de données.
Maintenant, nous avons environ 6 milliards d'événements alimentaires par jour, dans un avenir proche, nous atteindrons 20 à 25 milliards. Et puis - pas à un rythme aussi rapide, nous atteindrons 40 à 50 milliards d'ici la fin de l'année, lorsque nous décrirons tous les événements alimentaires qui nous intéressent.
1 rangs en jeu. Écoulé: 0,287 s. 59,85 milliards de lignes traitées, 59,85 Go (208,16 milliards de lignes / s., 208,16 Go / s.)Détails sous la coupe.
Préface
Les outils analytiques étaient VKontakte avant. Des utilisateurs uniques ont été pris en compte, il a été possible de construire des plannings d'événements par tranches et ainsi de tomber dans les profondeurs du service. Il s'agissait cependant de tranches fixes d'avance, de données agrégées, de HLL pour les uniques, d'une certaine rigidité et incapacité à répondre rapidement à des questions un peu plus compliquées que "combien?"
Bien sûr, il y avait, il y a et aura hadoop, il a également été écrit, écrit et sera écrit beaucoup, beaucoup de journaux d'utilisation des services. Malheureusement, hdfs n'a été utilisé que par certaines équipes pour implémenter leurs propres tâches. Encore plus tristement, hdfs ne concerne pas les requêtes analytiques rapides: il y avait des questions dans de nombreux domaines, dont les réponses devaient être trouvées dans le code, et non dans la documentation accessible à tous.
Nous sommes arrivés à la conclusion qu'il n'est plus possible de vivre comme ça. Chaque équipe doit disposer de données, les requêtes doivent être rapides et les données elles-mêmes doivent être précises et riches en paramètres utiles.
Par conséquent, nous avons formulé des exigences claires pour le nouveau système de statistiques / analytiques:
- les requêtes analytiques doivent être rapides;
- les données sont assez précises, idéalement ce sont des événements d'interaction utilisateur bruts avec le service;
- la structure des événements doit être décrite, comprise et accessible;
- stockage fiable des données, garantie de livraison unique;
- il est possible de compter les uniques, l'audience (quotidienne, hebdomadaire, mensuelle), les mesures de rétention, le temps passé par l'utilisateur dans le service, les actions quantifiées sur les mesures uniques et autres par l'ensemble de tranches;
- les tests, la conversion des données et la visualisation sont en cours.
Dans la cuisine
L'expérience a suggéré que nous avions besoin de deux bases de données: une lente, où nous agrégerions et enrichirions les données, et une rapide, où nous pourrions travailler avec ces données et construire des graphiques par-dessus. C'est l'une des approches les plus courantes, dans laquelle dans une base de données lente, par exemple, dans hdfs, différentes projections sont construites - sur des uniques et sur le nombre d'événements par tranches pendant une certaine période de temps.
Par une chaude journée de septembre, en discutant autour d'une tasse de thé dans la cuisine donnant sur la cathédrale de Kazan, nous avons eu l'idée d'essayer ClickHouse comme base rapide - à cette époque, nous l'utilisions déjà pour stocker des journaux techniques. De nombreux doutes étaient liés principalement à la vitesse et à la fiabilité: les tests de performances déclarés semblaient irréalistes et les nouvelles versions de la base de données interrompaient périodiquement les fonctionnalités existantes. Par conséquent, la proposition était simple - à essayer.
Premiers échantillons
Nous avons déployé un cluster de deux machines avec cette configuration:
2xE5-2620 v4 (32 cœurs au total), 256 Go de RAM, 28 places (raid10 avec ext4).
Au départ, c'était proche de la mise en page, mais nous sommes passés à loin. ClickHouse possède de nombreux moteurs de table différents, mais les principaux sont de la famille MergeTree. Nous avons choisi ReplicatedReplacingMergeTree avec à peu près les paramètres suivants:
PARTITION BY dt ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id) SAMPLE BY cityHash64(user_id) SETTINGS index_granularity = 8192;
Répliqué - signifie que la table est répliquée, ce qui résout l'une de nos exigences de fiabilité.
Remplacement - le tableau prend en charge la déduplication par la clé primaire: par défaut, la clé primaire correspond à la clé de tri, donc la section ORDER BY vous indique simplement quelle est la clé primaire.
SAMPLE BY - Je voulais également essayer d'échantillonner: sample renvoie un échantillon uniformément pseudo-aléatoire.
index_granularity = 8192 est le nombre magique de lignes de données entre les empattements d'index (oui, c'est rare), qui est utilisé par défaut. Nous ne l'avons pas changé.
Le partitionnement a été effectué le jour (bien que par défaut - par mois). De nombreuses demandes de données étaient censées être intrajournalières - par exemple, créer un graphique minute des vues vidéo pour un jour donné.
Ensuite, nous avons pris un morceau de journaux techniques et rempli la table avec environ un milliard de lignes. Excellente compression, regroupement par type de colonne Int *, comptage de valeurs uniques - tout a fonctionné incroyablement vite!
En parlant de vitesse, je veux dire que pas une seule requête n'a duré plus de 500 ms, et la plupart d'entre elles tiennent dans 50-100 ms. Et cela se fait sur deux machines - et, en fait, une seule a été impliquée dans les calculs.
Nous avons examiné tout cela et imaginé qu'au lieu de la colonne UInt8, il y aurait un identifiant du pays, et la colonne Int8 serait remplacée par des données, par exemple, sur l'âge de l'utilisateur. Et ils ont réalisé que ClickHouse nous convenait parfaitement, si tout était fait correctement.
Saisie forte des données
L'avantage de ClickHouse commence exactement lorsque le schéma de données correct est formé. Exemple: plateforme String - mauvaise, plateforme Int8 + dictionnaire - bonne, LowCardinality (String) - pratique et bonne (je parlerai de LowCardinality un peu plus tard).
Nous avons créé une classe de générateur spéciale en php, qui, sur demande, crée des classes wrapper sur des événements basés sur des tables dans ClickHouse, et un point d'entrée unique pour la journalisation. Je vais expliquer l'exemple du schéma qui s'est avéré:
- L'analyste / ingénieur / développeur de données décrit la documentation: quels champs, valeurs possibles, événements doivent être enregistrés.
- Un tableau est créé dans ClickHouse conformément à la structure de données du paragraphe précédent.
- Des classes d'habillage pour les événements basés sur une table sont générées.
- L'équipe produit implémente le remplissage des champs d'un objet de cette classe, l'envoi.
Changer le schéma au niveau php et le type de données enregistrées ne fonctionnera pas sans d'abord changer la table dans ClickHouse. Et cela, à son tour, ne peut se faire sans coordination avec l'équipe, modifications de la documentation et description des événements.
Pour chaque événement, vous pouvez définir deux paramètres qui contrôlent respectivement le pourcentage d'événements envoyés à ClickHouse et hadoop. Les paramètres sont nécessaires principalement pour un roulement progressif avec la possibilité de réduire la journalisation en cas de problème. Avant hadoop, les données sont livrées de manière standard à l'aide de Kafka. Et dans ClickHouse, ils volent à travers un
schéma avec KittenHouse en mode persistant, ce qui garantit au moins une livraison d'événement unique.
L'événement est remis à la table tampon dans le fragment souhaité, sur la base du reste de la division du hachage de user_id par le nombre de fragments dans le cluster. Ensuite, la table tampon vide les données dans le ReplicatedReplacingMergeTree local. Et au-dessus des tables locales, une table distribuée est extraite avec le moteur distribué, qui vous permet d'accéder aux données de tous les fragments.
Dénormalisation
ClickHouse est un SGBD en colonnes. Il ne s'agit pas de formulaires normaux, ce qui signifie qu'il vaut mieux avoir toutes les informations sur l'événement que de se joindre. Il y a aussi Join, mais si la bonne table ne tient pas en mémoire, la douleur commence. Par conséquent, nous avons pris une décision ferme: toutes les informations qui nous intéressent doivent être stockées dans l'événement lui-même. Par exemple, le sexe, l'âge de l'utilisateur, le pays, la ville, l'anniversaire - toutes ces informations publiques peuvent être utiles pour l'analyse d'audience, ainsi que toutes les informations utiles sur l'objet d'interaction. Si, par exemple, nous parlons de vidéo, il s'agit de video_id, video_owner_id, la date de mise en ligne de la vidéo, la durée, la qualité au moment de l'événement, la qualité maximale, etc.
Au total, dans chaque table, nous avons de 50 à 200 colonnes, tandis que dans toutes les tables, il y a des champs de service. Par exemple, le journal des erreurs est error_log - en fait, nous appelons une erreur hors de portée du type. Au cas où des valeurs étranges dépasseraient la taille du type dans le champ avec l'âge.
Type LowCardinality (T)
ClickHouse a la possibilité d'utiliser des dictionnaires externes. Ils sont stockés en mémoire, mis à jour périodiquement, peuvent être efficacement utilisés dans divers scénarios, y compris comme des ouvrages de référence classiques. Par exemple, vous souhaitez enregistrer le système d'exploitation et vous avez deux alternatives: une chaîne ou un nombre + un répertoire. Bien sûr, sur de grandes quantités de données et pour les requêtes analytiques hautes performances, il est logique d'écrire un nombre et d'obtenir une représentation sous forme de chaîne à partir du dictionnaire lorsque vous avez besoin:
dictGetString('os', 'os_name', toUInt64(os_id))
Mais il existe un moyen beaucoup plus pratique - d'utiliser le type LowCardinality (String), qui crée automatiquement un dictionnaire. La performance avec LowCardinality sous la condition de faible cardinalité de l'ensemble de valeurs est radicalement plus élevée qu'avec String.
Par exemple, nous utilisons LowCardinality (String) pour les types d'événements 'play', 'pause', 'rewind'. Ou pour la plateforme: 'web', 'android', 'iphone':
SELECT vk_platform, count() FROM t WHERE dt = yesterday() GROUP BY vk_platform Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB (13.65 billion rows/s., 41.04 GB/s.)
La fonctionnalité est encore expérimentale, donc pour l'utiliser, vous devez effectuer:
SET allow_experimental_low_cardinality_type = 1;
Mais on a le sentiment qu’après un certain temps, elle ne sera plus sous le décor.
Agrégation de données VKontakte
Puisqu'il y a beaucoup de colonnes et qu'il y a beaucoup d'événements, le désir naturel est de couper les «vieilles» partitions, mais d'abord - d'assembler les unités. Parfois, il est nécessaire d'analyser les événements bruts (il y a un mois ou un an), afin de ne pas couper les données en hdfs - tout analyste peut contacter le parquet souhaité pour n'importe quelle date.
En règle générale, lors de l'agrégation dans un intervalle de temps, nous nous appuyons toujours sur le fait que le nombre de lignes par unité de temps est égal au produit de la puissance de coupure. Cela impose des restrictions: les pays commencent à se regrouper en groupes tels que `` Russie '', `` Asie '', `` Europe '', `` Le reste du monde '', et les âges - à intervalles pour réduire la dimension à un million de lignes conditionnelles par date.
Agrégation par dt, user_id
Mais nous avons un ClickHouse réactif! Pouvons-nous accélérer à 50-100 millions de lignes à une date?
Des tests rapides ont montré que nous pouvions, et à ce moment une idée simple a surgi - laisser l'utilisateur dans la machine. À savoir, pour agréger non pas par «date, tranches» à l'aide d'outils spark, mais par «date, utilisateur» signifie par ClickHouse, tout en faisant une «transposition» des données.
Avec cette approche, nous stockons les utilisateurs dans des données agrégées, ce qui signifie que nous pouvons toujours prendre en compte les indicateurs d'audience, la rétention et les mesures de fréquence. Nous pouvons connecter des unités, en comptant les audiences communes de plusieurs services jusqu'à l'audience VKontakte entière. Tout cela peut être fait par n'importe quelle tranche qui est présente dans le tableau pour le même temps conditionnellement.
Je vais illustrer avec un exemple:

Après agrégation (beaucoup plus de colonnes à droite):

Dans ce cas, l'agrégation se produit précisément par (dt, user_id). Pour les champs contenant des informations utilisateur, avec une telle agrégation, vous pouvez utiliser les fonctions any, anyHeavy (sélectionne une valeur fréquente). Vous pouvez, par exemple, collecter anyHeavy (plateforme) dans un agrégat pour savoir quelle plateforme l'utilisateur utilise pour la plupart à partir d'événements vidéo. Si vous le souhaitez, vous pouvez utiliser groupUniqArray (plateforme) et stocker un tableau de toutes les plateformes à partir desquelles l'utilisateur a déclenché l'événement. Si cela ne suffit pas, vous pouvez créer des colonnes distinctes pour la plateforme et stocker, par exemple, le nombre de vidéos uniques visionnées à la moitié à partir d'une plateforme spécifique:
uniqCombinedIf(cityHash64(video_owner_id, video_id), (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android
Avec cette approche, on obtient un agrégat assez large dans lequel chaque ligne est un utilisateur unique, et chaque colonne contient des informations soit sur l'utilisateur, soit sur son interaction avec le service.
Il s'avère que pour calculer la DAU d'un service, il suffit d'exécuter une telle demande au-dessus de son agrégat:
SELECT dt, count() as DAU FROM agg GROUP BY dt Elapsed: 0.078 sec.
Ou calculez combien de jours les utilisateurs ont été dans le service pour la semaine:
SELECT days_in_service, count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 2.922 sec.
Nous pouvons accélérer par échantillonnage, tout en perdant presque toute précision:
SELECT days_in_service, 10 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 10 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 0.454 sec.
Il convient de noter tout de suite que l'échantillonnage n'est pas basé sur le pourcentage d'événements, mais sur le pourcentage d'utilisateurs - et en conséquence, il devient un outil incroyablement puissant.
Ou la même chose pendant 4 semaines avec 1/100 d'échantillonnage - environ 1% de résultats moins précis sont obtenus.
SELECT days_in_service, 100 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 100 WHERE dt > (yesterday() - 28) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 28 rows in set. Elapsed: 0.287 sec.
Agrégation d'autre part
Lors de l'agrégation par (dt, user_id), nous ne perdons pas l'utilisateur, nous ne manquons pas d'informations sur son interaction avec le service, mais, bien sûr, nous perdons les métriques sur un objet d'interaction spécifique. Mais vous ne pouvez pas perdre cela non plus - construisons l'unité en
(dt, video_owner_id, video_id), en adhérant aux mêmes idées. Nous conservons autant que possible les informations sur la vidéo, nous ne manquons pas de données sur l'interaction de la vidéo avec l'utilisateur et nous manquons complètement les informations sur l'utilisateur spécifique.
SELECT starts FROM agg3 WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...) 1 rows in set. Elapsed: 0.030 sec
Ou le top 10 des vidéos vues hier:
SELECT video_id, video_owner_id, watches FROM video_agg_video_d1 WHERE dt = yesterday() ORDER BY watches DESC LIMIT 10 10 rows in set. Elapsed: 0.035 sec.
En conséquence, nous avons un schéma d'agrégats de la forme:
- agrégation par «date, utilisateur» dans le produit;
- agrégation par «date, objet d'interaction» au sein du produit;
- parfois d'autres projections surgissent.
Azkaban et TeamCity
Enfin, quelques mots sur l'infrastructure. Notre collecte d'agrégats commence la nuit, en commençant par OPTIMISER sur chacune des tables avec des données brutes pour déclencher une fusion de données extraordinaire dans ReplicatedReplacingMergeTree. L'opération peut durer assez longtemps, cependant, il est nécessaire de retirer les prises, si elles se produisent. Il convient de noter que, jusqu'à présent, je n'ai jamais rencontré de doublons, mais rien ne garantit qu'ils n'apparaîtront pas à l'avenir.
L'étape suivante est la création d'agrégats. Ce sont des scripts bash dans lesquels les événements suivants se produisent:
- nous obtenons d'abord le nombre de fragments et un hôte du fragment:
SELECT shard_num, any(host_name) AS host FROM system.clusters GROUP BY shard_num
- puis le script exécute séquentiellement pour chaque fragment (clickhouse-client -h $ host) une requête du formulaire (pour les agrégats par utilisateurs):
INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num
Ce n'est pas entièrement optimal et peut générer de nombreuses interactions réseau entre les hôtes. Cependant, lors de l'ajout de nouveaux fragments, tout continue à fonctionner, la localité des données pour les unités est conservée, nous avons donc décidé de ne pas trop nous en préoccuper.
Nous avons Azkaban comme planificateur de tâches. Je ne dirais pas que c'est un outil super pratique, mais il fait parfaitement face à sa tâche, y compris lorsqu'il s'agit de construire des pipelines légèrement plus complexes et lorsqu'un script doit attendre que plusieurs autres se terminent.
Le temps total consacré à la conversion des événements existants en agrégats est de 15 minutes.
Test
Chaque matin, nous effectuons des tests automatisés qui répondent aux questions concernant les données brutes, ainsi que la disponibilité et la qualité des agrégats: «Vérifiez que pour hier il n'y avait pas plus d'un demi pour cent de données en moins ou des données uniques sur les données brutes ou dans les agrégats par rapport au même jour il y a une semaine. "
Technologiquement, ce sont des tests unitaires ordinaires utilisant JUnit et implémentant le pilote jdbc pour ClickHouse. L'exécution de tous les tests est lancée dans TeamCity et prend environ 30 secondes dans 1 thread, et en cas d'échecs, nous recevons des notifications VKontakte de notre merveilleux bot TeamCity.
Conclusion
Utilisez uniquement des versions stables de ClickHouse et vos cheveux seront doux et soyeux. Il convient d'ajouter que
ClickHouse ne ralentit pas .