Redis Stream - Fiabilidad y escalabilidad de sus sistemas de mensajería

imagen

Redis Stream: un nuevo tipo de datos abstractos introducido en Redis con el lanzamiento de la versión 5.0
Conceptualmente, Redis Stream es una lista a la que puede agregar entradas. Cada entrada tiene un identificador único. Por defecto, un identificador se genera automáticamente e incluye una marca de tiempo. Por lo tanto, puede solicitar intervalos de grabación por tiempo o recibir nuevos datos a medida que llegan a la secuencia, ya que el comando Unix tail -f lee el archivo de registro y se congela en previsión de nuevos datos. Tenga en cuenta que varios clientes pueden escuchar la transmisión al mismo tiempo, ya que muchos procesos "tail-f" pueden leer un archivo al mismo tiempo, sin entrar en conflicto entre sí.

Para comprender todas las ventajas del nuevo tipo de datos, recordemos brevemente las estructuras Redis existentes desde hace mucho tiempo que repiten parcialmente la funcionalidad de Redis Stream.

Excursión histórica


Redis pub / sub


Redis Pub / Sub es un sistema de mensajería simple ya integrado en su almacenamiento de valor clave. Sin embargo, por simplicidad tienes que pagar:

  • Si el editor falla por algún motivo, pierde todos sus suscriptores
  • El editor necesita saber la dirección exacta de todos sus suscriptores.
  • Un editor puede sobrecargar a sus suscriptores si los datos se publican más rápido de lo que se procesan.
  • El mensaje se elimina del búfer del editor inmediatamente después de la publicación, independientemente de cuántos suscriptores entregó y qué tan rápido lograron procesar este mensaje.
  • Todos los suscriptores recibirán el mensaje al mismo tiempo. Los suscriptores mismos deben de alguna manera ponerse de acuerdo sobre cómo procesar el mismo mensaje.
  • No existe un mecanismo integrado para confirmar el procesamiento exitoso de un mensaje por un suscriptor Si el suscriptor recibió un mensaje y se cayó durante el procesamiento, el editor no lo sabrá.

Lista de Redis


Redis List es una estructura de datos que admite comandos de lectura de bloqueo. Puede agregar y leer mensajes desde el principio o el final de la lista. Sobre la base de esta estructura, puede hacer una buena pila o cola para su sistema distribuido y esto en la mayoría de los casos será suficiente. Las principales diferencias con Redis Pub / Sub:

  • El mensaje se entrega a un cliente. El primer cliente bloqueado por lectura recibirá primero los datos.
  • Clint debe iniciar una operación de lectura para cada mensaje. List no sabe nada sobre clientes.
  • Los mensajes se almacenan hasta que alguien los cuenta o los elimina explícitamente. Si configura un servidor Redis para vaciar datos al disco, entonces la confiabilidad del sistema aumenta dramáticamente.

Introducción a Stream


Agregar un registro a una secuencia


El comando XADD agrega un nuevo registro a la secuencia. Un registro no es solo una cadena, consta de uno o más pares clave-valor. Por lo tanto, cada registro ya está estructurado y se asemeja a la estructura de un archivo CSV.

> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0 

En el ejemplo anterior, agregamos dos campos a la secuencia con el nombre (clave) "mystream": "sensor-id" y "temperatura" con los valores "1234" y "19.8", respectivamente. Como segundo argumento, el comando acepta el identificador que se asignará al registro; este identificador identifica de forma única cada registro en la secuencia. Sin embargo, en este caso, pasamos * porque queremos que Redis genere un nuevo identificador para nosotros. Cada nuevo identificador aumentará. Por lo tanto, cada nuevo registro tendrá un identificador más grande en relación con los registros anteriores.

Formato de identificación


El identificador de registro devuelto por el comando XADD consta de dos partes:

{millisecondsTime}-{sequenceNumber}

millisecondsTime : tiempo de Unix en milisegundos (tiempo del servidor Redis). Sin embargo, si la hora actual es igual o menor que la hora del registro anterior, se utiliza la marca de tiempo del registro anterior. Por lo tanto, si la hora del servidor se devuelve al pasado, el nuevo identificador conservará la propiedad de aumento.

SecuenciaNúmero se utiliza para registros creados en el mismo milisegundo. El número de secuencia aumentará en 1 en relación con el registro anterior. Como el número de secuencia tiene un tamaño de 64 bits, en la práctica no debe encontrarse con un límite en el número de registros que se pueden generar en un milisegundo.

El formato de tales identificadores a primera vista puede parecer extraño. Un lector incrédulo puede preguntarse por qué el tiempo es parte de un identificador. La razón es que las transmisiones de Redis admiten solicitudes de rango por identificadores. Dado que el identificador está asociado con la hora en que se creó el registro, esto permite solicitar rangos de tiempo. Veremos un ejemplo concreto cuando pasemos al estudio del comando XRANGE .

Si por alguna razón el usuario necesita especificar su propio identificador, que, por ejemplo, está asociado con algún sistema externo, entonces podemos pasarlo al comando XADD en lugar del signo * como se muestra a continuación:

 > XADD somestream 0-1 field value 0-1 > XADD somestream 0-2 foo bar 0-2 

Tenga en cuenta que, en este caso, debe controlar el aumento del identificador. En nuestro ejemplo, el identificador mínimo es "0-1", por lo que el equipo no aceptará otro identificador que sea igual o menor que "0-1".

 > XADD somestream 0-1 foo bar (error) ERR The ID specified in XADD is equal or smaller than the target stream top item 

El número de registros en la secuencia.


Puede obtener el número de registros en una secuencia simplemente usando el comando XLEN . Para nuestro ejemplo, este comando devolverá el siguiente valor:

 > XLEN somestream (integer) 2 

Solicitudes de rango - XRANGE y XREVRANGE


Para solicitar datos para un rango, necesitamos especificar dos identificadores: el principio y el final del rango. El rango devuelto incluirá todos los elementos, incluidos los bordes. También hay dos identificadores especiales "-" y "+", respectivamente, que significan el identificador más pequeño (primer registro) y el más grande (último registro) en la secuencia. El siguiente ejemplo mostrará todas las entradas de flujo.

 > XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2" 

Cada registro devuelto es una matriz de dos elementos: un identificador y una lista de pares clave-valor. Ya hemos dicho que los identificadores de registro están relacionados con el tiempo. Por lo tanto, podemos solicitar el rango de un período de tiempo específico. Sin embargo, podemos especificar en la solicitud no el identificador completo, sino solo el tiempo Unix, omitiendo la parte relacionada con el número de secuencia . La parte omitida del identificador es automáticamente igual a cero al comienzo del rango y al valor máximo posible al final del rango. El siguiente es un ejemplo de cómo solicitar un rango de dos milisegundos.

 > XRANGE mystream 1518951480106 1518951480107 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 

Solo tenemos un registro en este rango, sin embargo, en conjuntos de datos reales, el resultado devuelto puede ser enorme. Por este motivo, XRANGE admite la opción COUNT. Al especificar la cantidad, simplemente podemos obtener los primeros N registros. Si necesitamos obtener las siguientes N entradas (paginación), podemos usar el último identificador recibido, aumentar su número de secuencia en uno y solicitar nuevamente. Veamos esto en el siguiente ejemplo. Estamos comenzando a agregar 10 elementos usando XADD (supongamos que la secuencia mystream ya se ha llenado con 10 elementos). Para comenzar la iteración, obteniendo 2 elementos por comando, comenzamos con el rango completo, pero con COUNT igual a 2.

 > XRANGE mystream - + COUNT 2 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2" 

Para continuar la iteración con los dos elementos siguientes, debemos seleccionar el último identificador recibido, es decir, 1519073279157-0, y agregar 1 al número de secuencia .
El identificador resultante, en este caso 1519073279157-1, ahora se puede usar como un nuevo argumento para el comienzo del rango para la próxima llamada XRANGE :

 > XRANGE mystream 1519073279157-1 + COUNT 2 1) 1) 1519073280281-0 2) 1) "foo" 2) "value_3" 2) 1) 1519073281432-0 2) 1) "foo" 2) "value_4" 

Y así sucesivamente. Dado que la complejidad de XRANGE es O (log (N)) para buscar, y luego O (M) para devolver elementos M, cada paso de iteración es rápido. Por lo tanto, utilizando XRANGE, es posible iterar flujos de manera eficiente.

El comando XREVRANGE es el equivalente de XRANGE , pero devuelve los elementos en el orden inverso:

 > XREVRANGE mystream + - COUNT 1 1) 1) 1519073287312-0 2) 1) "foo" 2) "value_10" 

Tenga en cuenta que el comando XREVRANGE toma los argumentos del rango de inicio y detención en el orden inverso.

Lectura de nuevos registros con XREAD


A menudo hay una tarea para suscribirse a la transmisión y recibir solo mensajes nuevos. Este concepto puede parecer Redis Pub / Sub o bloquear Redis List, pero existen diferencias fundamentales en cómo usar Redis Stream:

  1. Cada nuevo mensaje se entrega a cada suscriptor de forma predeterminada. Este comportamiento es diferente de bloquear la Lista de Redis, donde solo un suscriptor leerá un nuevo mensaje.
  2. Mientras que en Redis Pub / Sub todos los mensajes se olvidan y nunca se guardan, en Stream todos los mensajes se almacenan indefinidamente (a menos que el cliente solicite explícitamente su eliminación).
  3. Redis Stream le permite diferenciar el acceso a los mensajes dentro de una transmisión. Un suscriptor específico solo puede ver su historial de mensajes personales.

Puede suscribirse a la transmisión y recibir nuevos mensajes con el comando XREAD . Esto es un poco más complicado que XRANGE , por lo que comenzaremos con ejemplos más simples primero.

 > XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2" 

En el ejemplo anterior, se especifica un formulario XREAD sin bloqueo. Tenga en cuenta que la opción COUNT es opcional. De hecho, la única opción de comando requerida es la opción STREAMS, que establece la lista de secuencias junto con el identificador máximo correspondiente. Escribimos "STREAMS mystream 0": queremos obtener todos los registros de la secuencia mystream con un identificador mayor que "0-0". Como puede ver en el ejemplo, el comando devuelve el nombre de la secuencia, porque podemos suscribirnos a varias secuencias al mismo tiempo. Podríamos escribir, por ejemplo, "STREAMS mystream otherstream 0 0". Tenga en cuenta que después de la opción STREAMS, primero debemos proporcionar los nombres de todas las secuencias necesarias y solo luego una lista de identificadores.

En esta forma simple, el comando no hace nada especial en comparación con XRANGE . Sin embargo, lo interesante es que podemos convertir fácilmente XREAD en un comando de bloqueo especificando el argumento BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

En el ejemplo anterior, se especifica una nueva opción BLOQUE con un tiempo de espera de 0 milisegundos (esto significa una espera sin fin). Además, en lugar de pasar el identificador habitual para la secuencia de mystream, se pasó el identificador especial $. Este identificador especial significa que XREAD debe usar el identificador máximo en el flujo de mystream como identificador. Por lo tanto, solo recibiremos mensajes nuevos, desde el momento en que comenzamos a escuchar. En cierto modo, esto es similar al comando Unix tail -f.

Tenga en cuenta que al usar la opción BLOQUEO, no necesitamos usar el identificador especial $. Podemos usar cualquier identificador existente en la secuencia. Si el equipo puede atender nuestra solicitud de inmediato, sin bloqueo, lo hará, de lo contrario se bloqueará.

Bloquear XREAD también puede escuchar varias transmisiones a la vez, solo necesita especificar sus nombres. En este caso, el comando devolverá un registro de la primera secuencia a la que llegaron los datos. El primer suscriptor bloqueado para esta transmisión recibirá primero los datos.

Grupos de consumidores


En ciertas tareas, queremos diferenciar el acceso de los suscriptores a los mensajes dentro del mismo hilo. Un ejemplo en el que esto puede ser útil es una cola de mensajes con trabajadores que recibirán diferentes mensajes de la transmisión, lo que le permite escalar el procesamiento de mensajes.

Si imaginamos que tenemos tres suscriptores C1, C2, C3 y una secuencia que contiene los mensajes 1, 2, 3, 4, 5, 6, 7, entonces el servicio de mensajes ocurrirá como en el siguiente diagrama:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Para obtener este efecto, Redis Stream utiliza un concepto llamado Grupo de consumidores. Este concepto es similar a un pseudo-suscriptor que recibe datos de una secuencia, pero en realidad es atendido por varios suscriptores dentro de un grupo, proporcionando ciertas garantías:

  1. Cada mensaje se entrega a diferentes suscriptores dentro del grupo.
  2. Dentro de un grupo, los suscriptores se identifican por su nombre, que es una cadena que distingue entre mayúsculas y minúsculas. Si algún suscriptor abandona temporalmente el grupo, puede ser restaurado al grupo con su propio nombre único.
  3. Cada grupo de consumidores sigue el concepto de "primer mensaje no leído". Cuando un suscriptor solicita nuevos mensajes, solo puede recibir mensajes que nunca se han entregado a ningún suscriptor dentro de un grupo.
  4. Hay un comando para confirmar explícitamente el procesamiento exitoso del mensaje por parte del suscriptor. Hasta que se llame a este comando, el mensaje solicitado permanecerá en el estado "pendiente".
  5. Dentro del Grupo de consumidores, cada suscriptor puede solicitar un historial de mensajes que le fueron entregados, pero que aún no se han procesado (en el estado "pendiente")

En cierto sentido, el estado de un grupo se puede representar de la siguiente manera:

 +----------------------------------------+ | consumer_group_name: mygroup | consumer_group_stream: somekey | last_delivered_id: 1292309234234-92 | | consumers: | "consumer-1" with pending messages | 1292309234234-4 | 1292309234232-8 | "consumer-42" with pending messages | ... (and so forth) +----------------------------------------+ 

Ahora es el momento de familiarizarse con los principales equipos del Grupo de consumidores, a saber:

  • XGROUP se usa para crear, destruir y administrar grupos.
  • XREADGROUP se usa para leer una secuencia a través de un grupo.
  • XACK : este comando permite al suscriptor marcar el mensaje como procesado con éxito

Creación de grupo de consumidores


Supongamos que ya existe una secuencia de mystream. Entonces el comando de creación de grupo se verá así:

> XGROUP CREATE mystream mygroup $
OK

Al crear un grupo, debemos pasar un identificador a partir del cual el grupo recibirá los mensajes. Si solo queremos recibir todos los mensajes nuevos, entonces podemos usar el identificador especial $ (como en nuestro ejemplo anterior). Si especifica 0 en lugar de un identificador especial, todos los mensajes de la transmisión estarán disponibles para el grupo.

Ahora que el grupo está creado, podemos comenzar a leer mensajes de inmediato usando el comando XREADGROUP . Este comando es muy similar a XREAD y admite la opción BLOQUE opcional. Sin embargo, existe una opción de GRUPO obligatoria, que siempre debe especificarse con dos argumentos: el nombre del grupo y el nombre del suscriptor. La opción COUNT también es compatible.

Antes de leer la transmisión, pongamos algunos mensajes allí:

 > XADD mystream * message apple 1526569495631-0 > XADD mystream * message orange 1526569498055-0 > XADD mystream * message strawberry 1526569506935-0 > XADD mystream * message apricot 1526569535168-0 > XADD mystream * message banana 1526569544280-0 

Ahora intentemos leer esta secuencia a través del grupo:

 > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple" 

El comando anterior literalmente dice lo siguiente:

"Yo, el suscriptor de Alice, miembro de mygroup, quiero leer un mensaje de mystream que nunca antes se había entregado a nadie".

Cada vez que un suscriptor realiza una operación con un grupo, debe indicar su nombre, identificándose únicamente dentro del grupo. Hay otro detalle muy importante en el comando anterior: el identificador especial ">". Este identificador especial filtra los mensajes, dejando solo aquellos que hasta ahora nunca se han entregado.

Además, en casos especiales, puede especificar un identificador real, como 0 o cualquier otro identificador válido. En este caso, el comando XREADGROUP le devolverá el historial de mensajes con el estado "pendiente", que se entregaron al suscriptor especificado (Alice), pero que aún no se han confirmado con el comando XACK .

Podemos verificar este comportamiento especificando inmediatamente el identificador 0, sin la opción COUNT . Solo vemos el único mensaje pendiente, es decir, el mensaje con la manzana:

 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple" 

Sin embargo, si confirmamos que el mensaje se procesó correctamente, ya no se mostrará:

 > XACK mystream mygroup 1526569495631-0 (integer) 1 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) (empty list or set) 

Ahora le toca a Bob leer algo:

 > XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 2) 1) 1526569506935-0 2) 1) "message" 2) "strawberry" 

Bob, miembro de mygroup, solicitó no más de dos mensajes. El comando informa solo mensajes no entregados debido al identificador especial ">". Como puede ver, el mensaje "manzana" no se muestra, ya que ya se entregó a Alice, por lo que Bob recibe "naranja" y "fresa".

Por lo tanto, Alice, Bob y cualquier otro suscriptor del grupo pueden leer diferentes mensajes de la misma secuencia. También pueden leer su historial de mensajes sin procesar o marcar los mensajes como procesados.

Hay algunas cosas a tener en cuenta:

  • Tan pronto como el suscriptor considera que el mensaje es el comando XREADGROUP , este mensaje pasa al estado "pendiente" y se asigna a este suscriptor en particular. Otros suscriptores del grupo no podrán leer este mensaje.
  • Los suscriptores se crean automáticamente en la primera mención, no hay necesidad de su creación explícita.
  • Con XREADGROUP puede leer mensajes de varias secuencias diferentes al mismo tiempo, sin embargo, para que esto funcione, primero debe crear grupos con el mismo nombre para cada secuencia usando XGROUP

Crash Recovery


El suscriptor puede recuperarse de la falla y volver a leer su lista de mensajes con el estado de "pendiente". Sin embargo, en el mundo real, los suscriptores pueden finalmente fallar. ¿Qué sucede con el mensaje colgante de un suscriptor si no puede recuperarse después de una falla?
Consumer Group ofrece una función que se usa específicamente para tales casos, cuando necesita cambiar el propietario de los mensajes.

En primer lugar, debe llamar al comando XPENDING , que muestra todos los mensajes del grupo con el estado "pendiente". En su forma más simple, se llama a un comando con solo dos argumentos: el nombre de la secuencia y el nombre del grupo:

 > XPENDING mystream mygroup 1) (integer) 2 2) 1526569498055-0 3) 1526569506935-0 4) 1) 1) "Bob" 2) "2" 

El equipo imprimió la cantidad de mensajes no procesados ​​para todo el grupo y para cada suscriptor. Solo tenemos a Bob con dos mensajes sin procesar, porque el único mensaje solicitado por Alice fue confirmado con XACK .

Podemos solicitar información adicional utilizando más argumentos:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]

{start-id} {end-id} - rango de identificadores (puede usar "-" y "+")
{count} - el número de intentos de entrega
{nombre-consumidor} - nombre del grupo

 > XPENDING mystream mygroup - + 10 1) 1) 1526569498055-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 2) 1) 1526569506935-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 

Ahora tenemos los detalles de cada mensaje: identificador, nombre del suscriptor, tiempo de inactividad en milisegundos y, finalmente, el número de intentos de entrega. Tenemos dos mensajes de Bob, y están inactivos durante 74170458 milisegundos, aproximadamente 20 horas.

Tenga en cuenta que nadie nos impide comprobar cuál era el contenido del mensaje simplemente usando XRANGE .

 > XRANGE mystream 1526569498055-0 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 

Solo tenemos que repetir el mismo identificador dos veces en los argumentos. Ahora que tenemos alguna idea, Alice puede decidir que Bob probablemente no se recuperará después de 20 horas de inactividad, y es hora de solicitar estos mensajes y continuar procesándolos en lugar de Bob. Para hacer esto, usamos el comando XCLAIM :

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Con este comando, podemos obtener un mensaje "extraño" que aún no se ha procesado cambiando el propietario a {consumidor}. Sin embargo, también podemos proporcionar un tiempo de inactividad mínimo {min-idle-time}. Esto ayuda a evitar una situación en la que dos clientes intentan cambiar simultáneamente el propietario de los mismos mensajes:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

El primer cliente restablecerá el tiempo de inactividad y aumentará el contador del número de entregas. Entonces el segundo cliente no podrá solicitarlo.

 > XCLAIM mystream mygroup Alice 3600000 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 

El mensaje fue reclamado con éxito por Alice, quien ahora puede procesar el mensaje y reconocerlo.

Del ejemplo anterior, está claro que la ejecución exitosa de la solicitud devuelve el contenido del mensaje en sí. Sin embargo, esto no es necesario. La opción JUSTID puede usarse para devolver solo identificadores de mensaje. Esto es útil si no está interesado en los detalles del mensaje y desea aumentar el rendimiento del sistema.

Mostrador de entrega


El contador que observa en la salida XPENDING es el número de entregas de cada mensaje. Dicho contador se incrementa de dos maneras: cuando el mensaje se solicita con éxito a través de XCLAIM o cuando se usa la llamada XREADGROUP .

Es normal que algunos mensajes se entreguen varias veces. Lo principal es que, como resultado, se procesan todos los mensajes. A veces, al procesar un mensaje, hay problemas debido a daños en el mensaje en sí o al procesar el mensaje se produce un error en el código del controlador.En este caso, puede resultar que nadie pueda procesar este mensaje. Como tenemos un contador de intentos de entrega, podemos usar este contador para detectar tales situaciones. Por lo tanto, tan pronto como el contador de entrega alcance un gran número especificado por usted, probablemente sea más razonable colocar dicho mensaje en otra secuencia y enviar una notificación al administrador del sistema.

Estado del hilo


El comando XINFO se utiliza para solicitar diversa información sobre una secuencia y sus grupos. Por ejemplo, la forma básica del comando es la siguiente:

 > XINFO STREAM mystream 1) length 2) (integer) 13 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1524494395530-0 2) 1) "a" 2) "1" 3) "b" 4) "2" 11) last-entry 12) 1) 1526569544280-0 2) 1) "message" 2) "banana" 

El comando anterior muestra información general sobre la secuencia especificada. Ahora un ejemplo un poco más complejo:

 > XINFO GROUPS mystream 1) 1) name 2) "mygroup" 3) consumers 4) (integer) 2 5) pending 6) (integer) 2 2) 1) name 2) "some-other-group" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0 

El comando anterior muestra información general para todos los grupos de la secuencia especificada

 > XINFO CONSUMERS mystream mygroup 1) 1) name 2) "Alice" 3) pending 4) (integer) 1 5) idle 6) (integer) 9104628 2) 1) name 2) "Bob" 3) pending 4) (integer) 1 5) idle 6) (integer) 83841983 

El comando anterior muestra información sobre todos los suscriptores de la secuencia y el grupo especificados.
Si olvida la sintaxis del comando, simplemente póngase en contacto con el comando para obtener ayuda:

 > XINFO HELP 1) XINFO {subcommand} arg arg ... arg. Subcommands are: 2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}. 3) GROUPS {key} -- Show the stream consumer groups. 4) STREAM {key} -- Show information about the stream. 5) HELP -- Print this help. 

Límite de tamaño de transmisión


Muchas aplicaciones no desean recopilar datos en la transmisión para siempre. A menudo es útil tener la cantidad máxima de mensajes en la transmisión. En otros casos, es útil transferir todos los mensajes de la secuencia a otro almacenamiento persistente cuando se alcanza el tamaño de secuencia especificado. Puede limitar el tamaño de la secuencia utilizando el parámetro MAXLEN en el comando XADD :

 > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" 

Cuando se usa MAXLEN, los registros antiguos se eliminan automáticamente cuando se alcanza la longitud especificada, por lo que la secuencia tiene un tamaño constante. Sin embargo, el recorte en este caso no ocurre de la manera más productiva en la memoria de Redis. La situación se puede mejorar de la siguiente manera: el argumento ~ en el ejemplo anterior significa que no necesitamos limitar la longitud de la secuencia a un valor específico. En nuestro ejemplo, este puede ser cualquier número mayor o igual a 1000 (por ejemplo, 1000, 1010 o 1030). Simplemente indicamos explícitamente que queremos que nuestra transmisión almacene al menos 1000 registros. Esto hace que trabajar con memoria sea mucho más eficiente dentro de Redis. También hay un comando XTRIM separado que hace lo mismo:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...





> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Almacenamiento persistente y replicación


Redis Stream se replica asincrónicamente en nodos esclavos y se guarda en archivos como AOF (instantánea de todos los datos) y RDB (registro de todas las operaciones de escritura). La replicación de estado de Grupos de consumidores también es compatible. Por lo tanto, si el mensaje está en el estado "pendiente" en el nodo maestro, entonces en los nodos esclavos este mensaje tendrá el mismo estado.

Eliminar elementos individuales de una secuencia


Para eliminar mensajes hay un comando XDEL especial . El comando obtiene el nombre de la secuencia, seguido de los identificadores del mensaje que debe eliminarse:

 > XRANGE mystream - + COUNT 2 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" > XDEL mystream 1526654999635-0 (integer) 1 > XRANGE mystream - + COUNT 2 1) 1) 1526655000369-0 2) 1) "value" 2) "3" 

Al usar este comando, debe tener en cuenta que, de hecho, la memoria no se liberará de inmediato.

Corrientes de longitud cero


La diferencia entre las secuencias y otras estructuras de datos de Redis es que cuando otras estructuras de datos ya no tienen elementos dentro de sí mismas, como efecto secundario, la estructura de datos se eliminará de la memoria. Entonces, por ejemplo, el conjunto ordenado se eliminará por completo cuando la llamada ZREM elimine el último elemento. En cambio, se permite que los hilos permanezcan en la memoria sin siquiera tener un solo elemento adentro.

Conclusión


Redis Stream es ideal para crear corredores de mensajes, colas de mensajes, registros unificados y sistemas de chat que almacenan el historial.

Como Nicklaus Wirth dijo una vez , los programas son algoritmos más estructuras de datos, y Redis ya te ofrece ambos.

Source: https://habr.com/ru/post/456270/


All Articles