
Redis Stream - un nouveau type de données abstrait introduit dans Redis avec la sortie de la version 5.0
Conceptuellement, Redis Stream est une liste à laquelle vous pouvez ajouter des entrées. Chaque entrée a un identifiant unique. Par défaut, un identifiant est généré automatiquement et inclut un horodatage. Par conséquent, vous pouvez demander des plages d'enregistrement par heure ou recevoir de nouvelles données à mesure qu'elles arrivent dans le flux, car la commande Unix tail -f lit le fichier journal et se fige en prévision de nouvelles données. Veuillez noter que plusieurs clients peuvent écouter le flux en même temps, car de nombreux processus «tail -f» peuvent lire un fichier en même temps sans entrer en conflit les uns avec les autres.
Pour comprendre tous les avantages du nouveau type de données, rappelons brièvement les structures Redis existantes qui répètent partiellement les fonctionnalités de Redis Stream.
Excursion historique
Redis pub / sub
Redis Pub / Sub est un système de messagerie simple déjà intégré à votre stockage de valeurs-clés. Cependant, pour plus de simplicité, vous devez payer:
- Si l'éditeur pour une raison quelconque échoue, il perd tous ses abonnés
- L'éditeur doit connaître l'adresse exacte de tous ses abonnés.
- Un éditeur peut surcharger ses abonnés si les données sont publiées plus rapidement qu'elles ne sont traitées
- Le message est supprimé du tampon de l'éditeur immédiatement après sa publication, quel que soit le nombre d'abonnés qu'il a transmis et la rapidité avec laquelle ils ont réussi à traiter ce message.
- Tous les abonnés recevront le message en même temps. Les abonnés eux-mêmes doivent en quelque sorte s'entendre sur la façon de traiter le même message.
- Il n'y a pas de mécanisme intégré pour confirmer le succès du traitement d'un message par un abonné Si l'abonné a reçu un message et est tombé pendant le traitement, l'éditeur n'en sera pas informé.
Liste Redis
Redis List est une structure de données qui prend en charge les commandes de lecture de verrouillage. Vous pouvez ajouter et lire des messages depuis le début ou la fin de la liste. Sur la base de cette structure, vous pouvez créer une bonne pile ou file d'attente pour votre système distribué et cela suffira dans la plupart des cas. Les principales différences avec Redis Pub / Sub:
- Le message est remis à un client. Le premier client bloqué par la lecture recevra d'abord les données.
- Clint doit lancer une opération de lecture pour chaque message. List ne sait rien des clients.
- Les messages sont stockés jusqu'à ce que quelqu'un les compte ou les supprime explicitement. Si vous configurez un serveur Redis pour vider les données sur le disque, la fiabilité du système augmente considérablement.
Introduction à Stream
Ajout d'un enregistrement à un flux
La commande
XADD ajoute un nouvel enregistrement au flux. Un enregistrement n'est pas seulement une chaîne, il se compose d'une ou plusieurs paires clé-valeur. Ainsi, chaque enregistrement est déjà structuré et ressemble à la structure d'un fichier CSV.
> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0
Dans l'exemple ci-dessus, nous ajoutons deux champs au flux avec le nom (clé) "mystream": "sensor-id" et "temperature" avec les valeurs "1234" et "19.8", respectivement. Comme deuxième argument, la commande accepte l'identifiant qui sera affecté à l'enregistrement - cet identifiant identifie de manière unique chaque enregistrement du flux. Cependant, dans ce cas, nous avons passé * car nous voulons que Redis génère un nouvel identifiant pour nous. Chaque nouvel identifiant augmentera. Par conséquent, chaque nouvel enregistrement aura un identifiant plus grand par rapport aux enregistrements précédents.
Format d'ID
L'identifiant d'enregistrement renvoyé par la commande
XADD se compose de deux parties:
{millisecondsTime}-{sequenceNumber}
millisecondsTime - Temps Unix en millisecondes (temps du serveur Redis). Toutefois, si l'heure actuelle est identique ou inférieure à l'heure de l'enregistrement précédent, l'horodatage de l'enregistrement précédent est utilisé. Par conséquent, si l'heure du serveur est revenue au passé, le nouvel identifiant conservera toujours la propriété d'augmentation.
sequenceNumber est utilisé pour les enregistrements créés dans la même milliseconde.
sequenceNumber sera augmenté de 1 par rapport à l'enregistrement précédent. Étant donné que
sequenceNumber a une taille de 64 bits, en pratique, vous ne devez pas rencontrer de limite sur le nombre d'enregistrements pouvant être générés en une milliseconde.
Le format de ces identifiants à première vue peut sembler étrange. Un lecteur incrédule peut se demander pourquoi le temps fait partie d'un identifiant. La raison en est que les flux Redis prennent en charge les demandes de plage par des identifiants. L'identifiant étant associé à l'heure de création de l'enregistrement, cela permet de demander des plages horaires. Nous examinerons un exemple concret lorsque nous passerons à l'étude de la
commande XRANGE .
Si, pour une raison quelconque, l'utilisateur doit spécifier son propre identifiant, qui, par exemple, est associé à un système externe, nous pouvons le transmettre à la
commande XADD au lieu du signe * comme indiqué ci-dessous:
> XADD somestream 0-1 field value 0-1 > XADD somestream 0-2 foo bar 0-2
Veuillez noter que dans ce cas, vous devez surveiller vous-même l'augmentation de l'identifiant. Dans notre exemple, l'identifiant minimum est «0-1», donc l'équipe n'acceptera pas un autre identifiant égal ou inférieur à «0-1».
> XADD somestream 0-1 foo bar (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
Le nombre d'enregistrements dans le flux
Vous pouvez obtenir le nombre d'enregistrements dans un flux simplement en utilisant la commande
XLEN . Pour notre exemple, cette commande renverra la valeur suivante:
> XLEN somestream (integer) 2
Demandes de plage - XRANGE et XREVRANGE
Pour demander des données pour une plage, nous devons spécifier deux identifiants - le début et la fin de la plage. La plage renvoyée comprendra tous les éléments, y compris les bordures. Il y a également deux identificateurs spéciaux "-" et "+", respectivement, ce qui signifie le plus petit (premier enregistrement) et le plus grand (dernier enregistrement) identificateur dans le flux. L'exemple ci-dessous affiche toutes les entrées de flux.
> XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2"
Chaque enregistrement renvoyé est un tableau de deux éléments: un identifiant et une liste de paires clé-valeur. Nous avons déjà dit que les identifiants d'enregistrement sont liés au temps. Par conséquent, nous pouvons demander la plage d'une période de temps spécifique. Cependant, nous pouvons spécifier dans la requête non pas l'identifiant complet, mais uniquement le temps Unix, en omettant la partie liée à
sequenceNumber . La partie omise de l'identifiant est automatiquement égale à zéro au début de la plage et à la valeur maximale possible à la fin de la plage. Voici un exemple de la façon de demander une plage de deux millisecondes.
> XRANGE mystream 1518951480106 1518951480107 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8"
Nous n'avons qu'un seul enregistrement dans cette plage, mais dans les ensembles de données réels, le résultat renvoyé peut être énorme. Pour cette raison,
XRANGE prend en charge l'option COUNT. En spécifiant la quantité, nous pouvons simplement obtenir les N premiers enregistrements. Si nous devons obtenir les N entrées suivantes (pagination), nous pouvons utiliser le dernier identifiant reçu, augmenter son
numéro de
séquence de un et demander à nouveau. Voyons cela dans l'exemple suivant. Nous commençons à ajouter 10 éléments en utilisant
XADD (supposons que le flux mystream a déjà été rempli avec 10 éléments). Pour commencer l'itération, en obtenant 2 éléments par commande, nous commençons avec la plage complète, mais avec COUNT égal à 2.
> XRANGE mystream - + COUNT 2 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
Pour continuer l'itération avec les deux éléments suivants, nous devons sélectionner le dernier identifiant reçu, c'est-à-dire 1519073279157-0, et ajouter 1 à
sequenceNumber .
L'identifiant résultant, dans ce cas 1519073279157-1, peut maintenant être utilisé comme un nouvel argument au début de la plage pour le prochain appel
XRANGE :
> XRANGE mystream 1519073279157-1 + COUNT 2 1) 1) 1519073280281-0 2) 1) "foo" 2) "value_3" 2) 1) 1519073281432-0 2) 1) "foo" 2) "value_4"
Et ainsi de suite. Puisque la complexité de
XRANGE est O (log (N)) pour rechercher, puis O (M) pour renvoyer M éléments, chaque étape d'itération est rapide. Ainsi, en utilisant
XRANGE, il est possible d'
itérer efficacement les flux.
La commande
XREVRANGE est l'équivalent de
XRANGE , mais renvoie les éléments dans l'ordre inverse:
> XREVRANGE mystream + - COUNT 1 1) 1) 1519073287312-0 2) 1) "foo" 2) "value_10"
Notez que la commande
XREVRANGE prend les arguments de la plage de début et de fin dans l'ordre inverse.
Lecture de nouveaux enregistrements avec XREAD
Il est souvent nécessaire de s'abonner au flux et de ne recevoir que de nouveaux messages. Ce concept peut sembler être un Redis Pub / Sub ou bloquer une liste Redis, mais il existe des différences fondamentales dans la façon d'utiliser Redis Stream:
- Chaque nouveau message est remis par défaut à chaque abonné. Ce comportement est différent du blocage de la liste Redis, où un nouveau message sera lu par un seul abonné.
- Alors que dans Redis Pub / Sub tous les messages sont oubliés et jamais enregistrés, dans Stream tous les messages sont stockés indéfiniment (sauf si le client provoque explicitement la suppression).
- Redis Stream vous permet de différencier l'accès aux messages au sein d'un même flux. Un abonné spécifique ne peut voir que l'historique de ses messages personnels.
Vous pouvez vous abonner au flux et recevoir de nouveaux messages à l'aide de la commande
XREAD . C'est un peu plus compliqué que
XRANGE , nous allons donc commencer par des exemples plus simples.
> XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2"
Dans l'exemple ci-dessus, un formulaire
XREAD non bloquant est
spécifié . Veuillez noter que l'option COUNT est facultative. En fait, la seule option de commande requise est l'option STREAMS, qui définit la liste des flux avec l'identifiant maximal correspondant. Nous avons écrit «STREAMS mystream 0» - nous voulons obtenir tous les enregistrements du flux mystream avec un identifiant supérieur à «0-0». Comme vous pouvez le voir dans l'exemple, la commande renvoie le nom du flux, car nous pouvons nous abonner à plusieurs flux en même temps. Nous pourrions écrire, par exemple, «STREAMS mystream otherstream 0 0». Veuillez noter qu'après l'option STREAMS, nous devons d'abord fournir les noms de tous les flux nécessaires, puis seulement une liste d'identifiants.
Sous cette forme simple, la commande n'a rien de spécial par rapport à
XRANGE . Cependant, la chose intéressante est que nous pouvons facilement transformer
XREAD en une commande de blocage en spécifiant l'argument BLOCK:
> XREAD BLOCK 0 STREAMS mystream $
Dans l'exemple ci-dessus, une nouvelle option BLOCK est spécifiée avec un délai d'expiration de 0 millisecondes (cela signifie une attente sans fin). De plus, au lieu de passer l'identifiant habituel pour le flux mystream, l'identifiant spécial $ a été passé. Cet identifiant spécial signifie que
XREAD doit utiliser l'identifiant maximum dans le flux mystream comme identifiant. Nous ne recevrons donc que de nouveaux messages, à partir du moment où nous avons commencé à écouter. D'une certaine manière, cela est similaire à la commande Unix tail -f.
Veuillez noter que lorsque vous utilisez l'option BLOC, nous n'avons pas besoin d'utiliser l'identifiant spécial $. Nous pouvons utiliser n'importe quel identifiant existant dans le flux. Si l'équipe peut répondre immédiatement à notre demande, sans bloquer, elle le fera, sinon elle sera bloquée.
Le blocage de
XREAD peut également écouter plusieurs flux à la fois, il vous suffit de spécifier leurs noms. Dans ce cas, la commande renverra un enregistrement du premier flux dans lequel les données sont arrivées. Le premier abonné bloqué pour ce flux recevra d'abord les données.
Groupes de consommateurs
Dans certaines tâches, nous voulons différencier l'accès des abonnés aux messages dans le même fil. Un exemple où cela peut être utile est une file d'attente de messages avec des travailleurs qui recevront différents messages du flux, vous permettant de faire évoluer le traitement des messages.
Si nous imaginons que nous avons trois abonnés C1, C2, C3 et un flux qui contient les messages 1, 2, 3, 4, 5, 6, 7, alors le service de messagerie se produira comme dans le diagramme ci-dessous:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
Pour obtenir cet effet, Redis Stream utilise un concept appelé Consumer Group. Ce concept est similaire à un pseudo-abonné qui reçoit des données d'un flux, mais est en fait servi par plusieurs abonnés au sein d'un groupe, offrant certaines garanties:
- Chaque message est remis à différents abonnés du groupe.
- Au sein d'un groupe, les abonnés sont identifiés par leur nom, qui est une chaîne sensible à la casse. Si un abonné abandonne temporairement le groupe, il peut être restauré dans le groupe par son propre nom unique.
- Chaque groupe de consommateurs suit le concept de «premier message non lu». Lorsqu'un abonné demande de nouveaux messages, il ne peut recevoir que des messages qui n'ont jamais été remis à aucun abonné d'un groupe.
- Il existe une commande pour confirmer explicitement le succès du traitement du message par l'abonné. Jusqu'à ce que cette commande soit appelée, le message demandé restera dans l'état "en attente".
- Au sein du Consumer Group, chaque abonné peut demander un historique des messages qui lui ont été remis, mais qui n'ont pas encore été traités (dans l'état "en attente")
En un sens, l'état d'un groupe peut être représenté comme suit:
+----------------------------------------+ | consumer_group_name: mygroup | consumer_group_stream: somekey | last_delivered_id: 1292309234234-92 | | consumers: | "consumer-1" with pending messages | 1292309234234-4 | 1292309234232-8 | "consumer-42" with pending messages | ... (and so forth) +----------------------------------------+
Il est maintenant temps de se familiariser avec les principales équipes du Consumer Group, à savoir:
- XGROUP est utilisé pour créer, détruire et gérer des groupes.
- XREADGROUP est utilisé pour lire un flux à travers un groupe.
- XACK - cette commande permet à l'abonné de marquer le message comme traité avec succès
Création d'un groupe de consommateurs
Supposons qu'un flux mystream existe déjà. La commande de création de groupe ressemblera alors à:
> XGROUP CREATE mystream mygroup $
OK
Lors de la création d'un groupe, nous devons transmettre un identifiant commençant par lequel le groupe recevra les messages. Si nous voulons simplement recevoir tous les nouveaux messages, nous pouvons utiliser l'identifiant spécial $ (comme dans notre exemple ci-dessus). Si vous spécifiez 0 au lieu d'un identifiant spécial, tous les messages du flux seront disponibles pour le groupe.
Maintenant que le groupe est créé, nous pouvons immédiatement commencer à lire les messages à l'aide de la
commande XREADGROUP . Cette commande est très similaire à
XREAD et prend en charge l'option BLOC facultative. Cependant, il existe une option GROUP obligatoire, qui doit toujours être spécifiée avec deux arguments: le nom du groupe et le nom de l'abonné. L'option COUNT est également prise en charge.
Avant de lire le flux, mettons-y quelques messages:
> XADD mystream * message apple 1526569495631-0 > XADD mystream * message orange 1526569498055-0 > XADD mystream * message strawberry 1526569506935-0 > XADD mystream * message apricot 1526569535168-0 > XADD mystream * message banana 1526569544280-0
Essayons maintenant de lire ce flux à travers le groupe:
> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
La commande ci-dessus textuellement se lit comme suit:
"Moi, abonné d'Alice, membre de mon groupe, je veux lire un message du mystream qui n'a jamais été livré à personne auparavant."
Chaque fois qu'un abonné effectue une opération avec un groupe, il doit indiquer son nom, s'identifiant de manière unique au sein du groupe. Il y a un autre détail très important dans la commande ci-dessus - l'identifiant spécial ">". Cet identifiant spécial filtre les messages, ne laissant que ceux qui n'ont jusqu'à présent jamais été remis.
De plus, dans des cas particuliers, vous pouvez spécifier un identifiant réel, tel que 0 ou tout autre identifiant valide. Dans ce cas, la commande
XREADGROUP vous renverra l'historique des messages avec le statut «en attente», qui ont été remis à l'abonné spécifié (Alice), mais qui n'ont pas encore été confirmés à l'aide de la
commande XACK .
Nous pouvons vérifier ce comportement en spécifiant immédiatement l'identifiant 0, sans l'option
COUNT . Nous voyons juste le seul message en attente, c'est-à-dire le message avec la pomme:
> XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple"
Cependant, si nous confirmons que le message a été traité avec succès, il ne s'affichera plus:
> XACK mystream mygroup 1526569495631-0 (integer) 1 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) (empty list or set)
Maintenant, c'est au tour de Bob de lire quelque chose:
> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 2) 1) 1526569506935-0 2) 1) "message" 2) "strawberry"
Bob, membre de mon groupe, n'a pas demandé plus de deux messages. La commande signale uniquement les messages non livrés en raison de l'identifiant spécial ">". Comme vous pouvez le voir, le message «pomme» ne s'affiche pas, car il a déjà été livré à Alice, donc Bob reçoit «orange» et «fraise».
Ainsi, Alice, Bob et tout autre abonné du groupe peuvent lire différents messages du même flux. Ils peuvent également lire l'historique de leurs messages bruts ou marquer les messages comme traités.
Il y a quelques points à garder à l'esprit:
- Dès que l'abonné considère que le message est la commande XREADGROUP , ce message passe à l'état «en attente» et est attribué à cet abonné particulier. Les autres abonnés du groupe ne pourront pas lire ce message.
- Les abonnés sont automatiquement créés à la première mention, il n'y a pas besoin de leur création explicite.
- Avec XREADGROUP, vous pouvez lire les messages de plusieurs flux différents en même temps, cependant, pour que cela fonctionne, vous devez d'abord créer des groupes avec le même nom pour chaque flux à l'aide de XGROUP
Récupération après incident
L'abonné peut récupérer de l'échec et relire sa liste de messages avec le statut "en attente". Cependant, dans le monde réel, les abonnés peuvent finalement échouer. Qu'arrive-t-il au message suspendu d'un abonné s'il ne peut pas récupérer après un échec?
Le Consumer Group propose une fonctionnalité qui est utilisée spécifiquement pour de tels cas - lorsque vous devez changer le propriétaire des messages.
Tout d'abord, vous devez appeler la commande
XPENDING , qui affiche tous les messages du groupe avec le statut "en attente". Dans sa forme la plus simple, une commande est appelée avec seulement deux arguments: le nom du flux et le nom du groupe:
> XPENDING mystream mygroup 1) (integer) 2 2) 1526569498055-0 3) 1526569506935-0 4) 1) 1) "Bob" 2) "2"
L'équipe a imprimé le nombre de messages non traités pour l'ensemble du groupe et pour chaque abonné. Nous n'avons que Bob avec deux messages non traités, car le seul message demandé par Alice a été confirmé avec
XACK .
Nous pouvons demander des informations supplémentaires en utilisant plus d'arguments:
XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]
{start-id} {end-id} - plage d'identifiants (vous pouvez utiliser "-" et "+")
{count} - le nombre de tentatives de livraison
{consumer-name} - nom du groupe
> XPENDING mystream mygroup - + 10 1) 1) 1526569498055-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 2) 1) 1526569506935-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1
Nous avons maintenant les détails de chaque message: identifiant, nom de l'abonné, temps d'arrêt en millisecondes, et enfin, le nombre de tentatives de livraison. Nous avons deux messages de Bob et ils sont inactifs pendant 74170458 millisecondes, environ 20 heures.
Veuillez noter que personne ne nous empêche de vérifier le contenu du message simplement en utilisant
XRANGE .
> XRANGE mystream 1526569498055-0 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
Il suffit de répéter deux fois le même identifiant dans les arguments. Maintenant que nous avons une idée, Alice peut décider que Bob ne se remettra probablement pas après 20 heures d'inactivité, et il est temps de demander ces messages et de reprendre leur traitement à la place de Bob. Pour ce faire, nous utilisons la commande
XCLAIM :
XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}
En utilisant cette commande, nous pouvons obtenir un message «étranger» qui n'a pas encore été traité en changeant le propriétaire en {consommateur}. Cependant, nous pouvons également fournir un temps d'arrêt minimum {min-idle-time}. Cela permet d'éviter une situation où deux clients essaient de changer simultanément le propriétaire des mêmes messages:
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
Le premier client réinitialisera les temps d'arrêt et augmentera le compteur du nombre de livraisons. Le deuxième client ne pourra donc pas le demander.
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange"
Le message a été revendiqué avec succès par Alice, qui peut maintenant traiter le message et le reconnaître.
D'après l'exemple ci-dessus, il est clair qu'une exécution réussie de la demande renvoie le contenu du message lui-même. Cependant, ce n'est pas nécessaire. L'option JUSTID peut être utilisée pour renvoyer uniquement des identificateurs de message. Ceci est utile si vous n'êtes pas intéressé par les détails du message et souhaitez augmenter les performances du système.
Comptoir de livraison
Le compteur que vous observez dans la sortie
XPENDING est le nombre de livraisons de chaque message. Un tel compteur est incrémenté de deux manières: lorsque le message est correctement demandé via
XCLAIM ou lorsque l'appel
XREADGROUP est
utilisé .
Il est normal que certains messages soient remis plusieurs fois. L'essentiel est qu'en conséquence, tous les messages soient traités. Parfois, lors du traitement d'un message, il y a des problèmes dus à des dommages au message lui-même ou le traitement du message provoque une erreur dans le code du gestionnaire.
Dans ce cas, il se peut que personne ne puisse traiter ce message. Comme nous avons un compteur de tentatives de livraison, nous pouvons utiliser ce compteur pour détecter de telles situations. Par conséquent, dès que le compteur de remise atteint un grand nombre spécifié par vous, il sera probablement plus raisonnable de placer un tel message dans un autre flux et d'envoyer une notification à l'administrateur système.Statut du fil
La commande XINFO est utilisée pour demander diverses informations sur un flux et ses groupes. Par exemple, la forme de base de la commande est la suivante: > XINFO STREAM mystream 1) length 2) (integer) 13 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1524494395530-0 2) 1) "a" 2) "1" 3) "b" 4) "2" 11) last-entry 12) 1) 1526569544280-0 2) 1) "message" 2) "banana"
La commande ci-dessus affiche des informations générales sur le flux spécifié. Maintenant un exemple un peu plus complexe: > XINFO GROUPS mystream 1) 1) name 2) "mygroup" 3) consumers 4) (integer) 2 5) pending 6) (integer) 2 2) 1) name 2) "some-other-group" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0
La commande ci-dessus affiche des informations générales pour tous les groupes du flux spécifié > XINFO CONSUMERS mystream mygroup 1) 1) name 2) "Alice" 3) pending 4) (integer) 1 5) idle 6) (integer) 9104628 2) 1) name 2) "Bob" 3) pending 4) (integer) 1 5) idle 6) (integer) 83841983
La commande ci-dessus affiche des informations sur tous les abonnés du flux et du groupe spécifiés.Si vous oubliez la syntaxe de la commande, contactez simplement la commande pour obtenir de l'aide: > XINFO HELP 1) XINFO {subcommand} arg arg ... arg. Subcommands are: 2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}. 3) GROUPS {key} -- Show the stream consumer groups. 4) STREAM {key} -- Show information about the stream. 5) HELP -- Print this help.
Limite de taille de flux
De nombreuses applications ne souhaitent pas collecter des données dans le flux pour toujours. Il est souvent utile d'avoir le nombre maximum de messages dans le flux. Dans d'autres cas, il est utile de transférer tous les messages du flux vers un autre stockage persistant lorsque la taille de flux spécifiée est atteinte. Vous pouvez limiter la taille du flux à l'aide du paramètre MAXLEN dans la commande XADD : > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3"
Lorsque vous utilisez MAXLEN, les anciens enregistrements sont automatiquement supprimés lorsque la longueur spécifiée est atteinte, de sorte que le flux a une taille constante. Cependant, le découpage dans ce cas ne se produit pas de la manière la plus productive dans la mémoire Redis. La situation peut être améliorée comme suit: L' argument ~ dans l'exemple ci-dessus signifie que nous n'avons pas besoin de limiter la longueur du flux à une valeur spécifique. Dans notre exemple, cela peut être n'importe quel nombre supérieur ou égal à 1000 (par exemple, 1000, 1010 ou 1030). Nous avons simplement indiqué explicitement que nous souhaitons que notre flux stocke au moins 1 000 enregistrements. Cela rend le travail avec la mémoire beaucoup plus efficace dans Redis. Il existe également une commande XTRIM distincte qui fait la même chose:XADD mystream MAXLEN ~ 1000 * ... entry fields here ...
> XTRIM mystream MAXLEN 10
> XTRIM mystream MAXLEN ~ 10
Stockage et réplication persistants
Redis Stream est répliqué de manière asynchrone sur les nœuds esclaves et enregistré dans des fichiers tels que AOF (instantané de toutes les données) et RDB (journal de toutes les opérations d'écriture). La réplication d'état des groupes de consommateurs est également prise en charge. Par conséquent, si le message est dans l'état "en attente" sur le nœud maître, alors sur les nœuds esclaves, ce message aura le même état.Suppression d'éléments individuels d'un flux
Pour supprimer les messages, il existe une commande XDEL spéciale . La commande obtient le nom du flux, suivi des identifiants du message à supprimer: > XRANGE mystream - + COUNT 2 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" > XDEL mystream 1526654999635-0 (integer) 1 > XRANGE mystream - + COUNT 2 1) 1) 1526655000369-0 2) 1) "value" 2) "3"
Lorsque vous utilisez cette commande, vous devez tenir compte du fait qu'en fait, la mémoire ne sera pas libérée immédiatement.Flux de longueur nulle
La différence entre les flux et les autres structures de données Redis est que lorsque d'autres structures de données n'ont plus d'éléments en elles-mêmes, comme effet secondaire, la structure de données elle-même sera supprimée de la mémoire. Ainsi, par exemple, l'ensemble trié sera complètement supprimé lorsque l'appel ZREM supprime le dernier élément. Au lieu de cela, les threads sont autorisés à rester en mémoire sans même avoir un seul élément à l'intérieur.Conclusion
Redis Stream est idéal pour créer des courtiers de messages, des files d'attente de messages, des journaux unifiés et des systèmes de discussion qui stockent l'historique.Comme l'a dit Nicklaus Wirth , les programmes sont des algorithmes et des structures de données, et Redis vous offre déjà les deux.