Redis Stream - Keandalan dan Skalabilitas Sistem Pesan Anda

gambar

Redis Stream - tipe data abstrak baru yang diperkenalkan di Redis dengan rilis versi 5.0
Secara konseptual, Redis Stream adalah Daftar yang dapat Anda tambahkan entri. Setiap entri memiliki pengidentifikasi unik. Secara default, pengenal dihasilkan secara otomatis dan menyertakan stempel waktu. Oleh karena itu, Anda dapat meminta rentang rekaman berdasarkan waktu atau menerima data baru saat tiba di aliran, karena perintah Unix tail -f membaca file log dan membeku untuk mengantisipasi data baru. Harap dicatat bahwa beberapa klien dapat mendengarkan streaming pada saat yang sama, karena banyak proses "tail -f" dapat membaca file pada saat yang sama, tanpa saling bertentangan.

Untuk memahami semua kelebihan dari tipe data baru, mari kita mengingat secara singkat struktur Redis yang sudah lama ada yang sebagian mengulangi fungsi Redis Stream.

Wisata sejarah


Redis pub / sub


Redis Pub / Sub adalah sistem pesan sederhana yang sudah ada dalam penyimpanan nilai kunci Anda. Namun, untuk kesederhanaan Anda harus membayar:

  • Jika penerbit karena alasan apa pun gagal, maka ia kehilangan semua pelanggannya
  • Penerbit perlu mengetahui alamat pasti semua pelanggannya.
  • Penerbit dapat membebani pelanggan berlebih jika data diterbitkan lebih cepat daripada yang diproses
  • Pesan dihapus dari buffer penerbit segera setelah publikasi, terlepas dari berapa banyak pelanggan yang dikirimkan dan seberapa cepat mereka berhasil memproses pesan ini.
  • Semua pelanggan akan menerima pesan secara bersamaan. Pelanggan sendiri harus entah bagaimana sepakat di antara mereka sendiri tentang cara memproses pesan yang sama.
  • Tidak ada mekanisme bawaan untuk mengonfirmasi keberhasilan pemrosesan pesan oleh pelanggan. Jika pelanggan menerima pesan dan jatuh selama pemrosesan, penerbit tidak akan mengetahuinya.

Daftar redis


Daftar Redis adalah struktur data yang mendukung perintah baca kunci. Anda dapat menambah dan membaca pesan dari awal atau akhir daftar. Atas dasar struktur ini, Anda dapat membuat tumpukan atau antrian yang bagus untuk sistem terdistribusi Anda dan ini dalam banyak kasus sudah cukup. Perbedaan utama dari Redis Pub / Sub:

  • Pesan dikirim ke satu klien. Klien pertama yang diblokir dengan membaca akan menerima data terlebih dahulu.
  • Clint harus memulai operasi baca untuk setiap pesan. Daftar tidak tahu apa-apa tentang klien.
  • Pesan disimpan sampai seseorang menghitungnya atau menghapusnya secara eksplisit. Jika Anda mengatur server Redis untuk menyiram data ke disk, maka keandalan sistem meningkat secara dramatis.

Pengantar Stream


Menambahkan catatan ke aliran


Perintah XADD menambahkan catatan baru ke aliran. Catatan bukan hanya string, itu terdiri dari satu atau lebih pasangan nilai kunci. Dengan demikian, setiap catatan sudah terstruktur dan menyerupai struktur file CSV.

> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0 

Dalam contoh di atas, kami menambahkan dua bidang ke aliran dengan nama (kunci) "mystream": "sensor-id" dan "temperature" dengan nilai masing-masing "1234" dan "19,8". Sebagai argumen kedua, perintah menerima pengidentifikasi yang akan ditugaskan ke catatan - pengidentifikasi ini secara unik mengidentifikasi setiap catatan dalam aliran. Namun, dalam hal ini, kami melewati * karena kami ingin Redis membuat pengenal baru untuk kami. Setiap pengenal baru akan meningkat. Oleh karena itu, setiap catatan baru akan memiliki pengidentifikasi yang lebih besar dalam kaitannya dengan catatan sebelumnya.

Format ID


Pengidentifikasi catatan yang dikembalikan oleh perintah XADD terdiri dari dua bagian:

{millisecondsTime}-{sequenceNumber}

millisecondsTime - Waktu unix dalam milidetik (Redis waktu server). Namun, jika waktu saat ini sama atau kurang dari waktu catatan sebelumnya, maka cap waktu catatan sebelumnya digunakan. Oleh karena itu, jika waktu server dikembalikan ke masa lalu, pengidentifikasi baru akan tetap mempertahankan properti peningkatan.

sequenceNumber digunakan untuk catatan yang dibuat dalam milidetik yang sama. sequenceNumber akan ditingkatkan 1 relatif terhadap catatan sebelumnya. Karena sequenceNumber berukuran 64 bit, dalam praktiknya Anda seharusnya tidak mengalami batasan jumlah rekaman yang dapat dihasilkan dalam satu milidetik.

Format pengidentifikasi seperti itu pada pandangan pertama mungkin tampak aneh. Pembaca yang tidak percaya mungkin bertanya-tanya mengapa waktu adalah bagian dari pengidentifikasi. Alasannya adalah bahwa Redis streaming permintaan rentang dukungan oleh pengidentifikasi. Karena pengidentifikasi dikaitkan dengan waktu catatan itu dibuat, ini memungkinkan untuk meminta rentang waktu. Kita akan melihat contoh konkret ketika kita melanjutkan ke mempelajari perintah XRANGE .

Jika karena alasan apa pun pengguna perlu menentukan pengenal sendiri, yang, misalnya, dikaitkan dengan beberapa sistem eksternal, maka kita dapat meneruskannya ke perintah XADD alih-alih tanda * seperti yang ditunjukkan di bawah ini:

 > XADD somestream 0-1 field value 0-1 > XADD somestream 0-2 foo bar 0-2 

Harap perhatikan bahwa dalam hal ini, Anda harus memantau sendiri peningkatan pengidentifikasi. Dalam contoh kami, pengidentifikasi minimum adalah "0-1", sehingga tim tidak akan menerima pengidentifikasi lain yang sama dengan atau kurang dari "0-1".

 > XADD somestream 0-1 foo bar (error) ERR The ID specified in XADD is equal or smaller than the target stream top item 

Jumlah catatan dalam aliran


Anda bisa mendapatkan jumlah rekaman dalam aliran hanya dengan menggunakan perintah XLEN . Sebagai contoh kita, perintah ini akan mengembalikan nilai berikut:

 > XLEN somestream (integer) 2 

Permintaan Rentang - XRANGE dan XREVRANGE


Untuk meminta data rentang, kami perlu menentukan dua pengidentifikasi - awal dan akhir rentang. Rentang yang dikembalikan akan mencakup semua elemen, termasuk perbatasan. Ada juga dua pengidentifikasi khusus "-" dan "+", yang berarti pengidentifikasi terkecil (catatan pertama) dan terbesar (catatan terakhir) dalam aliran. Contoh di bawah ini akan menampilkan semua entri streaming.

 > XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2" 

Setiap record yang dikembalikan adalah array dari dua elemen: pengidentifikasi dan daftar pasangan nilai kunci. Kami telah mengatakan bahwa pengidentifikasi rekaman terkait waktu. Karena itu, kami dapat meminta rentang periode waktu tertentu. Namun, kami dapat menentukan dalam permintaan bukan pengidentifikasi lengkap, tetapi hanya waktu Unix, menghilangkan bagian yang terkait dengan sequenceNumber . Bagian yang dihilangkan dari pengidentifikasi secara otomatis sama dengan nol pada awal rentang dan ke nilai maksimum yang mungkin pada akhir rentang. Berikut ini adalah contoh cara meminta rentang dua milidetik.

 > XRANGE mystream 1518951480106 1518951480107 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 

Kami hanya memiliki satu catatan dalam kisaran ini, namun dalam set data nyata, hasil yang dikembalikan bisa sangat besar. Untuk alasan ini, XRANGE mendukung opsi COUNT. Dengan menentukan kuantitas, kita cukup mendapatkan catatan N pertama. Jika kita perlu mendapatkan entri N berikutnya (pagination), kita dapat menggunakan pengenal terakhir yang diterima, menambah sequenceNumber- nya dengan satu dan meminta lagi. Mari kita lihat ini dalam contoh berikut. Kami mulai menambahkan 10 elemen menggunakan XADD (misalkan aliran mystream telah diisi dengan 10 elemen). Untuk memulai iterasi, mendapatkan 2 elemen per perintah, kita mulai dengan rentang penuh, tetapi dengan COUNT sama dengan 2.

 > XRANGE mystream - + COUNT 2 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2" 

Untuk melanjutkan iterasi dengan dua elemen berikut, kita perlu memilih pengenal terakhir yang diterima, yaitu 1519073279157-0, dan menambahkan 1 ke sequenceNumber .
Pengidentifikasi yang dihasilkan, dalam hal ini 1519073279157-1, sekarang dapat digunakan sebagai argumen baru ke awal rentang untuk panggilan XRANGE berikutnya:

 > XRANGE mystream 1519073279157-1 + COUNT 2 1) 1) 1519073280281-0 2) 1) "foo" 2) "value_3" 2) 1) 1519073281432-0 2) 1) "foo" 2) "value_4" 

Dan sebagainya. Karena kompleksitas XRANGE adalah O (log (N)) untuk mencari, dan kemudian O (M) untuk mengembalikan elemen M, setiap langkah iterasi cepat. Dengan demikian, menggunakan XRANGE, dimungkinkan untuk beralih aliran secara efisien.

Perintah XREVRANGE sama dengan XRANGE , tetapi mengembalikan elemen dalam urutan terbalik:

 > XREVRANGE mystream + - COUNT 1 1) 1) 1519073287312-0 2) 1) "foo" 2) "value_10" 

Perhatikan bahwa perintah XREVRANGE mengambil argumen kisaran mulai dan berhenti dalam urutan terbalik.

Membaca catatan baru dengan XREAD


Seringkali ada tugas untuk berlangganan aliran dan hanya menerima pesan baru. Konsep ini mungkin tampak seperti Redis Pub / Sub atau memblokir Redis List, tetapi ada perbedaan mendasar dalam cara menggunakan Redis Stream:

  1. Setiap pesan baru dikirim ke setiap pelanggan secara default. Perilaku ini berbeda dari memblokir Daftar Redis, di mana pesan baru akan dibaca oleh hanya satu pelanggan.
  2. Sementara di Redis Pub / Sub semua pesan dilupakan dan tidak pernah disimpan, dalam Stream semua pesan disimpan untuk waktu yang tidak terbatas (kecuali klien secara eksplisit meminta penghapusan).
  3. Redis Stream memungkinkan Anda untuk membedakan akses ke pesan dalam satu aliran. Pelanggan tertentu hanya dapat melihat riwayat pesan pribadinya.

Anda dapat berlangganan aliran dan menerima pesan baru menggunakan perintah XREAD . Ini sedikit lebih rumit daripada XRANGE , jadi kita akan mulai dengan contoh sederhana terlebih dahulu.

 > XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2" 

Dalam contoh di atas, formulir XREAD non-pemblokiran ditentukan . Harap perhatikan bahwa opsi COUNT adalah opsional. Bahkan, satu-satunya opsi perintah yang diperlukan adalah opsi STREAMS, yang menetapkan daftar aliran bersama dengan pengidentifikasi maksimum yang sesuai. Kami menulis "STREAMS mystream 0" - kami ingin mendapatkan semua catatan aliran mystream dengan pengenal yang lebih besar dari "0-0". Seperti yang Anda lihat dari contoh, perintah mengembalikan nama aliran, karena kita dapat berlangganan beberapa aliran pada saat yang sama. Kita bisa menulis, misalnya, "STREAMS mystream otherstream 0 0". Harap perhatikan bahwa setelah opsi STREAMS, pertama-tama kita perlu memberikan nama semua aliran yang diperlukan dan hanya kemudian daftar pengidentifikasi.

Dalam bentuk sederhana ini, perintah itu tidak melakukan apa-apa dibandingkan dengan XRANGE . Namun, yang menarik adalah kita dapat dengan mudah mengubah XREAD menjadi perintah pemblokiran dengan menentukan argumen BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

Pada contoh di atas, opsi BLOCK baru ditentukan dengan batas waktu 0 milidetik (ini berarti menunggu tanpa akhir). Selain itu, alih-alih melewati pengidentifikasi biasa untuk aliran mystream, $ pengidentifikasi khusus berlalu. Pengidentifikasi khusus ini berarti bahwa XREAD harus menggunakan pengidentifikasi maksimum dalam aliran mystream sebagai pengidentifikasi. Jadi kami hanya akan menerima pesan baru, mulai dari saat kami mulai mendengarkan. Di satu sisi, ini mirip dengan perintah Unix tail -f.

Harap dicatat bahwa ketika menggunakan opsi BLOCK, kita tidak perlu menggunakan pengenal khusus $. Kami dapat menggunakan pengidentifikasi apa pun yang ada di aliran. Jika tim dapat segera melayani permintaan kami, tanpa memblokir, itu akan melakukannya, jika tidak maka akan diblokir.

Memblokir XREAD juga dapat mendengarkan beberapa aliran sekaligus, Anda hanya perlu menentukan nama mereka. Dalam hal ini, perintah akan mengembalikan catatan aliran pertama ke mana data tersebut tiba. Pelanggan pertama yang diblokir untuk aliran ini akan menerima data terlebih dahulu.

Kelompok konsumen


Dalam tugas-tugas tertentu, kami ingin membedakan akses pelanggan ke pesan dalam utas yang sama. Contoh di mana ini bisa bermanfaat adalah antrian pesan dengan pekerja yang akan menerima pesan berbeda dari aliran, yang memungkinkan Anda untuk skala pemrosesan pesan.

Jika kita membayangkan bahwa kita memiliki tiga pelanggan C1, C2, C3 dan aliran yang berisi pesan 1, 2, 3, 4, 5, 6, 7, maka layanan pesan akan terjadi seperti pada diagram di bawah ini:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Untuk mendapatkan efek ini, Redis Stream menggunakan konsep yang disebut Consumer Group. Konsep ini mirip dengan pseudo-pelanggan yang menerima data dari aliran, tetapi sebenarnya dilayani oleh beberapa pelanggan dalam suatu kelompok, memberikan jaminan tertentu:

  1. Setiap pesan dikirim ke pelanggan yang berbeda dalam grup.
  2. Dalam sebuah grup, pelanggan diidentifikasi dengan nama, yang merupakan string peka huruf besar-kecil. Jika beberapa pelanggan keluar sementara dari grup, ia dapat dikembalikan ke grup dengan nama uniknya sendiri.
  3. Setiap Kelompok Konsumen mengikuti konsep "pesan pertama yang belum dibaca." Ketika pelanggan meminta pesan baru, ia hanya dapat menerima pesan yang belum pernah dikirimkan ke pelanggan mana pun dalam suatu grup.
  4. Ada perintah untuk secara eksplisit mengkonfirmasi bahwa pelanggan telah berhasil memproses pesan. Sampai perintah ini dipanggil, pesan yang diminta akan tetap dalam status "pending".
  5. Dalam Grup Konsumen, setiap pelanggan dapat meminta riwayat pesan yang dikirimkan kepadanya, tetapi belum diproses (dalam status "menunggu")

Dalam arti tertentu, keadaan suatu kelompok dapat direpresentasikan sebagai berikut:

 +----------------------------------------+ | consumer_group_name: mygroup | consumer_group_stream: somekey | last_delivered_id: 1292309234234-92 | | consumers: | "consumer-1" with pending messages | 1292309234234-4 | 1292309234232-8 | "consumer-42" with pending messages | ... (and so forth) +----------------------------------------+ 

Sekarang saatnya berkenalan dengan tim utama untuk Grup Konsumen, yaitu:

  • XGROUP digunakan untuk membuat, menghancurkan, dan mengelola grup.
  • XREADGROUP digunakan untuk membaca aliran melalui grup.
  • XACK - perintah ini memungkinkan pelanggan untuk menandai pesan sebagai berhasil diproses

Pembuatan Grup Konsumen


Misalkan aliran mistik sudah ada. Maka perintah pembuatan grup akan terlihat seperti:

> XGROUP CREATE mystream mygroup $
OK

Saat membuat grup, kita harus melewati pengenal yang dimulai dengan grup yang akan menerima pesan. Jika kami hanya ingin menerima semua pesan baru, maka kami dapat menggunakan pengenal khusus $ (seperti dalam contoh kami di atas). Jika Anda menentukan 0 alih-alih pengidentifikasi khusus, maka semua pesan aliran akan tersedia untuk grup.

Sekarang setelah grup dibuat, kita dapat segera mulai membaca pesan menggunakan perintah XREADGROUP . Perintah ini sangat mirip dengan XREAD dan mendukung opsi BLOCK opsional. Namun, ada opsi GROUP wajib, yang harus selalu ditentukan dengan dua argumen: nama grup dan nama pelanggan. Opsi COUNT didukung juga.

Sebelum membaca aliran, mari kita taruh beberapa pesan di sana:

 > XADD mystream * message apple 1526569495631-0 > XADD mystream * message orange 1526569498055-0 > XADD mystream * message strawberry 1526569506935-0 > XADD mystream * message apricot 1526569535168-0 > XADD mystream * message banana 1526569544280-0 

Sekarang mari kita coba membaca aliran ini melalui grup:

 > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple" 

Perintah di atas kata demi kata berbunyi sebagai berikut:

"Aku, pelanggan Alice, anggota dari mygroup, ingin membaca satu pesan dari mistik yang belum pernah dikirimkan kepada siapa pun sebelumnya."

Setiap kali pelanggan melakukan operasi dengan grup, ia harus menunjukkan namanya, secara unik mengidentifikasi dirinya di dalam grup. Ada detail lain yang sangat penting dalam perintah di atas - pengidentifikasi khusus ">". Pengidentifikasi khusus ini memfilter pesan, hanya menyisakan pesan yang sejauh ini belum pernah dikirimkan.

Juga, dalam kasus khusus, Anda dapat menentukan pengidentifikasi nyata, seperti 0 atau pengidentifikasi sah lainnya. Dalam hal ini, perintah XREADGROUP akan mengembalikan kepada Anda riwayat pesan dengan status "pending", yang dikirimkan ke pelanggan yang ditentukan (Alice), tetapi belum dikonfirmasi menggunakan perintah XACK .

Kami dapat memverifikasi perilaku ini dengan segera menentukan pengidentifikasi 0, tanpa opsi COUNT . Kami hanya melihat satu-satunya pesan yang tertunda, yaitu pesan dengan apel:

 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple" 

Namun, jika kami mengkonfirmasi pesan tersebut telah berhasil diproses, pesan itu tidak akan ditampilkan lagi:

 > XACK mystream mygroup 1526569495631-0 (integer) 1 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) (empty list or set) 

Sekarang giliran Bob untuk membaca sesuatu:

 > XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 2) 1) 1526569506935-0 2) 1) "message" 2) "strawberry" 

Bob, anggota mygroup, meminta tidak lebih dari dua pesan. Perintah hanya melaporkan pesan yang tidak terkirim karena pengenal khusus ">". Seperti yang Anda lihat, pesan "apel" tidak ditampilkan, karena sudah dikirim ke Alice, jadi Bob menerima "jeruk" dan "stroberi".

Dengan demikian, Alice, Bob dan pelanggan grup lainnya dapat membaca pesan berbeda dari aliran yang sama. Mereka juga dapat membaca riwayat pesan mentah mereka atau menandai pesan saat diproses.

Ada beberapa hal yang perlu diingat:

  • Segera setelah pelanggan menganggap pesan sebagai perintah XREADGROUP , pesan ini masuk ke status "tertunda" dan ditugaskan ke pelanggan khusus ini. Pelanggan grup lain tidak akan dapat membaca pesan ini.
  • Pelanggan secara otomatis dibuat pada penyebutan pertama, tidak perlu untuk penciptaan eksplisit mereka.
  • Dengan XREADGROUP Anda dapat membaca pesan dari beberapa aliran yang berbeda secara bersamaan, namun, agar ini berfungsi, Anda harus terlebih dahulu membuat grup dengan nama yang sama untuk setiap aliran menggunakan XGROUP

Pemulihan Kecelakaan


Pelanggan dapat pulih dari kegagalan dan membaca kembali daftar pesannya dengan status "menunggu keputusan". Namun, di dunia nyata, pelanggan pada akhirnya bisa gagal. Apa yang terjadi pada pesan menggantung pelanggan jika dia tidak bisa pulih setelah kegagalan?
Consumer Group menawarkan fitur yang digunakan khusus untuk kasus-kasus seperti itu - ketika Anda perlu mengubah pemilik pesan.

Pertama-tama, Anda perlu memanggil perintah XPENDING , yang menampilkan semua pesan grup dengan status "pending". Dalam bentuknya yang paling sederhana, sebuah perintah dipanggil hanya dengan dua argumen: nama aliran dan nama grup:

 > XPENDING mystream mygroup 1) (integer) 2 2) 1526569498055-0 3) 1526569506935-0 4) 1) 1) "Bob" 2) "2" 

Tim mencetak jumlah pesan yang belum diproses untuk seluruh grup dan untuk setiap pelanggan. Kami hanya memiliki Bob dengan dua pesan yang tidak diproses, karena satu-satunya pesan yang diminta oleh Alice telah dikonfirmasi dengan XACK .

Kami dapat meminta informasi tambahan menggunakan lebih banyak argumen:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]

{start-id} {end-id} - rentang pengidentifikasi (Anda dapat menggunakan "-" dan "+")
{count} - jumlah upaya pengiriman
{nama-konsumen} - nama grup

 > XPENDING mystream mygroup - + 10 1) 1) 1526569498055-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 2) 1) 1526569506935-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 

Sekarang kami memiliki rincian untuk setiap pesan: pengidentifikasi, nama pelanggan, waktu henti dalam milidetik, dan akhirnya, jumlah upaya pengiriman. Kami memiliki dua pesan dari Bob, dan tidak digunakan selama 74170458 milidetik, sekitar 20 jam.

Harap dicatat bahwa tidak ada yang menghentikan kami untuk memeriksa apa isi pesan hanya dengan menggunakan XRANGE .

 > XRANGE mystream 1526569498055-0 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 

Kami hanya perlu mengulangi pengidentifikasi yang sama dua kali dalam argumen. Sekarang kami memiliki beberapa ide, Alice dapat memutuskan bahwa Bob mungkin tidak akan pulih setelah 20 jam tidak aktif, dan sekarang saatnya untuk meminta pesan-pesan ini dan melanjutkan memprosesnya alih-alih Bob. Untuk melakukan ini, kami menggunakan perintah XCLAIM :

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Dengan menggunakan perintah ini, kita bisa mendapatkan pesan "asing" yang belum diproses dengan mengubah pemilik ke {konsumen}. Namun, kami juga dapat memberikan waktu henti minimum {min-idle-time}. Ini membantu menghindari situasi di mana dua klien mencoba mengubah pemilik pesan yang sama secara bersamaan:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

Pelanggan pertama akan mengatur ulang waktu henti dan menambah penghitung jumlah pengiriman. Jadi klien kedua tidak akan dapat memintanya.

 > XCLAIM mystream mygroup Alice 3600000 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 

Pesan itu berhasil diklaim oleh Alice, yang sekarang dapat memproses pesan dan mengakuinya.

Dari contoh di atas, jelas bahwa eksekusi yang berhasil dari permintaan mengembalikan konten pesan itu sendiri. Namun, ini tidak perlu. Opsi JUSTID dapat digunakan untuk hanya mengembalikan pengidentifikasi pesan. Ini berguna jika Anda tidak tertarik dengan detail pesan dan ingin meningkatkan kinerja sistem.

Konter pengiriman


Penghitung yang Anda amati dalam output XPENDING adalah jumlah pengiriman setiap pesan. Penghitung seperti itu ditambahkan dalam dua cara: ketika pesan berhasil diminta melalui XCLAIM atau ketika panggilan XREADGROUP digunakan .

Adalah normal bahwa beberapa pesan dikirim beberapa kali. Yang utama adalah sebagai hasilnya, semua pesan diproses. Terkadang saat memproses pesan ada masalah karena kerusakan pada pesan itu sendiri atau pemrosesan pesan menyebabkan kesalahan dalam kode handler.Dalam hal ini, mungkin ternyata tidak ada yang dapat memproses pesan ini. Karena kami memiliki penghitung upaya pengiriman, kami dapat menggunakan penghitung ini untuk mendeteksi situasi seperti itu. Oleh karena itu, segera setelah penghitung pengiriman mencapai jumlah besar yang ditentukan oleh Anda, mungkin akan lebih masuk akal untuk menempatkan pesan seperti itu di aliran lain dan mengirim pemberitahuan ke administrator sistem.

Status utas


Perintah XINFO digunakan untuk meminta berbagai informasi tentang aliran dan grupnya. Sebagai contoh, bentuk dasar dari perintah adalah sebagai berikut:

 > XINFO STREAM mystream 1) length 2) (integer) 13 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1524494395530-0 2) 1) "a" 2) "1" 3) "b" 4) "2" 11) last-entry 12) 1) 1526569544280-0 2) 1) "message" 2) "banana" 

Perintah di atas menampilkan informasi umum pada aliran yang ditentukan. Sekarang contoh yang sedikit lebih kompleks:

 > XINFO GROUPS mystream 1) 1) name 2) "mygroup" 3) consumers 4) (integer) 2 5) pending 6) (integer) 2 2) 1) name 2) "some-other-group" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0 

Perintah di atas menampilkan informasi umum untuk semua grup dari aliran yang ditentukan

 > XINFO CONSUMERS mystream mygroup 1) 1) name 2) "Alice" 3) pending 4) (integer) 1 5) idle 6) (integer) 9104628 2) 1) name 2) "Bob" 3) pending 4) (integer) 1 5) idle 6) (integer) 83841983 

Perintah di atas menampilkan informasi tentang semua pelanggan dari aliran dan grup yang ditentukan.
Jika Anda lupa sintaks perintah, cukup hubungi perintah untuk bantuan:

 > XINFO HELP 1) XINFO {subcommand} arg arg ... arg. Subcommands are: 2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}. 3) GROUPS {key} -- Show the stream consumer groups. 4) STREAM {key} -- Show information about the stream. 5) HELP -- Print this help. 

Batas Ukuran Stream


Banyak aplikasi yang tidak ingin mengumpulkan data ke dalam aliran selamanya. Seringkali berguna untuk memiliki jumlah pesan maksimum dalam aliran. Dalam kasus lain, penting untuk mentransfer semua pesan dari aliran ke penyimpanan persisten lain ketika ukuran aliran yang ditentukan tercapai. Anda dapat membatasi ukuran aliran menggunakan parameter MAXLEN dalam perintah XADD :

 > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" 

Saat menggunakan MAXLEN, catatan lama secara otomatis dihapus ketika panjang yang ditentukan tercapai, sehingga aliran memiliki ukuran konstan. Namun, pemangkasan dalam hal ini tidak terjadi dalam cara yang paling produktif dalam memori Redis. Situasi dapat ditingkatkan sebagai berikut: Argumen ~ dalam contoh di atas berarti bahwa kita tidak perlu membatasi panjang aliran ke nilai tertentu. Dalam contoh kami, ini bisa berupa angka yang lebih besar dari atau sama dengan 1000 (misalnya, 1000, 1010 atau 1030). Kami hanya secara eksplisit menunjukkan bahwa kami ingin aliran kami menyimpan setidaknya 1000 catatan. Ini membuat bekerja dengan memori jauh lebih efisien di dalam Redis. Ada juga perintah XTRIM terpisah yang melakukan hal yang sama:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...





> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Penyimpanan dan Replikasi Persisten


Redis Stream direplikasi secara asinkron ke node slave dan disimpan ke file seperti AOF (snapshot semua data) dan RDB (log semua operasi penulisan). Replikasi status Consumer Groups juga didukung. Oleh karena itu, jika pesan dalam status "pending" pada master node, maka pada slave node pesan ini akan memiliki status yang sama.

Menghapus item individual dari aliran


Untuk menghapus pesan ada perintah XDEL khusus . Perintah mendapatkan nama aliran, diikuti oleh pengidentifikasi pesan yang perlu dihapus:

 > XRANGE mystream - + COUNT 2 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" > XDEL mystream 1526654999635-0 (integer) 1 > XRANGE mystream - + COUNT 2 1) 1) 1526655000369-0 2) 1) "value" 2) "3" 

Saat menggunakan perintah ini, Anda perlu mempertimbangkan bahwa sebenarnya memori tidak akan segera dirilis.

Streaming Tanpa Panjang


Perbedaan antara stream dan struktur data Redis lainnya adalah ketika struktur data lain tidak lagi memiliki elemen di dalam dirinya, sebagai efek samping, struktur data itu sendiri akan dihapus dari memori. Jadi, misalnya, set yang diurutkan akan sepenuhnya dihapus ketika panggilan ZREM menghapus item terakhir. Alih-alih, utas dibiarkan tetap dalam memori tanpa memiliki satu elemen pun di dalamnya.

Kesimpulan


Redis Stream sangat ideal untuk membuat perantara pesan, antrian pesan, log terpadu, dan sistem obrolan yang menyimpan riwayat.

Seperti yang pernah dikatakan Nicklaus Wirth , program adalah algoritma plus struktur data, dan Redis sudah memberi Anda berdua.

Source: https://habr.com/ru/post/id456270/


All Articles