Ada banyak cara untuk memproses pesan dari sistem Pub-Sub: menggunakan layanan terpisah, mengisolasi proses yang terisolasi, mengatur kumpulan proses / aliran, IPC kompleks, Poll-over-Http dan banyak lainnya. Hari ini saya ingin berbicara tentang cara menggunakan Pub-Sub melalui HTTP dan tentang layanan saya yang ditulis khusus untuk ini.
Dalam beberapa kasus, menggunakan backend layanan HTTP yang sudah jadi adalah solusi ideal untuk memproses antrian pesan:
- Seimbang di luar kotak. Biasanya, backend sudah di belakang penyeimbang dan memiliki infrastruktur siap-untuk-memuat, yang sangat menyederhanakan pekerjaan dengan pesan.
- Menggunakan pengontrol REST reguler (sumber daya HTTP apa pun). Mengkonsumsi pesan HTTP meminimalkan biaya penerapan compumer untuk berbagai bahasa jika backend tercampur.
- Penyederhanaan penggunaan kait Web dari layanan lain. Sekarang hampir setiap layanan (Jira, Gitlab, Mattermost, Slack ...) entah bagaimana mendukung kait Web untuk berinteraksi dengan dunia luar. Anda dapat membuat hidup lebih mudah jika Anda mengajarkan antrean untuk melakukan fungsi-fungsi pengirim HTTP.
Pendekatan ini juga memiliki kelemahan:
- Anda bisa melupakan ringannya solusinya. HTTP adalah protokol yang berat, dan penggunaan kerangka kerja di sisi konsumen akan secara instan meningkatkan latensi dan pemuatan.
- Kami kehilangan kekuatan dari pendekatan Polling, mendapatkan kelemahan dari Push.
- Memproses pesan dengan instance layanan yang sama yang memproses klien dapat memengaruhi responsif. Ini tidak signifikan, karena diperlakukan dengan penyeimbangan dan isolasi.
Saya menerapkan ide itu sebagai layanan Antrian-Over-Http, yang akan dibahas nanti. Proyek ini ditulis dalam Kotlin menggunakan Spring Boot 2.1. Sebagai broker, hanya Apache Kafka yang saat ini tersedia.
Lebih lanjut dalam artikel tersebut, diasumsikan bahwa pembaca telah mengenal Kafka dan mengetahui tentang komit (komit) dan offset (offset) pesan, prinsip-prinsip grup (grup) dan konsumen (konsumen), dan juga memahami bagaimana partisi (partisi) berbeda dari topik (topik) . Jika ada celah, saya sarankan Anda untuk membaca bagian dokumentasi Kafka ini sebelum melanjutkan.Isi
Ulasan
Queue-Over-Http adalah layanan yang bertindak sebagai perantara antara broker pesan dan pengguna HTTP akhir (layanan ini memudahkan implementasi dukungan untuk mengirim pesan ke konsumen dengan cara lain, misalnya, dalam berbagai * RPC). Saat ini, hanya berlangganan, berhenti berlangganan, dan melihat daftar konsumen yang tersedia. Mengirim pesan ke broker (produksi) melalui HTTP belum dilaksanakan karena ketidakmampuan untuk menjamin urutan pesan tanpa dukungan khusus dari produsen.
Tokoh utama dari layanan ini adalah konsumen, yang dapat berlangganan partisi tertentu atau hanya untuk topik (pola topik didukung). Dalam kasus pertama, keseimbangan otomatis partisi dimatikan. Setelah berlangganan, sumber daya HTTP yang ditentukan mulai menerima pesan dari partisi Kafka yang ditetapkan. Secara arsitektur, setiap pelanggan dikaitkan dengan klien Kafka Java asli.
cerita menghibur tentang KafkaConsumerKafka memiliki klien Java yang luar biasa yang dapat melakukan banyak hal. Saya menggunakannya dalam adaptor antrian untuk menerima pesan dari broker dan kemudian mengirimkannya ke antrian layanan lokal. Perlu disebutkan bahwa klien bekerja secara eksklusif dalam konteks utas tunggal.
Gagasan adaptornya sederhana. Kami mulai dalam satu utas, kami menulis penjadwal paling sederhana dari klien asli, dengan fokus pada pengurangan latensi. Yaitu, kami menulis sesuatu yang serupa:
while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) {
Tampaknya semuanya indah, latensi minimal bahkan dengan puluhan konsumen. Dalam praktiknya, ternyata
KafkaConsumer
untuk mode operasi ini dan memberikan tingkat alokasi sekitar 1,5 MB / s dalam waktu idle. Dengan 100 kurir, tingkat alokasi mencapai 150 MB / s dan membuat GC sering memikirkan aplikasi. Tentu saja, semua sampah ini ada di area muda, GC cukup mampu menangani ini, tetapi tetap saja, solusinya tidak sempurna.
Jelas, Anda harus pergi cara khas untuk
KafkaConsumer
dan sekarang saya menempatkan setiap pelanggan di aliran saya. Ini memberikan overhead untuk memori dan penjadwalan, tetapi tidak ada cara lain.
Saya menulis ulang kode dari atas, menghapus lingkaran dalam dan mengubah
Duration.ZERO
ke
Duration.ofMillis(100)
. Ternyata dengan baik, tingkat alokasi turun menjadi 80-150 KB / s yang dapat diterima per konsumen. Namun, Polling dengan batas waktu 100ms menunda seluruh antrian komit ke 100ms yang sama, dan ini tidak dapat diterima banyak.
Dalam proses mencari solusi untuk masalah ini, saya ingat
KafkaConsumer::wakeup
, yang melempar
WakeupException
dan mengganggu operasi pemblokiran apa pun pada konsumen. Dengan metode ini, jalur menuju latensi rendah mudah: ketika permintaan baru untuk komit tiba, kami memasukkannya ke dalam antrian, dan pada konsumen asli yang kami sebut
wakeup
. Dalam siklus kerja, tangkap
WakeupException
dan lakukan apa yang telah terakumulasi. Untuk transfer kontrol dengan bantuan pengecualian, Anda harus segera memberikannya di tangan Anda, tetapi karena tidak ada yang lain ...
Ternyata opsi ini jauh dari sempurna, karena setiap operasi pada konsumen asli sekarang melempar
WakeupException
, termasuk komit itu sendiri. Memproses situasi ini akan mengacaukan kode dengan bendera yang memungkinkan
wakeup
dilakukan.
Saya sampai pada kesimpulan bahwa akan lebih baik untuk memodifikasi metode
KafkaConsumer::poll
sehingga dapat diinterupsi secara normal, sesuai dengan flag tambahan. Akibatnya,
Frankenstein lahir dari refleksi, yang persis menyalin metode polling asli, menambahkan jalan keluar dari lingkaran oleh bendera. Bendera ini diatur oleh metode interruptPoll terpisah, yang, selain itu, memanggil wakeup pada pemilih klien untuk melepaskan kunci utas pada operasi I / O.
Setelah menerapkan klien dengan cara ini, saya mendapatkan kecepatan reaksi dari saat permintaan untuk komit tiba hingga pemrosesan hingga 100 mikrodetik, dan latensi yang sangat baik untuk mengambil pesan dari broker, yang cukup baik.
Setiap partisi diwakili oleh antrian lokal yang terpisah, di mana adaptor menulis pesan dari broker. Pekerja mengambil pesan darinya dan mengirimkannya untuk dieksekusi, yaitu untuk dikirim melalui HTTP.
Layanan ini mendukung pemrosesan pesan batch untuk meningkatkan throughput. Saat berlangganan, Anda dapat menentukan
concurrencyFactor
setiap topik (berlaku untuk setiap partisi yang ditugaskan secara independen). Misalnya,
concurrencyFactor=1000
berarti bahwa 1000 pesan dalam bentuk permintaan HTTP dapat dikirim ke konsumen secara bersamaan. Segera setelah semua pesan dari paket tersebut dikerjakan dengan jelas oleh konsumen, layanan memutuskan komit berikutnya dari offset pesan terakhir di Kafka. Oleh karena itu nilai kedua
concurrencyFactor
adalah jumlah maksimum pesan yang diproses oleh konsumen jika terjadi kecelakaan Kafka atau Queue-Over-Http.
Untuk mengurangi penundaan, antrian
loadFactor = concurrencyFactor * 2
, yang memungkinkan Anda membaca pesan dua kali lebih banyak dari broker yang dapat dikirim. Karena autocommit dinonaktifkan pada klien asli, skema seperti itu tidak melanggar jaminan At-Least-Once.
Nilai
concurrencyFactor
tinggi meningkatkan throughput antrian dengan mengurangi jumlah komit yang memakan waktu hingga 10 ms dalam kasus terburuk. Pada saat yang sama, beban pada konsumen meningkat.
Urutan pengiriman pesan dalam bundel tidak dijamin, tetapi dapat dicapai dengan mengatur
concurrencyFactor=1
.
Berkomitmen
Komit adalah bagian penting dari layanan. Ketika paket data berikutnya siap, offset dari pesan terakhir dari paket tersebut langsung dikomit ke Kafka, dan hanya setelah komit yang berhasil barulah paket berikutnya tersedia untuk diproses. Seringkali ini tidak cukup dan komitmen otomatis diperlukan. Untuk melakukan ini, ada parameter
autoCommitPeriodMs
, yang tidak ada hubungannya dengan periode autocommit klasik untuk klien asli yang melakukan pesan terakhir yang dibaca dari partisi. Bayangkan
concurrencyFactor=10
. Layanan telah mengirim semua 10 pesan dan menunggu masing-masing untuk siap. Pemrosesan pesan 3 selesai terlebih dahulu, lalu pesan 1, dan kemudian pesan 10. Pada titik ini, saatnya untuk komit otomatis. Adalah penting untuk tidak melanggar semantik At-Least-Once. Oleh karena itu, Anda hanya dapat melakukan pesan pertama, yaitu, offset 2, karena hanya berhasil diproses pada saat itu. Lebih lanjut, hingga autocommit berikutnya, pesan 2, 5, 6, 4, dan 8. diproses. Sekarang Anda hanya perlu komit untuk mengimbangi 7, dan seterusnya. Autocommit hampir tidak berpengaruh pada throughput.
Menangani kesalahan
Dalam mode operasi normal, layanan mengirim pesan ke penyelia sekali. Jika karena alasan tertentu menyebabkan kesalahan 4xx atau 5xx, layanan akan mengirim ulang pesan, menunggu pemrosesan berhasil. Waktu antara upaya dapat dikonfigurasi sebagai parameter terpisah.
Dimungkinkan juga untuk mengatur jumlah upaya setelah mana pesan akan ditandai sebagai diproses, yang akan menghentikan pengiriman ulang terlepas dari status respons. Saya tidak menyarankan menggunakan ini untuk data sensitif, situasi kegagalan konsumen harus selalu disesuaikan secara manual. Pesan tempel dapat dipantau oleh log layanan dan pemantauan status respons konsumen.
tentang menempelBiasanya, server HTTP, memberikan status respons 4xx atau 5xx, juga mengirim Connection: close
header. Koneksi TCP yang ditutup dengan cara ini tetap dalam status TIME_WAITED
sampai dihapus oleh sistem operasi setelah beberapa waktu. Masalahnya adalah bahwa koneksi tersebut menempati seluruh port yang tidak dapat digunakan kembali sampai dirilis. Ini dapat menyebabkan tidak adanya port bebas pada mesin untuk membuat koneksi TCP dan layanan akan dibuang dengan pengecualian dalam log untuk setiap pengiriman. Dalam praktiknya, pada Windows 10, port berakhir setelah 10-20 ribu mengirim pesan yang salah dalam 1-2 menit. Dalam mode standar, ini bukan masalah.
Pesan
Setiap pesan yang diambil dari broker dikirim ke penasihat melalui HTTP ke sumber yang ditentukan selama berlangganan. Secara default, pesan dikirim oleh permintaan POST di badan. Perilaku ini dapat diubah dengan menentukan metode lain apa pun. Jika metode ini tidak mendukung pengiriman data dalam tubuh, Anda dapat menentukan nama parameter string di mana pesan akan dikirim. Selain itu, saat berlangganan, Anda dapat menentukan header tambahan yang akan ditambahkan ke setiap pesan, yang sesuai untuk otorisasi dasar menggunakan token. Header ditambahkan ke setiap pesan dengan pengidentifikasi konsumen, topik dan partisi, di mana pesan itu dibaca, nomor pesan, kunci partisi, jika berlaku, serta nama broker.
Performa
Untuk mengevaluasi kinerja, saya menggunakan PC (Windows 10, OpenJDK-11 (G1 tanpa tuning), i7-6700K, 16GB), yang menjalankan layanan dan laptop (Windows 10, i5-8250U, 8GB), di mana produsen pesan, HTTP berputar. Resource Consumer dan Kafka dengan pengaturan default. PC terhubung ke router melalui koneksi kabel 1Gb / s, laptop melalui 802.11ac. Produser menulis setiap 110 ms setiap 100 ms untuk 110 byte pesan ke topik yang ditentukan di mana pengikut berlangganan (
concurrencyFactor=500
, komit otomatis dimatikan) dari grup yang berbeda. Stand jauh dari ideal, tetapi Anda bisa mendapatkan beberapa gambar.
Parameter pengukuran utama adalah efek layanan pada latensi.
Biarkan:
- t
q - cap waktu layanan menerima pesan dari klien asli
- d
t0 adalah waktu antara t
q dan waktu pesan dikirim dari antrian lokal ke kumpulan eksekutif
- d
t adalah waktu antara t
q dan waktu permintaan HTTP dikirim. Itu adalah pengaruh layanan terhadap latensi pesan.
Selama pengukuran, hasil berikut diperoleh (konsumen - C, topik - T, pesan - M):

Dalam mode operasi standar, layanan itu sendiri hampir tidak mempengaruhi latensi, dan konsumsi memori minimal. Nilai maksimum dt (sekitar 60ms) tidak secara khusus ditunjukkan, karena mereka bergantung pada operasi GC, dan bukan pada layanan itu sendiri. Penyesuaian khusus GC atau mengganti G1 dengan Shenandoah dapat membantu memuluskan penyebaran nilai maksimum.
Semuanya berubah secara dramatis ketika konsumen tidak mengatasi aliran pesan dari antrian dan layanan mengaktifkan mode pelambatan. Dalam mode ini, konsumsi memori meningkat, karena waktu respons terhadap permintaan meningkat secara signifikan, yang mencegah pembersihan sumber daya secara tepat waktu. Efek pada latensi di sini tetap pada tingkat dengan hasil sebelumnya, dan nilai-nilai dt tinggi disebabkan oleh pesan preloading di antrian lokal.
Sayangnya, tidak mungkin untuk menguji pada beban yang lebih tinggi, karena laptop membungkuk sudah di 1.300 RPS. Jika seseorang dapat membantu pengaturan pengukuran pada beban tinggi, saya dengan senang hati akan menyediakan perakitan untuk pengujian.
Demonstrasi
Sekarang mari kita beralih ke demonstrasi. Untuk ini kita perlu:
- Kafka broker, siap berangkat. Saya akan mengambil contoh yang diangkat pada 192.168.99.100:9092 dari Bitnami.
- Sumber daya HTTP yang akan menerima pesan. Untuk lebih jelasnya, saya mengambil Web-hooks dari Slack.
Pertama-tama, Anda perlu meningkatkan layanan Queue-Over-Http itu sendiri. Untuk melakukan ini, buat konten berikut di direktori
application.yml
kosong:
spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"
Di sini kami menunjukkan kepada layanan parameter koneksi dari broker tertentu, serta tempat menyimpan pelanggan sehingga mereka tidak hilang di antara permulaan. Dalam `app.brokers []. Config`, Anda dapat menentukan parameter koneksi yang didukung oleh klien Kafka asli; daftar lengkap dapat ditemukan di
sini .
Karena file konfigurasi diproses oleh Spring, Anda dapat menulis banyak hal menarik di sana. Termasuk, mengkonfigurasi logging.
Sekarang jalankan layanan itu sendiri. Kami menggunakan cara termudah -
docker-compose.yml
:
version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist
Jika opsi ini tidak sesuai dengan Anda, Anda dapat mengkompilasi layanan dari sumbernya. Instruksi perakitan di proyek Readme, tautan yang diberikan di akhir artikel.Langkah selanjutnya adalah mendaftarkan pelanggan pertama. Untuk melakukan ini, Anda perlu melakukan permintaan HTTP ke layanan dengan deskripsi Konsumen:
POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } }
Jika semuanya berjalan dengan baik, responsnya akan hampir sama dengan konten yang dikirim.
Mari kita lihat setiap parameter:
Consumer.id
- ID pelanggan kamiConsumer.group.id
- pengidentifikasi grupConsumer.broker
- menunjukkan broker layanan mana yang Anda butuhkanConsumer.topics[0].name
- nama topik dari mana kami ingin menerima pesanConsumer.topics[0].config. concurrencyFactor
Consumer.topics[0].config. concurrencyFactor
- jumlah pesan maksimum yang dikirim secara bersamaanConsumer.topics[0].config. autoCommitPeriodMs
Consumer.topics[0].config. autoCommitPeriodMs
- periode komit paksa untuk pesan siapConsumer.subscriptionMethod.type
- jenis langganan. Hanya HTTP yang saat ini tersedia.Consumer.subscriptionMethod.delayOnErrorMs
- waktu sebelum mengirim ulang pesan yang berakhir dengan kesalahanConsumer.subscriptionMethod.retryBeforeCommit
- jumlah upaya untuk mengirim ulang pesan kesalahan. Jika 0 - pesan akan berputar hingga pemrosesan berhasil. Dalam kasus kami, jaminan pengiriman penuh tidak sepenting keteguhan aliran.Consumer.subscriptionMethod.uri
- sumber daya tempat pesan akan dikirimConsumer.subscriptionMethod.additionalHeader
- header tambahan yang akan dikirim dengan setiap pesan. Perhatikan bahwa akan ada JSON di badan setiap pesan sehingga Slack dapat menginterpretasikan permintaan dengan benar.
Dalam permintaan ini, metode HTTP dihilangkan, karena standarnya, POST, Slack cukup baik.Mulai saat ini, layanan memonitor partisi yang ditugaskan untuk topik slack.test untuk pesan baru.
Untuk menulis pesan ke topik, saya akan menggunakan utilitas
/opt/bitnami/kafka/bin
di Kafka yang terletak di
/opt/bitnami/kafka/bin
gambar Kafka yang diluncurkan (lokasi utilitas dalam contoh Kafka lain mungkin berbeda):
kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {βtextβ: βHello!β}
Pada saat yang sama, Slack akan memberi tahu Anda tentang pesan baru:
Untuk berhenti berlangganan konsumen, cukup membuat permintaan POST untuk `broker / berhenti berlangganan` dengan konten yang sama dengan yang selama berlangganan.Kesimpulan
Saat ini, hanya fungsi dasar yang diimplementasikan. Lebih lanjut direncanakan untuk meningkatkan batching, mencoba untuk menerapkan semantik sekali-tepat, menambahkan kemampuan untuk mengirim pesan ke broker melalui HTTP dan, yang paling penting, menambahkan dukungan untuk Pub-Sub populer lainnya.
Layanan Queue-Over-Http saat ini sedang dalam pengembangan aktif. Versi 0.1.3 cukup stabil untuk pengujian pada stan dev dan stage. Kinerja telah diuji pada Windows 10, Debian 9, dan Ubuntu 18.04. Anda dapat menggunakan prod dengan risiko Anda sendiri. Jika Anda ingin membantu pengembangan atau memberikan umpan balik tentang layanan - selamat datang di proyek
Github .