Redis Stream - Confiabilidade e escalabilidade dos seus sistemas de mensagens

imagem

Redis Stream - um novo tipo de dado abstrato introduzido no Redis com o lançamento da versão 5.0
Conceitualmente, o Redis Stream é uma lista à qual você pode adicionar entradas. Cada entrada possui um identificador exclusivo. Por padrão, um identificador é gerado automaticamente e inclui um carimbo de data / hora. Portanto, você pode solicitar intervalos de gravação por tempo ou receber novos dados conforme eles chegam no fluxo, pois o comando tail -f do Unix lê o arquivo de log e congela antecipando novos dados. Observe que vários clientes podem ouvir o fluxo ao mesmo tempo, pois muitos processos “tail -f” podem ler um arquivo ao mesmo tempo, sem conflitar entre si.

Para entender todas as vantagens do novo tipo de dados, vamos relembrar brevemente as estruturas Redis existentes há muito tempo que repetem parcialmente a funcionalidade do Redis Stream.

Excursão histórica


Redis pub / sub


O Redis Pub / Sub é um sistema simples de mensagens já incorporado ao seu armazenamento de valores-chave. No entanto, para simplificar, você deve pagar:

  • Se, por algum motivo, o editor falhar, ele perderá todos os seus assinantes
  • O editor precisa saber o endereço exato de todos os seus assinantes.
  • Um editor pode sobrecarregar seus assinantes se os dados forem publicados mais rapidamente do que processados
  • A mensagem é excluída do buffer do editor imediatamente após a publicação, independentemente de quantos assinantes ele entregou e com que rapidez eles conseguiram processar essa mensagem.
  • Todos os assinantes receberão a mensagem ao mesmo tempo. Os próprios assinantes devem, de alguma forma, concordar entre si sobre como processar a mesma mensagem.
  • Não há mecanismo interno para confirmar o processamento bem-sucedido de uma mensagem por um assinante. Se o assinante recebeu uma mensagem e caiu durante o processamento, o editor não a saberá.

Lista de Redis


Redis List é uma estrutura de dados que suporta comandos de leitura de bloqueio. Você pode adicionar e ler mensagens do início ou do fim da lista. Com base nessa estrutura, você pode criar uma boa pilha ou fila para o seu sistema distribuído e isso, na maioria dos casos, será suficiente. As principais diferenças de Redis Pub / Sub:

  • A mensagem é entregue a um cliente. O primeiro cliente bloqueado pela leitura receberá os dados primeiro.
  • Clint deve iniciar uma operação de leitura para cada mensagem. Lista não sabe nada sobre clientes.
  • As mensagens são armazenadas até que alguém as conte ou as exclua explicitamente. Se você configurar um servidor Redis para liberar dados no disco, a confiabilidade do sistema aumentará drasticamente.

Introdução ao Stream


Adicionando um registro a um fluxo


O comando XADD adiciona um novo registro ao fluxo. Um registro não é apenas uma sequência, consiste em um ou mais pares de valores-chave. Assim, cada registro já está estruturado e se assemelha à estrutura de um arquivo CSV.

> XADD mystream * sensor-id 1234 temperature 19.8 1518951480106-0 

No exemplo acima, adicionamos dois campos ao fluxo com o nome (chave) "mystream": "sensor-id" e "temperature" com os valores "1234" e "19,8", respectivamente. Como segundo argumento, o comando aceita o identificador que será atribuído ao registro - esse identificador identifica exclusivamente cada registro no fluxo. No entanto, nesse caso, passamos * porque queremos que o Redis gere um novo identificador para nós. Cada novo identificador aumentará. Portanto, cada novo registro terá um identificador maior em relação aos registros anteriores.

Formato de identificação


O identificador de registro retornado pelo comando XADD consiste em duas partes:

{millisecondsTime}-{sequenceNumber}

millisecondsTime - hora do Unix em milissegundos (hora do servidor Redis). No entanto, se a hora atual for igual ou menor que a hora do registro anterior, o carimbo de hora do registro anterior será usado. Portanto, se a hora do servidor retornar ao passado, o novo identificador ainda manterá a propriedade de aumento.

sequenceNumber é usado para registros criados no mesmo milissegundo. sequenceNumber será aumentado em 1 em relação ao registro anterior. Como sequenceNumber tem tamanho de 64 bits, na prática você não deve ter um limite no número de registros que podem ser gerados em um milissegundo.

O formato desses identificadores à primeira vista pode parecer estranho. Um leitor incrédulo pode se perguntar por que o tempo faz parte de um identificador. O motivo é que os fluxos Redis suportam solicitações de intervalo por identificadores. Como o identificador está associado à hora em que o registro foi criado, isso possibilita solicitar intervalos de tempo. Veremos um exemplo concreto quando passarmos ao estudo do comando XRANGE .

Se, por qualquer motivo, o usuário precisar especificar seu próprio identificador, que, por exemplo, está associado a algum sistema externo, podemos transmiti-lo ao comando XADD em vez do sinal *, como mostrado abaixo:

 > XADD somestream 0-1 field value 0-1 > XADD somestream 0-2 foo bar 0-2 

Observe que, nesse caso, você deve monitorar o aumento no identificador. No nosso exemplo, o identificador mínimo é "0-1", portanto a equipe não aceitará outro identificador igual ou menor que "0-1".

 > XADD somestream 0-1 foo bar (error) ERR The ID specified in XADD is equal or smaller than the target stream top item 

O número de registros no fluxo


Você pode obter o número de registros em um fluxo simplesmente usando o comando XLEN . Para o nosso exemplo, este comando retornará o seguinte valor:

 > XLEN somestream (integer) 2 

Solicitações de intervalo - XRANGE e XREVRANGE


Para solicitar dados para um intervalo, precisamos especificar dois identificadores - o início e o fim do intervalo. O intervalo retornado incluirá todos os elementos, incluindo bordas. Há também dois identificadores especiais "-" e "+", respectivamente, significando o menor (primeiro registro) e o maior (último registro) identificador no fluxo. O exemplo abaixo exibirá todas as entradas do fluxo.

 > XRANGE mystream - + 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 2) 1) 1518951482479-0 2) 1) "sensor-id" 2) "9999" 3) "temperature" 4) "18.2" 

Cada registro retornado é uma matriz de dois elementos: um identificador e uma lista de pares de valores-chave. Já dissemos que os identificadores de registro estão relacionados ao tempo. Portanto, podemos solicitar o intervalo de um período específico de tempo. No entanto, podemos especificar na solicitação não o identificador completo, mas apenas o tempo Unix, omitindo a parte relacionada ao sequenceNumber . A parte omitida do identificador é automaticamente igual a zero no início do intervalo e ao valor máximo possível no final do intervalo. A seguir, é apresentado um exemplo de como solicitar um intervalo de dois milissegundos.

 > XRANGE mystream 1518951480106 1518951480107 1) 1) 1518951480106-0 2) 1) "sensor-id" 2) "1234" 3) "temperature" 4) "19.8" 

Temos apenas um registro nesse intervalo, no entanto, em conjuntos de dados reais, o resultado retornado pode ser enorme. Por esse motivo, o XRANGE suporta a opção COUNT. Ao especificar a quantidade, podemos simplesmente obter os primeiros N registros. Se precisarmos obter as próximas N entradas (paginação), podemos usar o último identificador recebido, aumentar seu sequenceNumber em um e solicitar novamente. Vejamos isso no exemplo a seguir. Estamos começando a adicionar 10 elementos usando o XADD (suponha que o fluxo mystream já tenha sido preenchido com 10 elementos). Para iniciar a iteração, obtendo 2 elementos por comando, começamos com o intervalo completo, mas com COUNT igual a 2.

 > XRANGE mystream - + COUNT 2 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2" 

Para continuar a iteração com os dois elementos a seguir, precisamos selecionar o último identificador recebido, que é 1519073279157-0, e adicionar 1 ao sequenceNumber .
O identificador resultante, neste caso 1519073279157-1, agora pode ser usado como um novo argumento para o início do intervalo para a próxima chamada XRANGE :

 > XRANGE mystream 1519073279157-1 + COUNT 2 1) 1) 1519073280281-0 2) 1) "foo" 2) "value_3" 2) 1) 1519073281432-0 2) 1) "foo" 2) "value_4" 

E assim por diante Como a complexidade do XRANGE é O (log (N)) para pesquisar e, em seguida, O (M) para retornar elementos M, cada etapa da iteração é rápida. Assim, usando o XRANGE, é possível iterar os fluxos com eficiência.

O comando XREVRANGE é equivalente a XRANGE , mas retorna os elementos na ordem inversa:

 > XREVRANGE mystream + - COUNT 1 1) 1) 1519073287312-0 2) 1) "foo" 2) "value_10" 

Observe que o comando XREVRANGE aceita os argumentos do intervalo de início e parada na ordem inversa.

Lendo novos registros com o XREAD


Geralmente, há uma tarefa de se inscrever no fluxo e receber apenas novas mensagens. Esse conceito pode parecer um Pub / Sub Redis ou uma Lista Redis bloqueadora, mas existem diferenças fundamentais em como usar o Redis Stream:

  1. Cada nova mensagem é entregue a cada assinante por padrão. Esse comportamento é diferente de bloquear a lista Redis, onde uma nova mensagem será lida por apenas um assinante.
  2. Enquanto no Redis Pub / Sub todas as mensagens são esquecidas e nunca são salvas, no Stream todas as mensagens são armazenadas por um período indeterminado (a menos que o cliente solicite explicitamente a exclusão).
  3. O Redis Stream permite diferenciar o acesso às mensagens em um stream. Um assinante específico pode ver apenas seu histórico de mensagens pessoais.

Você pode se inscrever no fluxo e receber novas mensagens usando o comando XREAD . Isso é um pouco mais complicado que o XRANGE , portanto, começaremos com exemplos mais simples primeiro.

 > XREAD COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1519073278252-0 2) 1) "foo" 2) "value_1" 2) 1) 1519073279157-0 2) 1) "foo" 2) "value_2" 

No exemplo acima, um formulário XREAD sem bloqueio é especificado . Observe que a opção COUNT é opcional. De fato, a única opção de comando necessária é a opção STREAMS, que define a lista de fluxos junto com o identificador máximo correspondente. Escrevemos "STREAMS mystream 0" - queremos obter todos os registros do fluxo mystream com um identificador maior que "0-0". Como você pode ver no exemplo, o comando retorna o nome do fluxo, porque podemos assinar vários threads ao mesmo tempo. Poderíamos escrever, por exemplo, “STREAMS mystream otherstream 0 0”. Observe que, após a opção STREAMS, primeiro precisamos fornecer os nomes de todos os fluxos necessários e somente uma lista de identificadores.

Nesta forma simples, o comando não faz nada de especial em comparação com o XRANGE . No entanto, o interessante é que podemos facilmente transformar o XREAD em um comando de bloqueio especificando o argumento BLOCK:

> XREAD BLOCK 0 STREAMS mystream $

No exemplo acima, uma nova opção BLOCK é especificada com um tempo limite de 0 milissegundos (isso significa espera sem fim). Além disso, em vez de passar o identificador usual para o fluxo mystream, o identificador especial $ foi passado. Esse identificador especial significa que o XREAD deve usar o identificador máximo no fluxo mystream como identificador. Portanto, receberemos apenas novas mensagens, a partir do momento em que começamos a ouvir. De certa forma, isso é semelhante ao comando tail -f do Unix.

Observe que, ao usar a opção BLOCK, não precisamos usar o identificador especial $. Podemos usar qualquer identificador existente no fluxo. Se a equipe puder atender a nossa solicitação imediatamente, sem bloquear, isso será feito, caso contrário, será bloqueada.

O bloqueio do XREAD também pode ouvir vários fluxos de uma só vez, basta especificar seus nomes. Nesse caso, o comando retornará um registro do primeiro fluxo no qual os dados chegaram. O primeiro assinante bloqueado para este fluxo receberá os dados primeiro.

Grupos de consumidores


Em determinadas tarefas, queremos diferenciar o acesso de assinantes a mensagens dentro do mesmo encadeamento. Um exemplo em que isso pode ser útil é uma fila de mensagens com trabalhadores que receberão mensagens diferentes do fluxo, permitindo escalar o processamento de mensagens.

Se imaginarmos que temos três assinantes C1, C2, C3 e um fluxo que contém as mensagens 1, 2, 3, 4, 5, 6, 7, o serviço de mensagens ocorrerá como no diagrama abaixo:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

Para obter esse efeito, o Redis Stream usa um conceito chamado Grupo de Consumidores. Esse conceito é semelhante a um pseudo-assinante que recebe dados de um fluxo, mas na verdade é atendido por vários assinantes dentro de um grupo, fornecendo certas garantias:

  1. Cada mensagem é entregue a diferentes assinantes dentro do grupo.
  2. Dentro de um grupo, os assinantes são identificados pelo nome, que é uma sequência que diferencia maiúsculas de minúsculas. Se algum assinante sair temporariamente do grupo, ele poderá ser restaurado no grupo por seu próprio nome exclusivo.
  3. Cada grupo de consumidores segue o conceito de "primeira mensagem não lida". Quando um assinante solicita novas mensagens, ele pode receber apenas mensagens que nunca foram entregues a nenhum assinante dentro de um grupo.
  4. Existe um comando para confirmar explicitamente o processamento bem-sucedido da mensagem pelo assinante. Até que este comando seja chamado, a mensagem solicitada permanecerá no status "pendente".
  5. Dentro do Grupo de Consumidores, cada assinante pode solicitar um histórico de mensagens entregues a ele, mas ainda não foram processadas (no status "pendente")

Em certo sentido, o estado de um grupo pode ser representado da seguinte maneira:

 +----------------------------------------+ | consumer_group_name: mygroup | consumer_group_stream: somekey | last_delivered_id: 1292309234234-92 | | consumers: | "consumer-1" with pending messages | 1292309234234-4 | 1292309234232-8 | "consumer-42" with pending messages | ... (and so forth) +----------------------------------------+ 

Agora é hora de se familiarizar com as principais equipes do Grupo de Consumidores, a saber:

  • XGROUP é usado para criar, destruir e gerenciar grupos.
  • XREADGROUP é usado para ler um fluxo através de um grupo.
  • XACK - este comando permite que o assinante marque a mensagem como processada com êxito

Criação de grupo de consumidores


Suponha que um fluxo mystream já exista. Em seguida, o comando de criação de grupo será semelhante a:

> XGROUP CREATE mystream mygroup $
OK

Ao criar um grupo, devemos passar um identificador começando com o qual o grupo receberá mensagens. Se queremos apenas receber todas as novas mensagens, podemos usar o identificador especial $ (como no nosso exemplo acima). Se você especificar 0 em vez de um identificador especial, todas as mensagens do fluxo estarão disponíveis para o grupo.

Agora que o grupo foi criado, podemos começar imediatamente a ler as mensagens usando o comando XREADGROUP . Este comando é muito semelhante ao XREAD e suporta a opção opcional BLOCK. No entanto, existe uma opção obrigatória GROUP, que sempre deve ser especificada com dois argumentos: o nome do grupo e o nome do assinante. A opção COUNT também é suportada.

Antes de ler o fluxo, vamos colocar algumas mensagens lá:

 > XADD mystream * message apple 1526569495631-0 > XADD mystream * message orange 1526569498055-0 > XADD mystream * message strawberry 1526569506935-0 > XADD mystream * message apricot 1526569535168-0 > XADD mystream * message banana 1526569544280-0 

Agora vamos tentar ler este fluxo através do grupo:

 > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple" 

O comando acima, literalmente, é o seguinte:

"Eu, o assinante de Alice, um membro do meu grupo, quero ler uma mensagem do mystream que nunca foi entregue a ninguém antes."

Cada vez que um assinante realiza uma operação com um grupo, ele deve indicar seu nome, identificando-se exclusivamente dentro do grupo. Há outro detalhe muito importante no comando acima - o identificador especial ">". Esse identificador especial filtra as mensagens, deixando apenas as que até agora nunca foram entregues.

Além disso, em casos especiais, você pode especificar um identificador real, como 0 ou qualquer outro identificador válido. Nesse caso, o comando XREADGROUP retornará para você o histórico de mensagens com o status "pendente", que foram entregues ao assinante especificado (Alice), mas ainda não foram confirmadas usando o comando XACK .

Podemos verificar esse comportamento especificando imediatamente o identificador 0, sem a opção COUNT . Apenas vemos a única mensagem pendente, ou seja, a mensagem com a maçã:

 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) 1526569495631-0 2) 1) "message" 2) "apple" 

No entanto, se confirmarmos a mensagem como processada com êxito, ela não será mais exibida:

 > XACK mystream mygroup 1526569495631-0 (integer) 1 > XREADGROUP GROUP mygroup Alice STREAMS mystream 0 1) 1) "mystream" 2) (empty list or set) 

Agora é a vez de Bob ler algo:

 > XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 2) 1) 1526569506935-0 2) 1) "message" 2) "strawberry" 

Bob, um membro do meu grupo, pediu não mais que duas mensagens. O comando relata apenas mensagens não entregues devido ao identificador especial ">". Como você pode ver, a mensagem "maçã" não é exibida, pois já foi entregue a Alice, então Bob recebe "laranja" e "morango".

Assim, Alice, Bob e qualquer outro assinante do grupo podem ler mensagens diferentes do mesmo fluxo. Eles também podem ler o histórico de mensagens brutas ou marcar as mensagens como processadas.

Há algumas coisas a serem lembradas:

  • Assim que o assinante considerar a mensagem como o comando XREADGROUP , essa mensagem entrará no estado "pendente" e será atribuída a esse assinante específico. Outros assinantes do grupo não poderão ler esta mensagem.
  • Os assinantes são criados automaticamente na primeira menção, não há necessidade de criação explícita.
  • Com o XREADGROUP, você pode ler mensagens de vários fluxos diferentes ao mesmo tempo; no entanto, para que isso funcione, é necessário primeiro criar grupos com o mesmo nome para cada fluxo usando XGROUP

Recuperação de falha


O assinante pode se recuperar da falha e reler sua lista de mensagens com o status "pendente". No entanto, no mundo real, os assinantes podem falhar. O que acontece com a mensagem pendente de um assinante se ele não pode se recuperar após uma falha?
O Grupo de consumidores oferece um recurso usado especificamente para esses casos - quando você precisa alterar o proprietário das mensagens.

Primeiro, você precisa chamar o comando XPENDING , que exibe todas as mensagens do grupo com o status "pendente". Na sua forma mais simples, um comando é chamado com apenas dois argumentos: o nome do fluxo e o nome do grupo:

 > XPENDING mystream mygroup 1) (integer) 2 2) 1526569498055-0 3) 1526569506935-0 4) 1) 1) "Bob" 2) "2" 

A equipe imprimiu o número de mensagens não processadas para todo o grupo e para cada assinante. Temos apenas Bob com duas mensagens não processadas, porque a única mensagem solicitada por Alice foi confirmada com o XACK .

Podemos solicitar informações adicionais usando mais argumentos:

XPENDING {key} {groupname} [{start-id} {end-id} {count} [{consumer-name}]]

{start-id} {end-id} - intervalo de identificadores (você pode usar "-" e "+")
{count} - o número de tentativas de entrega
{nome do consumidor} - nome do grupo

 > XPENDING mystream mygroup - + 10 1) 1) 1526569498055-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 2) 1) 1526569506935-0 2) "Bob" 3) (integer) 74170458 4) (integer) 1 

Agora, temos os detalhes de cada mensagem: identificador, nome do assinante, tempo de inatividade em milissegundos e, finalmente, o número de tentativas de entrega. Temos duas mensagens de Bob e elas ficam inativas por 74170458 milissegundos, cerca de 20 horas.

Observe que ninguém está nos impedindo de verificar qual era o conteúdo da mensagem usando XRANGE .

 > XRANGE mystream 1526569498055-0 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 

Nós apenas temos que repetir o mesmo identificador duas vezes nos argumentos. Agora que temos alguma idéia, Alice pode decidir que Bob provavelmente não se recuperará após 20 horas de inatividade, e é hora de solicitar essas mensagens e continuar processando-as em vez de Bob. Para fazer isso, usamos o comando XCLAIM :

XCLAIM {key} {group} {consumer} {min-idle-time} {ID-1} {ID-2} ... {ID-N}

Usando este comando, podemos obter uma mensagem "estrangeira" que ainda não foi processada, alterando o proprietário para {consumidor}. No entanto, também podemos fornecer um tempo de inatividade mínimo {min-idle-time}. Isso ajuda a evitar uma situação em que dois clientes tentam alterar simultaneamente o proprietário das mesmas mensagens:

Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Clinet 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0

O primeiro cliente redefinirá o tempo de inatividade e aumentará o contador do número de entregas. Portanto, o segundo cliente não poderá solicitá-lo.

 > XCLAIM mystream mygroup Alice 3600000 1526569498055-0 1) 1) 1526569498055-0 2) 1) "message" 2) "orange" 

A mensagem foi reivindicada com sucesso por Alice, que agora pode processar a mensagem e reconhecê-la.

A partir do exemplo acima, é claro que a execução bem-sucedida da solicitação retorna o conteúdo da própria mensagem. No entanto, isso não é necessário. A opção JUSTID pode ser usada para retornar apenas identificadores de mensagem. Isso é útil se você não estiver interessado nos detalhes da mensagem e desejar aumentar o desempenho do sistema.

Balcão de entrega


O contador que você observa na saída XPENDING é o número de entregas de cada mensagem. Esse contador é incrementado de duas maneiras: quando a mensagem é solicitada com êxito por meio do XCLAIM ou quando a chamada XREADGROUP é usada .

É normal que algumas mensagens sejam entregues várias vezes. O principal é que, como resultado, todas as mensagens são processadas. Às vezes, ao processar uma mensagem, há problemas devido a danos à própria mensagem ou ao processar a mensagem causa um erro no código do manipulador.Nesse caso, pode acontecer que ninguém consiga processar esta mensagem. Como temos um contador de tentativas de entrega, podemos usá-lo para detectar essas situações. Portanto, assim que o contador de entrega atingir um grande número especificado por você, provavelmente será mais razoável colocar essa mensagem em outro fluxo e enviar uma notificação ao administrador do sistema.

Status do segmento


O comando XINFO é usado para solicitar várias informações sobre um fluxo e seus grupos. Por exemplo, a forma básica do comando é a seguinte:

 > XINFO STREAM mystream 1) length 2) (integer) 13 3) radix-tree-keys 4) (integer) 1 5) radix-tree-nodes 6) (integer) 2 7) groups 8) (integer) 2 9) first-entry 10) 1) 1524494395530-0 2) 1) "a" 2) "1" 3) "b" 4) "2" 11) last-entry 12) 1) 1526569544280-0 2) 1) "message" 2) "banana" 

O comando acima exibe informações gerais no fluxo especificado. Agora, um exemplo um pouco mais complexo:

 > XINFO GROUPS mystream 1) 1) name 2) "mygroup" 3) consumers 4) (integer) 2 5) pending 6) (integer) 2 2) 1) name 2) "some-other-group" 3) consumers 4) (integer) 1 5) pending 6) (integer) 0 

O comando acima exibe informações gerais para todos os grupos do fluxo especificado

 > XINFO CONSUMERS mystream mygroup 1) 1) name 2) "Alice" 3) pending 4) (integer) 1 5) idle 6) (integer) 9104628 2) 1) name 2) "Bob" 3) pending 4) (integer) 1 5) idle 6) (integer) 83841983 

O comando acima exibe informações sobre todos os assinantes do fluxo e grupo especificados.
Se você esquecer a sintaxe do comando, entre em contato com o comando para obter ajuda:

 > XINFO HELP 1) XINFO {subcommand} arg arg ... arg. Subcommands are: 2) CONSUMERS {key} {groupname} -- Show consumer groups of group {groupname}. 3) GROUPS {key} -- Show the stream consumer groups. 4) STREAM {key} -- Show information about the stream. 5) HELP -- Print this help. 

Limite de tamanho do fluxo


Muitos aplicativos não desejam coletar dados no fluxo para sempre. Geralmente, é útil ter o número máximo de mensagens no fluxo. Em outros casos, é útil transferir todas as mensagens do fluxo para outro armazenamento persistente quando o tamanho do fluxo especificado for atingido. Você pode limitar o tamanho do fluxo usando o parâmetro MAXLEN no comando XADD :

 > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" 

Ao usar o MAXLEN, os registros antigos são excluídos automaticamente quando o comprimento especificado é atingido; portanto, o fluxo tem um tamanho constante. No entanto, o corte nesse caso não ocorre da maneira mais produtiva na memória do Redis. A situação pode ser melhorada da seguinte maneira: O argumento ~ no exemplo acima significa que não precisamos limitar o comprimento do fluxo a um valor específico. No nosso exemplo, esse número pode ser maior ou igual a 1000 (por exemplo, 1000, 1010 ou 1030). Apenas indicamos explicitamente que queremos que nosso fluxo armazene pelo menos 1000 registros. Isso torna o trabalho com memória muito mais eficiente dentro do Redis. Há também um comando XTRIM separado que faz a mesma coisa:

XADD mystream MAXLEN ~ 1000 * ... entry fields here ...





> XTRIM mystream MAXLEN 10

> XTRIM mystream MAXLEN ~ 10

Armazenamento e replicação persistentes


O Redis Stream é replicado de forma assíncrona nos nós escravos e salvo em arquivos como AOF (instantâneo de todos os dados) e RDB (log de todas as operações de gravação). A replicação de estado de Grupos de Consumidores também é suportada. Portanto, se a mensagem estiver no status “pendente” no nó principal, nos nós escravos essa mensagem terá o mesmo status.

Removendo itens individuais de um fluxo


Para excluir mensagens, existe um comando XDEL especial . O comando obtém o nome do fluxo, seguido pelos identificadores da mensagem que precisa ser excluída:

 > XRANGE mystream - + COUNT 2 1) 1) 1526654999635-0 2) 1) "value" 2) "2" 2) 1) 1526655000369-0 2) 1) "value" 2) "3" > XDEL mystream 1526654999635-0 (integer) 1 > XRANGE mystream - + COUNT 2 1) 1) 1526655000369-0 2) 1) "value" 2) "3" 

Ao usar este comando, é necessário considerar que, de fato, a memória não será liberada imediatamente.

Fluxos de comprimento zero


A diferença entre fluxos e outras estruturas de dados Redis é que, quando outras estruturas de dados não tiverem mais elementos em si mesmas, como efeito colateral, a própria estrutura de dados será excluída da memória. Portanto, por exemplo, o conjunto classificado será completamente excluído quando a chamada do ZREM remover o último item. Em vez disso, é permitido que os threads permaneçam na memória sem ter um único elemento dentro.

Conclusão


O Redis Stream é ideal para criar intermediários de mensagens, filas de mensagens, logs unificados e sistemas de bate-papo que armazenam histórico.

Como Nicklaus Wirth disse uma vez , os programas são algoritmos e estruturas de dados, e o Redis já oferece os dois.

Source: https://habr.com/ru/post/pt456270/


All Articles