Halo, Habr!
Kami menemukan cadangan terakhir buku "
Apache Kafka. Pemrosesan Streaming dan Analisis Data " dan mengirimkannya ke prepress. Selain itu, kami telah menerima kontrak untuk buku "
Kafka Streams in Action " dan mulai menerjemahkannya secara harfiah minggu depan.

Untuk menunjukkan kasus menarik menggunakan perpustakaan Kafka Streams, kami memutuskan untuk menerjemahkan artikel tentang paradigma Event Sourcing di Kafka dari Adam Worski, yang
artikelnya tentang bahasa Scala diterbitkan dua minggu lalu. Lebih menarik lagi bahwa pendapat Adam Worski tidak dapat disangkal: di
sini , misalnya, dikemukakan bahwa paradigma ini jelas tidak cocok untuk Kafka. Semua yang lebih berkesan, kami harap, kami mendapatkan kesan artikel.
Istilah "Sourcing Acara" diterjemahkan sebagai "Logging Peristiwa" baik dalam publikasi
Arsitektur Bersih kami oleh Robert Martin maupun dalam artikel ini. Jika seseorang terkesan dengan terjemahan "acara memompa" - tolong beri tahu saya.
Menciptakan sistem yang menyediakan pendaftaran acara (sumber acara), cepat atau lambat kita dihadapkan dengan masalah kegigihan (kegigihan) - dan di sini kita memiliki beberapa pilihan. Pertama, ada
EventStore , implementasi yang matang yang diperkeras dalam pertempuran. Atau, Anda dapat menggunakan
akka-persistence untuk mengambil keuntungan penuh dari skalabilitas
Cassandra , serta mengandalkan kinerja model aktor. Pilihan lain adalah
database relasional lama yang baik, di mana pendekatan
CRUD
dikombinasikan dengan penggunaan peristiwa dan manfaat maksimal diperas dari transaksi.
Selain peluang (dan, mungkin, banyak lainnya) yang telah muncul berkat beberapa hal yang baru-baru ini dilaksanakan, hari ini menjadi sangat mudah untuk mengatur pendaftaran acara di atas
Kafka . Mari kita lihat caranya.
Apa itu event logging?Ada sejumlah
artikel pengantar yang sangat baik tentang hal ini, jadi saya akan membatasi diri pada pengantar yang paling ringkas. Saat mendaftarkan acara, kami tidak menyimpan status "saat ini" dari entitas yang digunakan dalam sistem kami, tetapi aliran peristiwa yang terkait dengan entitas ini. Setiap
peristiwa adalah
fakta yang menggambarkan perubahan status (sudah!) Itu telah
terjadi dengan objek. Seperti yang Anda tahu, fakta tidak dibahas dan
tidak berubah .
Ketika kita memiliki aliran peristiwa semacam itu, keadaan entitas saat ini dapat diklarifikasi dengan meminimalkan semua peristiwa yang terkait dengannya; namun, perlu diingat bahwa yang sebaliknya tidak mungkin - hanya dengan mempertahankan keadaan "saat ini", kami membuang banyak informasi kronologis yang berharga.
Event logging dapat
hidup berdampingan secara damai dengan cara penyimpanan keadaan yang lebih tradisional. Sebagai aturan, sistem memproses sejumlah jenis entitas (misalnya: pengguna, pesanan, barang, ...) dan sangat mungkin bahwa pendaftaran acara hanya akan berguna untuk beberapa kategori ini. Penting untuk dicatat bahwa di sini kita tidak dihadapkan dengan pilihan "semua atau tidak sama sekali"; ini hanya tentang fitur manajemen negara tambahan dalam aplikasi kita.
Penyimpanan acara di KafkaMasalah pertama yang harus dipecahkan: bagaimana cara menyimpan acara di Kafka? Ada tiga strategi yang mungkin:
- Simpan semua acara untuk semua jenis entitas dalam satu topik (dengan banyak segmen)
- Menurut topik-per-pada-setiap-entitas-jenis, yaitu, kami mengambil semua peristiwa yang terkait dengan pengguna dalam topik yang terpisah, dalam terpisah - semua yang terkait dengan produk, dll.
- Berdasarkan topik-oleh-esensi, yaitu dengan topik terpisah untuk setiap pengguna spesifik dan setiap nama produk
Strategi ketiga (topik-oleh-esensi) praktis tidak praktis. Jika, ketika setiap pengguna baru muncul dalam sistem, ia harus memulai topik yang terpisah, segera jumlah topik akan menjadi tidak terbatas. Agregasi apa pun dalam kasus ini akan sangat sulit, misalnya, akan sulit untuk mengindeks semua pengguna di mesin pencari; Anda tidak hanya harus mengkonsumsi sejumlah besar topik - tetapi masih belum semuanya diketahui sebelumnya.
Oleh karena itu, tetap memilih antara 1 dan 2. Kedua opsi memiliki kelebihan dan kekurangan. Memiliki satu topik membuatnya lebih mudah untuk mendapatkan pandangan
global dari semua acara. Di sisi lain, dengan menyoroti topik untuk setiap jenis entitas, Anda dapat mengatur skala dan segmen aliran masing-masing entitas secara individual. Pilihan salah satu dari dua strategi tergantung pada kasus penggunaan khusus.
Selain itu, Anda dapat menerapkan kedua strategi sekaligus, jika Anda memiliki ruang penyimpanan tambahan: buat topik berdasarkan jenis entitas dari satu topik komprehensif.

Di sisa artikel, kami akan bekerja dengan hanya satu jenis entitas dan dengan satu topik, meskipun materi yang disajikan dapat dengan mudah diekstrapolasi dan diterapkan untuk bekerja dengan banyak topik atau jenis entitas.
(EDIT: seperti yang dicatat oleh
Chris Hunt , ada
artikel yang sangat bagus oleh Martin Kleppman , yang membahas secara rinci bagaimana mendistribusikan acara berdasarkan topik dan segmen).
Operasi penyimpanan paling sederhana dalam paradigma pencatatan peristiwaOperasi paling sederhana, yang logis untuk diharapkan dari toko yang mendukung event logging, adalah membaca status "saat ini" (diminimalkan) dari entitas tertentu. Sebagai aturan, setiap entitas memiliki satu atau yang lain
id
. Karenanya, mengetahui
id
ini, sistem penyimpanan kami harus mengembalikan keadaan objek saat ini.
Kebenaran dalam upaya terakhir adalah log peristiwa: keadaan saat ini selalu dapat disimpulkan dari aliran peristiwa yang terkait dengan entitas tertentu. Untuk ini, mesin basis data akan membutuhkan fungsi murni (tanpa efek samping) yang menerima acara dan keadaan awal, dan mengembalikan keadaan yang diubah:
Event = > State => State
. Di hadapan fungsi seperti itu dan
nilai kondisi awal, kondisi saat ini adalah
konvolusi dari aliran peristiwa (fungsi perubahan kondisi harus
bersih sehingga dapat diterapkan secara bebas berulang kali ke acara yang sama.)
Implementasi yang disederhanakan dari operasi "baca kondisi saat ini" di Kafka mengumpulkan aliran
semua peristiwa dari topik, menyaringnya, hanya menyisakan peristiwa dengan
id
diberikan dan runtuh menggunakan fungsi yang ditentukan. Jika ada banyak peristiwa (dan seiring waktu jumlah peristiwa hanya tumbuh), operasi ini dapat menjadi lambat dan menghabiskan banyak sumber daya. Bahkan jika hasilnya akan di-cache dalam memori dan disimpan pada node layanan, informasi ini masih harus dibuat kembali secara berkala, misalnya, karena kegagalan node atau karena crowding out dari data cache.

Karena itu, diperlukan cara yang lebih rasional. Di sinilah aliran kafka dan repositori negara berguna. Aplikasi Kafka-stream dijalankan pada sekelompok node yang menggunakan topik tertentu bersama-sama. Setiap node diberi serangkaian segmen topik yang dikonsumsi, seperti halnya dengan konsumen Kafka biasa. Namun, aliran kafka menyediakan operasi data tingkat tinggi yang membuatnya lebih mudah untuk membuat aliran yang diperoleh.
Salah satu operasi seperti itu di
aliran kafka adalah konvolusi aliran dalam penyimpanan lokal. Setiap penyimpanan lokal berisi data hanya dari segmen yang dikonsumsi oleh node yang diberikan. Di luar kotak, dua implementasi penyimpanan lokal tersedia:
dalam RAM dan berdasarkan
RocksDB .
Kembali ke topik pendaftaran acara, kami mencatat bahwa adalah mungkin untuk menutup aliran acara di
toko negara dengan memegang "status saat ini" dari setiap entitas dari segmen yang ditugaskan ke node. Jika kita menggunakan implementasi state store berdasarkan RocksDB, maka berapa banyak entitas yang dapat kita lacak pada satu node hanya tergantung pada jumlah ruang disk.
Begini konvolusi peristiwa dalam penyimpanan lokal saat menggunakan Java API (serde berarti "serializer / deserializer"):
KStreamBuilder builder = new KStreamBuilder(); builder.stream(keySerde, valueSerde, "my_entity_events") .groupByKey(keySerde, valueSerde)
Contoh lengkap
pemrosesan pesanan berdasarkan layanan microser tersedia di situs web Confluent.
(EDIT: seperti dicatat oleh
Sergei Egorov dan
Nikita Salnikov di Twitter, untuk sistem dengan event logging, Anda mungkin perlu mengubah pengaturan penyimpanan data default di Kafka sehingga tidak ada batas waktu atau ukuran yang berfungsi, dan juga, secara opsional, , aktifkan kompresi data.)
Lihat status saat iniKami telah membuat repositori keadaan di mana status saat ini dari semua entitas yang berasal dari segmen yang ditugaskan ke node berada, tetapi bagaimana cara meminta repositori ini sekarang? Jika permintaan lokal (yaitu, itu berasal dari node yang sama di mana repositori berada), maka semuanya cukup sederhana:
streams .store("my_entity_store", QueryableStoreTypes.keyValueStore()); .get(entityId);
Tetapi bagaimana jika kita ingin meminta data yang terletak di node lain? Dan bagaimana cara mengetahui apa simpul ini? Di sini, fitur lain yang baru-baru ini diperkenalkan di Kafka sangat berguna:
pertanyaan interaktif . Dengan bantuan mereka, Anda dapat mengakses metadata Kafka dan mencari tahu simpul mana yang memproses segmen topik dengan
id
diberikan (dalam hal ini, alat untuk segmentasi topik secara implisit digunakan):
metadataService .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)
Selanjutnya, Anda perlu mengarahkan permintaan entah bagaimana ke simpul yang benar. Harap dicatat: cara spesifik di mana komunikasi lintas-situs dilaksanakan dan ditangani - apakah itu REST, akka-remote atau lainnya - tidak termasuk dalam area tanggung jawab aliran kafka. Kafka hanya menyediakan akses ke toko negara dan memberikan informasi di mana simpul toko negara terletak untuk
id
diberikan.
Pemulihan BencanaState store terlihat bagus, tetapi apa yang terjadi ketika sebuah simpul gagal? Merekonstruksi toko negara lokal untuk segmen tertentu juga bisa menjadi operasi yang mahal. Ini dapat memicu peningkatan penundaan atau kehilangan permintaan untuk waktu yang lama, karena kafka-stream perlu diseimbangkan kembali (setelah menambah atau menghapus simpul).
Itulah sebabnya, secara default, penyimpanan negara jangka panjang dicatat: yaitu, semua perubahan yang dibuat ke toko juga ditulis ke topik changelog. Topik ini dikompresi (karena untuk setiap
id
kami hanya tertarik pada catatan terakhir, tanpa riwayat perubahan, karena sejarah disimpan dalam acara itu sendiri) - oleh karena itu, sekecil mungkin. Itu sebabnya rekreasi penyimpanan pada node lain dapat terjadi jauh lebih cepat.
Namun, dengan penyeimbangan kembali dalam kasus ini, penundaan masih dimungkinkan. Untuk lebih mengurangi mereka, kafka-stream menyediakan kemampuan untuk menyimpan beberapa
replika cadangan (
num.standby.replicas
) untuk setiap repositori. Replika ini menerapkan semua pembaruan yang diambil dari topik dengan log perubahan saat tersedia, dan siap untuk beralih ke mode toko keadaan utama untuk segmen tertentu segera setelah toko utama saat ini gagal.
KoherensiDengan pengaturan default, Kafka menyediakan setidaknya satu kali pengiriman. Artinya, jika terjadi kegagalan simpul, beberapa pesan dapat dikirim beberapa kali. Misalnya, ada kemungkinan bahwa peristiwa tertentu akan diterapkan dua kali ke toko negara jika sistem macet setelah toko negara berubah ke log, tetapi sebelum offset untuk acara tertentu ini dilakukan. Mungkin ini tidak akan menyebabkan kesulitan: fungsi pembaruan status kami (
Event = > State => State
) secara normal dapat mengatasi situasi seperti itu. Namun, itu mungkin tidak dapat mengatasi: dalam kasus seperti itu, jaminan
pengiriman satu kali yang disediakan oleh Kafka dapat digunakan. Jaminan tersebut hanya berlaku ketika membaca dan menulis topik Kafka, tapi inilah yang kami lakukan di sini:
di latar belakang, semua entri dalam topik Kafka dikurangi menjadi memperbarui log perubahan untuk toko negara dan melakukan penyeimbangan. Semua ini bisa dilakukan
dalam bentuk transaksi .
Oleh karena itu, jika fungsi kami memperbarui keadaan memerlukan ini, kami dapat mengaktifkan semantik pemrosesan aliran “pengiriman satu kali secara ketat” menggunakan opsi konfigurasi tunggal:
processing.guarantee
. Karena ini, kinerja turun, tetapi tidak ada yang sia-sia.
Acara mendengarkanSekarang kita telah membahas dasar-dasarnya - menanyakan "kondisi saat ini" dan memperbaruinya untuk setiap entitas - bagaimana dengan memicu
efek samping ? Pada titik tertentu, ini akan menjadi perlu, misalnya, untuk:
- Mengirim email notifikasi
- Pengindeksan Entitas Mesin Pencari
- Memanggil layanan eksternal melalui REST (atau SOAP, CORBA, dll.)
Semua tugas ini, sampai taraf tertentu, memblokir dan terkait dengan operasi I / O (ini alami untuk efek samping), jadi mungkin bukan ide yang baik untuk mengeksekusinya dalam kerangka logika pembaruan negara: akibatnya, frekuensi kegagalan dalam loop utama dapat meningkat acara, dan dalam hal kinerja akan ada hambatan.
Selain itu, fungsi dengan logika pembaruan keadaan (E
Event = > State => State
) dapat dijalankan beberapa kali (jika terjadi kegagalan atau restart), dan paling sering kami ingin meminimalkan jumlah kasus di mana efek samping untuk peristiwa tertentu dijalankan beberapa kali.
Untungnya, karena kami bekerja dengan topik Kafka, kami memiliki cukup banyak fleksibilitas. Pada tahap flow, di mana state store diperbarui, peristiwa dapat dipancarkan tidak berubah (atau, jika perlu, juga dalam bentuk yang dimodifikasi), dan aliran / topik yang dihasilkan (dalam Kafka konsep-konsep ini setara) dapat dikonsumsi sesuka Anda. Selain itu, dapat dikonsumsi baik sebelum atau setelah tahap pembaruan negara. Akhirnya, kita dapat mengontrol bagaimana kita meluncurkan efek samping: setidaknya sekali atau maksimal sekali. Opsi pertama diberikan jika Anda melakukan offset pada topik-topik yang dikonsumsi hanya setelah semua efek samping berhasil diselesaikan. Sebaliknya, dengan maksimum satu putaran, kami melakukan shift hingga efek samping dipicu.
Ada beberapa opsi untuk memicu efek samping, mereka bergantung pada situasi praktis tertentu. Pertama-tama, Anda dapat menentukan tahap Kafka-stream di mana efek samping untuk setiap peristiwa dipicu sebagai bagian dari fungsi pemrosesan aliran.
Menyiapkan mekanisme semacam itu cukup sederhana, tetapi solusi ini tidak fleksibel ketika Anda harus berurusan dengan percobaan ulang, mengontrol offset, dan bersaing dengan offset untuk banyak acara sekaligus. Dalam kasus yang lebih kompleks seperti itu, mungkin lebih tepat untuk menentukan pemrosesan menggunakan, katakanlah,
kafka reaktif atau mekanisme lain yang mengkonsumsi topik Kafka "secara langsung".
Ada juga kemungkinan bahwa satu peristiwa akan
memicu acara lain - misalnya, acara "pesanan" dapat memicu acara "persiapan untuk pengiriman" dan "pemberitahuan pelanggan". Ini juga dapat diterapkan pada tahap aliran kafka.
Akhirnya, jika kita ingin menyimpan acara atau beberapa data yang diekstraksi dari peristiwa dalam database atau mesin pencari, katakanlah, di ElasticSearch atau PostgreSQL, maka kita dapat menggunakan konektor
Kafka Connect , yang akan memproses bagi kita semua detail terkait dengan konsumsi topik.
Membuat Tampilan dan ProyeksiBiasanya, persyaratan sistem tidak terbatas pada permintaan dan pemrosesan hanya aliran entitas tunggal. Agregasi, kombinasi dari beberapa aliran acara juga harus didukung. Aliran gabungan seperti itu sering disebut sebagai
proyeksi , dan ketika diciutkan, mereka dapat digunakan untuk membuat
representasi data . Apakah mungkin menerapkannya dengan Kafka?

Sekali lagi ya! Ingatlah bahwa pada prinsipnya kita hanya berurusan dengan topik Kafka, tempat acara kita disimpan; oleh karena itu, kami memiliki semua kekuatan dari Konsumen / Produsen Kafka mentah, penggabung aliran-kafka, dan bahkan
KSQL - semua ini berguna bagi kami untuk mendefinisikan proyeksi. Misalnya, menggunakan aliran kafka Anda dapat memfilter aliran, tampilan, grup dengan kunci, agregat di jendela sementara atau sesi, dll. baik di tingkat kode, atau menggunakan KSQL seperti SQL.
Aliran semacam itu dapat disimpan dan disediakan untuk kueri untuk waktu yang lama menggunakan toko negara dan kueri interaktif, seperti yang kami lakukan dengan aliran entitas individual.
Apa selanjutnyaUntuk mencegah aliran peristiwa tanpa batas saat sistem berkembang, opsi kompresi seperti menyimpan
snapshot dari "kondisi saat ini" mungkin berguna. Dengan demikian, kita dapat membatasi diri untuk menyimpan hanya beberapa foto terbaru dan peristiwa yang terjadi setelah pembuatannya.
Meskipun Kafka tidak memiliki dukungan langsung untuk snapshot (dan dalam beberapa sistem lain yang beroperasi berdasarkan prinsip peristiwa perekaman, itu adalah), Anda pasti dapat menambahkan fungsi semacam ini sendiri, menggunakan beberapa mekanisme di atas, seperti stream, konsumen, toko negara, dll. d.
RingkasanMeskipun, pada awalnya, Kafka tidak dirancang dengan mata pada paradigma registrasi acara, pada kenyataannya itu adalah mesin data streaming dengan dukungan untuk
replikasi topik , segmentasi,
repositori negara, dan
streaming API , dan sangat fleksibel pada saat yang sama. Karena itu, di atas Kafka, Anda dapat dengan mudah menerapkan sistem pendaftaran acara. Selain itu, karena dengan latar belakang segala sesuatu yang terjadi, kami akan selalu memiliki topik Kafka, kami akan mendapatkan fleksibilitas tambahan, karena kami dapat bekerja dengan API streaming level tinggi atau konsumen level rendah.