
Redis Stream - ein neuer abstrakter Datentyp, der mit der Veröffentlichung von Version 5.0 in Redis eingeführt wurde
Konzeptionell ist Redis Stream eine Liste, zu der Sie Einträge hinzufügen können. Jeder Eintrag hat eine eindeutige Kennung. Standardmäßig wird automatisch eine Kennung generiert, die einen Zeitstempel enthält. Daher können Sie Aufzeichnungsbereiche nach Zeit anfordern oder neue Daten empfangen, sobald diese im Stream ankommen, da der Unix-Befehl tail -f die Protokolldatei liest und in Erwartung neuer Daten einfriert. Bitte beachten Sie, dass mehrere Clients gleichzeitig den Stream abhören können, da viele "tail-f" -Prozesse gleichzeitig eine Datei lesen können, ohne dass Konflikte auftreten.
Um alle Vorteile des neuen Datentyps zu verstehen, erinnern wir uns kurz an die seit langem vorhandenen Redis-Strukturen, die die Funktionalität von Redis Stream teilweise wiederholen.
Historischer Ausflug
Redis Pub / Sub
Redis Pub / Sub ist ein einfaches Messaging-System, das bereits in Ihren Schlüsselwertspeicher integriert ist. Der Einfachheit halber müssen Sie jedoch bezahlen:
- Wenn der Verlag aus irgendeinem Grund versagt, verliert er alle seine Abonnenten
- Der Verlag muss die genaue Adresse aller seiner Abonnenten kennen.
- Ein Herausgeber kann seine Abonnenten überlasten, wenn die Daten schneller veröffentlicht als verarbeitet werden
- Die Nachricht wird unmittelbar nach der Veröffentlichung aus dem Puffer des Herausgebers gelöscht, unabhängig davon, wie viele Abonnenten sie zugestellt hat und wie schnell sie diese Nachricht verarbeitet haben.
- Alle Abonnenten erhalten die Nachricht gleichzeitig. Die Abonnenten selbst müssen sich irgendwie darauf einigen, wie dieselbe Nachricht verarbeitet werden soll.
- Es gibt keinen eingebauten Mechanismus zum Bestätigen der erfolgreichen Verarbeitung einer Nachricht durch einen Teilnehmer. Wenn der Abonnent eine Nachricht erhalten hat und während der Verarbeitung abgefallen ist, weiß der Herausgeber nichts davon.
Redis Liste
Redis List ist eine Datenstruktur, die Lesebefehle für Sperren unterstützt. Sie können Nachrichten am Anfang oder Ende der Liste hinzufügen und lesen. Auf der Grundlage dieser Struktur können Sie einen guten Stapel oder eine gute Warteschlange für Ihr verteiltes System erstellen, was in den meisten Fällen ausreicht. Die Hauptunterschiede zu Redis Pub / Sub:
- Die Nachricht wird an einen Client übermittelt. Der erste durch Lesen blockierte Client erhält zuerst die Daten.
- Clint muss für jede Nachricht einen Lesevorgang einleiten. List weiß nichts über Kunden.
- Nachrichten werden gespeichert, bis jemand sie zählt oder explizit löscht. Wenn Sie einen Redis-Server einrichten, um Daten auf die Festplatte zu übertragen, erhöht sich die Zuverlässigkeit des Systems erheblich.
Einführung in Stream
Hinzufügen eines Datensatzes zu einem Stream
Der
XADD- Befehl fügt dem Stream einen neuen Datensatz hinzu. Ein Datensatz ist nicht nur eine Zeichenfolge, sondern besteht aus einem oder mehreren Schlüssel-Wert-Paaren. Somit ist jeder Datensatz bereits strukturiert und ähnelt der Struktur einer CSV-Datei.
> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0
Im obigen Beispiel fügen wir dem Stream zwei Felder mit dem Namen (Schlüssel) "mystream" hinzu: "Sensor-ID" und "Temperatur" mit den Werten "1234" bzw. "19.8". Als zweites Argument akzeptiert der Befehl die Kennung, die dem Datensatz zugewiesen wird. Diese Kennung identifiziert jeden Datensatz im Stream eindeutig. In diesem Fall haben wir jedoch * bestanden, weil Redis eine neue Kennung für uns generieren soll. Jede neue Kennung wird erhöht. Daher hat jeder neue Datensatz eine größere Kennung im Vergleich zu vorherigen Datensätzen.
ID-Format
Die vom
XADD- Befehl zurückgegebene
Datensatzkennung besteht aus zwei Teilen:
{millisecondsTime}-{sequenceNumber}
millisecondsTime - Unix-Zeit in Millisekunden (Redis-Serverzeit). Wenn jedoch die aktuelle Zeit gleich oder kleiner als die Zeit des vorherigen Datensatzes ist, wird der Zeitstempel des vorherigen Datensatzes verwendet. Wenn die Serverzeit auf die Vergangenheit zurückgesetzt wird, behält der neue Bezeichner daher weiterhin die Eigenschaft zum Erhöhen bei.
sequenceNumber wird für Datensätze verwendet, die in derselben Millisekunde erstellt wurden.
sequenceNumber wird gegenüber dem vorherigen Datensatz um 1 erhöht. Da
sequenceNumber 64 Bit groß ist, sollten Sie in der Praxis nicht auf eine Begrenzung der Anzahl von Datensätzen stoßen, die innerhalb einer Millisekunde generiert werden können.
Das Format solcher Kennungen mag auf den ersten Blick seltsam erscheinen. Ein ungläubiger Leser mag sich fragen, warum Zeit Teil einer Kennung ist. Der Grund dafür ist, dass Redis-Streams Bereichsanforderungen nach Bezeichnern unterstützen. Da die Kennung der Zeit zugeordnet ist, zu der der Datensatz erstellt wurde, können Zeitbereiche angefordert werden. Wir werden uns ein konkretes Beispiel ansehen, wenn wir mit dem Studium des
XRANGE-Befehls fortfahren .
Wenn der Benutzer aus irgendeinem Grund seine eigene Kennung angeben muss, die beispielsweise einem externen System zugeordnet ist, können wir diese anstelle des * -Zeichens wie unten gezeigt an den
XADD-Befehl übergeben :
> XADD somestream 0-1 field value 0-1 > XADD somestream 0-2 foo bar 0-2
Bitte beachten Sie, dass Sie in diesem Fall die Zunahme der Kennung selbst überwachen müssen. In unserem Beispiel ist die minimale Kennung "0-1", sodass das Team keine andere Kennung akzeptiert, die gleich oder kleiner als "0-1" ist.
> XADD somestream 0-1 foo bar (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
Die Anzahl der Datensätze im Stream
Sie können die Anzahl der Datensätze in einem Stream einfach mit dem Befehl
XLEN abrufen . In unserem Beispiel gibt dieser Befehl den folgenden Wert zurück:
> XLEN somestream (integer) 2
Bereichsanforderungen - XRANGE und XREVRANGE
Um Daten für einen Bereich anzufordern, müssen zwei Bezeichner angegeben werden - der Anfang und das Ende des Bereichs. Der zurückgegebene Bereich enthält alle Elemente, einschließlich Rahmen. Es gibt auch zwei spezielle Bezeichner "-" und "+", die den kleinsten (erster Datensatz) und den größten (letzter Datensatz) Bezeichner im Stream bedeuten. Im folgenden Beispiel werden alle Stream-Einträge angezeigt.
> XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2"
Jeder zurückgegebene Datensatz besteht aus zwei Elementen: einem Bezeichner und einer Liste von Schlüssel-Wert-Paaren. Wir haben bereits gesagt, dass Datensatzkennungen zeitbezogen sind. Daher können wir den Bereich eines bestimmten Zeitraums anfordern. In der Anforderung können wir jedoch nicht die vollständige Kennung, sondern nur die Unix-Zeit angeben, wobei der Teil, der sich auf
sequenceNumber bezieht, weggelassen
wird . Der ausgelassene Teil des Bezeichners ist am Anfang des Bereichs automatisch gleich Null und am Ende des Bereichs gleich dem maximal möglichen Wert. Das folgende Beispiel zeigt, wie ein Bereich von zwei Millisekunden angefordert wird.
> XRANGE mystream 1518951480106 1518951480107 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8"
Wir haben nur einen Datensatz in diesem Bereich, aber in realen Datensätzen kann das zurückgegebene Ergebnis sehr groß sein. Aus diesem Grund unterstützt
XRANGE die Option COUNT. Durch Angabe der Menge können wir einfach die ersten N Datensätze erhalten. Wenn wir die nächsten N Einträge erhalten müssen (Paginierung), können wir den zuletzt empfangenen Bezeichner verwenden, seine
Sequenznummer um eins erhöhen und erneut anfordern. Schauen wir uns das im folgenden Beispiel an. Wir
fangen an, 10 Elemente mit
XADD hinzuzufügen (angenommen, der Mystream-Stream wurde bereits mit 10 Elementen gefüllt). Um die Iteration zu starten und 2 Elemente pro Befehl zu erhalten, beginnen wir mit dem gesamten Bereich, jedoch mit COUNT gleich 2.
> XRANGE mystream - + COUNT 2 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
Um die Iteration mit den folgenden zwei Elementen fortzusetzen, müssen wir den zuletzt empfangenen Bezeichner auswählen, nämlich 1519073279157-0, und der
Sequenznummer 1 hinzufügen.
Der resultierende Bezeichner, in diesem Fall 1519073279157-1, kann jetzt als neues Argument am Anfang des Bereichs für den nächsten
XRANGE- Aufruf verwendet werden:
> XRANGE mystream 1519073279157-1 + COUNT 2 1) 1) 1519073280281-0 2) 1) "foo" 2) "value_3" 2) 1) 1519073281432-0 2) 1) "foo" 2) "value_4"
Usw. Da die Komplexität von
XRANGE O (log (N)) für die Suche und dann O (M) für die Rückgabe von M Elementen ist, ist jeder Schritt der Iteration schnell. Mit
XRANGE ist es daher möglich, Flüsse effizient zu
iterieren .
Der Befehl
XREVRANGE entspricht
XRANGE , gibt jedoch die Elemente in umgekehrter Reihenfolge zurück:
> XREVRANGE mystream + - COUNT 1 1) 1) 1519073287312-0 2) 1) "foo" 2) "value_10"
Beachten Sie, dass der Befehl
XREVRANGE die Argumente des Start- und
Stoppbereichs in umgekehrter Reihenfolge verwendet.
Lesen neuer Datensätze mit XREAD
Oft gibt es eine Aufgabe, den Stream zu abonnieren und nur neue Nachrichten zu empfangen. Dieses Konzept mag wie ein Redis Pub / Sub erscheinen oder Redis List blockieren, aber es gibt grundlegende Unterschiede bei der Verwendung von Redis Stream:
- Jede neue Nachricht wird standardmäßig an jeden Teilnehmer übermittelt. Dieses Verhalten unterscheidet sich vom Blockieren der Redis-Liste, bei der eine neue Nachricht nur von einem Teilnehmer gelesen wird.
- Während in Redis Pub / Sub alle Nachrichten vergessen und nie gespeichert werden, werden in Stream alle Nachrichten auf unbestimmte Zeit gespeichert (es sei denn, der Client fordert ausdrücklich zum Löschen auf).
- Mit Redis Stream können Sie den Zugriff auf Nachrichten innerhalb eines Streams unterscheiden. Ein bestimmter Abonnent kann nur seinen persönlichen Nachrichtenverlauf sehen.
Mit dem Befehl
XREAD können Sie den Stream abonnieren und neue Nachrichten
empfangen . Dies ist etwas komplizierter als
XRANGE , daher beginnen wir zunächst mit einfacheren Beispielen.
> XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
Im obigen Beispiel wird ein nicht blockierendes
XREAD- Formular
angegeben . Bitte beachten Sie, dass die Option COUNT optional ist. Tatsächlich ist die einzige erforderliche Befehlsoption die Option STREAMS, mit der die Liste der Streams zusammen mit der entsprechenden maximalen Kennung festgelegt wird. Wir haben "STREAMS mystream 0" geschrieben - wir möchten alle Datensätze des mystream-Streams mit einer Kennung größer als "0-0" abrufen. Wie Sie dem Beispiel entnehmen können, gibt der Befehl den Namen des Streams zurück, da mehrere Threads gleichzeitig abonniert werden können. Wir könnten zum Beispiel "STREAMS mystream otherstream 0 0" schreiben. Bitte beachten Sie, dass wir nach der Option STREAMS zuerst die Namen aller erforderlichen Streams und erst dann eine Liste der Bezeichner angeben müssen.
In dieser einfachen Form macht der Befehl im Vergleich zu
XRANGE nichts Besonderes. Das Interessante ist jedoch, dass wir
XREAD leicht in einen Blockierungsbefehl verwandeln können, indem wir das BLOCK-Argument
angeben :
> XREAD BLOCK 0 STREAMS mystream $
Im obigen Beispiel wird eine neue BLOCK-Option mit einer Zeitüberschreitung von 0 Millisekunden angegeben (dies bedeutet endloses Warten). Darüber hinaus wurde anstelle der üblichen Kennung für den Mystream-Stream die spezielle Kennung $ übergeben. Diese spezielle Kennung bedeutet, dass
XREAD die maximale Kennung im Mystream-Stream als Kennung verwenden sollte. Wir erhalten also erst ab dem Moment, in dem wir zuhören, neue Nachrichten. In gewisser Weise ähnelt dies dem Unix-Befehl tail -f.
Bitte beachten Sie, dass wir bei Verwendung der Option BLOCK nicht die spezielle Kennung $ verwenden müssen. Wir können jeden im Stream vorhandenen Bezeichner verwenden. Wenn das Team unsere Anfrage sofort bearbeiten kann, ohne sie zu blockieren, wird es dies tun, andernfalls wird sie blockiert.
Durch das Blockieren von
XREAD können auch mehrere Streams gleichzeitig
abgehört werden. Sie müssen lediglich deren Namen angeben. In diesem Fall gibt der Befehl einen Datensatz des ersten Streams zurück, in den die Daten eingegangen sind. Der erste für diesen Stream blockierte Teilnehmer empfängt zuerst die Daten.
Verbrauchergruppen
Bei bestimmten Aufgaben möchten wir den Zugriff von Abonnenten auf Nachrichten innerhalb desselben Threads unterscheiden. Ein Beispiel, bei dem dies nützlich sein kann, ist eine Nachrichtenwarteschlange mit Mitarbeitern, die unterschiedliche Nachrichten vom Stream empfangen, sodass Sie die Nachrichtenverarbeitung skalieren können.
Wenn wir uns vorstellen, dass wir drei Teilnehmer C1, C2, C3 und einen Stream haben, der die Nachrichten 1, 2, 3, 4, 5, 6, 7 enthält, erfolgt der Nachrichtendienst wie in der folgenden Abbildung:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
Um diesen Effekt zu erzielen, verwendet Redis Stream ein Konzept namens Consumer Group. Dieses Konzept ähnelt einem Pseudo-Teilnehmer, der Daten von einem Stream empfängt, jedoch tatsächlich von mehreren Teilnehmern innerhalb einer Gruppe bedient wird und bestimmte Garantien bietet:
- Jede Nachricht wird an verschiedene Teilnehmer innerhalb der Gruppe gesendet.
- Innerhalb einer Gruppe werden Abonnenten anhand des Namens identifiziert, bei dem zwischen Groß- und Kleinschreibung unterschieden wird. Wenn ein Abonnent vorübergehend aus der Gruppe ausscheidet, kann er unter seinem eigenen eindeutigen Namen in der Gruppe wiederhergestellt werden.
- Jede Verbrauchergruppe folgt dem Konzept der "ersten ungelesenen Nachricht". Wenn ein Abonnent neue Nachrichten anfordert, kann er nur Nachrichten empfangen, die noch nie an einen Abonnenten innerhalb einer Gruppe gesendet wurden.
- Es gibt einen Befehl, um die erfolgreiche Verarbeitung der Nachricht durch den Teilnehmer explizit zu bestätigen. Bis dieser Befehl aufgerufen wird, bleibt die angeforderte Nachricht im Status "Ausstehend".
- Innerhalb der Verbrauchergruppe kann jeder Abonnent einen Verlauf von Nachrichten anfordern, die ihm zugestellt, aber noch nicht verarbeitet wurden (im Status "Ausstehend").
In gewissem Sinne kann der Zustand einer Gruppe wie folgt dargestellt werden:
+----------------------------------------+ | consumer_group_name: mygroup | consumer_group_stream: somekey | last_delivered_id: 1292309234234-92 | | consumers: | "consumer-1" with pending messages | 1292309234234-4 | 1292309234232-8 | "consumer-42" with pending messages | ... (and so forth) +----------------------------------------+
Jetzt ist es Zeit, sich mit den Hauptteams der Verbrauchergruppe vertraut zu machen, nämlich:
- XGROUP wird zum Erstellen, Zerstören und Verwalten von Gruppen verwendet.
- XREADGROUP wird verwendet, um einen Stream durch eine Gruppe zu lesen.
- XACK - Mit diesem Befehl kann der Teilnehmer die Nachricht als erfolgreich verarbeitet markieren
Schaffung einer Verbrauchergruppe
Angenommen, ein Mystream-Stream ist bereits vorhanden. Dann sieht der Befehl zur Gruppenerstellung folgendermaßen aus:
> XGROUP CREATE mystream mygroup $
OK
Beim Erstellen einer Gruppe müssen wir eine Kennung übergeben, ab der die Gruppe Nachrichten empfängt. Wenn wir nur alle neuen Nachrichten empfangen möchten, können wir den speziellen Bezeichner $ verwenden (wie in unserem obigen Beispiel). Wenn Sie anstelle einer speziellen Kennung 0 angeben, stehen der Gruppe alle Nachrichten des Streams zur Verfügung.
Nachdem die Gruppe erstellt wurde, können wir sofort mit dem Lesen des
Befehls XREADGROUP Nachrichten
lesen . Dieser Befehl ist
XREAD sehr ähnlich und unterstützt die optionale Option BLOCK. Es gibt jedoch eine obligatorische GROUP-Option, die immer mit zwei Argumenten angegeben werden muss: dem Namen der Gruppe und dem Namen des Abonnenten. Die COUNT-Option wird ebenfalls unterstützt.
Bevor wir den Stream lesen, lassen Sie uns dort einige Nachrichten einfügen:
> XADD mystream * message apple 1526569495631-0 > XADD mystream * message orange 1526569498055-0 > XADD mystream * message strawberry 1526569506935-0 > XADD mystream * message apricot 1526569535168-0 > XADD mystream * message banana 1526569544280-0
Versuchen wir nun, diesen Stream durch die Gruppe zu lesen:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
Der obige Befehl lautet wörtlich wie folgt:
"Ich, Alices Abonnent, ein Mitglied von mygroup, möchte eine Nachricht aus dem mystream lesen, die noch nie zuvor an jemanden gesendet wurde."
Jedes Mal, wenn ein Teilnehmer eine Operation mit einer Gruppe ausführt, muss er seinen Namen angeben und sich innerhalb der Gruppe eindeutig identifizieren. Der obige Befehl enthält ein weiteres sehr wichtiges Detail - die spezielle Kennung ">". Diese spezielle Kennung filtert Nachrichten und hinterlässt nur diejenigen, die bisher noch nie zugestellt wurden.
In besonderen Fällen können Sie auch eine echte Kennung angeben, z. B. 0 oder eine andere gültige Kennung. In diesem Fall gibt der Befehl
XREADGROUP den Verlauf von Nachrichten mit dem Status "Ausstehend" an Sie zurück, die an den angegebenen Teilnehmer (Alice)
gesendet wurden , aber noch nicht mit dem
Befehl XACK bestätigt wurden.
Wir können dieses Verhalten überprüfen, indem wir sofort die Kennung 0 ohne die Option
COUNT angeben. Wir sehen nur die einzige ausstehende Nachricht, d. H. Die Nachricht mit dem Apfel:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
Wenn wir jedoch bestätigen, dass die Nachricht erfolgreich verarbeitet wurde, wird sie nicht mehr angezeigt:
> XACK mystream mygroup 1526569495631-0 (integer) 1 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) (empty list or set)
Jetzt ist Bob an der Reihe, etwas zu lesen:
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 2) 1) 1526569506935-0 2) 1) "message" 2) "strawberry"
Bob, ein Mitglied von mygroup, bat um nicht mehr als zwei Nachrichten. Der Befehl meldet nur nicht zugestellte Nachrichten aufgrund der speziellen Kennung ">". Wie Sie sehen können, wird die Meldung "Apfel" nicht angezeigt, da sie bereits an Alice übermittelt wurde, sodass Bob "Orange" und "Erdbeere" erhält.
Somit können Alice, Bob und jeder andere Gruppenabonnent verschiedene Nachrichten aus demselben Stream lesen. Sie können auch ihren Rohnachrichtenverlauf lesen oder Nachrichten als verarbeitet markieren.
Es gibt einige Dinge zu beachten:
- Sobald der Teilnehmer die Nachricht als XREADGROUP- Befehl betrachtet, wechselt diese Nachricht in den Status "Ausstehend" und wird diesem bestimmten Teilnehmer zugewiesen. Andere Gruppenabonnenten können diese Nachricht nicht lesen.
- Abonnenten werden bei der ersten Erwähnung automatisch erstellt, ihre explizite Erstellung ist nicht erforderlich.
- Mit XREADGROUP können Sie Nachrichten aus mehreren verschiedenen Streams gleichzeitig lesen. Damit dies jedoch funktioniert, müssen Sie zunächst mit XGROUP für jeden Stream Gruppen mit demselben Namen erstellen
Crash Recovery
Der Teilnehmer kann den Fehler beheben und seine Nachrichtenliste mit dem Status "Ausstehend" erneut lesen. In der realen Welt können Abonnenten jedoch letztendlich scheitern. Was passiert mit der baumelnden Nachricht eines Abonnenten, wenn er sich nach einem Fehler nicht erholen kann?
Die Verbrauchergruppe bietet eine Funktion, die speziell für solche Fälle verwendet wird - wenn Sie den Eigentümer von Nachrichten ändern müssen.
Zunächst müssen Sie den Befehl
XPENDING aufrufen , der alle Nachrichten der Gruppe mit dem Status "Ausstehend" anzeigt. In seiner einfachsten Form wird ein Befehl mit nur zwei Argumenten aufgerufen: dem Namen des Streams und dem Namen der Gruppe:
> XPENDING mystream mygroup 1) (integer) 2 2) 1526569498055-0 3) 1526569506935-0 4) 1) 1) "Bob" 2) "2"
Das Team druckte die Anzahl der unverarbeiteten Nachrichten für die gesamte Gruppe und für jeden Teilnehmer. Wir haben nur Bob mit zwei unverarbeiteten Nachrichten, da die einzige von Alice angeforderte Nachricht mit
XACK bestätigt wurde.
Wir können zusätzliche Informationen mit mehr Argumenten anfordern:
XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{Start-ID} {End-ID} - Bereich von Bezeichnern (Sie können "-" und "+" verwenden)
{count} - Die Anzahl der Zustellversuche
{Verbrauchername} - Gruppenname
> XPENDING mystream mygroup - + 10 1) 1) 1526569498055-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 2) 1) 1526569506935-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1
Jetzt haben wir die Details für jede Nachricht: Kennung, Teilnehmername, Ausfallzeit in Millisekunden und schließlich die Anzahl der Zustellversuche. Wir haben zwei Nachrichten von Bob, die 74170458 Millisekunden lang ungefähr 20 Stunden lang inaktiv sind.
Bitte beachten Sie, dass uns niemand daran
hindert, den Inhalt der Nachricht nur mithilfe von
XRANGE zu überprüfen .
> XRANGE mystream 1526569498055-0 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
Wir müssen denselben Bezeichner nur zweimal in den Argumenten wiederholen. Nachdem wir eine Idee haben, kann Alice entscheiden, dass Bob sich nach 20 Stunden Inaktivität wahrscheinlich nicht erholen wird. Es ist Zeit, diese Nachrichten anzufordern und die Verarbeitung anstelle von Bob fortzusetzen. Dazu verwenden wir den Befehl
XCLAIM :
XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}
Mit diesem Befehl können wir eine "fremde" Nachricht erhalten, die noch nicht verarbeitet wurde, indem wir den Eigentümer in {consumer} ändern. Wir können jedoch auch eine minimale Ausfallzeit (minimale Leerlaufzeit) angeben. Dies hilft zu vermeiden, dass zwei Clients gleichzeitig versuchen, den Eigentümer derselben Nachrichten zu ändern:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
Der erste Kunde setzt Ausfallzeiten zurück und erhöht den Zähler für die Anzahl der Lieferungen. Der zweite Client kann es also nicht anfordern.
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
Die Nachricht wurde erfolgreich von Alice beansprucht, die die Nachricht nun verarbeiten und bestätigen kann.
Aus dem obigen Beispiel geht hervor, dass die erfolgreiche Ausführung der Anforderung den Inhalt der Nachricht selbst zurückgibt. Dies ist jedoch nicht erforderlich. Mit der Option JUSTID können nur Nachrichtenkennungen zurückgegeben werden. Dies ist nützlich, wenn Sie nicht an den Details der Nachricht interessiert sind und die Systemleistung steigern möchten.
Lieferschalter
Der Zähler, den Sie in der
XPENDING- Ausgabe beobachten, ist die Anzahl der Zustellungen jeder Nachricht. Ein solcher Zähler wird auf zwei Arten inkrementiert: wenn die Nachricht erfolgreich über
XCLAIM angefordert wurde oder wenn der
XREADGROUP- Aufruf
verwendet wird .
Es ist normal, dass einige Nachrichten mehrmals zugestellt werden. Die Hauptsache ist, dass dadurch alle Nachrichten verarbeitet werden. Manchmal treten bei der Verarbeitung einer Nachricht Probleme auf, die auf eine Beschädigung der Nachricht selbst zurückzuführen sind, oder die Verarbeitung der Nachricht führt zu einem Fehler im Handlercode.
In diesem Fall kann sich herausstellen, dass niemand diese Nachricht verarbeiten kann. Da wir einen Zähler für Zustellversuche haben, können wir diesen Zähler verwenden, um solche Situationen zu erkennen. Sobald der Zustellungszähler eine von Ihnen angegebene große Anzahl erreicht, ist es wahrscheinlich sinnvoller, eine solche Nachricht in einem anderen Stream zu platzieren und eine Benachrichtigung an den Systemadministrator zu senden.Thread-Status
Mit dem Befehl XINFO werden verschiedene Informationen zu einem Stream und seinen Gruppen angefordert . Die Grundform des Befehls lautet beispielsweise wie folgt: > XINFO STREAM mystream 1) length 2) (integer) 13 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1524494395530-0 2) 1) "a" 2) "1" 3) "b" 4) "2" 11) last-entry 12) 1) 1526569544280-0 2) 1) "message" 2) "banana"
Der obige Befehl zeigt allgemeine Informationen zum angegebenen Stream an. Nun ein etwas komplexeres Beispiel: > XINFO GROUPS mystream 1) 1) name 2) "mygroup" 3) consumers 4) (integer) 2 5) pending 6) (integer) 2 2) 1) name 2) "some-other-group" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0
Der obige Befehl zeigt allgemeine Informationen für alle Gruppen des angegebenen Streams an > XINFO CONSUMERS mystream mygroup 1) 1) name 2) "Alice" 3) pending 4) (integer) 1 5) idle 6) (integer) 9104628 2) 1) name 2) "Bob" 3) pending 4) (integer) 1 5) idle 6) (integer) 83841983
Der obige Befehl zeigt Informationen zu allen Abonnenten des angegebenen Streams und der angegebenen Gruppe an.Wenn Sie die Befehlssyntax vergessen haben, wenden Sie sich an den Befehl, um Hilfe zu erhalten: > XINFO HELP 1) XINFO {subcommand} arg arg ... arg. Subcommands are: 2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}. 3) GROUPS {key} -- Show the stream consumer groups. 4) STREAM {key} -- Show information about the stream. 5) HELP -- Print this help.
Stream-Größenbeschränkung
Viele Anwendungen möchten nicht für immer Daten im Stream sammeln. Es ist oft nützlich, die maximale Anzahl von Nachrichten im Stream zu haben. In anderen Fällen ist es hilfreich, alle Nachrichten vom Stream in einen anderen dauerhaften Speicher zu übertragen, wenn die angegebene Streamgröße erreicht ist. Sie können die Größe des Streams mit dem Parameter MAXLEN im Befehl XADD begrenzen : > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3"
Bei Verwendung von MAXLEN werden alte Datensätze automatisch gelöscht, wenn die angegebene Länge erreicht ist, sodass der Stream eine konstante Größe hat. In diesem Fall erfolgt das Trimmen im Redis-Speicher jedoch nicht auf die produktivste Weise. Die Situation kann wie folgt verbessert werden: Das Argument ~ im obigen Beispiel bedeutet, dass wir die Länge des Streams nicht auf einen bestimmten Wert beschränken müssen. In unserem Beispiel kann dies eine beliebige Zahl größer oder gleich 1000 sein (z. B. 1000, 1010 oder 1030). Wir haben nur ausdrücklich angegeben, dass in unserem Stream mindestens 1000 Datensätze gespeichert werden sollen. Dies macht das Arbeiten mit Speicher in Redis viel effizienter. Es gibt auch einen separaten XTRIM- Befehl , der dasselbe tut:XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
> XTRIM mystream MAXLEN 10
> XTRIM mystream MAXLEN ~ 10
Permanente Speicherung und Replikation
Redis Stream wird asynchron auf Slave-Knoten repliziert und in Dateien wie AOF (Snapshot aller Daten) und RDB (Protokoll aller Schreibvorgänge) gespeichert. Die Statusreplikation für Verbrauchergruppen wird ebenfalls unterstützt. Befindet sich die Nachricht auf dem Masterknoten im Status "Ausstehend", hat diese Nachricht auf den Slaveknoten denselben Status.Einzelne Elemente aus einem Stream entfernen
Zum Löschen von Nachrichten gibt es einen speziellen XDEL- Befehl . Der Befehl erhält den Namen des Streams, gefolgt von den Bezeichnern der Nachricht, die gelöscht werden muss: > XRANGE mystream - + COUNT 2 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" > XDEL mystream 1526654999635-0 (integer) 1 > XRANGE mystream - + COUNT 2 1) 1) 1526655000369-0 2) 1) "value" 2) "3"
Wenn Sie diesen Befehl verwenden, müssen Sie berücksichtigen, dass der Speicher nicht sofort freigegeben wird.Streams ohne Länge
Der Unterschied zwischen Streams und anderen Redis-Datenstrukturen besteht darin, dass die Datenstruktur selbst aus dem Speicher gelöscht wird, wenn andere Datenstrukturen keine Elemente mehr in sich haben. So wird beispielsweise die sortierte Menge vollständig gelöscht, wenn der ZREM-Aufruf das letzte Element entfernt. Stattdessen dürfen Threads im Speicher bleiben, ohne dass sich auch nur ein einziges Element darin befindet.Fazit
Redis Stream ist ideal zum Erstellen von Nachrichtenbrokern, Nachrichtenwarteschlangen, einheitlichen Protokollen und Chat-Systemen, in denen der Verlauf gespeichert wird.Wie Nicklaus Wirth einmal sagte , sind Programme Algorithmen plus Datenstrukturen, und Redis bietet Ihnen bereits beides.