
Il y a quelque temps, nous avons été confrontés au problème du nettoyage des tuples dans les espaces tarantool. Le nettoyage devait être commencé non pas lorsque tarantool manquait déjà de mémoire, mais à l'avance et à intervalles réguliers. Tarantool a un module écrit en Lua pour cette tâche appelé expirationd . Après une courte utilisation de ce module, nous avons réalisé qu'il ne nous convenait pas: lors du nettoyage constant de grandes quantités de données, Lua s'est accroché au GC. Par conséquent, nous avons pensé à développer notre module d'expiration plafonné, en espérant que le code écrit dans le langage de programmation natif résoudrait nos problèmes de la meilleure façon.
Un bon exemple était le module tarantool appelé memcached . L'approche utilisée est basée sur le fait qu'un champ séparé est entré dans l'espace dans lequel la durée de vie du tuple est indiquée, en d'autres termes, ttl. Le module en arrière-plan analyse l'espace, compare ttl à l'heure actuelle et décide de supprimer ou non le tuple. Le code du module memcached est simple et élégant, mais trop général. Premièrement, il ne prend pas en compte le type d'index utilisé pour explorer et supprimer. Deuxièmement, à chaque passage, tous les tuples sont scannés, dont le nombre peut être assez important. Et si dans le module expirationd le premier problème a été résolu (l'index de l'arbre est alloué dans une classe distincte), alors le second ne reçoit toujours aucune attention. Cela a prédéterminé le choix en faveur de l'écriture de votre propre code.
La description
La documentation de tarantool a un très bon tutoriel sur la façon d'écrire vos procédures stockées en C. Tout d'abord, je vous suggère de vous familiariser avec lui afin de comprendre les insertions avec les commandes et le code qui se trouvent ci-dessous. Il convient également de prêter attention à la référence aux objets disponibles lors de l'écriture de votre propre module plafonné, à savoir box , fibre , index et txn .
Commençons de loin et regardons à quoi ressemble le module expirationd plafonné de l'extérieur:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) capped_connection = net_box:new(3300)
Pour plus de simplicité, exécutez tarantool dans le répertoire où se trouve notre bibliothèque libcapped-expirationd.so. Deux fonctions sont exportées de la bibliothèque: démarrer et tuer. Vous devez d'abord rendre ces fonctions disponibles à partir de Lua en utilisant box.schema.func.create et box.schema.user.grant. Créez ensuite un espace dont les tuples ne contiendront que trois champs: le premier est un identifiant unique, le second est email, le troisième est la durée de vie du tuple. En haut du premier champ, créez un arbre-index et appelez-le primaire. Ensuite, nous obtenons l'objet pour se connecter à notre bibliothèque native.
Après les travaux préparatoires, nous commençons la fonction de démarrage:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Cet exemple fonctionnera sur la numérisation tout comme le module expirationd, qui est écrit en Lua. Le premier argument de la fonction de démarrage est le nom unique de la tâche. Le second est l'identifiant de l'espace. Le troisième est un index unique par lequel les tuples seront supprimés. Le quatrième est l'indice par lequel les tuples seront contournés. Cinquièmement - le numéro du champ de tuple avec la durée de vie (la numérotation va de 1, pas 0!). Sixième et septième - paramètres de numérisation. 1024 est le nombre maximal de tuples pouvant être visualisés en une seule transaction. 3600 - temps de scan complet en secondes.
Notez que le même index est utilisé pour l'analyse et la suppression dans l'exemple. S'il s'agit d'un index arborescent, l'analyse va de la clé la plus petite à la plus grande. Si un autre, par exemple, un index de hachage, alors la traversée est effectuée, en règle générale, dans un ordre arbitraire. En une seule analyse, tous les tuples d'espace sont affichés.
Insérons plusieurs tuples dans l'espace avec une durée de vie de 60 secondes:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Vérifiez que l'insertion a réussi:
tarantool> box.space.tester.index.primary:select()
Répétez la sélection après plus de 60 secondes (comptez depuis le début de l'insertion du premier tuple) et vérifiez que le module d'expiration plafonné a déjà fonctionné:
tarantool> box.space.tester.index.primary:select()
Arrêtez la tâche:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Regardons un deuxième exemple où un index séparé est utilisé pour l'exploration:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}}) capped_connection = net_box:new(3300)
Ici, tout est le même que dans le premier exemple, à quelques exceptions près. Au-dessus du troisième champ, nous construisons un arbre-index et l'appelons exp. Cet index n'a pas à être unique, contrairement à un index appelé primaire. Le contournement se fera à l'index exp et la suppression au primaire. Nous nous souvenons que précédemment, les deux étaient effectués uniquement à l'aide de l'index primaire.
Après le travail préparatoire, nous démarrons la fonction de démarrage avec de nouveaux arguments:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Encore une fois, nous allons insérer plusieurs tuples dans l'espace avec une durée de vie de 60 secondes:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Après 30 secondes, par analogie, ajoutez quelques tuples supplémentaires:
box.space.tester:insert{3, 'user3@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{4, 'user4@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{5, 'user5@tarantool.io', math.floor(fiber.time()) + 60}
Vérifiez que l'insertion a réussi:
tarantool> box.space.tester.index.primary:select()
Répétez la sélection après plus de 60 secondes (comptez depuis le début de l'insertion du premier tuple) et vérifiez que le module d'expiration plafonné a déjà fonctionné:
tarantool> box.space.tester.index.primary:select()
Des tuples sont restés dans l'espace, qui vivent pendant environ 30 secondes. De plus, l'analyse s'est arrêtée lors du passage d'un tuple avec l'identifiant 2 et une durée de vie de 1576421257 à un tuple avec l'identifiant 3 et une durée de vie de 1576421287. Les tuples avec une durée de vie de 1576421287 et plus n'ont pas été visualisés en raison de l'ordre des clés d'index exp. Ce sont les économies que nous voulions réaliser au tout début.
Arrêtez la tâche:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implémentation
Mieux encore, toutes les fonctionnalités du projet indiqueront toujours son code source! Dans le cadre de la publication, nous nous concentrerons uniquement sur les points les plus importants, à savoir les algorithmes de contournement spatial.
Les arguments que nous transmettons à la fonction de démarrage sont stockés dans une structure appelée expirationd_task:
struct expirationd_task { char name[256]; uint32_t space_id; uint32_t rm_index_id; uint32_t it_index_id; uint32_t it_index_type; uint32_t field_no; uint32_t scan_size; uint32_t scan_time; };
L'attribut name est le nom de la tâche. L'attribut space_id est l'identifiant de l'espace. L'attribut rm_index_id est l'identifiant de l'index unique par lequel les tuples seront supprimés. L'attribut it_index_id est l'identifiant de l'index par lequel les tuples seront analysés. L'attribut it_index_type est le type d'index par lequel les tuples seront traversés. L'attribut deposited_no est le numéro de champ de tuple avec une durée de vie. L'attribut scan_size est le nombre maximal de tuples qui sont affichés dans une seule transaction. L'attribut scan_time est le temps pour une analyse complète en secondes.
Nous ne considérerons pas l'analyse des arguments. Il s'agit d'un travail laborieux mais simple dans lequel la bibliothèque msgpuck vous aidera. Des difficultés ne peuvent survenir qu'avec des index qui sont transférés de Lua en tant que structure de données complexe avec le type mp_map, et non à l'aide de types simples mp_bool, mp_double, mp_int, mp_uint et mp_array. Mais l'ensemble de l'indice n'est pas analysé. Il suffit de vérifier son unicité, de calculer le type et d'extraire l'identifiant.
Nous listons les prototypes de toutes les fonctions utilisées pour l'analyse:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos); bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Et maintenant passons à la chose la plus importante - la logique de contourner l'espace et de supprimer les tuples. Chaque bloc de tuples ne dépassant pas scan_size est affiché et modifié sous une seule transaction. En cas de succès, cette transaction est validée; en cas d'erreur, elle est annulée. Le dernier argument de la fonction expirationd_iterate est un pointeur vers l'itérateur à partir duquel l'analyse commence ou se poursuit. Cet itérateur est incrémenté à l'intérieur jusqu'à ce qu'une erreur se produise, que l'espace se termine ou qu'il n'y ait aucune possibilité d'arrêter le processus à l'avance. La fonction expirationd_expired vérifie la durée de vie du tuple, expirationd_delete - supprime le tuple, expirationd_breakable - vérifie si nous devons continuer.
Code de fonction expirationd_iterate:
static bool expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp) { box_iterator_t *iter = *iterp; box_txn_begin(); for (uint32_t i = 0; i < task->scan_size; ++i) { box_tuple_t *tuple = NULL; if (box_iterator_next(iter, &tuple) < 0) { box_iterator_free(iter); *iterp = NULL; box_txn_rollback(); return false; } if (!tuple) { box_iterator_free(iter); *iterp = NULL; box_txn_commit(); return true; } if (expirationd_expired(task, tuple)) expirationd_delete(task, tuple); else if (expirationd_breakable(task)) break; } box_txn_commit(); return true; }
Code de fonction expirationd_expiré:
static bool expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple) { const char *buf = box_tuple_field(tuple, task->field_no - 1); if (!buf || mp_typeof(*buf) != MP_UINT) return false; uint64_t val = mp_decode_uint(&buf); if (val > fiber_time64() / 1000000) return false; return true; }
Expiration du code de fonctiond_delete:
static void expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple) { uint32_t len; const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len); box_delete(task->space_id, task->rm_index_id, str, str + len, NULL); }
Code de fonction expirationd_breakable:
static bool expirationd_breakable(struct expirationd_task *task) { return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT; }
App
Découvrez le code source ici !