
Halo, Habr!
Saya bekerja di tim Tinkoff, yang sedang mengembangkan pusat notifikasi sendiri. Sebagian besar, saya mengembangkan di Jawa menggunakan boot Spring dan menyelesaikan berbagai masalah teknis yang muncul dalam proyek.
Sebagian besar layanan microsoft kami berinteraksi secara asinkron dengan satu sama lain melalui perantara pesan. Sebelumnya, kami menggunakan IBM MQ sebagai broker, yang tidak lagi menangani beban, tetapi pada saat yang sama memiliki jaminan pengiriman yang tinggi.
Sebagai pengganti, kami ditawari Apache Kafka, yang memiliki skalabilitas tinggi, tetapi, sayangnya, membutuhkan pendekatan konfigurasi yang hampir terpisah untuk skenario yang berbeda. Selain itu, setidaknya mekanisme pengiriman sekali, yang bekerja di Kafka secara default, tidak memungkinkan mempertahankan tingkat konsistensi yang diperlukan di luar kotak. Selanjutnya, saya akan membagikan pengalaman kami dalam mengonfigurasi Kafka, khususnya, saya akan memberi tahu Anda cara mengkonfigurasi dan hidup dengan tepat sekali pengiriman.
Pengiriman terjamin dan banyak lagi
Parameter yang akan dibahas nanti akan membantu mencegah sejumlah masalah dengan pengaturan koneksi default. Tapi pertama-tama, saya ingin memperhatikan satu parameter yang akan memfasilitasi kemungkinan debug.
Client.id untuk Produser dan Konsumen akan membantu dalam hal ini. Sekilas, Anda dapat menggunakan nama aplikasi sebagai nilai, dan dalam kebanyakan kasus ini akan berfungsi. Meskipun situasi ketika beberapa Konsumen digunakan dalam aplikasi dan Anda memberi mereka client.id yang sama mengarah ke peringatan berikut:
org.apache.kafka.common.utils.AppInfoParser — Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=kafka.test-0
Jika Anda ingin menggunakan JMX dalam aplikasi dengan Kafka, maka ini bisa menjadi masalah. Untuk kasus ini, yang terbaik adalah menggunakan kombinasi nama aplikasi dan, misalnya, nama topik, sebagai nilai client.id. Hasil dari konfigurasi kami dapat dilihat pada output dari perintah kafka-consumer-groups dari utilitas dari Confluent:

Sekarang kita akan menganalisis skenario pengiriman pesan yang dijamin. Kafka Produser memiliki parameter acks yang memungkinkan Anda untuk mengonfigurasi setelah berapa banyak yang mengakui pemimpin gugus perlu mempertimbangkan pesan berhasil direkam. Parameter ini dapat mengambil nilai-nilai berikut:
- 0 - mengakui tidak akan dipertimbangkan.
- 1 - parameter default, hanya diperlukan 1 replika.
- −1 - diperlukan pengakuan dari semua replika yang disinkronkan ( konfigurasi kluster min.insync.replicas ).
Dapat dilihat dari nilai-nilai di atas bahwa acks sama dengan −1 memberikan jaminan terkuat bahwa pesan tidak akan hilang.
Seperti kita ketahui, sistem terdistribusi tidak dapat diandalkan. Untuk melindungi dari kerusakan sementara, Produser Kafka menyediakan parameter coba lagi yang memungkinkan Anda untuk mengatur jumlah upaya coba lagi selama delivery.timeout.ms . Karena parameter coba lagi default ke Integer.MAX_VALUE (2147483647), jumlah pengiriman ulang pesan dapat disesuaikan dengan mengubah hanya delivery.timeout.ms.
Bergerak menuju tepat sekali pengiriman
Pengaturan ini memungkinkan Produser kami untuk mengirimkan pesan dengan jaminan tinggi. Sekarang mari kita bicara tentang bagaimana menjamin rekaman hanya satu salinan pesan dalam topik Kafka? Dalam kasus paling sederhana, untuk melakukan ini pada Produser, atur parameter enable.idempotence menjadi true. Idempotency menjamin perekaman hanya satu pesan di partisi tertentu dari satu topik. Prasyarat untuk mengaktifkan idempotency adalah acks = all, coba lagi> 0, max.in.flight.requests.per.connection ≤ 5 . Jika parameter ini tidak ditetapkan oleh pengembang, maka nilai-nilai di atas akan secara otomatis ditetapkan.
Ketika idempotensi diatur, perlu untuk memastikan bahwa pesan yang sama jatuh ke partisi yang sama setiap waktu. Ini dapat dilakukan dengan mengkonfigurasi kunci dan parameter partisi.class pada Produser. Mari kita mulai dengan kuncinya. Untuk setiap pengiriman, harus sama. Ini mudah dicapai menggunakan pengidentifikasi bisnis apa pun dari pesan aslinya. Parameter partisier.class memiliki nilai default dari DefaultPartitioner . Dengan strategi partisi ini, perilaku default adalah sebagai berikut:
- Jika partisi tersebut ditentukan secara eksplisit saat mengirim pesan, maka kami menggunakannya.
- Jika partisi tidak ditentukan, tetapi kunci ditentukan, pilih partisi dengan hash dari tombol.
- Jika partisi dan kunci tidak ditentukan, pilih partisi pada gilirannya (round-robin).
Selain itu, menggunakan kunci dan pengiriman idempoten dengan parameter max.in.flight.requests.per.connection = 1 memberi Anda pemrosesan pesan yang teratur pada Konsumen. Secara terpisah, perlu diingat bahwa jika kontrol akses dikonfigurasi pada cluster Anda, maka Anda akan memerlukan hak untuk menulis idempoten ke topik.
Jika Anda tiba-tiba tidak memiliki kemampuan pengiriman idempoten dengan kunci atau logika di sisi Produser membutuhkan pelestarian konsistensi data antara partisi yang berbeda, maka transaksi akan datang untuk menyelamatkan. Selain itu, menggunakan transaksi berantai, Anda dapat menyinkronkan catatan di Kafka secara kondisional, misalnya, dengan catatan di database. Untuk memungkinkan pengiriman transaksional ke Produser, perlu memiliki idempotensi, dan secara opsional mengatur transactional.id . Jika kontrol akses dikonfigurasi pada cluster Kafka Anda, maka untuk perekaman transaksional, serta idempoten, Anda akan memerlukan izin menulis, yang dapat diberikan oleh mask menggunakan nilai yang disimpan di transactional.id.
Secara formal, Anda dapat menggunakan string apa pun, misalnya, nama aplikasi, sebagai pengidentifikasi transaksi. Tetapi jika Anda menjalankan beberapa instance dari aplikasi yang sama dengan transactional.id yang sama, maka instance yang diluncurkan pertama akan dihentikan dengan kesalahan, karena Kafka akan menganggapnya sebagai proses zombie.
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Untuk mengatasi masalah ini, kami menambahkan akhiran ke nama aplikasi dalam bentuk nama host, yang diperoleh dari variabel lingkungan.
Produser dikonfigurasi, tetapi transaksi pada Kafka hanya mengontrol ruang lingkup pesan. Terlepas dari status transaksi, pesan langsung masuk ke dalam topik, tetapi memiliki atribut sistem tambahan.
Untuk mencegah agar pesan tersebut tidak dibaca oleh Konsumen sebelumnya, ia perlu mengatur parameter isolation.level menjadi read_committed. Konsumen tersebut akan dapat membaca pesan non-transaksional seperti sebelumnya, dan pesan transaksional hanya setelah komit.
Jika Anda menginstal semua pengaturan yang tercantum di atas, maka Anda mengkonfigurasi pengiriman tepat satu kali. Selamat!
Namun ada satu lagi nuansa. Transactional.id, yang kami konfigurasikan di atas, sebenarnya adalah awalan transaksi. Pada manajer transaksi, nomor seri ditambahkan ke dalamnya. Identifier yang diterima dikeluarkan pada transactional.id.expiration.ms , yang dikonfigurasi pada cluster Kafka dan memiliki nilai default "7 hari". Jika selama ini aplikasi tidak menerima pesan apa pun, maka ketika Anda mencoba mengirim transaksi berikutnya, Anda akan menerima InvalidPidMappingException . Setelah itu, koordinator transaksi akan mengeluarkan nomor urut baru untuk transaksi selanjutnya. Namun, pesan tersebut dapat hilang jika InvalidPidMappingException tidak diproses dengan benar.
Alih-alih total
Seperti yang Anda lihat, mengirim pesan ke Kafka saja tidak cukup. Anda perlu memilih kombinasi parameter dan bersiap untuk membuat perubahan cepat. Dalam artikel ini saya mencoba untuk menunjukkan pengaturan pengiriman tepat satu kali secara rinci dan menjelaskan beberapa masalah konfigurasi client.id dan transactional.id yang kami temui. Ringkasan pengaturan Produser dan Konsumen dirangkum di bawah ini.
Produsen:
- acks = all
- coba lagi> 0
- enable.idempotence = true
- max.in.flight.requests.per.connection ≤ 5 (1 - untuk pengiriman tertib)
- transactional.id = $ {application-name} - $ {hostname}
Konsumen:
- isolation.level = read_committed
Untuk meminimalkan kesalahan dalam aplikasi masa depan, kami membuat wrapper kami di atas konfigurasi pegas, di mana nilai untuk beberapa parameter yang tercantum sudah ditetapkan.
Dan inilah beberapa bahan untuk studi independen: