
Há algum tempo, enfrentamos o problema de limpar tuplas em espaços tarantool. A limpeza teve que ser iniciada não quando o tarantool já estava ficando sem memória, mas com antecedência e com certa frequência. O Tarantool possui um módulo escrito em Lua para esta tarefa chamada expirationd . Após um breve uso deste módulo, percebemos que ele não era adequado para nós: na limpeza constante de grandes volumes de dados, Lua ficou no GC. Portanto, pensamos em desenvolver nosso módulo expirationd limitado, esperando que o código escrito na linguagem de programação nativa resolvesse nossos problemas da melhor maneira.
Um bom exemplo foi o módulo tarantool chamado memcached . A abordagem usada é baseada no fato de que um campo separado é inserido no espaço em que a vida útil da tupla é indicada, ou seja, ttl. O módulo em segundo plano varre o espaço, compara ttl com o tempo atual e decide se deseja excluir a tupla ou não. O código do módulo memcached é simples e elegante, mas muito geral. Em primeiro lugar, ele não leva em consideração o tipo de índice usado para rastrear e excluir. Em segundo lugar, a cada passagem, todas as tuplas são digitalizadas, cujo número pode ser bastante grande. E se no módulo expirationd o primeiro problema foi resolvido (o índice da árvore é alocado em uma classe separada), o segundo ainda não recebe atenção. Isso predeterminou a escolha em favor de escrever seu próprio código.
Descrição do produto
A documentação do tarantool possui um excelente tutorial sobre como escrever seus procedimentos armazenados em C. Primeiro, sugiro que você se familiarize com ele para entender os detalhes com comandos e códigos que serão encontrados abaixo. Também vale a pena prestar atenção à referência aos objetos disponíveis ao escrever seu próprio módulo limitado, como box , fiber , index e txn .
Vamos começar de longe e ver como o módulo expirationd limitado parece de fora:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) capped_connection = net_box:new(3300)
Para simplificar, execute o tarantool no diretório em que nossa biblioteca libcapped-expirationd.so está localizada. Duas funções são exportadas da biblioteca: start and kill. Primeiro, você precisa disponibilizar essas funções em Lua usando box.schema.func.create e box.schema.user.grant. Em seguida, crie um espaço cujas tuplas conterão apenas três campos: o primeiro é um identificador exclusivo, o segundo é o email e o terceiro é o tempo de vida da tupla. No topo do primeiro campo, crie um índice de árvore e chame-o de primário. Em seguida, obtemos o objeto para conectar-se à nossa biblioteca nativa.
Após o trabalho preparatório, iniciamos a função de início:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Este exemplo funcionará na verificação exatamente como o módulo expirationd, escrito em Lua. O primeiro argumento para a função start é o nome exclusivo da tarefa. O segundo é o identificador de espaço. O terceiro é um índice exclusivo pelo qual as tuplas serão excluídas. O quarto é o índice pelo qual as tuplas serão ignoradas. Quinto - o número do campo da tupla com a vida útil (a numeração passa de 1, não de 0!). Configurações de sexta e sétima digitalização. 1024 é o número máximo de tuplas que podem ser visualizadas em uma única transação. 3600 - tempo de varredura completo em segundos.
Observe que o mesmo índice é usado para rastreamento e exclusão no exemplo. Se for um índice em árvore, o rastreamento será da chave menor para a maior. Se algum outro, por exemplo, índice de hash, a travessia é realizada, em regra, em uma ordem arbitrária. Em uma varredura, todas as tuplas de espaço são exibidas.
Vamos inserir várias tuplas no espaço com uma vida útil de 60 segundos:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Verifique se a inserção foi bem-sucedida:
tarantool> box.space.tester.index.primary:select()
Repita a seleção após mais de 60 segundos (conte desde o início da inserção da primeira tupla) e verifique se o módulo expirationd limitado já funcionou:
tarantool> box.space.tester.index.primary:select()
Pare a tarefa:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Vejamos um segundo exemplo em que um índice separado é usado para rastreamento:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}}) capped_connection = net_box:new(3300)
Aqui tudo é o mesmo que no primeiro exemplo, com algumas exceções. No topo do terceiro campo, criamos um índice de árvore e chamamos de exp. Esse índice não precisa ser exclusivo, diferentemente de um índice chamado primário. O desvio será feito no índice exp e a exclusão no primário. Lembramos que anteriormente, ambos eram feitos apenas usando o índice primário.
Após o trabalho preparatório, iniciamos a função start com novos argumentos:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Novamente, inseriremos várias tuplas no espaço com uma vida útil de 60 segundos:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Após 30 segundos, por analogia, adicione mais algumas tuplas:
box.space.tester:insert{3, 'user3@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{4, 'user4@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{5, 'user5@tarantool.io', math.floor(fiber.time()) + 60}
Verifique se a inserção foi bem-sucedida:
tarantool> box.space.tester.index.primary:select()
Repita a seleção após mais de 60 segundos (conte desde o início da inserção da primeira tupla) e verifique se o módulo expirationd limitado já funcionou:
tarantool> box.space.tester.index.primary:select()
As tuplas permaneceram no espaço, que vive por cerca de 30 segundos. Além disso, a varredura parou ao alternar de uma tupla com identificador 2 e uma vida útil de 1576421257 para uma tupla com identificador 3 e uma vida útil de 1576421287. As tuplas com vida útil de 1576421287 e mais não foram visualizadas devido à ordem das chaves de índice exp. Essa é a economia que queríamos obter desde o início.
Pare a tarefa:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implementação
O melhor de tudo: todos os recursos do projeto sempre informarão seu código fonte! Como parte da publicação, focaremos apenas os pontos mais importantes, a saber, algoritmos de desvio de espaço.
Os argumentos que passamos para a função start são armazenados em uma estrutura chamada expirationd_task:
struct expirationd_task { char name[256]; uint32_t space_id; uint32_t rm_index_id; uint32_t it_index_id; uint32_t it_index_type; uint32_t field_no; uint32_t scan_size; uint32_t scan_time; };
O atributo name é o nome da tarefa. O atributo space_id é o identificador do espaço. O atributo rm_index_id é o identificador do índice exclusivo pelo qual as tuplas serão excluídas. O atributo it_index_id é o identificador do índice pelo qual as tuplas serão rastreadas. O atributo it_index_type é o tipo de índice pelo qual as tuplas serão percorridas. O atributo filed_no é o número do campo da tupla com uma vida útil. O atributo scan_size é o número máximo de tuplas exibidas em uma única transação. O atributo scan_time é o tempo para uma verificação completa em segundos.
Não consideraremos a análise de argumentos. Este é um trabalho minucioso, mas descomplicado, no qual a biblioteca msgpuck o ajudará. As dificuldades podem surgir apenas com índices transferidos de Lua como uma estrutura de dados complexa com o tipo mp_map, e não com a ajuda dos tipos simples mp_bool, mp_double, mp_int, mp_uint e mp_array. Mas todo o índice não é analisado. Basta verificar sua singularidade, calcular o tipo e extrair o identificador.
Listamos os protótipos de todas as funções usadas para análise:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos); bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
E agora vamos para a coisa mais importante - a lógica de ignorar o espaço e remover as tuplas. Cada bloco de tuplas não maiores que scan_size é exibido e alterado em uma transação. Se bem-sucedida, essa transação é confirmada; em caso de erro, é revertida. O último argumento para a função expirationd_iterate é um ponteiro para o iterador a partir do qual a verificação inicia ou continua. Esse iterador é incrementado internamente até ocorrer um erro, o espaço termina ou não há oportunidade para interromper o processo antecipadamente. A função expirationd_expired verifica a vida útil da tupla, expirationd_delete - exclui a tupla, expirationd_breakable - verifica se precisamos seguir em frente.
Código da função Expirationd_iterate:
static bool expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp) { box_iterator_t *iter = *iterp; box_txn_begin(); for (uint32_t i = 0; i < task->scan_size; ++i) { box_tuple_t *tuple = NULL; if (box_iterator_next(iter, &tuple) < 0) { box_iterator_free(iter); *iterp = NULL; box_txn_rollback(); return false; } if (!tuple) { box_iterator_free(iter); *iterp = NULL; box_txn_commit(); return true; } if (expirationd_expired(task, tuple)) expirationd_delete(task, tuple); else if (expirationd_breakable(task)) break; } box_txn_commit(); return true; }
Código da função Expirationd_expired:
static bool expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple) { const char *buf = box_tuple_field(tuple, task->field_no - 1); if (!buf || mp_typeof(*buf) != MP_UINT) return false; uint64_t val = mp_decode_uint(&buf); if (val > fiber_time64() / 1000000) return false; return true; }
Código da função expirationd_delete:
static void expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple) { uint32_t len; const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len); box_delete(task->space_id, task->rm_index_id, str, str + len, NULL); }
Código da função Expirationd_breakable:
static bool expirationd_breakable(struct expirationd_task *task) { return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT; }
App
Confira o código fonte aqui !