
Vor einiger Zeit hatten wir das Problem, Tupel in Tarentool- Räumen zu reinigen. Die Reinigung musste nicht gestartet werden, wenn der Speicher des Tarantools bereits erschöpft war, sondern im Voraus und mit einer bestimmten Häufigkeit. Tarantool hat ein in Lua geschriebenes Modul für diese Aufgabe namens expirationd . Nach einer kurzen Nutzung dieses Moduls stellten wir fest, dass es für uns nicht geeignet war: Nach einer ständigen Bereinigung großer Datenmengen hing Lua im GC. Aus diesem Grund haben wir uns überlegt, unser Capped Expirationd-Modul zu entwickeln, in der Hoffnung, dass der in der nativen Programmiersprache geschriebene Code unsere Probleme bestmöglich löst.
Ein gutes Beispiel war das Tarantool-Modul memcached . Der dabei verwendete Ansatz basiert auf der Tatsache, dass ein separates Feld in den Bereich eingegeben wird, in dem die Lebensdauer des Tupels angegeben ist, dh ttl. Das Modul im Hintergrund durchsucht den Raum, vergleicht ttl mit der aktuellen Zeit und entscheidet, ob das Tupel gelöscht werden soll oder nicht. Der zwischengespeicherte Modulcode ist einfach und elegant, aber zu allgemein. Erstens wird der zum Crawlen und Löschen verwendete Indextyp nicht berücksichtigt. Zweitens werden bei jedem Durchlauf alle Tupel gescannt, deren Anzahl ziemlich groß sein kann. Und wenn im expirationd-Modul das erste Problem gelöst wurde (der Baumindex wird in einer separaten Klasse zugeordnet), dann erhält das zweite noch keine Beachtung. Dies gab die Wahl vor, Ihren eigenen Code zu schreiben.
Beschreibung
Die Dokumentation zu tarantool enthält eine sehr gute Anleitung zum Schreiben Ihrer gespeicherten Prozeduren in C. Zunächst sollten Sie sich damit vertraut machen, um die Einfügungen mit Befehlen und Code zu verstehen, die unten aufgeführt sind. Beachten Sie auch den Verweis auf die Objekte, die verfügbar sind, wenn Sie Ihr eigenes Capped-Modul schreiben, nämlich Box , Fiber , Index und TXN .
Fangen wir von weitem an und schauen wir uns an, wie das abgedeckte Ablaufmodul von außen aussieht:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) capped_connection = net_box:new(3300)
Führen Sie zur Vereinfachung tarantool in dem Verzeichnis aus, in dem sich unsere Bibliothek libcapped-expirationd.so befindet. Aus der Bibliothek werden zwei Funktionen exportiert: start und kill. Zuerst müssen Sie diese Funktionen in Lua mit box.schema.func.create und box.schema.user.grant verfügbar machen. Erstellen Sie dann einen Bereich, dessen Tupel nur drei Felder enthalten: Das erste ist eine eindeutige Kennung, das zweite eine E-Mail und das dritte die Lebensdauer des Tupels. Erstellen Sie über dem ersten Feld einen Baumindex und nennen Sie ihn primär. Als nächstes erhalten wir das Objekt, um eine Verbindung zu unserer nativen Bibliothek herzustellen.
Nach den Vorarbeiten starten wir die Startfunktion:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
Dieses Beispiel funktioniert beim Scannen genauso wie das in Lua geschriebene expirationd-Modul. Das erste Argument für die Startfunktion ist der eindeutige Name der Aufgabe. Die zweite ist die Leerzeichenkennung. Der dritte ist ein eindeutiger Index, mit dem Tupel gelöscht werden. Der vierte ist der Index, um den Tupel umgangen werden. Fünftens - die Nummer des Tupelfeldes mit der Lebensdauer (die Nummerierung geht von 1, nicht von 0!). Sechste und siebte Scaneinstellung. 1024 ist die maximale Anzahl von Tupeln, die in einer einzelnen Transaktion angezeigt werden können. 3600 - vollständige Scanzeit in Sekunden.
Beachten Sie, dass im Beispiel derselbe Index für das Crawlen und Löschen verwendet wird. Wenn es sich um einen Baumindex handelt, wechselt der Crawl vom kleineren zum größeren Schlüssel. Wenn es sich um einen anderen Hash-Index handelt, wird die Durchquerung in der Regel in einer beliebigen Reihenfolge durchgeführt. In einem Scan werden alle Space-Tupel angezeigt.
Fügen wir mehrere Tupel mit einer Lebensdauer von 60 Sekunden in den Space ein:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Überprüfen Sie, ob die Einfügung erfolgreich war:
tarantool> box.space.tester.index.primary:select()
Wiederholen Sie die Auswahl nach mehr als 60 Sekunden (beginnend mit dem Beginn des Einfügens des ersten Tupels) und stellen Sie sicher, dass das begrenzte Ablaufmodul bereits funktioniert hat:
tarantool> box.space.tester.index.primary:select()
Stoppen Sie die Aufgabe:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
Schauen wir uns ein zweites Beispiel an, in dem ein separater Index zum Crawlen verwendet wird:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}}) capped_connection = net_box:new(3300)
Hier ist bis auf wenige Ausnahmen alles wie im ersten Beispiel. Über dem dritten Feld erstellen wir einen Baumindex und nennen ihn exp. Dieser Index muss im Gegensatz zu einem Primärindex nicht eindeutig sein. Die Umgehung erfolgt bei exp index und das Löschen bei primary. Wir erinnern uns, dass beide zuvor nur mit dem Primärindex durchgeführt wurden.
Nach den Vorarbeiten starten wir die Startfunktion mit neuen Argumenten:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
Wieder werden wir mehrere Tupel mit einer Lebensdauer von 60 Sekunden in den Space einfügen:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
Fügen Sie nach 30 Sekunden analog ein paar weitere Tupel hinzu:
box.space.tester:insert{3, 'user3@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{4, 'user4@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{5, 'user5@tarantool.io', math.floor(fiber.time()) + 60}
Überprüfen Sie, ob die Einfügung erfolgreich war:
tarantool> box.space.tester.index.primary:select()
Wiederholen Sie die Auswahl nach mehr als 60 Sekunden (beginnend mit dem Beginn des Einfügens des ersten Tupels) und stellen Sie sicher, dass das begrenzte Ablaufmodul bereits funktioniert hat:
tarantool> box.space.tester.index.primary:select()
Die Tupel blieben in dem Raum, der etwa 30 Sekunden lebte. Darüber hinaus wurde der Scan beim Wechsel von einem Tupel mit dem Bezeichner 2 und einer Lebensdauer von 1576421257 zu einem Tupel mit dem Bezeichner 3 und einer Lebensdauer von 1576421287 abgebrochen. Tupel mit einer Lebensdauer von 1576421287 und mehr wurden aufgrund der Reihenfolge der exp-Indexschlüssel nicht angezeigt. Dies sind die Einsparungen, die wir von Anfang an erzielen wollten.
Stoppen Sie die Aufgabe:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
Implementierung
Das Beste ist, dass alle Funktionen des Projekts immer den Quellcode enthalten ! Im Rahmen der Veröffentlichung konzentrieren wir uns nur auf die wichtigsten Punkte, nämlich die Space-Bypass-Algorithmen.
Die Argumente, die wir an die Startfunktion übergeben, werden in einer Struktur namens expirationd_task gespeichert:
struct expirationd_task { char name[256]; uint32_t space_id; uint32_t rm_index_id; uint32_t it_index_id; uint32_t it_index_type; uint32_t field_no; uint32_t scan_size; uint32_t scan_time; };
Das Attribut name ist der Name der Aufgabe. Das space_id-Attribut ist der Bezeichner des Leerzeichens. Das Attribut rm_index_id ist die Kennung des eindeutigen Index, mit dem Tupel gelöscht werden. Das Attribut it_index_id ist der Bezeichner des Index, mit dem Tupel gecrawlt werden. Das Attribut it_index_type gibt den Indextyp an, mit dem Tupel durchlaufen werden. Das Attribut filed_no ist die Tupelfeldnummer mit einer Lebensdauer. Das scan_size-Attribut gibt die maximale Anzahl von Tupeln an, die in einer einzelnen Transaktion angezeigt werden. Das scan_time-Attribut gibt die Zeit für einen vollständigen Scan in Sekunden an.
Argumente werden nicht analysiert. Dies ist eine mühsame, aber unkomplizierte Aufgabe, bei der Ihnen die msgpuck- Bibliothek helfen wird. Schwierigkeiten können nur bei Indizes auftreten, die von Lua als komplexe Datenstruktur mit dem Typ mp_map übertragen werden, und nicht mit Hilfe der einfachen Typen mp_bool, mp_double, mp_int, mp_uint und mp_array. Der gesamte Index wird jedoch nicht analysiert. Es reicht aus, die Eindeutigkeit zu überprüfen, den Typ zu berechnen und die Kennung zu extrahieren.
Wir listen die Prototypen aller Funktionen auf, die zum Parsen verwendet werden:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos); bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
Und jetzt kommen wir zum wichtigsten Punkt - der Logik, das Leerzeichen zu umgehen und Tupel zu entfernen. Jeder Tupelblock, der nicht größer als scan_size ist, wird unter einer Transaktion angezeigt und geändert. Bei Erfolg wird diese Transaktion festgeschrieben und im Fehlerfall zurückgesetzt. Das letzte Argument für die Funktion expirationd_iterate ist ein Zeiger auf den Iterator, ab dem der Scanvorgang beginnt oder fortgesetzt wird. Dieser Iterator wird solange inkrementiert, bis ein Fehler auftritt, der Speicherplatz endet oder keine Möglichkeit mehr besteht, den Vorgang vorab abzubrechen. Die Funktion expirationd_expired überprüft die Lebensdauer des Tupels, expirationd_delete - löscht das Tupel, expirationd_breakable - prüft, ob wir weitermachen müssen.
Expirationd_iterate Funktionscode:
static bool expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp) { box_iterator_t *iter = *iterp; box_txn_begin(); for (uint32_t i = 0; i < task->scan_size; ++i) { box_tuple_t *tuple = NULL; if (box_iterator_next(iter, &tuple) < 0) { box_iterator_free(iter); *iterp = NULL; box_txn_rollback(); return false; } if (!tuple) { box_iterator_free(iter); *iterp = NULL; box_txn_commit(); return true; } if (expirationd_expired(task, tuple)) expirationd_delete(task, tuple); else if (expirationd_breakable(task)) break; } box_txn_commit(); return true; }
Expirationd_expired Funktionscode:
static bool expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple) { const char *buf = box_tuple_field(tuple, task->field_no - 1); if (!buf || mp_typeof(*buf) != MP_UINT) return false; uint64_t val = mp_decode_uint(&buf); if (val > fiber_time64() / 1000000) return false; return true; }
Funktionscode expirationd_delete:
static void expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple) { uint32_t len; const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len); box_delete(task->space_id, task->rm_index_id, str, str + len, NULL); }
Expirationd_breakable Funktionscode:
static bool expirationd_breakable(struct expirationd_task *task) { return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT; }
App
Hier geht es zum Quellcode!