ClickHouse Product Analytics VKontakte



Bei der Entwicklung eines Produkts, sei es ein Videodienst oder ein Band, Geschichten oder Artikel, möchte ich das bedingte "Glück" des Benutzers messen können. Zu verstehen, ob wir unsere Änderungen verbessern oder verschlechtern, die Richtung der Produktentwicklung anzupassen und uns dabei nicht auf die Intuition und unsere eigenen Gefühle zu verlassen, sondern auf Metriken und Zahlen, an die Sie glauben können.

In diesem Artikel werde ich Ihnen erläutern, wie wir es geschafft haben, Produktstatistiken und -analysen für einen Service mit einer monatlichen Zielgruppe von 97 Millionen zu starten und dabei äußerst leistungsstarke analytische Anfragen zu erhalten. Wir werden über ClickHouse, die verwendeten Engines und die Funktionen der Abfragen sprechen. Ich werde einen Ansatz zur Datenaggregation beschreiben, mit dem wir in Sekundenbruchteilen komplexe Metriken erhalten und über Datenkonvertierung und -tests sprechen können.

Jetzt haben wir ungefähr 6 Milliarden Lebensmittelereignisse pro Tag, in naher Zukunft werden wir 20 bis 25 Milliarden erreichen. Und dann - nicht so schnell - werden wir bis Ende des Jahres auf 40-50 Milliarden steigen, wenn wir alle für uns interessanten Lebensmittelereignisse beschreiben.

1 Zeilen im Set. Verstrichen: 0,287 Sek. Verarbeitete 59,85 Milliarden Zeilen, 59,85 GB (208,16 Milliarden Zeilen / s, 208,16 GB / s)

Details unter dem Schnitt.

Vorwort


Analytische Tools waren zuvor VKontakte. Es wurden eindeutige Benutzer berücksichtigt. Es war möglich, Ereignispläne anhand von Slices zu erstellen und dadurch in die Tiefe des Dienstes zu fallen. Es ging jedoch um feste Slices im Voraus, um aggregierte Daten, um HLL für eindeutige, um eine gewisse Steifheit und die Unfähigkeit, Fragen schnell zu beantworten, die etwas komplizierter waren als „wie viel?“.

Natürlich gab, gibt und wird Hadoop, es wurde auch geschrieben, geschrieben und wird viel geschrieben, viele Protokolle über die Nutzung von Diensten. Leider wurde hdfs nur von einigen Teams verwendet, um ihre eigenen Aufgaben zu implementieren. Noch trauriger ist, dass es bei hdfs nicht um schnelle analytische Fragen geht: Es gab Fragen zu vielen Feldern, deren Antworten im Code und nicht in der Dokumentation zu finden waren, die jedem zugänglich war.

Wir sind zu dem Schluss gekommen, dass es nicht mehr möglich ist, so zu leben. Jedes Team sollte Daten haben, Abfragen sollten schnell sein und die Daten selbst sollten genau und reich an nützlichen Parametern sein.

Daher haben wir klare Anforderungen an das neue Statistik- / Analysesystem formuliert:

  • analytische Abfragen sollten schnell sein;
  • Die Daten sind ziemlich genau. Im Idealfall handelt es sich dabei um unformatierte Benutzerinteraktionsereignisse mit dem Dienst.
  • Die Struktur der Ereignisse sollte beschrieben, verstanden und zugänglich sein.
  • zuverlässige Datenspeicherung, einmalige Liefergarantie;
  • Es ist möglich, die Unikate, die Zielgruppe (täglich, wöchentlich, monatlich), die Aufbewahrungsmetriken, die vom Benutzer im Dienst verbrachte Zeit, die quantifizierten Aktionen für eindeutige und andere Metriken anhand der Slices zu zählen.
  • Tests, Datenkonvertierung und Visualisierung werden durchgeführt.

In der Küche


Die Erfahrung hat gezeigt, dass wir zwei Datenbanken benötigen: eine langsame, in der wir die Daten aggregieren und anreichern, und eine schnelle, in der wir mit diesen Daten arbeiten und darauf Diagramme erstellen können. Dies ist einer der häufigsten Ansätze, bei denen in einer langsamen Datenbank, beispielsweise in HDFS, unterschiedliche Projektionen erstellt werden - auf eindeutigen und auf der Anzahl der Ereignisse durch Slices für einen bestimmten Zeitraum.

An einem warmen Septembertag hatten wir bei einer Tasse Tee in der Küche mit Blick auf die Kasaner Kathedrale die Idee, ClickHouse als schnelle Basis zu nutzen - zu diesem Zeitpunkt haben wir es bereits zum Speichern technischer Protokolle verwendet. Es gab viele Zweifel, die hauptsächlich mit Geschwindigkeit und Zuverlässigkeit verbunden waren: Die deklarierten Leistungstests schienen unrealistisch, und neue Datenbankversionen brachen regelmäßig vorhandene Funktionen. Daher war der Vorschlag einfach - zu versuchen.

Erste Proben


Wir haben einen Cluster von zwei Computern mit dieser Konfiguration bereitgestellt:
2xE5-2620 v4 (insgesamt 32 Kerne), 256 G RAM, 28 T Plätze (raid10 mit ext4).

Anfangs war es nahes Layout, aber dann haben wir zu weit gewechselt. ClickHouse verfügt über viele verschiedene Tabellen-Engines, die wichtigsten stammen jedoch aus der MergeTree-Familie. Wir haben ReplicatedReplacingMergeTree mit ungefähr den folgenden Einstellungen ausgewählt:

PARTITION BY dt ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id) SAMPLE BY cityHash64(user_id) SETTINGS index_granularity = 8192; 

Repliziert - bedeutet, dass die Tabelle repliziert wird, wodurch eine unserer Zuverlässigkeitsanforderungen gelöst wird.

Ersetzen - Die Tabelle unterstützt die Deduplizierung des Primärschlüssels: Standardmäßig stimmt der Primärschlüssel mit dem Sortierschlüssel überein, sodass im Abschnitt ORDER BY nur angegeben wird, um welchen Primärschlüssel es sich handelt.

SAMPLE BY - Ich wollte auch Sampling ausprobieren: sample gibt eine einheitlich pseudozufällige Stichprobe zurück.

index_granularity = 8192 ist die magische Anzahl von Datenzeilen zwischen Indexserifen (ja, es ist spärlich), die standardmäßig verwendet wird. Wir haben es nicht geändert.

Die Partitionierung erfolgte nach Tag (standardmäßig jedoch nach Monat). Viele Datenanfragen sollten im Tagesverlauf erfolgen. Erstellen Sie beispielsweise ein Minutendiagramm mit Videoansichten für einen bestimmten Tag.

Als nächstes nahmen wir ein Stück technische Protokolle und füllten die Tabelle mit ungefähr einer Milliarde Zeilen. Hervorragende Komprimierung, Gruppierung nach Spaltentyp Int *, Zählen eindeutiger Werte - alles hat unglaublich schnell funktioniert!

Apropos Geschwindigkeit, ich meine, dass keine einzige Anfrage länger als 500 ms dauerte und die meisten von ihnen in 50-100 ms passen. Und das auf zwei Maschinen - und tatsächlich war nur eine an den Berechnungen beteiligt.

Wir haben uns alles angesehen und uns vorgestellt, dass anstelle der UInt8-Spalte eine Länder-ID angezeigt wird und die Int8-Spalte durch Daten ersetzt wird, beispielsweise über das Alter des Benutzers. Und sie haben erkannt, dass ClickHouse für uns völlig geeignet ist, wenn alles richtig gemacht wird.

Starke Datentypisierung


Der Vorteil von ClickHouse beginnt genau dann, wenn das richtige Datenschema erstellt wird. Beispiel: Plattform String - schlecht, Plattform Int8 + Wörterbuch - gut, LowCardinality (String) - praktisch und gut (ich werde etwas später über LowCardinality sprechen).

Wir haben eine spezielle Generatorklasse in PHP erstellt, die auf Anfrage Wrapper-Klassen über Ereignisse basierend auf Tabellen in ClickHouse und einen einzelnen Einstiegspunkt für die Protokollierung erstellt. Ich werde das Beispiel des Schemas erklären, das sich herausstellte:

  1. Analyst / Dateningenieur / Entwickler beschreibt die Dokumentation: Welche Felder, möglichen Werte und Ereignisse müssen protokolliert werden?
  2. In ClickHouse wird eine Tabelle gemäß der Datenstruktur aus dem vorherigen Absatz erstellt.
  3. Umbruchklassen für Ereignisse, die auf einer Tabelle basieren, werden generiert.
  4. Das Produktteam implementiert das Ausfüllen der Felder eines Objekts dieser Klasse und das Senden.

Das Ändern des Schemas auf PHP-Ebene und des Typs der protokollierten Daten funktioniert nicht, ohne zuerst die Tabelle in ClickHouse zu ändern. Dies kann wiederum nicht ohne Abstimmung mit dem Team, Änderungen in der Dokumentation und Beschreibung der Ereignisse geschehen.

Für jedes Ereignis können Sie zwei Einstellungen festlegen, die den Prozentsatz der an ClickHouse bzw. Hadoop gesendeten Ereignisse steuern. Einstellungen werden hauptsächlich für das schrittweise Rollen benötigt, mit der Möglichkeit, die Protokollierung zu reduzieren, wenn etwas schief geht. Vor Hadoop werden die Daten standardmäßig mit Kafka geliefert. Und in ClickHouse fliegen sie mit KittenHouse im permanenten Modus durch ein Schema , das mindestens eine einzelne Ereignisübermittlung garantiert.

Das Ereignis wird an die Puffertabelle an den gewünschten Shard übergeben, basierend auf dem Rest der Division eines Hash von user_id durch die Anzahl der Shards im Cluster. Als Nächstes löscht die Puffertabelle die Daten in den lokalen ReplicatedReplacingMergeTree. Zusätzlich zu den lokalen Tabellen wird mit der Distributed Engine eine verteilte Tabelle abgerufen, mit der Sie auf Daten von allen Shards zugreifen können.

Denormalisierung


ClickHouse ist ein säulenförmiges DBMS. Es geht nicht um normale Formulare, was bedeutet, dass es besser ist, alle Informationen für die Veranstaltung richtig zu haben, als sich anzumelden. Es gibt auch Join, aber wenn die richtige Tabelle nicht in den Speicher passt, beginnt der Schmerz. Daher haben wir eine willensstarke Entscheidung getroffen: Alle Informationen, an denen wir interessiert sind, sollten in der Veranstaltung selbst gespeichert werden. Zum Beispiel Geschlecht, Alter des Benutzers, Land, Stadt, Geburtstag - all dies sind öffentliche Informationen, die für die Analyse des Publikums nützlich sein können, sowie alle nützlichen Informationen über das Objekt der Interaktion. Wenn es sich beispielsweise um Video handelt, handelt es sich um video_id, video_owner_id, Datum des Video-Uploads, Länge, Qualität zum Zeitpunkt des Ereignisses, maximale Qualität usw.

Insgesamt haben wir in jeder Tabelle 50 bis 200 Spalten, während in allen Tabellen Servicefelder vorhanden sind. Das Fehlerprotokoll lautet beispielsweise error_log. Tatsächlich rufen wir einen Fehler außerhalb des Bereichs des Typs auf. Falls seltsame Werte mit dem Alter über die Größe des Feldtyps hinausgehen.

Typ LowCardinality (T)


ClickHouse kann externe Wörterbücher verwenden. Sie werden im Speicher gespeichert, regelmäßig aktualisiert und können in verschiedenen Szenarien effektiv verwendet werden, auch als klassische Nachschlagewerke. Sie möchten beispielsweise das Betriebssystem protokollieren und haben zwei Alternativen: eine Zeichenfolge oder eine Zahl + ein Verzeichnis. Bei großen Datenmengen und bei analytischen Hochleistungsabfragen ist es natürlich logisch, eine Zahl zu schreiben und bei Bedarf eine Zeichenfolgendarstellung aus dem Wörterbuch abzurufen:

 dictGetString('os', 'os_name', toUInt64(os_id)) 

Es gibt jedoch eine viel bequemere Möglichkeit, den Typ LowCardinality (String) zu verwenden, mit dem automatisch ein Wörterbuch erstellt wird. Die Leistung mit LowCardinality unter der Bedingung einer geringen Kardinalität des Wertesatzes ist radikal höher als mit String.

Zum Beispiel verwenden wir LowCardinality (String) für die Ereignistypen 'play', 'pause', 'rewind'. Oder für die Plattform: "Web", "Android", "iPhone":

 SELECT vk_platform, count() FROM t WHERE dt = yesterday() GROUP BY vk_platform Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB (13.65 billion rows/s., 41.04 GB/s.) 

Die Funktion ist noch experimentell. Um sie zu verwenden, müssen Sie Folgendes ausführen:

 SET allow_experimental_low_cardinality_type = 1; 

Aber es gibt das Gefühl, dass sie nach einiger Zeit nicht mehr unter der Kulisse sein wird.

VKontakte-Datenaggregation


Da es viele Säulen und viele Ereignisse gibt, besteht der natürliche Wunsch darin, die „alten“ Trennwände zu schneiden, aber zuerst die Einheiten zusammenzubauen. Gelegentlich ist es notwendig, rohe Ereignisse (vor einem Monat oder einem Jahr) zu analysieren, damit wir die Daten nicht in HDFS schneiden - jeder Analyst kann das gewünschte Parkett für ein beliebiges Datum kontaktieren.

Wenn wir in einem Zeitintervall aggregieren, ruhen wir uns in der Regel immer darauf aus, dass die Anzahl der Zeilen pro Zeiteinheit gleich dem Produkt der Schnittleistung ist. Dies führt zu Einschränkungen: Länder beginnen, sich in Gruppen wie "Russland", "Asien", "Europa", "Der Rest der Welt" und dem Alter zu sammeln - in Intervallen, um die Dimension auf eine bedingte Million Zeilen pro Datum zu reduzieren.

Aggregation nach dt, user_id


Aber wir haben ein reaktives ClickHouse! Können wir an einem Datum auf 50 bis 100 Millionen Leitungen beschleunigen?
Schnelle Tests haben gezeigt, dass wir es können, und in diesem Moment entstand eine einfache Idee - den Benutzer in der Maschine zu lassen. Nicht nach "Datum, Slices" mit Funkenwerkzeugen zu aggregieren, sondern nach "Datum, Benutzer" bedeutet ClickHouse, während Daten "transponiert" werden.

Mit diesem Ansatz speichern wir Benutzer in aggregierten Daten, was bedeutet, dass wir weiterhin Zielgruppenindikatoren, Aufbewahrungs- und Frequenzmetriken berücksichtigen können. Wir können Einheiten verbinden und die gemeinsame Zielgruppe mehrerer Dienste bis zur gesamten VKontakte-Zielgruppe zählen. All dies kann von jedem Slice durchgeführt werden, das zur gleichen Zeit in der Tabelle vorhanden ist.

Ich werde mit einem Beispiel veranschaulichen:



Nach der Aggregation (viele weitere Spalten rechts):



In diesem Fall erfolgt die Aggregation genau nach (dt, user_id). Für Felder mit Benutzerinformationen können Sie mit einer solchen Aggregation die Funktionen any, anyHeavy verwenden (wählt einen häufig vorkommenden Wert aus). Sie können beispielsweise anyHeavy (Plattform) in einem Aggregat sammeln, um anhand von Videoereignissen zu ermitteln, welche Plattform der Benutzer zum größten Teil verwendet. Bei Bedarf können Sie groupUniqArray (Plattform) verwenden und ein Array aller Plattformen speichern, von denen der Benutzer das Ereignis ausgelöst hat. Wenn dies nicht ausreicht, können Sie separate Spalten für die Plattform erstellen und beispielsweise die Anzahl der eindeutigen Videos speichern, die von einer bestimmten Plattform auf die Hälfte angezeigt werden:

 uniqCombinedIf(cityHash64(video_owner_id, video_id), (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android 

Mit diesem Ansatz wird ein ziemlich breites Aggregat erhalten, in dem jede Zeile ein eindeutiger Benutzer ist und jede Spalte Informationen entweder über den Benutzer oder über seine Interaktion mit dem Dienst enthält.

Es stellt sich heraus, dass es zur Berechnung der DAU eines Dienstes ausreicht, eine solche Anforderung zusätzlich zu ihrem Aggregat auszuführen:

 SELECT dt, count() as DAU FROM agg GROUP BY dt Elapsed: 0.078 sec. 

Oder berechnen Sie, wie viele Tage Benutzer für die Woche im Dienst waren:

 SELECT days_in_service, count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 2.922 sec. 

Wir können durch Abtasten beschleunigen, ohne dabei an Genauigkeit zu verlieren:

 SELECT days_in_service, 10 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 10 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 0.454 sec. 

Es sollte sofort beachtet werden, dass die Stichprobe nicht nach dem Prozentsatz der Ereignisse, sondern nach dem Prozentsatz der Benutzer erfolgt - und als Ergebnis wird sie zu einem unglaublich leistungsfähigen Werkzeug.

Oder das gleiche für 4 Wochen mit 1/100 Probenahme - etwa 1% weniger genaue Ergebnisse werden erhalten.

 SELECT days_in_service, 100 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 100 WHERE dt > (yesterday() - 28) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 28 rows in set. Elapsed: 0.287 sec. 

Aggregation auf der anderen Seite


Bei der Aggregation nach (dt, user_id) verlieren wir den Benutzer nicht, wir verpassen keine Informationen über seine Interaktion mit dem Dienst, aber natürlich verlieren wir die Metriken für ein bestimmtes Interaktionsobjekt. Aber Sie können dies auch nicht verlieren - bauen wir die Einheit durch
(dt, video_owner_id, video_id), wobei die gleichen Ideen eingehalten werden. Wir behalten die Informationen über das Video so weit wie möglich bei, verpassen keine Daten über die Interaktion des Videos mit dem Benutzer und vermissen die Informationen über den bestimmten Benutzer vollständig.

 SELECT starts FROM agg3 WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...) 1 rows in set. Elapsed: 0.030 sec 

Oder die Top 10 Videoaufrufe gestern:

 SELECT video_id, video_owner_id, watches FROM video_agg_video_d1 WHERE dt = yesterday() ORDER BY watches DESC LIMIT 10 10 rows in set. Elapsed: 0.035 sec. 

Als Ergebnis haben wir ein Schema von Einheiten der Form:

  • Aggregation nach "Datum, Benutzer" innerhalb des Produkts;
  • Aggregation nach „Datum, Interaktionsobjekt“ innerhalb des Produkts;
  • manchmal entstehen andere Projektionen.

Askaban und TeamCity


Zum Schluss noch ein paar Worte zur Infrastruktur. Unsere Gesamtsammlung beginnt nachts und beginnt mit OPTIMIZE für jede der Tabellen mit Rohdaten, um eine außergewöhnliche Datenzusammenführung in ReplicatedReplacingMergeTree auszulösen. Der Vorgang kann lange genug dauern, es ist jedoch erforderlich, Takes zu entfernen, falls sie auftreten. Es ist erwähnenswert, dass ich bisher noch nie auf Duplikate gestoßen bin, aber es gibt keine Garantie dafür, dass sie in Zukunft nicht mehr erscheinen werden.

Der nächste Schritt ist die Erstellung von Aggregaten. Dies sind Bash-Skripte, in denen Folgendes vorkommt:

  • Zuerst erhalten wir die Anzahl der Scherben und einige Hosts von der Scherbe:

     SELECT shard_num, any(host_name) AS host FROM system.clusters GROUP BY shard_num 
  • Anschließend führt das Skript nacheinander für jeden Shard (clickhouse-client -h $ host) eine Anforderung des Formulars aus (für Aggregate von Benutzern):

     INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num 

Dies ist nicht ganz optimal und kann viel Netzwerkinteraktion zwischen Hosts erzeugen. Wenn Sie jedoch neue Shards hinzufügen, funktioniert alles sofort weiter. Die Lokalität der Daten für die Einheiten bleibt erhalten, sodass wir uns entschlossen haben, uns darüber keine großen Sorgen zu machen.

Wir haben Askaban als Aufgabenplaner. Ich würde nicht sagen, dass dies ein sehr praktisches Tool ist, aber es erfüllt seine Aufgabe perfekt, auch wenn es darum geht, etwas komplexere Pipelines zu erstellen und wenn ein Skript warten muss, bis mehrere andere abgeschlossen sind.

Die Gesamtzeit für die Konvertierung der jetzt vorhandenen Ereignisse in Aggregate beträgt 15 Minuten.

Testen


Jeden Morgen führen wir automatisierte Tests durch, die Fragen zu Rohdaten sowie zur Bereitschaft und Qualität von Aggregaten beantworten: „Stellen Sie sicher, dass gestern nicht mehr als ein halbes Prozent weniger Daten oder eindeutige Daten zu Rohdaten oder in Aggregaten vorhanden waren im Vergleich zum selben Tag vor einer Woche. "

Technologisch gesehen sind dies gewöhnliche Komponententests mit JUnit und der Implementierung des JDBC-Treibers für ClickHouse. Die Ausführung aller Tests wird in TeamCity gestartet und dauert in einem Thread etwa 30 Sekunden. Bei Fehlern erhalten wir VKontakte-Benachrichtigungen von unserem wunderbaren TeamCity-Bot.

Fazit


Verwenden Sie nur stabile Versionen von ClickHouse und Ihr Haar wird weich und seidig. Es ist erwähnenswert, dass ClickHouse nicht langsamer wird .

Source: https://habr.com/ru/post/de445284/


All Articles