
前一段时间,我们面临着在tarantool 空间清洁元组的问题。 不是必须在tarantool内存不足时开始清洁,而是提前并以一定频率开始清洁。 Tarantool有一个用Lua编写的用于完成此任务的模块,称为expirationd 。 短暂使用此模块后,我们意识到它不适合我们:在不断清理大量数据时,Lua挂在GC上。 因此,我们考虑开发有上限的过期模块,希望以本机编程语言编写的代码将以最佳方式解决我们的问题。
一个很好的例子是名为memcached的tarantool模块。 其中使用的方法基于以下事实:在表示元组寿命的空间(即ttl)中输入了单独的字段。 后台模块扫描空间,将ttl与当前时间进行比较,并决定是否删除元组。 memcached模块代码既简单又优雅,但是太笼统了。 首先,它没有考虑用于爬网和删除的索引类型。 其次,在每次通过时,将扫描所有元组,其数量可能非常大。 并且,如果在过期模块中解决了第一个问题(将树索引分配在单独的类中),则第二个问题仍然没有得到关注。 这预示了选择编写自己的代码的选择。
内容描述
tarantool的文档中有一个很好的教程 ,介绍如何用C语言编写存储过程。首先,我建议您熟悉一下它,以便理解带有命令和代码的插入内容,这些内容将在下面找到。 编写自己的加盖模块(即box , fibre , index和txn)时可用的对象的引用也应引起注意。
让我们从远处开始,看看从外面看,有上限的过期模块的外观:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) capped_connection = net_box:new(3300)
为简单起见,请在我们的库libcapped-expirationd.so所在的目录中运行tarantool。 从库中导出了两个函数:start和kill。 首先,您需要使用box.schema.func.create和box.schema.user.grant从Lua提供这些功能。 然后创建一个空间,其元组将仅包含三个字段:第一个是唯一标识符,第二个是电子邮件,第三个是元组的生存期。 在第一个字段的顶部,构建一个树索引并将其称为主索引。 接下来,我们获得对象以连接到我们的本机库。
准备工作结束后,我们启动启动功能:
capped_connection:call('libcapped-expirationd.start', {'non-indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.primary, 3, 1024, 3600})
该示例将像使用Lua编写的过期模块一样进行扫描。 启动函数的第一个参数是任务的唯一名称。 第二个是空间标识符。 第三个是唯一索引,通过该索引将删除元组。 第四个是元组将被绕过的索引。 第五-具有生存期的元组字段的编号(编号从1开始,而不是0!)。 第六和第七-扫描设置。 1024是单个事务中可以查看的最大元组数。 3600-完整扫描时间(以秒为单位)。
请注意,在示例中,相同的索引用于爬网和删除。 如果它是树索引,则爬网是从较小的键到较大的键。 如果是其他哈希索引,则通常按任意顺序执行遍历。 在一次扫描中,将查看所有空间元组。
让我们将几个元组插入空间,生命周期为60秒:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
检查插入是否成功:
tarantool> box.space.tester.index.primary:select()
60+秒后(从插入第一个元组的开始算起)重复执行select,然后查看已设置上限的过期模块已正常工作:
tarantool> box.space.tester.index.primary:select()
停止任务:
capped_connection:call('libcapped-expirationd.kill', {'non-indexed'})
让我们看第二个示例,其中使用单独的索引进行爬网:
fiber = require('fiber') net_box = require('net.box') box.cfg{listen = 3300} box.schema.func.create('libcapped-expirationd.start', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.start') box.schema.func.create('libcapped-expirationd.kill', {language = 'C'}) box.schema.user.grant('guest', 'execute', 'function', 'libcapped-expirationd.kill') box.schema.space.create('tester') box.space.tester:create_index('primary', {unique = true, parts = {1, 'unsigned'}}) box.space.tester:create_index('exp', {unique = false, parts = {3, 'unsigned'}}) capped_connection = net_box:new(3300)
此处的一切与第一个示例相同,但有一些例外。 在第三个字段的顶部,我们建立一个树索引并将其称为exp。 与称为主索引的索引不同,此索引不必是唯一的。 绕过将在exp索引处完成,并在主索引处进行删除。 我们记得以前,两者都仅使用主索引完成。
准备工作完成后,我们使用新的参数启动start函数:
capped_connection:call('libcapped-expirationd.start', {'indexed', box.space.tester.id, box.space.tester.index.primary, box.space.tester.index.exp, 3, 1024, 3600})
同样,我们将在空间中插入几个元组,生命周期为60秒:
box.space.tester:insert{0, 'user0@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{1, 'user1@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{2, 'user2@tarantool.io', math.floor(fiber.time()) + 60}
类似地,在30秒之后,添加一些元组:
box.space.tester:insert{3, 'user3@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{4, 'user4@tarantool.io', math.floor(fiber.time()) + 60} box.space.tester:insert{5, 'user5@tarantool.io', math.floor(fiber.time()) + 60}
检查插入是否成功:
tarantool> box.space.tester.index.primary:select()
60+秒后(从插入第一个元组的开始算起)重复执行select,然后查看已设置上限的过期模块已正常工作:
tarantool> box.space.tester.index.primary:select()
元组保留在该空间中,该空间存活约30秒。 此外,当从标识符为2且寿命为1576421257的元组切换为标识符为3且寿命为1576421287的元组时,扫描停止。由于exp索引键的顺序,未查看寿命为1576421287及以上的元组。 这是我们一开始就希望实现的节省。
停止任务:
capped_connection:call('libcapped-expirationd.kill', {'indexed'})
实作
最好的是,项目的所有功能将始终告诉其源代码 ! 作为出版物的一部分,我们将仅关注最重要的方面,即空间旁路算法。
我们传递给start函数的参数存储在称为expirationd_task的结构中:
struct expirationd_task { char name[256]; uint32_t space_id; uint32_t rm_index_id; uint32_t it_index_id; uint32_t it_index_type; uint32_t field_no; uint32_t scan_size; uint32_t scan_time; };
name属性是任务的名称。 space_id属性是空间的标识符。 rm_index_id属性是将删除元组的唯一索引的标识符。 it_index_id属性是将对元组进行爬网的索引的标识符。 it_index_type属性是将遍历元组的索引的类型。 filed_no属性是具有生存期的元组字段号。 scan_size属性是在单个事务中查看的最大元组数。 scan_time属性是完整扫描的时间,以秒为单位。
我们将不考虑解析参数。 这是一项艰苦而简单的工作, msgpuck库将为您提供帮助。 仅当索引是从Lua作为mp_map类型的复杂数据结构传输的,而不是借助简单类型mp_bool,mp_double,mp_int,mp_uint和mp_array时,才可能出现困难。 但是整个索引没有被解析。 检查其唯一性,计算类型并提取标识符就足够了。
我们列出了用于解析的所有函数的原型:
bool expirationd_parse_name(struct expirationd_task *task, const char **pos); bool expirationd_parse_space_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index_unique(struct expirationd_task *task, const char **pos); bool expirationd_parse_rm_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_id(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index_type(struct expirationd_task *task, const char **pos); bool expirationd_parse_it_index(struct expirationd_task *task, const char **pos); bool expirationd_parse_field_no(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_size(struct expirationd_task *task, const char **pos); bool expirationd_parse_scan_time(struct expirationd_task *task, const char **pos);
现在让我们继续最重要的事情-绕过空间并删除元组的逻辑。 在一个事务下查看和更改不大于scan_size的每个元组块。 如果成功,则提交该事务;如果出错,则回滚。 expirationd_iterate函数的最后一个参数是指向迭代器的指针,从该迭代器开始或继续扫描。 该迭代器在内部递增,直到发生错误,空格结束或没有机会提前停止进程为止。 expirationd_expired函数检查元组的生命周期,expirationd_delete-删除元组,expirationd_breakable-检查我们是否需要继续。
Expirationd_iterate函数代码:
static bool expirationd_iterate(struct expirationd_task *task, box_iterator_t **iterp) { box_iterator_t *iter = *iterp; box_txn_begin(); for (uint32_t i = 0; i < task->scan_size; ++i) { box_tuple_t *tuple = NULL; if (box_iterator_next(iter, &tuple) < 0) { box_iterator_free(iter); *iterp = NULL; box_txn_rollback(); return false; } if (!tuple) { box_iterator_free(iter); *iterp = NULL; box_txn_commit(); return true; } if (expirationd_expired(task, tuple)) expirationd_delete(task, tuple); else if (expirationd_breakable(task)) break; } box_txn_commit(); return true; }
Expirationd_expired函数代码:
static bool expirationd_expired(struct expirationd_task *task, box_tuple_t *tuple) { const char *buf = box_tuple_field(tuple, task->field_no - 1); if (!buf || mp_typeof(*buf) != MP_UINT) return false; uint64_t val = mp_decode_uint(&buf); if (val > fiber_time64() / 1000000) return false; return true; }
功能代码expirationd_delete:
static void expirationd_delete(struct expirationd_task *task, box_tuple_t *tuple) { uint32_t len; const char *str = box_tuple_extract_key(tuple, task->space_id, task->rm_index_id, &len); box_delete(task->space_id, task->rm_index_id, str, str + len, NULL); }
Expirationd_breakable函数代码:
static bool expirationd_breakable(struct expirationd_task *task) { return task->it_index_id != task->rm_index_id && task->it_index_type == ITER_GT; }
应用程式
在此处查看源代码!