Intégration continue dans Yandex. 2e partie

Dans l' article précédent , nous avons parlé du transfert du développement vers un référentiel unique avec une approche de développement basée sur des troncs, avec des systèmes unifiés pour l'assemblage, les tests, le déploiement et la surveillance, sur les tâches qu'un système d'intégration continue doit résoudre pour fonctionner efficacement dans de telles conditions.


Aujourd'hui, nous parlerons aux lecteurs Habr du dispositif du système d'intégration continue.


image


Un système d'intégration continue doit fonctionner de manière fiable et rapide. Le système doit réagir rapidement aux événements entrants et ne doit pas entraîner de retards supplémentaires dans le processus de livraison des résultats de test à l'utilisateur. Les résultats de l'assemblage et des tests doivent être fournis à l'utilisateur en temps réel.


Le système d'intégration continue est un système de traitement de données en continu avec des retards minimaux.


Après avoir envoyé tous les résultats à un certain stade (configuration, génération, style, petits tests, tests moyens, etc.), le système de génération le signale au système d'intégration continue («ferme» le stade), et l'utilisateur voit que pour cette vérification et A ce stade, tous les résultats sont connus. Chaque étape se ferme indépendamment. L'utilisateur reçoit plus rapidement un signal utile. Après avoir fermé toutes les étapes, la vérification est considérée comme terminée.


Pour implémenter le système, nous avons choisi l'architecture Kappa . Le système se compose de 2 sous-systèmes:


  • Le traitement des événements et des données a lieu dans un circuit en temps réel. Toutes les données d'entrée sont traitées comme des flux de données (flux). Tout d'abord, les événements sont enregistrés dans le flux et ce n'est qu'ensuite qu'ils sont traités.
  • Les résultats du traitement des données sont continuellement écrits dans la base de données, où passent ensuite les appels via l'API. Dans l'architecture Kappa, cela s'appelle la couche de desserte.

Toutes les demandes de modification de données doivent passer par le circuit en temps réel, car là, vous devez toujours avoir l'état actuel du système. Les demandes de lecture vont uniquement à la base de données.




Dans la mesure du possible, nous suivons la règle de l'ajout uniquement. Aucune modification ou suppression d'objets, à l'exception de la suppression d'anciennes données inutiles.


Plus de 2 To de données brutes transitent par le service par jour.


Avantages:


  • Les flux contiennent tous les événements et messages. Nous pouvons toujours comprendre ce qui s'est passé et quand. Le flux peut être perçu comme un gros journal.
  • Haute efficacité et surcharge minimale. Il s'agit d'un système entièrement orienté événement, sans aucune perte d'interrogation. Il n'y a aucun événement - nous ne faisons rien de plus.
  • Le code d'application ne traite pratiquement pas les primitives de synchronisation des threads et la mémoire partagée entre les threads. Cela rend le système plus fiable.
  • Les processeurs sont bien isolés les uns des autres, car n'interagissent pas directement, uniquement via des flux. Une bonne couverture de test peut être fournie.

Mais le traitement des données en streaming n'est pas si simple:


  • Une bonne compréhension du modèle de calcul est requise. Vous devrez repenser les algorithmes de traitement des données existants. Tous les algorithmes ne tombent pas immédiatement dans le modèle de flux et vous devez vous casser la tête un peu.
  • Il est nécessaire de garantir la conservation de l'ordre de réception et le traitement des événements.
  • Vous devez être capable de gérer des événements interdépendants, c'est-à-dire avoir un accès rapide à toutes les données nécessaires lors du traitement d'un nouveau message.
  • Vous devez également pouvoir gérer les événements en double.

Traitement de flux


Tout en travaillant sur le projet, la bibliothèque Stream Processor a été écrite, ce qui nous a aidés à implémenter et à lancer rapidement des algorithmes de traitement de données en streaming en production.


Stream Processor est une bibliothèque pour la construction de systèmes de traitement de données en streaming. Le flux est une séquence potentiellement infinie de données (messages) dans laquelle seul l'ajout de nouveaux messages est possible; les messages déjà enregistrés ne sont pas modifiés et ne sont pas supprimés du flux. Les convertisseurs d'un flux à un autre (processeurs de flux) se composent fonctionnellement de trois parties: un fournisseur de messages entrants, qui lit généralement les messages d'un ou plusieurs flux et les place dans une file d'attente de traitement, un processeur de messages qui convertit les messages entrants en messages sortants et les place dans une file d'attente à l'enregistrement et au rédacteur, où les messages sortants regroupés dans la fenêtre de temps tombent dans le flux de sortie. Les messages de données générés par un processeur de flux peuvent être utilisés par d'autres ultérieurement. Ainsi, les flux et les processeurs forment un graphe orienté dans lequel des boucles sont possibles, en particulier, un processeur de flux peut même générer des messages dans le même flux d'où il reçoit des données.


Il est garanti que chaque message du flux d'entrée sera traité par chaque processeur qui lui est associé au moins une fois (sémantique au moins une fois). Il est également garanti que tous les messages seront traités dans l'ordre dans lequel ils sont arrivés dans ce flux. Pour ce faire, les processeurs de flux sont répartis sur tous les nœuds de service actifs, de sorte qu'à un moment donné, pas plus d'une instance de chaque processeur enregistré ne fonctionne.


Le traitement des événements interdépendants est l'un des principaux problèmes rencontrés dans la construction de systèmes de traitement de données en continu. En règle générale, lors du streaming de messages, les processeurs de flux créent progressivement un certain état qui était valide au moment où le message en cours a été traité. De tels objets d'état sont généralement associés non pas à l'ensemble du flux dans son ensemble, mais à un certain sous-ensemble de messages, qui est déterminé par la valeur de clé dans ce flux. Un stockage efficace de la richesse est la clé du succès. Lors du traitement du message suivant, il est important que le processeur puisse obtenir rapidement cet état et, sur la base de celui-ci et du message actuel, générer des messages sortants. Ces objets d'état sont accessibles aux processeurs en L1 (veuillez ne pas confondre avec le cache CPU) LRU cache, qui est situé en mémoire. Dans le cas où il n'y avait aucun état dans le cache L1, il est restauré à partir du cache L2 situé dans le même stockage où les flux sont stockés et où il est périodiquement stocké pendant le fonctionnement du processeur. S'il n'y avait aucun état dans le cache L2, il est restauré à partir des messages de flux d'origine, comme si le processeur avait traité tous les messages d'origine associés à la clé de message actuelle. La technique de mise en cache vous permet également de traiter le problème de la latence élevée du stockage, car souvent le traitement séquentiel ne repose pas sur les performances du serveur, mais sur le retard des demandes et des réponses lors de la communication avec l'entrepôt de données.




Pour stocker efficacement les données dans les caches L1 et les données de message en mémoire, en plus des structures économes en mémoire, nous utilisons des pools d'objets qui vous permettent de n'avoir qu'une seule copie d'un objet (ou même des parties de celui-ci) en mémoire. Cette technique est déjà utilisée dans le JDK pour les chaînes internes aux chaînes et s'étend également aux autres types d'objets, qui devraient être immuables.


Pour un stockage compact des données dans le stockage de flux, certaines données sont normalisées avant d'écrire dans le flux, c'est-à-dire transformer en chiffres. Des algorithmes de compression efficaces peuvent ensuite être appliqués aux nombres (identificateurs d'objet). Les nombres sont triés, les deltas sont comptés, puis encodés avec le codage ZigZag puis compressés par l'archiveur. La normalisation n'est pas une technique très standard pour diffuser des systèmes de traitement de données. Mais cette technique de compression est très efficace et la quantité de données dans le flux le plus chargé est réduite d'environ 1 000 fois.




Pour chaque flux et processeur, nous suivons le cycle de vie du traitement des messages: l'apparition de nouveaux messages dans le flux d'entrée, la taille de la file d'attente des messages non traités, la taille de la file d'attente pour l'écriture dans le flux résultant, le temps de traitement des messages et la répartition du temps par étapes de traitement des messages:




Entrepôt de données


Les résultats du traitement en continu des données doivent être mis à la disposition de l'utilisateur dès que possible. Les données traitées des flux doivent être enregistrées en continu dans la base de données, où vous pouvez ensuite aller chercher les données (par exemple, afficher un rapport avec les résultats du test, afficher l'historique du test).


Caractéristiques des données et requêtes stockées.
La plupart des données sont des tests. Sur un mois, plus de 1,5 milliard de builds et de tests sont lancés. Une quantité assez importante d'informations est stockée pour chaque lancement: le résultat et le type d'erreur, une brève description de l'erreur (extrait), plusieurs liens vers les journaux, la durée du test, un ensemble de valeurs numériques, des métriques, au format nom = valeur, etc. Certaines de ces données - par exemple, les mesures et la durée - sont très difficiles à compresser, car il s'agit en fait de valeurs aléatoires. L'autre partie - par exemple, le résultat, le type d'erreur, les journaux - peut être enregistrée plus efficacement, car ils ne changent presque pas dans le même test d'une exécution à l'autre.


Auparavant, nous utilisions MySQL pour stocker des données traitées. Nous avons progressivement commencé à nous reposer sur les capacités de la base de données:


  • La quantité de données traitées double tous les six mois.
  • Nous ne pouvions stocker les données que pour les 2 derniers mois, mais nous voulions stocker les données pendant au moins un an.
  • Problèmes avec la vitesse d'exécution de quelques requêtes lourdes (proches de l'analyse).
  • Schéma de base de données compliqué. Nombreuses tables (normalisation), ce qui complique l'écriture dans la base de données. Le schéma de base est très différent du schéma des objets utilisés dans le circuit en temps réel.
  • Ne rencontrant pas d'arrêt du serveur. La défaillance d'un serveur distinct ou l'arrêt du centre de données peut entraîner une défaillance du système.
  • Opération assez compliquée.

En tant que candidats au nouvel entrepôt de données, nous avons envisagé plusieurs options: PostgreSQL, MongoDB et plusieurs solutions internes, y compris ClickHouse .


Certaines solutions ne nous permettent pas de stocker nos données plus efficacement que l'ancienne solution basée sur MySQL. D'autres ne permettent pas l'implémentation de requêtes rapides et complexes (presque analytiques). Par exemple, nous avons une demande assez lourde qui montre les commits qui affectent un projet spécifique (un ensemble de tests). Dans tous les cas où nous ne pouvons pas exécuter de requêtes SQL rapides, nous devons obliger l'utilisateur à attendre longtemps ou à faire des calculs à l'avance avec une perte de flexibilité. Si vous comptez quelque chose à l'avance, vous devez écrire plus de code et en même temps perdre de la flexibilité - il n'y a aucun moyen de changer rapidement le comportement et de raconter quoi que ce soit. Il est beaucoup plus pratique et plus rapide d'écrire une requête SQL qui renverra les données dont l'utilisateur a besoin et pourra les modifier rapidement si vous voulez changer le comportement du système.


Clickhouse


Nous avons opté pour ClickHouse . ClickHouse est un système de gestion de base de données en colonnes (SGBD) pour le traitement des requêtes analytiques en ligne (OLAP).


En passant à ClickHouse, nous avons délibérément abandonné certaines des opportunités offertes par d'autres SGBD, en recevant une compensation plus que digne sous forme de requêtes analytiques très rapides et d'un entrepôt de données compact.


Dans les SGBD relationnels, les valeurs liées à une ligne sont physiquement stockées côte à côte. Dans ClickHouse, les valeurs de différentes colonnes sont stockées séparément et les données d'une colonne sont stockées ensemble. Cet ordre de stockage des données vous permet de fournir un degré élevé de compression des données avec le bon choix de clé primaire. Cela affecte également dans quels scénarios le SGBD fonctionnera bien. ClickHouse fonctionne mieux avec les requêtes, où un petit nombre de colonnes sont lues et la requête utilise une grande table et les autres tables sont petites. Mais même dans les requêtes non analytiques, ClickHouse peut afficher de bons résultats.


Les données des tableaux sont triées par clé primaire. Le tri est effectué en arrière-plan. Cela vous permet de créer un index clairsemé d'un petit volume, ce qui vous permet de trouver rapidement des données. ClickHouse n'a aucun index secondaire. À strictement parler, il existe un index secondaire - la clé de partition (ClickHouse coupe les données de partition là où la clé de partition est spécifiée dans la demande). Plus de détails .


Le schéma de données avec normalisation n'est pas fonctionnel, au contraire, il est préférable de dénormaliser les données en fonction des demandes qui lui sont adressées. Il est préférable de créer des tableaux "larges" avec un grand nombre de colonnes. Cet élément est également lié au précédent, car l'absence d'index secondaires crée parfois des copies de tables à l'aide d'une clé primaire différente.


ClickHouse n'a pas UPDATE et DELETE au sens classique, mais il est possible de les émuler.


Les données doivent être insérées dans de grands blocs et pas trop souvent (une fois toutes les quelques secondes). Le chargement de données ligne par ligne est pratiquement inopérant sur des volumes de données réels.


ClickHouse ne prend pas en charge les transactions; le système devient finalement cohérent .


Néanmoins, certaines fonctionnalités de ClickHouse, similaires à d'autres SGBD, facilitent le transfert de systèmes existants vers celui-ci.


  • ClickHouse utilise SQL, mais avec de légères différences, utile pour les requêtes typiques des systèmes OLAP. Il existe un puissant système de fonctions d'agrégation, ALL / ANY JOIN, des expressions lambda dans les fonctions et d'autres extensions SQL qui vous permettent d'écrire presque n'importe quelle requête analytique.
  • ClickHouse prend en charge la réplication, l' enregistrement du quorum et la lecture du quorum. Une écriture de quorum est nécessaire pour un stockage fiable des données: INSERT ne réussit que si ClickHouse a pu écrire des données dans un nombre donné de répliques sans erreur.

Vous pouvez en savoir plus sur les fonctionnalités de ClickHouse dans la documentation .


Caractéristiques de travailler avec ClickHouse


Choix de la clé primaire et de la clé de partition.


Comment choisir une clé primaire et une clé de partition? C'est peut-être la première question qui se pose lors de la création d'une nouvelle table. Le choix de la clé primaire et de la clé de partition est généralement dicté par les requêtes qui seront effectuées sur les données. En même temps, les requêtes qui utilisent les deux conditions s'avèrent être les plus efficaces: à la fois par la clé primaire et par la clé de partition.


Dans notre cas, les tableaux principaux sont les matrices d'exécution des tests. Il est logique de supposer qu'avec cette structure de données, les clés doivent être sélectionnées de manière à ce que l'ordre de contournement de l'une passe dans l'ordre d'augmentation du numéro de ligne et dans l'ordre de contournement de l'autre - dans l'ordre d'augmentation du numéro de colonne.


Il est également important de garder à l'esprit que le choix de la clé primaire peut considérablement affecter la compacité du stockage de données, car des valeurs identiques dans le contournement de la clé primaire dans d'autres colonnes n'occupent presque pas d'espace dans le tableau. Ainsi, dans notre cas, par exemple, les états des tests changent peu de commit à commit. Ce fait a prédéterminé essentiellement le choix de la clé primaire - une paire d'identifiant de test et de numéro de validation. De plus, dans cet ordre.




La clé de partition a deux objectifs. D'une part, il permet aux partitions d'être «archivées» afin de pouvoir être définitivement supprimées du stockage, car les données qu'elles contiennent sont déjà obsolètes. D'un autre côté, la clé de partition est un index secondaire, ce qui signifie qu'elle vous permet d'accélérer les requêtes si une expression y est présente.


Pour nos matrices, le choix du numéro de commit comme clé de partition semble assez naturel. Mais si vous définissez la valeur de révision dans l'expression de la clé de partition, il y aura trop de partitions dans une telle table, ce qui dégradera les performances des requêtes. Par conséquent, dans l'expression de la clé de partition, la valeur de révision peut être divisée en un grand nombre pour réduire le nombre de partitions, par exemple, PARTITION BY intDiv (révision, 2000). Ce nombre doit être suffisamment grand pour que le nombre de partitions ne dépasse pas les valeurs recommandées, tandis qu'il doit être suffisamment petit pour que peu de données ne tombent dans une partition et que la base de données n'ait pas à lire trop de données.


Comment implémenter UPDATE et DELETE?


Dans le sens habituel, UPDATE et DELETE ne sont pas pris en charge dans ClickHouse. Cependant, au lieu de UPDATE et DELETE, vous pouvez ajouter une colonne avec une version à la table et utiliser le moteur ReplacingMergeTree spécial (supprime les enregistrements en double avec la même valeur de clé primaire). Dans certains cas, la version sera naturellement présente dans la table dès le début: par exemple, si nous voulons créer une table pour l'état actuel du test, la version dans cette table sera le numéro de commit.


CREATE TABLE current_tests ( test_id UInt64, value Nullable(String), version UInt64 ) ENGINE = ReplacingMergeTree(version) ORDER BY test_id 

Dans le cas d'un changement d'enregistrement, nous ajoutons la version avec une nouvelle valeur, dans le cas de la suppression, avec une valeur NULL (ou une autre valeur spéciale qui ne peut pas être trouvée dans les données).


Qu'avez-vous réalisé avec le nouveau stockage?


L'un des principaux objectifs du passage à ClickHouse était de pouvoir stocker l'historique des tests sur une longue période (plusieurs années, ou au moins un an dans le pire des cas). Déjà au stade du prototype, il est devenu clair que nous pouvions contourner les SSD existants dans nos serveurs pour stocker au moins une histoire de trois ans. Les requêtes analytiques se sont considérablement accélérées, nous pouvons désormais extraire des informations beaucoup plus utiles de nos données. La marge RPS a augmenté. De plus, cette valeur est mise à l'échelle presque linéairement par l'ajout de nouveaux serveurs au cluster ClickHouse. La création d'un nouvel entrepôt de données pour la base de données ClickHouse n'est qu'une étape à peine perceptible pour l'utilisateur final vers un objectif plus important - l'ajout de nouvelles fonctionnalités, l'accélération et la simplification du développement, grâce à la possibilité de stocker et de traiter de grandes quantités de données.


Venez chez nous


Notre département est en constante expansion. Visitez-nous si vous souhaitez travailler sur des tâches et des algorithmes complexes et intéressants. Si vous avez des questions, vous pouvez me les poser directement en PM.


Liens utiles


Traitement de flux



Architecture Kappa



ClickHouse:


Source: https://habr.com/ru/post/fr429956/


All Articles