
Mengembangkan produk apa pun, apakah itu layanan video atau rekaman, cerita atau artikel, saya ingin dapat mengukur "kebahagiaan" bersyarat dari pengguna. Untuk memahami apakah kita membuat perubahan kita lebih baik atau lebih buruk, untuk menyesuaikan arah pengembangan produk, berdasarkan bukan pada intuisi dan perasaan kita sendiri, tetapi pada metrik dan angka yang dapat Anda percayai.
Dalam artikel ini, saya akan memberi tahu Anda bagaimana kami berhasil meluncurkan statistik produk dan analitik pada layanan dengan audiens 97 juta bulanan, sambil mendapatkan pertanyaan analitis kinerja sangat tinggi. Kita akan berbicara tentang ClickHouse, mesin yang digunakan, dan fitur kueri. Saya akan menjelaskan pendekatan agregasi data, yang memungkinkan kami memperoleh metrik kompleks dalam sepersekian detik, dan berbicara tentang konversi dan pengujian data.
Sekarang kita memiliki sekitar 6 miliar acara makanan per hari, dalam waktu dekat kita akan mencapai 20โ25 miliar. Dan kemudian - dengan kecepatan yang tidak secepat itu, kita akan meningkat menjadi 40-50 miliar pada akhir tahun, ketika kita menggambarkan semua acara makanan yang menarik bagi kita.
1 baris diatur. Berlalu: 0,287 dtk. Memproses 59,85 miliar baris, 59,85 GB (208,16 miliar baris / dt., 208,16 GB / dt.)Detail di bawah potongan.
Kata Pengantar
Alat analitik adalah VKontakte sebelumnya. Pengguna unik dipertimbangkan, adalah mungkin untuk membuat jadwal acara dengan irisan dan dengan demikian jatuh ke kedalaman layanan. Namun, itu adalah pertanyaan irisan tetap di muka, data agregat, HLL untuk yang unik, beberapa kekakuan dan ketidakmampuan untuk dengan cepat menjawab pertanyaan yang sedikit lebih rumit daripada "berapa banyak?"
Tentu saja, ada, sedang dan akan hadoop, itu juga ditulis, ditulis dan akan banyak ditulis, banyak log menggunakan layanan. Sayangnya, HDFS hanya digunakan oleh beberapa tim untuk melaksanakan tugas mereka sendiri. Yang lebih menyedihkan, hdfs bukan tentang pertanyaan analitik cepat: ada banyak pertanyaan di berbagai bidang, jawaban yang harus ditemukan dalam kode, dan tidak dalam dokumentasi yang dapat diakses oleh semua orang.
Kami sampai pada kesimpulan bahwa tidak mungkin lagi hidup seperti ini. Setiap tim harus memiliki data, pertanyaan atas mereka harus cepat, dan data itu sendiri harus akurat dan kaya akan parameter yang berguna.
Karenanya, kami merumuskan persyaratan yang jelas untuk sistem statistik / analisis baru:
- pertanyaan analitik harus cepat;
- data cukup akurat, idealnya ini adalah peristiwa interaksi pengguna mentah dengan layanan;
- struktur acara harus dijelaskan, dipahami, dan dapat diakses;
- penyimpanan data yang andal, jaminan pengiriman satu kali;
- dimungkinkan untuk menghitung unik, audiens (harian, mingguan, bulanan), metrik retensi, waktu yang dihabiskan oleh pengguna dalam layanan, tindakan terukur pada metrik unik dan lainnya dengan serangkaian irisan;
- pengujian, konversi data, dan visualisasi sedang berlangsung.
Di dapur
Pengalaman menunjukkan bahwa kami membutuhkan dua database: yang lambat, di mana kami akan mengumpulkan dan memperkaya data, dan yang cepat, di mana kami dapat bekerja dengan data ini dan membuat grafik di atasnya. Ini adalah salah satu pendekatan yang paling umum, di mana dalam basis yang lambat, misalnya, dalam HDFS, proyeksi yang berbeda dibangun - di atas yang unik dan pada jumlah peristiwa dengan irisan untuk jangka waktu tertentu.
Pada hari yang hangat di bulan September, ketika berbicara tentang secangkir teh di dapur yang menghadap ke Katedral Kazan, kami mempunyai ide untuk mencoba ClickHouse sebagai basis cepat - pada waktu itu kami telah menggunakannya untuk menyimpan log teknis. Ada banyak keraguan terkait terutama dengan kecepatan dan keandalan: tes kinerja yang dinyatakan tampak tidak realistis, dan rilis database baru secara berkala merusak fungsi yang ada. Karena itu, proposal itu sederhana - untuk dicoba.
Sampel pertama
Kami mengerahkan sekelompok dua mesin dengan konfigurasi ini:
2xE5-2620 v4 (total 32 core), ram 256G, 28T tempat (raid10 dengan ext4).
Awalnya, itu dekat tata letak, tapi kemudian kami beralih ke jauh. ClickHouse memiliki banyak mesin tabel yang berbeda, tetapi yang utama berasal dari keluarga MergeTree. Kami memilih ReplicatedReplacingMergeTree dengan kira-kira pengaturan berikut:
PARTITION BY dt ORDER BY (toStartOfHour(time), cityHash64(user_id), event_microsec, event_id) SAMPLE BY cityHash64(user_id) SETTINGS index_granularity = 8192;
Digandakan - berarti bahwa tabel direplikasi, dan ini memecahkan salah satu persyaratan keandalan kami.
Mengganti - tabel mendukung deduplikasi pada kunci utama: secara default, kunci primer cocok dengan tombol sortir, jadi bagian ORDER BY hanya memberi tahu Anda apa kunci utama itu.
SAMPEL OLEH - Saya juga ingin mencoba pengambilan sampel: sampel mengembalikan sampel acak-semu yang seragam.
index_granularity = 8192 adalah jumlah baris data ajaib antara serif indeks (ya, jarang), yang digunakan secara default. Kami tidak mengubahnya.
Partisi dilakukan berdasarkan hari (meskipun secara default - berdasarkan bulan). Banyak permintaan data yang seharusnya intraday - misalnya, buat grafik menit penayangan video untuk hari tertentu.
Selanjutnya, kami mengambil sepotong log teknis dan mengisi meja dengan sekitar satu miliar baris. Kompresi luar biasa, pengelompokan berdasarkan tipe kolom Int *, menghitung nilai unik - semuanya bekerja sangat cepat!
Berbicara tentang kecepatan, maksud saya tidak ada satu permintaan yang bertahan lebih dari 500 ms, dan kebanyakan dari mereka masuk ke dalam 50-100 ms. Dan ini pada dua mesin - dan, pada kenyataannya, hanya satu yang terlibat dalam perhitungan.
Kami melihat semua ini dan membayangkan bahwa alih-alih kolom UInt8 akan ada id negara, dan kolom Int8 akan diganti oleh data, misalnya, tentang usia pengguna. Dan mereka menyadari bahwa ClickHouse sepenuhnya cocok untuk kita, jika semuanya dilakukan dengan benar.
Pengetikan data yang kuat
Manfaat ClickHouse dimulai tepat ketika skema data yang benar terbentuk. Contoh: platform String - bad, platform Int8 + dictionary - good, LowCardinality (String) - nyaman dan bagus (saya akan berbicara tentang LowCardinality sedikit kemudian).
Kami membuat kelas generator khusus di php, yang, atas permintaan, membuat kelas wrapper atas peristiwa berdasarkan tabel di ClickHouse, dan satu titik masuk tunggal untuk masuk. Saya akan menjelaskan contoh skema yang ternyata:
- Analis / insinyur data / pengembang menjelaskan dokumentasi: bidang mana, nilai yang mungkin, peristiwa perlu dicatat.
- Tabel dibuat di ClickHouse sesuai dengan struktur data dari paragraf sebelumnya.
- Kelas pembungkus untuk acara berdasarkan tabel dihasilkan.
- Tim produk mengimplementasikan mengisi bidang objek kelas ini, mengirim.
Mengubah skema di tingkat php dan jenis data yang dicatat tidak akan berfungsi tanpa terlebih dahulu mengubah tabel di ClickHouse. Dan ini, pada gilirannya, tidak dapat dilakukan tanpa koordinasi dengan tim, perubahan dalam dokumentasi dan deskripsi peristiwa.
Untuk setiap acara, Anda dapat mengatur dua pengaturan yang mengontrol persentase acara yang dikirim ke ClickHouse dan hadoop masing-masing. Pengaturan diperlukan terutama untuk pengguliran bertahap dengan kemampuan untuk mengurangi penebangan jika terjadi kesalahan. Sebelum hadoop, data dikirimkan dengan cara standar menggunakan Kafka. Dan di ClickHouse, mereka terbang melalui
skema dengan KittenHouse dalam mode persisten, yang menjamin setidaknya satu pengiriman acara tunggal.
Acara dikirim ke tabel buffer ke beling yang diinginkan, berdasarkan sisa membagi beberapa hash dari user_id dengan jumlah beling di cluster. Selanjutnya, tabel buffer menyiram data ke ReplicatedReplacingMergeTree lokal. Dan di atas tabel lokal, tabel terdistribusi ditarik dengan mesin Terdistribusi, yang memungkinkan Anda untuk mengakses data dari semua pecahan.
Denormalisasi
ClickHouse adalah DBMS berbentuk kolom. Ini bukan tentang bentuk normal, yang berarti lebih baik memiliki semua informasi dengan benar dalam acara tersebut daripada bergabung. Ada juga yang Gabung, tetapi jika tabel kanan tidak pas di memori, rasa sakit dimulai. Oleh karena itu, kami membuat keputusan dengan tekad kuat: semua informasi yang kami minati harus disimpan dalam acara itu sendiri. Misalnya, jenis kelamin, usia pengguna, negara, kota, ulang tahun - semua itu adalah informasi publik yang dapat berguna untuk analitik audiens, serta semua informasi yang berguna tentang objek interaksi. Jika, misalnya, kita berbicara tentang video, itu video_id, video_owner_id, tanggal unggah video, panjang, kualitas pada saat acara, kualitas maksimal, dan sebagainya.
Secara total, di setiap tabel kami memiliki 50 hingga 200 kolom, sementara di semua tabel ada bidang layanan. Misalnya, log kesalahan adalah error_log - pada kenyataannya, kami menyebut kesalahan di luar jangkauan tipe. Dalam kasus nilai-nilai aneh melampaui ukuran tipe di bidang dengan usia.
Type LowCardinality (T)
ClickHouse memiliki kemampuan untuk menggunakan kamus eksternal. Mereka disimpan dalam memori, diperbarui secara berkala, dapat secara efektif digunakan dalam berbagai skenario, termasuk sebagai buku referensi klasik. Misalnya, Anda ingin mencatat sistem operasi dan Anda memiliki dua alternatif: string atau angka + direktori. Tentu saja, pada sejumlah besar data, dan untuk permintaan analitik kinerja tinggi, adalah logis untuk menulis angka, dan mendapatkan representasi string dari kamus saat Anda membutuhkan:
dictGetString('os', 'os_name', toUInt64(os_id))
Tetapi ada cara yang jauh lebih nyaman - untuk menggunakan tipe LowCardinality (String), yang secara otomatis membuat kamus. Kinerja dengan LowCardinality dalam kondisi kardinalitas rendah dari himpunan nilai secara radikal lebih tinggi daripada dengan String.
Sebagai contoh, kami menggunakan LowCardinality (String) untuk jenis acara 'play', 'pause', 'rewind'. Atau untuk platform: 'web', 'android', 'iphone':
SELECT vk_platform, count() FROM t WHERE dt = yesterday() GROUP BY vk_platform Elapsed: 0.145 sec. Processed 1.98 billion rows, 5.96 GB (13.65 billion rows/s., 41.04 GB/s.)
Fitur ini masih eksperimental, jadi untuk menggunakannya Anda harus melakukan:
SET allow_experimental_low_cardinality_type = 1;
Tetapi ada perasaan bahwa setelah beberapa waktu dia tidak akan lagi berada di bawah pengaturan.
VKontakte agregasi data
Karena ada banyak kolom, dan ada banyak acara, keinginan alami adalah untuk memotong partisi "lama", tetapi pertama-tama - untuk merakit unit. Kadang-kadang, perlu untuk menganalisis peristiwa mentah (sebulan atau setahun yang lalu), jadi kami tidak memotong data dalam HDFS - analis mana pun dapat menghubungi parket yang diinginkan untuk tanggal apa pun.
Sebagai aturan, ketika mengumpulkan dalam interval waktu, kami selalu berpijak pada kenyataan bahwa jumlah baris per unit waktu sama dengan produk dari daya potong. Ini memberlakukan pembatasan: negara-negara mulai mengumpulkan dalam kelompok-kelompok seperti 'Rusia', 'Asia', 'Eropa', 'Sisa dunia', dan usia - dalam interval untuk mengurangi dimensi menjadi satu juta baris bersyarat per tanggal.
Agregasi berdasarkan dt, user_id
Tetapi kami memiliki ClickHouse reaktif! Bisakah kita berakselerasi ke 50-100 juta garis dalam satu kencan?
Tes cepat menunjukkan bahwa kami dapat, dan pada saat itu muncul ide sederhana - untuk meninggalkan pengguna di dalam mesin. Yaitu, untuk menggabungkan bukan dengan "date, slices" menggunakan alat percikan, tetapi dengan "date, user" berarti oleh ClickHouse, sambil melakukan beberapa "transposisi" data.
Dengan pendekatan ini, kami menyimpan pengguna dalam data agregat, yang berarti bahwa kami masih dapat mempertimbangkan indikator audiensi, retensi, dan metrik frekuensi. Kita dapat menghubungkan unit, menghitung audiens umum dari beberapa layanan hingga seluruh audiens VKontakte. Semua ini dapat dilakukan oleh setiap irisan yang ada dalam tabel untuk kondisi saat yang sama.
Saya akan mengilustrasikan dengan sebuah contoh:

Setelah agregasi (lebih banyak kolom di sebelah kanan):

Dalam hal ini, agregasi terjadi tepat oleh (dt, user_id). Untuk bidang dengan informasi pengguna, dengan agregasi seperti itu, Anda dapat menggunakan fungsi apa saja, anyHeavy (memilih nilai yang sering dijumpai). Anda dapat, misalnya, mengumpulkan AnyHeavy (platform) secara agregat untuk mengetahui platform mana yang paling sering digunakan pengguna dari peristiwa video. Jika diinginkan, Anda dapat menggunakan groupUniqArray (platform) dan menyimpan array semua platform dari mana pengguna mengangkat acara tersebut. Jika ini tidak cukup, Anda dapat membuat kolom terpisah untuk platform dan menyimpan, misalnya, jumlah video unik yang diputar hingga setengah dari platform tertentu:
uniqCombinedIf(cityHash64(video_owner_id, video_id), (platform = 'android') AND (event = '50p')) as uniq_videos_50p_android
Dengan pendekatan ini, agregat yang agak lebar diperoleh di mana setiap baris adalah pengguna yang unik, dan setiap kolom berisi informasi baik pada pengguna atau tentang interaksinya dengan layanan.
Ternyata untuk menghitung DAU suatu layanan, cukup melakukan permintaan seperti itu di atas agregatnya:
SELECT dt, count() as DAU FROM agg GROUP BY dt Elapsed: 0.078 sec.
Atau hitung berapa hari pengguna berada dalam layanan selama seminggu:
SELECT days_in_service, count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 2.922 sec.
Kami dapat mempercepat dengan pengambilan sampel, sementara hampir tanpa kehilangan keakuratan:
SELECT days_in_service, 10 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 10 WHERE dt > (yesterday() - 7) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 7 rows in set. Elapsed: 0.454 sec.
Harus segera dicatat bahwa pengambilan sampel tidak berdasarkan persentase peristiwa, tetapi oleh persentase pengguna - dan sebagai hasilnya, ini menjadi alat yang sangat kuat.
Atau sama untuk 4 minggu dengan 1/100 sampling - sekitar 1% hasil kurang akurat diperoleh.
SELECT days_in_service, 100 * count() AS uniques FROM ( SELECT uniqUpTo(7)(dt) AS days_in_service FROM agg2 SAMPLE 1 / 100 WHERE dt > (yesterday() - 28) GROUP BY user_id ) GROUP BY days_in_service ORDER BY days_in_service ASC 28 rows in set. Elapsed: 0.287 sec.
Agregasi di sisi lain
Saat digabungkan dengan (dt, user_id), kami tidak kehilangan pengguna, kami tidak ketinggalan informasi tentang interaksinya dengan layanan, tetapi, tentu saja, kami kehilangan metrik tentang objek interaksi tertentu. Tapi Anda tidak bisa kehilangan ini juga - mari kita membangun unit dengan
(dt, video_owner_id, video_id), mengikuti ide yang sama. Kami menyimpan informasi tentang video sebanyak mungkin, kami tidak ketinggalan data tentang interaksi video dengan pengguna, dan kami benar-benar kehilangan informasi tentang pengguna tertentu.
SELECT starts FROM agg3 WHERE (dt = yesterday()) AND (video_id = ...) AND (video_owner_id = ...) 1 rows in set. Elapsed: 0.030 sec
Atau 10 penayangan video teratas kemarin:
SELECT video_id, video_owner_id, watches FROM video_agg_video_d1 WHERE dt = yesterday() ORDER BY watches DESC LIMIT 10 10 rows in set. Elapsed: 0.035 sec.
Akibatnya, kami memiliki skema agregat bentuk:
- agregasi berdasarkan "tanggal, pengguna" di dalam produk;
- agregasi berdasarkan "tanggal, objek interaksi" dalam produk;
- terkadang proyeksi lain muncul.
Azkaban dan TeamCity
Akhirnya, beberapa kata tentang infrastruktur. Pengumpulan agregat kami dimulai pada malam hari, dimulai dengan MENGOPTIMASI pada setiap tabel dengan data mentah untuk memicu penggabungan data yang luar biasa di ReplicatedReplacingMergeTree. Operasi dapat bertahan cukup lama, namun, perlu untuk menghapus mengambil, jika terjadi. Perlu dicatat bahwa sejauh ini saya belum pernah menemukan duplikat, tetapi tidak ada jaminan bahwa mereka tidak akan muncul di masa depan.
Langkah selanjutnya adalah pembuatan agregat. Ini adalah skrip bash tempat terjadi hal-hal berikut:
- pertama kita mendapatkan jumlah pecahan dan beberapa host dari pecahan:
SELECT shard_num, any(host_name) AS host FROM system.clusters GROUP BY shard_num
- lalu skrip dieksekusi secara berurutan untuk setiap shard (clickhouse-client -h $ host) permintaan formulir (untuk agregat oleh pengguna):
INSERT INTO ... SELECT ... FROM ... SAMPLE 1/$shards_count OFFSET 1/$shard_num
Ini tidak sepenuhnya optimal dan dapat menghasilkan banyak interaksi jaringan antara host. Namun, ketika menambahkan pecahan baru, semuanya terus bekerja di luar kotak, lokalitas data untuk unit dipertahankan, jadi kami memutuskan untuk tidak terlalu khawatir tentang hal itu.
Kami memiliki Azkaban sebagai penjadwal tugas. Saya tidak akan mengatakan bahwa ini adalah alat yang sangat nyaman, tetapi ini mengatasi tugasnya dengan sempurna, termasuk ketika harus membangun pipa yang sedikit lebih rumit dan ketika satu skrip perlu menunggu beberapa skrip lainnya selesai.
Total waktu yang dihabiskan untuk mengubah acara yang sekarang ada menjadi agregat adalah 15 menit.
Pengujian
Setiap pagi kami menjalankan tes otomatis yang menjawab pertanyaan tentang data mentah, serta kesiapan dan kualitas agregat: โPeriksa bahwa untuk kemarin tidak ada lebih dari setengah persen lebih sedikit data atau data unik pada data mentah atau agregat dibandingkan dengan hari yang sama seminggu yang lalu. "
Secara teknologi, ini adalah tes unit biasa menggunakan JUnit dan mengimplementasikan driver jdbc untuk ClickHouse. Menjalankan semua tes diluncurkan di TeamCity dan membutuhkan waktu sekitar 30 detik dalam 1 utas, dan jika terjadi kegagalan, kami mendapatkan pemberitahuan VKontakte dari bot TeamCity kami yang luar biasa.
Kesimpulan
Gunakan hanya versi stabil ClickHouse dan rambut Anda akan lembut dan halus. Perlu ditambahkan bahwa
ClickHouse tidak melambat .