
Hace algún tiempo, enfrentamos el problema de limpiar tuplas en espacios de tarantool. La limpieza tuvo que comenzar no cuando el tarantool ya se estaba quedando sin memoria, sino de antemano y con cierta frecuencia. Tarantool tiene un módulo escrito en Lua para esta tarea llamado caducado . Después de un breve uso de este módulo, nos dimos cuenta de que no era adecuado para nosotros: en la limpieza constante de grandes volúmenes de datos, Lua se quedó en el GC. Por lo tanto, pensamos en desarrollar nuestro módulo de caducidad con límite, esperando que el código escrito en el lenguaje de programación nativo resuelva nuestros problemas de la mejor manera.
Un buen ejemplo fue el módulo tarantool llamado memcached . El enfoque utilizado en él se basa en el hecho de que se ingresa un campo separado en el espacio en el que se indica la vida útil de la tupla, en otras palabras, ttl. El módulo en segundo plano escanea el espacio, compara ttl con la hora actual y decide si eliminar la tupla o no. El código del módulo memcached es simple y elegante, pero demasiado general. En primer lugar, no tiene en cuenta el tipo de índice utilizado para rastrear y eliminar. En segundo lugar, en cada pasada, se escanean todas las tuplas, cuyo número puede ser bastante grande. Y si en el módulo vencido se resolvió el primer problema (el índice de árbol se asigna en una clase separada), entonces al segundo todavía no se le presta atención. Esto predeterminó la elección a favor de escribir su propio código.
Descripción
La documentación para tarantool tiene un muy buen tutorial sobre cómo escribir sus procedimientos almacenados en C. En primer lugar, le sugiero que se familiarice con él para comprender las inserciones con los comandos y el código que se encontrarán a continuación. También vale la pena prestar atención a la referencia a los objetos que están disponibles al escribir su propio módulo limitado, a saber, box , fiber , index y txn .
Comencemos desde lejos y veamos cómo se ve el módulo caducado limitado desde el exterior:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) capped_connection = net_box:new(3300)
Para simplificar, ejecute tarantool en el directorio donde se encuentra nuestra biblioteca libcapped-expirationd.so. Dos funciones se exportan desde la biblioteca: iniciar y matar. Primero debe hacer que estas funciones estén disponibles desde Lua usando box.schema.func.create y box.schema.user.grant. Luego cree un espacio cuyas tuplas contendrán solo tres campos: el primero es un identificador único, el segundo es el correo electrónico, el tercero es la vida útil de la tupla. Encima del primer campo, cree un índice de árbol y llámelo primario. A continuación, obtenemos el objeto para conectarnos a nuestra biblioteca nativa.
Después del trabajo preparatorio, comenzamos la función de inicio:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Este ejemplo funcionará en el escaneo al igual que el módulo caducado, que está escrito en Lua. El primer argumento para la función de inicio es el nombre único de la tarea. El segundo es el identificador de espacio. El tercero es un índice único por el cual se eliminarán las tuplas. El cuarto es el índice por el cual se omitirán las tuplas. Quinto: el número del campo de tupla con el tiempo de vida (¡la numeración va de 1, no de 0!). Sexto y séptimo - ajustes de escaneo. 1024 es el número máximo de tuplas que se pueden ver en una sola transacción. 3600: tiempo de exploración completo en segundos.
Tenga en cuenta que el mismo índice se utiliza para el rastreo y la eliminación en el ejemplo. Si se trata de un índice de árbol, el rastreo es desde la clave más pequeña a la más grande. Si algún otro, por ejemplo, el índice hash, entonces el recorrido generalmente se realiza en orden aleatorio. En una exploración, se ven todas las tuplas espaciales.
Insertemos varias tuplas en el espacio con una vida útil de 60 segundos:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Verifique que la inserción haya sido exitosa:
tarantool> box.space.tester.index.primary:select()
Repita la selección después de más de 60 segundos (cuente desde el comienzo de la inserción de la primera tupla) y vea que el módulo caducado con límite ya ha funcionado:
tarantool> box.space.tester.index.primary:select()
Detén la tarea:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Veamos un segundo ejemplo donde se usa un índice separado para el rastreo:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}}) capped_connection = net_box:new(3300)
Aquí todo es igual que en el primer ejemplo, con algunas excepciones. En la parte superior del tercer campo construimos un índice de árbol y lo llamamos exp. Este índice no tiene que ser único, a diferencia de un índice llamado primario. La omisión se realizará en el índice exp y la eliminación en primaria. Recordamos que anteriormente, ambos se hicieron solo usando el índice primario.
Después del trabajo preparatorio, comenzamos la función de inicio con nuevos argumentos:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Nuevamente, insertaremos varias tuplas en el espacio con una vida útil de 60 segundos:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Después de 30 segundos, por analogía, agregue algunas tuplas más:
box.space.tester:insert{3, 'user3@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{4, 'user4@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{5, 'user5@tarantool.io', math.floor(fiber.time()) + 60}
Verifique que la inserción haya sido exitosa:
tarantool> box.space.tester.index.primary:select()
Repita la selección después de más de 60 segundos (cuente desde el comienzo de la inserción de la primera tupla) y vea que el módulo caducado con límite ya ha funcionado:
tarantool> box.space.tester.index.primary:select()
Las tuplas permanecieron en el espacio, que viven durante unos 30 segundos. Además, la exploración se detuvo al cambiar de una tupla con identificador 2 y una vida útil de 1576421257 a una tupla con identificador 3 y una vida útil de 1576421287. Las tuplas con una vida útil de 1576421287 y más no se vieron debido al orden de las teclas de índice exp. Este es el ahorro que queríamos lograr desde el principio.
Detén la tarea:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implementación
¡Lo mejor de todo es que todas las características del proyecto siempre dirán su código fuente ! Como parte de la publicación, nos centraremos solo en los puntos más importantes, a saber, los algoritmos de derivación espacial.
Los argumentos que pasamos a la función de inicio se almacenan en una estructura llamada expirationd_task:
struct expirationd_task { char name[256]; uint32_t space_id; uint32_t rm_index_id; uint32_t it_index_id; uint32_t it_index_type; uint32_t field_no; uint32_t scan_size; uint32_t scan_time; };
El atributo de nombre es el nombre de la tarea. El atributo space_id es el identificador del espacio. El atributo rm_index_id es el identificador del índice único por el cual se eliminarán las tuplas. El atributo it_index_id es el identificador del índice por el cual se rastrearán las tuplas. El atributo it_index_type es el tipo de índice por el que se atravesarán las tuplas. El atributo archive_no es el número de campo de tupla con una vida útil. El atributo scan_size es el número máximo de tuplas que se ven en una sola transacción. El atributo scan_time es el tiempo para una exploración completa en segundos.
No consideraremos analizar los argumentos. Este es un trabajo minucioso pero sencillo en el que la biblioteca msgpuck lo ayudará. Las dificultades pueden surgir solo con índices que se transfieren desde Lua como una estructura de datos compleja con el tipo mp_map, y no con la ayuda de los tipos simples mp_bool, mp_double, mp_int, mp_uint y mp_array. Pero no se analiza todo el índice. Es suficiente verificar su singularidad, calcular el tipo y extraer el identificador.
Enumeramos los prototipos de todas las funciones que se utilizan para analizar:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos); bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Y ahora pasemos a lo más importante: la lógica de evitar el espacio y eliminar las tuplas. Cada bloque de tuplas no mayor que scan_size se visualiza y cambia en una sola transacción. Si tiene éxito, esta transacción se confirma; en caso de error, se revierte. El último argumento para la función expirationd_iterate es un puntero al iterador desde el que comienza o continúa el escaneo. Este iterador se incrementa en el interior hasta que se produce un error, el espacio finaliza o no hay oportunidad de detener el proceso por adelantado. La función expirationd_expired verifica la vida útil de la tupla, expirationd_delete - elimina la tupla, expirationd_breakable - verifica si necesitamos seguir adelante.
Código de función expirado_iterate:
static bool expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp) { box_iterator_t *iter = *iterp; box_txn_begin(); for (uint32_t i = 0; i < task->scan_size; ++i) { box_tuple_t *tuple = NULL; if (box_iterator_next(iter, &tuple) < 0) { box_iterator_free(iter); *iterp = NULL; box_txn_rollback(); return false; } if (!tuple) { box_iterator_free(iter); *iterp = NULL; box_txn_commit(); return true; } if (expirationd_expired(task, tuple)) expirationd_delete(task, tuple); else if (expirationd_breakable(task)) break; } box_txn_commit(); return true; }
Código de función expirado_ expirado:
static bool expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple) { const char *buf = box_tuple_field(tuple, task->field_no - 1); if (!buf || mp_typeof(*buf) != MP_UINT) return false; uint64_t val = mp_decode_uint(&buf); if (val > fiber_time64() / 1000000) return false; return true; }
Código de función expirationd_delete:
static void expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple) { uint32_t len; const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len); box_delete(task->space_id, task->rm_index_id, str, str + len, NULL); }
Código de función expirado_breakable:
static bool expirationd_breakable(struct expirationd_task *task) { return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT; }
App
¡Mira el código fuente aquí !