Kontinuierliche Integration in Yandex. Teil 2

Im vorherigen Artikel haben wir über die Übertragung der Entwicklung auf ein einzelnes Repository mit einem stammbasierten Entwicklungsansatz mit einheitlichen Systemen für Montage, Test, Bereitstellung und Überwachung gesprochen und darüber, welche Aufgaben ein kontinuierliches Integrationssystem lösen muss, um unter solchen Bedingungen effektiv zu arbeiten.


Heute werden wir Habr-Lesern über das Gerät des kontinuierlichen Integrationssystems berichten.


Bild


Ein kontinuierliches Integrationssystem muss zuverlässig und schnell funktionieren. Das System sollte schnell auf eingehende Ereignisse reagieren und keine zusätzlichen Verzögerungen bei der Übermittlung der Testergebnisse an den Benutzer verursachen. Die Ergebnisse der Montage und Prüfung müssen dem Benutzer in Echtzeit übermittelt werden.


Das kontinuierliche Integrationssystem ist ein Streaming-Datenverarbeitungssystem mit minimalen Verzögerungen.


Nachdem alle Ergebnisse zu einem bestimmten Zeitpunkt gesendet wurden (konfigurieren, erstellen, formatieren, kleine Tests, mittlere Tests usw.), signalisiert das Build-System dies dem kontinuierlichen Integrationssystem („schließt“ die Phase), und der Benutzer sieht dies für diese Prüfung und Zu diesem Zeitpunkt sind alle Ergebnisse bekannt. Jede Stufe wird unabhängig geschlossen. Der Benutzer erhält schneller ein nützliches Signal. Nach Abschluss aller Phasen gilt die Prüfung als abgeschlossen.


Um das System zu implementieren, haben wir uns für die Kappa- Architektur entschieden. Das System besteht aus 2 Subsystemen:


  • Die Ereignis- und Datenverarbeitung findet in einer Echtzeitschaltung statt. Alle Eingabedaten werden als Datenströme (Streams) behandelt. Zuerst werden die Ereignisse im Stream aufgezeichnet und erst dann verarbeitet.
  • Die Ergebnisse der Datenverarbeitung werden kontinuierlich in die Datenbank geschrieben, wo dann Aufrufe über die API ausgeführt werden. In der Kappa-Architektur wird dies als Serving Layer bezeichnet.

Alle Anforderungen zur Datenänderung müssen die Echtzeitschaltung durchlaufen, da Sie dort immer den aktuellen Status des Systems haben müssen. Leseanforderungen werden nur an die Datenbank gesendet.




Wo immer möglich, befolgen wir die Nur-Anhängen-Regel. Keine Änderungen oder Löschungen von Objekten, mit Ausnahme des Löschens alter, unnötiger Daten.


Pro Tag werden mehr als 2 TB Rohdaten durch den Dienst geleitet.


Vorteile:


  • Streams enthalten alle Ereignisse und Nachrichten. Wir können immer verstehen, was und wann passiert ist. Stream kann als großes Protokoll wahrgenommen werden.
  • Hohe Effizienz und minimaler Overhead. Es stellt sich heraus, dass es sich um ein vollständig ereignisorientiertes System handelt, bei dem keine Abfragen verloren gehen. Es gibt keine Veranstaltung - wir machen nichts extra.
  • Der Anwendungscode behandelt praktisch nicht die Grundelemente der Thread-Synchronisation und den von Threads gemeinsam genutzten Speicher. Dies macht das System zuverlässiger.
  • Die Prozessoren sind gut voneinander isoliert, weil Interagiere nicht direkt, nur über Streams. Eine gute Testabdeckung kann bereitgestellt werden.

Die Verarbeitung von Streaming-Daten ist jedoch nicht so einfach:


  • Ein gutes Verständnis des Rechenmodells ist erforderlich. Sie müssen vorhandene Datenverarbeitungsalgorithmen überdenken. Nicht alle Algorithmen fallen sofort in das Stream-Modell und Sie müssen Ihren Kopf ein wenig zerschlagen.
  • Es muss sichergestellt werden, dass die Reihenfolge des Eingangs und der Verarbeitung von Ereignissen eingehalten wird.
  • Sie müssen in der Lage sein, miteinander verbundene Ereignisse zu verarbeiten, d. H. Sie haben schnellen Zugriff auf alle erforderlichen Daten, während Sie eine neue Nachricht verarbeiten.
  • Sie müssen auch in der Lage sein, doppelte Ereignisse zu verarbeiten.

Stream-Verarbeitung


Während der Arbeit an dem Projekt wurde die Stream Processor-Bibliothek geschrieben, mit deren Hilfe wir Streaming-Datenverarbeitungsalgorithmen in der Produktion schnell implementieren und starten konnten.


Stream Processor ist eine Bibliothek zum Erstellen von Streaming-Datenverarbeitungssystemen. Stream ist eine potenziell endlose Folge von Daten (Nachrichten), in die nur das Hinzufügen neuer Nachrichten möglich ist. Bereits aufgezeichnete Nachrichten werden nicht geändert oder aus dem Stream gelöscht. Konverter eines Streams in einen anderen (Stream-Prozessoren) bestehen funktional aus drei Teilen: einem Anbieter eingehender Nachrichten, der normalerweise Nachrichten aus einem oder mehreren Streams liest und in eine Verarbeitungswarteschlange stellt, einem Nachrichtenprozessor, der eingehende Nachrichten in ausgehende Nachrichten konvertiert und in eine Warteschlange stellt zum Datensatz und zum Schreiber, wo ausgehende Nachrichten, die innerhalb des Zeitfensters gruppiert sind, in den Ausgabestream fallen. Von einem Stream-Prozessor generierte Datennachrichten können später von anderen verwendet werden. Somit bilden Streams und Prozessoren einen gerichteten Graphen, in dem Schleifen möglich sind, insbesondere kann ein Stream-Prozessor sogar Nachrichten in demselben Stream erzeugen, von dem er Daten empfängt.


Es wird garantiert, dass jede Nachricht des Eingabestreams von jedem ihm zugeordneten Prozessor mindestens einmal verarbeitet wird (Semantik mindestens einmal). Es wird auch garantiert, dass alle Nachrichten in der Reihenfolge verarbeitet werden, in der sie in diesem Stream angekommen sind. Zu diesem Zweck werden Stream-Prozessoren auf alle Arbeitsdienstknoten verteilt, sodass zu einem Zeitpunkt nicht mehr als eine Instanz jedes registrierten Prozessors arbeitet.


Die Verarbeitung miteinander verbundener Ereignisse ist eines der Hauptprobleme beim Erstellen von Systemen für die Streaming-Datenverarbeitung. In der Regel erstellen Stream-Prozessoren beim Streaming von Nachrichten schrittweise einen bestimmten Status, der zum Zeitpunkt der Verarbeitung der aktuellen Nachricht gültig war. Solche Statusobjekte sind normalerweise nicht dem gesamten Stream als Ganzes zugeordnet, sondern einer bestimmten Teilmenge von Nachrichten, die durch den Schlüsselwert in diesem Stream bestimmt wird. Effiziente Speicherung von Wohlstand ist der Schlüssel zum Erfolg. Bei der Verarbeitung der nächsten Nachricht ist es wichtig, dass der Prozessor diesen Status schnell abrufen und basierend auf dieser und der aktuellen Nachricht ausgehende Nachrichten generieren kann. Auf diese Statusobjekte können Prozessoren in L1 zugreifen (bitte nicht mit dem CPU-Cache verwechseln). Der LRU-Cache befindet sich im Speicher. Falls im L1-Cache kein Status vorhanden war, wird er aus dem L2-Cache wiederhergestellt, der sich in demselben Speicher befindet, in dem die Streams gespeichert sind, und in dem er während des Prozessorbetriebs regelmäßig gespeichert wird. Wenn im L2-Cache kein Status vorhanden war, wird er aus den ursprünglichen Stream-Nachrichten wiederhergestellt, als hätte der Prozessor alle ursprünglichen Nachrichten verarbeitet, die dem aktuellen Nachrichtenschlüssel zugeordnet sind. Mit der Caching-Technik können Sie auch das Problem der hohen Latenz des Speichers lösen, da die sequentielle Verarbeitung häufig nicht von der Leistung des Servers abhängt, sondern von der Verzögerung von Anforderungen und Antworten bei der Kommunikation mit dem Data Warehouse.




Um Daten in L1-Caches und Nachrichtendaten effektiv im Speicher zu speichern, verwenden wir zusätzlich zu speichereffizienten Strukturen Objektpools, mit denen Sie nur eine Kopie eines Objekts (oder sogar Teile davon) im Speicher haben können. Diese Technik wird bereits im JDK zum String-Internieren von Strings verwendet und erstreckt sich in ähnlicher Weise auf andere Objekttypen, die unveränderlich sein sollten.


Für eine kompakte Speicherung von Daten im Stream-Speicher werden einige Daten vor dem Schreiben in den Stream normalisiert, d. H. in Zahlen verwandeln. Effektive Komprimierungsalgorithmen können dann auf Zahlen (Objektkennungen) angewendet werden. Zahlen werden sortiert, Deltas werden gezählt, dann mit ZigZag-Codierung codiert und dann vom Archivierer komprimiert. Die Normalisierung ist keine Standardtechnik für das Streaming von Datenverarbeitungssystemen. Diese Komprimierungstechnik ist jedoch sehr effektiv und die Datenmenge im am meisten geladenen Stream wird um das 1000-fache reduziert.




Für jeden Stream und Prozessor verfolgen wir den Lebenszyklus der Nachrichtenverarbeitung: das Auftreten neuer Nachrichten im Eingabestream, die Größe der Warteschlange für nicht verarbeitete Nachrichten, die Größe der Warteschlange zum Schreiben in den resultierenden Stream, die Nachrichtenverarbeitungszeit und die Verteilung der Zeit nach Nachrichtenverarbeitungsstufen:




Data Warehouse


Die Ergebnisse der Streaming-Datenverarbeitung sollten dem Benutzer so bald wie möglich zur Verfügung stehen. Die verarbeiteten Daten aus den Streams sollten kontinuierlich in der Datenbank aufgezeichnet werden, wo Sie dann die Daten abrufen können (z. B. einen Bericht mit den Testergebnissen anzeigen, den Verlauf des Tests anzeigen).


Eigenschaften gespeicherter Daten und Abfragen.
Die meisten Daten sind Testläufe. Über einen Monat werden mehr als 1,5 Milliarden Builds und Tests gestartet. Bei jedem Start wird eine relativ große Menge an Informationen gespeichert: Ergebnis und Art des Fehlers, eine kurze Beschreibung des Fehlers (Snippet), mehrere Links zu Protokollen, Testdauer, eine Reihe von numerischen Werten, Metriken im Format name = value usw. Einige dieser Daten - zum Beispiel Metriken und Dauer - sind sehr schwer zu komprimieren, da es sich tatsächlich um Zufallswerte handelt. Der andere Teil - zum Beispiel das Ergebnis, die Art des Fehlers, die Protokolle - kann effizienter gespeichert werden, da sie sich im selben Test von Lauf zu Lauf fast nicht ändern.


Zuvor haben wir MySQL zum Speichern verarbeiteter Daten verwendet. Wir begannen uns allmählich gegen die Fähigkeiten der Datenbank auszuruhen:


  • Die verarbeitete Datenmenge verdoppelt sich alle sechs Monate.
  • Wir konnten nur Daten für die letzten 2 Monate speichern, aber wir wollten Daten für mindestens ein Jahr speichern.
  • Probleme mit der Ausführungsgeschwindigkeit einiger schwerer (fast analytischer) Abfragen.
  • Kompliziertes Datenbankschema. Viele Tabellen (Normalisierung), was das Schreiben in die Datenbank erschwert. Das Basisschema unterscheidet sich stark von dem Schema der Objekte, die in der Echtzeitschaltung verwendet werden.
  • Server wird nicht heruntergefahren. Der Ausfall eines separaten Servers oder das Herunterfahren des Rechenzentrums kann zu einem Systemausfall führen.
  • Ziemlich komplizierte Bedienung.

Als Kandidaten für das neue Data Warehouse haben wir verschiedene Optionen in Betracht gezogen: PostgreSQL, MongoDB und verschiedene interne Lösungen, einschließlich ClickHouse .


Bei einigen Lösungen können wir unsere Daten nicht effizienter speichern als bei der alten MySQL-basierten Lösung. Andere erlauben nicht die Implementierung schneller und komplexer (fast analytischer) Abfragen. Zum Beispiel haben wir eine ziemlich schwere Anfrage, die Commits anzeigt, die sich auf ein bestimmtes Projekt auswirken (einige Tests). In allen Fällen, in denen wir keine schnellen SQL-Abfragen ausführen können, müssten wir den Benutzer zwingen, lange zu warten oder einige Berechnungen im Voraus durchzuführen, was zu einem Verlust an Flexibilität führt. Wenn Sie etwas im Voraus zählen, müssen Sie mehr Code schreiben und gleichzeitig die Flexibilität verlieren - es gibt keine Möglichkeit, das Verhalten schnell zu ändern und etwas zu erzählen. Es ist viel bequemer und schneller, eine SQL-Abfrage zu schreiben, die die vom Benutzer benötigten Daten zurückgibt und diese schnell ändern kann, wenn Sie das Verhalten des Systems ändern möchten.


Clickhouse


Wir haben uns für ClickHouse entschieden. ClickHouse ist ein säulenförmiges Datenbankverwaltungssystem (DBMS) für die Online-Verarbeitung analytischer Abfragen (OLAP).


Bei der Umstellung auf ClickHouse haben wir bewusst auf einige der Möglichkeiten verzichtet, die andere DBMS bieten, und dafür eine mehr als angemessene Entschädigung in Form von sehr schnellen analytischen Abfragen und einem kompakten Data Warehouse erhalten.


In relationalen DBMS werden Werte, die sich auf eine Zeile beziehen, physisch nebeneinander gespeichert. In ClickHouse werden Werte aus verschiedenen Spalten separat gespeichert und Daten aus einer Spalte werden zusammen gespeichert. Diese Reihenfolge der Datenspeicherung ermöglicht es Ihnen, ein hohes Maß an Datenkomprimierung mit der richtigen Wahl des Primärschlüssels bereitzustellen. Dies wirkt sich auch darauf aus, in welchen Szenarien das DBMS gut funktioniert. ClickHouse funktioniert besser bei Abfragen, bei denen eine kleine Anzahl von Spalten gelesen wird und die Abfrage eine große Tabelle verwendet und der Rest der Tabellen klein ist. Aber auch bei nicht analytischen Abfragen kann ClickHouse gute Ergebnisse erzielen.


Die Daten in den Tabellen sind nach Primärschlüssel sortiert. Die Sortierung erfolgt im Hintergrund. Auf diese Weise können Sie einen spärlichen Index für ein kleines Volume erstellen, mit dem Sie schnell Daten finden können. ClickHouse hat keine Sekundärindizes. Genau genommen gibt es einen sekundären Index - den Partitionsschlüssel (ClickHouse schneidet Partitionsdaten ab, bei denen der Partitionsschlüssel in der Anforderung angegeben ist). Weitere Details .


Das Datenschema mit Normalisierung ist nicht funktionsfähig, im Gegenteil, es ist vorzuziehen, die Daten abhängig von den Anforderungen an sie zu denormalisieren. Es ist vorzuziehen, "breite" Tabellen mit einer großen Anzahl von Spalten zu erstellen. Dieses Element ist auch mit dem vorherigen verwandt, da das Fehlen von Sekundärindizes manchmal Kopien von Tabellen mit einem anderen Primärschlüssel erstellt.


ClickHouse verfügt nicht über UPDATE und DELETE im klassischen Sinne, es besteht jedoch die Möglichkeit, diese zu emulieren.


Daten müssen in großen Blöcken und nicht zu oft (alle paar Sekunden) eingefügt werden. Das zeilenweise Laden von Daten ist bei realen Datenmengen praktisch nicht funktionsfähig.


ClickHouse unterstützt keine Transaktionen, das System wird schließlich konsistent .


Einige Funktionen von ClickHouse, ähnlich wie bei anderen DBMS, erleichtern jedoch die Übertragung vorhandener Systeme.


  • ClickHouse verwendet SQL, jedoch mit geringfügigen Unterschieden, das für für OLAP-Systeme typische Abfragen nützlich ist. Es gibt ein leistungsstarkes System von Aggregatfunktionen, ALL / ANY JOIN, Lambda-Ausdrücken in Funktionen und anderen SQL-Erweiterungen, mit denen Sie fast jede analytische Abfrage schreiben können.
  • ClickHouse unterstützt Replikation, Quorumaufzeichnung und Quorumlesung. Für eine zuverlässige Datenspeicherung ist ein Quorum-Schreibvorgang erforderlich: INSERT ist nur erfolgreich, wenn ClickHouse fehlerfrei Daten auf eine bestimmte Anzahl von Replikaten schreiben konnte.

Weitere Informationen zu ClickHouse-Funktionen finden Sie in der Dokumentation .


Funktionen für die Arbeit mit ClickHouse


Wahl des Primärschlüssels und des Partitionsschlüssels.


Wie wähle ich einen Primärschlüssel und einen Partitionsschlüssel aus? Vielleicht ist dies die erste Frage, die sich beim Erstellen einer neuen Tabelle stellt. Die Auswahl des Primärschlüssels und des Partitionsschlüssels wird normalerweise von den Abfragen bestimmt, die für die Daten ausgeführt werden. Gleichzeitig erweisen sich Abfragen, die beide Bedingungen verwenden, als am effektivsten: sowohl nach dem Primärschlüssel als auch nach dem Partitionsschlüssel.


In unserem Fall sind die Haupttabellen die Matrizen zum Ausführen der Tests. Es ist logisch anzunehmen, dass bei dieser Datenstruktur die Schlüssel so ausgewählt werden müssen, dass die Umgehungsreihenfolge eines von ihnen in der Reihenfolge der Erhöhung der Zeilennummer und die Umgehungsreihenfolge des anderen - in der Reihenfolge der Erhöhung der Spaltennummer - erfolgt.


Es ist auch wichtig zu beachten, dass die Auswahl des Primärschlüssels die Kompaktheit der Datenspeicherung erheblich beeinträchtigen kann, da identische Werte bei der Umgehung des Primärschlüssels in anderen Spalten fast keinen Platz in der Tabelle beanspruchen. In unserem Fall ändern sich beispielsweise die Status der Tests von Commit zu Commit kaum. Diese Tatsache bestimmte im Wesentlichen die Wahl des Primärschlüssels - ein Paar aus Testkennung und Festschreibungsnummer. Außerdem in dieser Reihenfolge.




Der Partitionsschlüssel hat zwei Zwecke. Zum einen können Partitionen so „archiviert“ werden, dass sie dauerhaft aus dem Speicher gelöscht werden können, da die darin enthaltenen Daten bereits veraltet sind. Andererseits ist der Partitionsschlüssel ein sekundärer Index, was bedeutet, dass Sie Abfragen beschleunigen können, wenn ein Ausdruck dafür in ihnen vorhanden ist.


Für unsere Matrizen erscheint es ganz natürlich, die Festschreibungsnummer als Partitionsschlüssel zu wählen. Wenn Sie jedoch den Revisionswert im Ausdruck für den Partitionsschlüssel festlegen, enthält eine solche Tabelle unangemessen viele Partitionen, wodurch die Leistung von Abfragen an diese Partition beeinträchtigt wird. Daher kann im Ausdruck für den Partitionsschlüssel der Revisionswert in eine große Anzahl unterteilt werden, um die Anzahl der Partitionen zu verringern, z. B. PARTITION BY intDiv (Revision, 2000). Diese Anzahl sollte groß genug sein, damit die Anzahl der Partitionen die empfohlenen Werte nicht überschreitet, während sie klein genug sein sollte, damit nicht viele Daten in eine Partition fallen und die Datenbank nicht zu viele Daten lesen muss.


Wie implementiere ich UPDATE und DELETE?


Im üblichen Sinne werden UPDATE und DELETE in ClickHouse nicht unterstützt. Anstelle von UPDATE und DELETE können Sie der Tabelle jedoch eine Spalte mit der Version hinzufügen und die spezielle ReplacingMergeTree- Engine verwenden (entfernt doppelte Datensätze mit demselben Primärschlüsselwert). In einigen Fällen ist die Version natürlich von Anfang an in der Tabelle vorhanden. Wenn Sie beispielsweise eine Tabelle für den aktuellen Teststatus erstellen möchten, ist die Version in dieser Tabelle die Festschreibungsnummer.


CREATE TABLE current_tests ( test_id UInt64, value Nullable(String), version UInt64 ) ENGINE = ReplacingMergeTree(version) ORDER BY test_id 

Im Falle einer Datensatzänderung fügen wir die Version mit einem neuen Wert hinzu, im Falle eines Löschens mit einem NULL-Wert (oder einem anderen speziellen Wert, der in den Daten nicht gefunden werden kann).


Was haben Sie mit dem neuen Speicher erreicht?


Eines der Hauptziele des Wechsels zu ClickHouse war die Möglichkeit, den Testverlauf über einen langen Zeitraum (mehrere Jahre oder im schlimmsten Fall mindestens ein Jahr) zu speichern. Bereits im Prototypenstadium wurde klar, dass wir die vorhandenen SSDs auf unseren Servern umgehen können, um eine mindestens dreijährige Historie zu speichern. Analytische Abfragen haben sich erheblich beschleunigt, jetzt können wir viel nützlichere Informationen aus unseren Daten extrahieren. Die RPS-Marge hat zugenommen. Darüber hinaus wird dieser Wert durch Hinzufügen neuer Server zum ClickHouse-Cluster nahezu linear skaliert. Das Erstellen eines neuen Data Warehouse für die ClickHouse-Datenbank ist für den Endbenutzer nur ein kaum wahrnehmbarer Schritt in Richtung eines wichtigeren Ziels: Hinzufügen neuer Funktionen, Beschleunigen und Vereinfachen der Entwicklung dank der Möglichkeit, große Datenmengen zu speichern und zu verarbeiten.


Komm zu uns


Unsere Abteilung wird ständig erweitert. Besuchen Sie uns, wenn Sie an komplexen und interessanten Aufgaben und Algorithmen arbeiten möchten. Wenn Sie Fragen haben, können Sie mich direkt in PM fragen.


Nützliche Links


Stream-Verarbeitung



Kappa Architektur



ClickHouse:


Source: https://habr.com/ru/post/de429956/


All Articles