
Hai, habrozhiteli! Buku ini cocok untuk setiap pengembang yang ingin memahami pemrosesan streaming. Memahami pemrograman terdistribusi akan membantu Anda lebih memahami Streaming Kafka dan Kafka. Akan menyenangkan mengetahui kerangka kerja Kafka itu sendiri, tetapi ini tidak perlu: Saya akan memberi tahu Anda semua yang Anda butuhkan. Berkat buku ini, pengembang Kafka yang berpengalaman, seperti pemula, akan belajar cara membuat aplikasi streaming yang menarik menggunakan perpustakaan Kafka Streams. Pengembang Java tingkat menengah dan tinggi yang sudah akrab dengan konsep-konsep seperti serialisasi akan belajar bagaimana menerapkan keterampilan mereka untuk membuat aplikasi Kafka Streams. Kode sumber buku ini ditulis dalam Java 8 dan pada dasarnya menggunakan sintaks ekspresi lambda Java 8, sehingga kemampuan untuk bekerja dengan fungsi lambda (bahkan dalam bahasa pemrograman lain) berguna bagi Anda.
Kutipan. 5.3. Operasi agregasi dan jendela
Di bagian ini, kami beralih ke bagian paling menjanjikan dari Streaming Kafka. Sejauh ini kami telah membahas aspek-aspek Aliran Kafka berikut:
- membuat topologi pemrosesan;
- penggunaan status dalam aplikasi streaming;
- membuat koneksi aliran data;
- perbedaan antara stream peristiwa (KStream) dan aliran pembaruan (KTable).
Dalam contoh berikut, kami akan menggabungkan semua elemen ini. Selain itu, Anda akan diperkenalkan dengan operasi jendela - fitur hebat lainnya dari aplikasi streaming. Contoh pertama kami adalah agregasi sederhana.
5.3.1. Agregasi penjualan saham berdasarkan industri
Agregasi dan pengelompokan adalah alat penting untuk bekerja dengan streaming data. Memeriksa catatan individual saat tersedia sering tidak cukup. Untuk mengekstrak informasi tambahan dari data, pengelompokan dan kombinasi mereka diperlukan.
Dalam contoh ini, Anda harus mencoba setelan pedagang intraday yang perlu melacak volume penjualan saham perusahaan di beberapa industri. Secara khusus, Anda tertarik pada lima perusahaan dengan penjualan saham terbesar di setiap industri.
Untuk agregasi semacam itu, Anda akan memerlukan beberapa langkah berikut untuk menerjemahkan data ke dalam formulir yang diinginkan (secara umum).
- Buat sumber berbasis topik yang menerbitkan informasi perdagangan stok mentah. Kita harus memetakan objek bertipe StockTransaction ke objek bertipe ShareVolume. Faktanya adalah bahwa objek StockTransaction berisi metadata penjualan, dan kami hanya perlu data tentang jumlah saham yang terjual.
- Group ShareVolume data dengan simbol saham. Setelah dikelompokkan berdasarkan simbol, Anda dapat menciutkan data ini ke subtotal penjualan saham. Perlu dicatat bahwa metode KStream.groupBy mengembalikan turunan tipe KGroupedStream. Dan Anda bisa mendapatkan contoh KTable dengan memanggil metode KGroupedStream.reduce nanti.
Apa itu antarmuka KGroupedStream
Metode KStream.groupBy dan KStream.groupByKey mengembalikan instance KGroupedStream. KGroupedStream adalah representasi perantara dari aliran acara setelah dikelompokkan berdasarkan kunci. Sama sekali tidak dimaksudkan untuk bekerja secara langsung dengannya. Sebagai gantinya, KGroupedStream digunakan untuk operasi agregasi, yang hasilnya selalu KTable. Dan karena hasil operasi agregasi adalah KTable dan mereka menggunakan penyimpanan negara, ada kemungkinan bahwa tidak semua pembaruan sebagai hasilnya dikirim lebih jauh ke dalam pipa.
Metode KTable.groupBy mengembalikan KGroupedTable yang serupa - representasi perantara dari aliran pembaruan yang dikelompokkan kembali dengan kunci.
Mari kita istirahat sejenak dan melihat ara. 5.9, yang menunjukkan apa yang telah kita capai. Topologi ini seharusnya sudah tidak asing lagi bagi Anda.
Sekarang mari kita lihat kode untuk topologi ini (dapat ditemukan di file src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java) (Listing 5.2).
Kode yang diberikan berbeda dalam singkat dan sejumlah besar tindakan dilakukan dalam beberapa baris. Dalam parameter pertama metode builder.stream, Anda dapat melihat sesuatu yang baru untuk diri Anda sendiri: nilai tipe AutoOffsetReset yang disebutkan .EARLIEST (ada juga TERBARU), diatur menggunakan metode Consumed.withOffsetResetPolicy. Dengan menggunakan tipe yang disebutkan ini, Anda dapat menentukan strategi untuk mengatur ulang offset untuk masing-masing KStream atau KTable, yang memiliki prioritas di atas parameter untuk mengatur ulang offset dari konfigurasi.
GroupByKey dan GroupBy
Antarmuka KStream memiliki dua metode untuk pengelompokan catatan: GroupByKey dan GroupBy. Keduanya mengembalikan KGroupedTable, jadi Anda mungkin memiliki pertanyaan yang sah: apa perbedaan antara mereka dan kapan harus menggunakan yang mana?
Metode GroupByKey digunakan ketika kunci di KStream sudah tidak kosong. Dan yang paling penting, bendera "membutuhkan partisi ulang" belum pernah ditetapkan.
Metode GroupBy mengasumsikan bahwa Anda mengubah kunci untuk pengelompokan, sehingga bendera re-partisi diatur ke true. Melakukan koneksi, agregasi, dll. Setelah metode GroupBy akan mengarah ke partisi ulang otomatis.
Ringkasan: Anda harus menggunakan GroupByKey daripada GroupBy bila memungkinkan.
Apa yang dilakukan metode mapValues dan groupBy dapat dipahami, jadi lihat metode sum () (dapat ditemukan di file src / main / java / bbejeck / model / ShareVolume.java) (Listing 5.3).
Metode ShareVolume.sum mengembalikan subtotal volume penjualan saham, dan hasil dari seluruh rantai perhitungan adalah objek <String, ShareVolume> KTable. Sekarang Anda mengerti apa peran yang dimainkan KTable. Ketika objek ShareVolume tiba, pembaruan terkini disimpan di KTable yang sesuai. Penting untuk tidak lupa bahwa semua pembaruan tercermin dalam shareVolumeKTable sebelumnya, tetapi tidak semua dikirim lebih lanjut.
Selanjutnya, dengan bantuan KTable ini, kami melakukan agregasi (berdasarkan jumlah saham yang terjual) untuk mendapatkan lima perusahaan dengan penjualan saham tertinggi di setiap industri. Tindakan kami dalam kasus ini akan serupa dengan tindakan selama agregasi pertama.
- Lakukan operasi groupBy lainnya untuk mengelompokkan objek ShareVolume individu berdasarkan industri.
- Lanjutkan untuk meringkas objek ShareVolume. Kali ini, objek agregasi adalah antrian prioritas dengan ukuran tetap. Hanya lima perusahaan dengan jumlah saham terbesar yang dijual disimpan dalam antrian dengan ukuran tetap.
- Tampilkan garis dari paragraf sebelumnya dalam nilai string dan kembalikan lima terlaris dengan jumlah saham berdasarkan industri.
- Tulis hasilnya dalam bentuk string ke topik.
Dalam gbr. 5.10 menunjukkan grafik topologi pergerakan data. Seperti yang Anda lihat, pemrosesan putaran kedua cukup sederhana.
Sekarang, setelah memahami dengan jelas struktur pemrosesan putaran kedua ini, Anda dapat merujuk ke kode sumbernya (Anda akan menemukannya di file src / main / java / bbejeck / chapter_5 / AggregationsAndReducingExample.java) (Listing 5.4).
Ada variabel fixedQueue di penginisialisasi ini. Ini adalah objek khusus - adaptor untuk java.util.TreeSet, yang digunakan untuk melacak hasil tertinggi N dalam mengurangi urutan jumlah saham yang terjual.
Anda telah menjumpai panggilan ke groupBy dan mapValues, jadi kami tidak akan menghentikannya (kami memanggil metode KTable.toStream, karena metode KTable.print sudah tidak digunakan lagi). Tetapi Anda belum melihat versi KTable dari metode agregat (), jadi kami akan meluangkan waktu untuk membahasnya.
Seperti yang Anda ingat, KTable dibedakan oleh fakta bahwa catatan dengan kunci yang sama dianggap pembaruan. KTable menggantikan rekor lama dengan yang baru. Agregasi terjadi dengan cara yang sama: catatan terakhir dengan satu kunci dikumpulkan. Ketika catatan tiba, ditambahkan ke turunan dari kelas FixedSizePriorityQueue menggunakan adder (parameter kedua dalam panggilan ke metode agregat), tetapi jika catatan lain dengan kunci yang sama sudah ada, catatan lama dihapus menggunakan subtracter (parameter ketiga dalam panggilan ke metode agregat).
Ini semua berarti bahwa agregator kami, FixedSizePriorityQueue, tidak menggabungkan semua nilai dengan satu kunci, tetapi menyimpan jumlah bergerak dari jumlah N dari jenis saham terlaris. Setiap entri berisi jumlah total saham yang terjual sejauh ini. KTable akan memberi Anda informasi tentang saham perusahaan mana yang saat ini paling banyak dijual, agregasi bergulir dari setiap pembaruan tidak diperlukan.
Kami belajar melakukan dua hal penting:
- nilai kelompok dalam KTable oleh kunci yang umum bagi mereka;
- Lakukan operasi yang bermanfaat seperti konvolusi dan agregasi pada nilai-nilai yang dikelompokkan ini.
Kemampuan untuk melakukan operasi ini penting untuk memahami arti dari data yang bergerak melalui aplikasi Kafka Streams dan mencari tahu informasi apa yang mereka bawa.
Kami juga telah menyatukan beberapa konsep kunci yang dibahas sebelumnya dalam buku ini. Dalam Bab 4, kami berbicara tentang betapa pentingnya keadaan gagal-aman, lokal untuk aplikasi streaming. Contoh pertama dalam bab ini menunjukkan mengapa negara bagian sangat penting - memungkinkan untuk melacak informasi apa yang sudah Anda lihat. Akses lokal menghindari penundaan jaringan, membuat aplikasi lebih produktif dan tahan kesalahan.
Saat melakukan operasi konvolusi atau agregasi, Anda harus menentukan nama toko negara. Operasi konvolusi dan agregasi mengembalikan instance KTable, dan KTable menggunakan store negara untuk mengganti hasil lama dengan yang baru. Seperti yang Anda lihat, tidak semua pembaruan dikirim lebih jauh ke dalam pipa, dan ini penting, karena operasi agregasi dirancang untuk mendapatkan informasi akhir. Jika negara bagian tidak diterapkan, KTable akan mengirimkan lebih lanjut semua hasil agregasi dan konvolusi.
Selanjutnya, kita melihat pelaksanaan operasi seperti agregasi, dalam periode waktu tertentu - yang disebut operasi windowing.
5.3.2. Operasi Jendela
Pada bagian sebelumnya, kami memperkenalkan konvolusi dan agregasi “bergulir”. Aplikasi ini melakukan konvolusi berkelanjutan penjualan saham dengan agregasi berikutnya dari lima saham terlaris.
Terkadang agregasi dan konvolusi hasil yang berkelanjutan seperti itu diperlukan. Dan kadang-kadang Anda perlu melakukan operasi hanya pada periode waktu tertentu. Misalnya, hitung berapa banyak transaksi bursa yang telah dilakukan dengan saham perusahaan tertentu dalam 10 menit terakhir. Atau berapa banyak pengguna yang mengklik iklan banner baru dalam 15 menit terakhir. Aplikasi dapat melakukan operasi seperti itu beberapa kali, tetapi dengan hasil yang terkait hanya dengan interval waktu yang ditentukan (jendela waktu).
Menghitung Transaksi Bursa oleh Pembeli
Dalam contoh berikut, kami akan terlibat dalam melacak transaksi pertukaran untuk beberapa pedagang - baik organisasi besar atau pemodal tunggal yang pintar.
Ada dua kemungkinan alasan untuk pelacakan ini. Salah satunya adalah kebutuhan untuk mengetahui apa yang pemimpin beli / jual. Jika pemain besar dan investor canggih ini melihat peluang untuk diri mereka sendiri, masuk akal untuk mengikuti strategi mereka. Alasan kedua adalah keinginan untuk melihat kemungkinan tanda-tanda transaksi ilegal menggunakan informasi orang dalam. Untuk melakukan ini, Anda perlu menganalisis korelasi lonjakan besar dalam penjualan dengan siaran pers penting.
Pelacakan tersebut terdiri dari langkah-langkah seperti:
- membuat aliran untuk membaca dari topik transaksi saham;
- pengelompokan catatan yang masuk dengan ID pelanggan dan simbol stok dari stok. Panggilan ke metode groupBy mengembalikan instance kelas KGroupedStream;
- KGroupedStream.windowedBy mengembalikan aliran data yang dibatasi oleh jendela sementara, yang memungkinkan agregasi jendela. Bergantung pada jenis jendela, TimeWindowedKStream atau SessionWindowedKStream dikembalikan;
- Menghitung transaksi untuk operasi agregasi. Aliran data jendela menentukan apakah catatan tertentu diperhitungkan dalam perhitungan ini;
- menulis hasil ke suatu topik atau mengeluarkannya ke konsol selama pengembangan.
Topologi aplikasi ini sederhana, tetapi gambar visualnya tidak ada salahnya. Lihatlah foto itu. 5.11.
Selanjutnya kami akan mempertimbangkan fungsionalitas operasi jendela dan kode yang sesuai.
Jenis jendela
Ada tiga jenis jendela di Kafka Streaming:
- sesi
- Tumbling (jatuh);
- sliding / "jumping" (sliding / melompat).
Yang mana yang akan dipilih tergantung pada persyaratan bisnis. Jendela “Tumbling” dan “jumping” terbatas waktunya, sementara pembatasan sesi dikaitkan dengan tindakan pengguna - durasi sesi ditentukan semata-mata oleh seberapa aktif perilaku pengguna. Hal utama adalah jangan lupa bahwa semua jenis windows didasarkan pada tanggal / waktu perangko catatan, dan bukan pada waktu sistem.
Selanjutnya, kami menerapkan topologi kami dengan masing-masing jenis jendela. Kode lengkap hanya akan diberikan pada contoh pertama, tidak ada yang akan berubah untuk tipe windows lainnya, kecuali untuk tipe operasi windows.
Jendela sesi
Jendela sesi sangat berbeda dari semua jenis jendela lainnya. Mereka tidak dibatasi oleh waktu seperti oleh aktivitas pengguna (atau aktivitas entitas yang ingin Anda lacak). Jendela sesi dibatasi oleh periode tidak aktif.
Gambar 5.12 menggambarkan konsep windows sesi. Sesi yang lebih kecil akan bergabung dengan sesi di sebelah kirinya. Dan sesi di sebelah kanan akan terpisah, karena mengikuti periode tidak aktif yang panjang. Jendela sesi didasarkan pada tindakan pengguna, tetapi menerapkan stempel tanggal / waktu dari catatan untuk menentukan sesi mana milik catatan itu.
Menggunakan Sesi Windows untuk Melacak Transaksi Bursa
Kami akan menggunakan jendela sesi untuk menangkap informasi tentang transaksi pertukaran. Implementasi windows sesi ditunjukkan pada Listing 5.5 (yang dapat ditemukan di src / main / java / bbejeck / chapter_5 / CountingWindowingAndKTableJoinExample.java).
Anda telah memenuhi sebagian besar operasi topologi ini, jadi tidak perlu mempertimbangkannya lagi di sini. Tetapi ada beberapa elemen baru yang akan kita bahas sekarang.
Untuk operasi grup apa pun, beberapa jenis operasi agregasi (agregasi, konvolusi, atau penghitungan) biasanya dilakukan. Anda dapat melakukan agregasi kumulatif dengan total kumulatif, atau agregasi jendela, di mana catatan diperhitungkan dalam rentang waktu tertentu.
Kode dalam Listing 5.5 menghitung jumlah transaksi dalam jendela sesi. Dalam gbr. 5.13 tindakan ini dianalisis langkah demi langkah.
Dengan memanggil windowedBy (SessionWindows.with (twentySeconds) .sampai (lima belas menit)) kami membuat jendela sesi dengan interval idle 20 detik dan interval retensi 15 menit. Interval tidak aktif selama 20 detik berarti bahwa aplikasi akan menyertakan catatan apa pun yang tiba dalam waktu 20 detik dari akhir atau awal sesi saat ini di sesi saat ini (aktif).
Selanjutnya, kami menunjukkan operasi agregasi mana yang harus dilakukan di jendela sesi - dalam hal ini, hitung. Jika catatan yang masuk berada di luar interval tidak aktif (di kedua sisi cap tanggal / waktu), aplikasi membuat sesi baru. Interval penyimpanan berarti mempertahankan sesi untuk waktu tertentu dan memungkinkan data terlambat yang melampaui periode tidak aktif sesi, tetapi masih dapat dilampirkan. Selain itu, awal dan akhir sesi baru yang dihasilkan dari gabungan terkait dengan cap tanggal / waktu paling awal dan terbaru.
Mari kita lihat beberapa entri dari metode penghitungan untuk melihat bagaimana sesi bekerja (Tabel 5.1).
Setelah menerima catatan, kami mencari sesi yang sudah ada dengan kunci yang sama, waktu akhir kurang dari cap tanggal / waktu saat ini - interval tidak aktif dan waktu mulai lebih lama dari cap tanggal / waktu saat ini + interval tidak aktif. Dengan mengingat hal ini, empat catatan dari tabel. 5.1 bergabung menjadi satu sesi sebagai berikut.
1. Rekam 1 lebih dulu, sehingga waktu mulai sama dengan waktu akhir dan 00:00:00.
2. Selanjutnya datang rekaman 2, dan kami mencari sesi yang berakhir tidak lebih awal dari 23:59:55 dan mulai paling lambat 00:00:35. Temukan catatan 1 dan gabungkan sesi 1 dan 2. Ambil waktu mulai sesi 1 (sebelumnya) dan waktu akhir sesi 2 (nanti), sehingga sesi baru kami mulai pukul 00:00:00 dan berakhir pada pukul 00:00:15.
3. Rekam 3 tiba, kami mencari sesi antara 00:00:30 dan 00:01:10 dan tidak menemukannya. Tambahkan sesi kedua untuk kunci 123-345-654, FFBE, mulai dan berakhir pada 00:00:50.
4. Rekam 4 tiba, dan kami mencari sesi antara 23:59:45 dan 00:00:25. Kali ini ada dua sesi - 1 dan 2. Ketiga sesi digabungkan menjadi satu, dengan waktu mulai 00:00:00 dan waktu akhir 00:00:15.
Dari apa yang dikatakan di bagian ini, ada baiknya mengingat nuansa penting berikut:
- Sesi bukan jendela ukuran tetap. Durasi sesi ditentukan oleh aktivitas dalam periode waktu tertentu;
- Prangko tanggal / waktu dalam data menentukan apakah suatu peristiwa termasuk dalam sesi yang ada atau dalam periode tidak aktif.
Selanjutnya kita akan membahas jenis windows berikut - jendela “jungkir balik”.
Tumbling windows
Jendela "Tumbling" menangkap peristiwa yang termasuk dalam periode waktu tertentu. Bayangkan Anda perlu menangkap semua transaksi pertukaran perusahaan setiap 20 detik, sehingga Anda mengumpulkan semua peristiwa untuk periode waktu ini. Pada akhir interval 20 detik, jendela "jatuh" dan beralih ke interval pengamatan baru 20 detik. Gambar 5.14 menggambarkan situasi ini.
Seperti yang Anda lihat, semua peristiwa yang diterima selama 20 detik terakhir termasuk dalam jendela. Pada akhir periode waktu ini, jendela baru dibuat.
Listing 5.6 menunjukkan kode yang menunjukkan menggunakan jendela jatuh untuk menangkap transaksi pertukaran setiap 20 detik (Anda dapat menemukannya di src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java).
Berkat perubahan kecil pada panggilan ke metode TimeWindows.of ini, Anda dapat menggunakan jendela tumbling. Dalam contoh ini, tidak ada panggilan ke metode till (), akibatnya interval penyimpanan default 24 jam akan digunakan.
Akhirnya, saatnya untuk beralih ke opsi jendela terakhir - melompat jendela.
Jendela geser ("melompat")
Jendela geser / "melompat" mirip dengan "jatuh", tetapi dengan sedikit perbedaan. Jendela geser tidak menunggu akhir interval waktu sebelum membuat jendela baru untuk menangani peristiwa terbaru. Mereka memulai perhitungan baru setelah interval menunggu lebih pendek dari durasi jendela.
Untuk mengilustrasikan perbedaan antara jendela “jungkir balik” dan “lompat”, mari kita kembali ke contoh dengan perhitungan transaksi pertukaran. Tujuan kami, seperti sebelumnya, adalah untuk menghitung jumlah transaksi, tetapi kami tidak ingin menunggu sepanjang waktu sebelum memperbarui penghitung. Sebagai gantinya, kami akan memperbarui penghitung pada interval yang lebih pendek. Misalnya, kami akan terus menghitung jumlah transaksi setiap 20 detik, tetapi untuk memperbarui penghitung setiap 5 detik, seperti yang ditunjukkan pada Gambar. 5.15. Pada saat yang sama, kami memiliki tiga jendela hasil dengan data yang tumpang tindih.
Listing 5.7 memperlihatkan kode untuk menentukan jendela geser (dapat ditemukan di src / main / java / bbejeck / chapter_5 / CountingWindowingAndKtableJoinExample.java).
«» «» advanceBy(). 15 .
, . , , :
, KTable KStream .
5.3.3. KStream KTable
4 KStream. KTable KStream. . KStream — , KTable — , KTable.
. , .
- KTable KStream , , .
- KTable, . KTable .
- .
, .
KTable KStream
KTable KStream .
- KTable.toStream().
- KStream.map , Windowed TransactionSummary.
( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.8).
KStream.map, KStream .
, KTable .
KTable
, KTable ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.9).
, Serde , Serde. EARLIEST .
— .
. , ( src/main/java/bbejeck/chapter_5/CountingWindowingAndKtableJoinExample.java) ( 5.10).
leftJoin . 4, JoinWindow , KStream-KTable KTable . : KTable, . : KTable KStream .
KStream.
5.3.4. GlobalKTable
, . 4 KStream, — KStream KTable. . , Kafka Streams . , , ( 4, « » 4.2.4).
— , ; . , , .
, , , . Kafka Streams GlobalKTable.
GlobalKTable , . , , . GlobalKTable . .
KStream GlobalKTable
5.3.2 . :
{customerId='074-09-3705', stockTicker='GUTM'}, 17 {customerId='037-34-5184', stockTicker='CORK'}, 16
Meskipun hasil ini konsisten dengan tujuan, akan lebih mudah jika nama klien dan nama lengkap perusahaan juga ditampilkan. Untuk menambahkan nama pelanggan dan nama perusahaan, Anda dapat melakukan koneksi normal, tetapi Anda harus melakukan dua pemetaan kunci dan pemartisian berulang. Dengan GlobalKTable Anda dapat menghindari biaya operasi semacam itu.
Untuk melakukan ini, kita akan menggunakan objek countStream dari Listing 5.11 (kode yang sesuai dapat ditemukan di file src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java), menghubungkannya dengan dua objek GlobalKTable.
Kami sudah membahas ini sebelumnya, jadi saya tidak akan mengulanginya. Tapi saya perhatikan bahwa kode di toStream (). Fungsi peta diabstraksi menjadi objek fungsi demi keterbacaan alih-alih ekspresi lambda yang tertanam.
Langkah selanjutnya adalah mendeklarasikan dua instance GlobalKTable (kode yang ditampilkan dapat ditemukan di src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java) (Listing 5.12).
Perhatikan bahwa nama topik dideskripsikan menggunakan tipe enumerasi.
Sekarang kita telah menyiapkan semua komponen, masih menulis kode untuk koneksi (yang dapat ditemukan di file src / main / java / bbejeck / chapter_5 / GlobalKTableExample.java) (Listing 5.13).
Meskipun ada dua senyawa dalam kode ini, mereka diatur dalam sebuah rantai, karena tidak ada hasil yang digunakan secara terpisah. Hasilnya ditampilkan di akhir seluruh operasi.
Ketika Anda memulai operasi koneksi di atas, Anda akan mendapatkan hasil berikut:
{customer='Barney, Smith' company="Exxon", transactions= 17}
Esensinya tidak berubah, tetapi hasil ini terlihat lebih jelas.
Menghitung Bab 4, Anda telah melihat beberapa jenis koneksi sedang beraksi. Mereka tercantum dalam tabel. 5.2. Tabel ini mencerminkan konektivitas yang relevan dengan versi 1.0.0 dari Stream Kafka; sesuatu akan berubah di rilis mendatang.
Sebagai kesimpulan, saya akan mengingatkan Anda hal utama: Anda dapat menghubungkan aliran acara (KStream) dan memperbarui aliran (KTable) menggunakan status lokal. Selain itu, jika ukuran data referensi tidak terlalu besar, Anda bisa menggunakan objek GlobalKTable. GlobalKTable mereplikasi semua bagian ke masing-masing node aplikasi Kafka Streams, sehingga memastikan ketersediaan semua data terlepas dari bagian mana yang terkait dengan kunci tersebut.
Selanjutnya kita akan melihat kemungkinan Streaming Kafka, berkat yang Anda dapat mengamati perubahan negara tanpa mengkonsumsi data dari topik Kafka.
5.3.5. Status Permintaan
Kami telah melakukan beberapa operasi yang melibatkan negara dan selalu menampilkan hasilnya ke konsol (untuk tujuan pengembangan) atau menuliskannya ke topik (untuk operasi industri). Saat menulis hasil ke suatu topik, Anda harus menggunakan konsumen Kafka untuk melihatnya.
Membaca data dari topik ini dapat dianggap semacam pandangan terwujud. Untuk tugas kami, kami dapat menggunakan definisi tampilan terwujud dari Wikipedia: “... objek basis data fisik yang berisi hasil kueri. Misalnya, itu bisa berupa salinan lokal dari data yang dihapus, atau bagian dari baris dan / atau kolom dari suatu tabel atau hasil gabungan, atau tabel pivot yang diperoleh dengan menggunakan agregasi ”(https://en.wikipedia.org/wiki/Materialized_view).
Kafka Streaming juga memungkinkan Anda untuk melakukan pertanyaan interaktif di toko-toko negara, yang memungkinkan Anda untuk langsung membaca pandangan terwujud ini. Penting untuk dicatat bahwa permintaan ke toko negara adalah dalam sifat operasi read-only. Berkat ini, Anda tidak dapat takut untuk secara tidak sengaja membuat status aplikasi tidak konsisten selama pemrosesan data.
Kemampuan untuk secara langsung menanyakan toko negara adalah penting. Ini berarti Anda dapat membuat aplikasi - dasbor tanpa harus menerima data dari konsumen Kafka terlebih dahulu. Ini meningkatkan efisiensi aplikasi, karena fakta bahwa tidak diperlukan untuk merekam data lagi:
- karena lokasi data, mereka dapat diakses dengan cepat;
- Duplikasi data dikecualikan, karena mereka tidak ditulis ke penyimpanan eksternal.
Hal utama yang saya ingin Anda ingat: Anda dapat langsung menjalankan permintaan negara dari aplikasi. Anda tidak bisa melebih-lebihkan peluang yang diberikan ini kepada Anda. Alih-alih mengkonsumsi data dari Kafka dan menyimpan catatan dalam database untuk aplikasi, Anda dapat meminta toko negara dengan hasil yang sama. Permintaan langsung ke toko negara berarti lebih sedikit kode (tidak ada konsumen) dan lebih sedikit perangkat lunak (tidak perlu tabel database untuk menyimpan hasilnya).
Kami telah membahas sejumlah besar informasi dalam bab ini, jadi kami akan menghentikan sementara diskusi kami tentang pertanyaan interaktif ke toko-toko resmi. Tapi jangan khawatir: di Bab 9 kita akan membuat aplikasi sederhana - panel informasi dengan pertanyaan interaktif. Untuk mendemonstrasikan kueri interaktif dan kemungkinan menambahkannya ke aplikasi Kafka Streams, ia akan menggunakan beberapa contoh dari bab ini dan bab-bab sebelumnya.
Ringkasan
- Objek KStream mewakili aliran peristiwa yang sebanding dengan sisipan basis data. Objek KTable mewakili aliran pembaruan, mereka lebih mirip dengan pembaruan dalam database. Ukuran objek KTable tidak tumbuh, catatan lama diganti dengan yang baru.
- Objek KTable diperlukan untuk operasi agregasi.
- Menggunakan operasi jendela, Anda dapat memecah data agregat menjadi keranjang waktu.
- Berkat objek GlobalKTable, Anda dapat mengakses data referensi di mana saja di aplikasi, apa pun bagiannya.
- Koneksi antara objek KStream, KTable dan GlobalKTable dimungkinkan.
Sejauh ini, kami telah fokus pada pembuatan aplikasi Kafka Streaming menggunakan KStream DSL tingkat tinggi. Meskipun pendekatan tingkat tinggi memungkinkan Anda untuk membuat program yang rapi dan ringkas, penggunaannya adalah kompromi yang pasti. Bekerja dengan DSL KStream berarti meningkatkan keringkasan kode dengan mengurangi tingkat kontrol. Pada bab selanjutnya, kita akan melihat API tingkat rendah dari node handler dan mencoba tradeoff lainnya. Program akan menjadi lebih lama daripada sebelumnya, tetapi kita akan memiliki kesempatan untuk membuat hampir semua simpul pemrosesan yang mungkin kita butuhkan.
→ Detail lebih lanjut tentang buku ini dapat ditemukan di
situs web penerbit→ Untuk Khabrozhiteley, diskon 25% untuk kupon -
Kafka Streaming→ Setelah pembayaran versi kertas buku, sebuah buku elektronik dikirim melalui email.