RESTinio是一个异步HTTP服务器。 实践中的一个简单示例:作为响应返回大量数据


最近,我碰巧在一个应该控制其传出连接速度的应用程序上工作。 例如,连接到一个URL,应用程序应将自身限制为200KiB /秒。 并连接到另一个URL-仅30KiB /秒。


这里最有趣的一点是测试这些限制。 我需要一个HTTP服务器,该服务器将以给定的速度提供流量,例如512KiB /秒。 然后,我可以查看该应用程序是否确实能够承受200KiB /秒的速度,或者是否可以分解为更高的速度。


但是在哪里可以得到这样的HTTP服务器呢?


由于我与C ++应用程序中嵌入的RESTinio HTTP服务器有关,因此,我没有比将一个简单的HTTP测试服务器快速投掷到膝盖上以向客户端发送大量传出数据的方法更好的了。


关于它的简单程度,并希望在本文中讲述。 同时,在评论中找出这是否真的很简单或者我是否在自欺欺人。 原则上,可以将本文视为上一篇有关RESTinio的文章的延续,该文章称为“ RESTinio是异步HTTP服务器。异步” 。 因此,如果有人有兴趣阅读有关RESTinio的实际应用(虽然不是很认真)的信息,那么欢迎您。


总体思路


上面提到的测试服务器的总体思路非常简单:当客户端连接到服务器并执行HTTP GET请求时,将激活一个计时器,该计时器每秒运行一次。 触发计时器后,将给定大小的下一个数据块发送到客户端。


但是一切都比较复杂


如果客户端读取数据的速度比服务器发送数据的速度慢,那么每秒仅发送N KB并不是一个好主意。 由于数据将开始在套接字中累积,因此不会导致任何问题。


因此,在发送数据时,建议控制套接字在HTTP服务器端进行写入的准备情况。 只要套接字已准备就绪(也就是说,尚未累积太多数据),就可以发送新部分。 但是,如果尚未准备好,则需要等到套接字进入准备记录的状态。


听起来很合理,但是I / O操作隐藏在RESTinio的内脏中。如何确定下一个数据是否可以写入?


如果使用RESTinio中的写后通知程序 ,则可以避免这种情况。 例如,我们可以这样写:


void request_handler(restinio::request_handle_t req) { req->create_response() //   . ... //   . .done([](const auto & ec) { ... //         . }); } 

RESTinio完成写出数据时,将调用传递给done()方法的lambda。 因此,如果套接字一段时间未准备好进行记录,则lambda不会立即被调用,而是在套接字进入其适当状态并接受所有传出数据之后。


由于使用了写后通知程序,因此测试服务器的逻辑如下:


  • 发送下一批数据,计算在正常事件过程中需要发送下一批数据的时间;
  • 我们将通知器挂在数据的下一部分上;
  • 当调用写后通知时,我们检查下一批是否到达。 如果是这样,则立即启动下一部分的发送。 如果不是,请旋动计时器。

结果,事实证明,一旦记录开始变慢,发送新数据就会暂停。 并在套接字准备好接受新的传出数据时恢复。


还有一点复杂:chunked_output


RESTinio支持三种生成HTTP请求响应的方式 。 默认情况下使用的最简单方法不适用于这种情况,因为 我需要几乎无休止的传出数据流。 这样的流当然不能set_body到对set_body方法的单个调用。


因此,所描述的测试服务器使用了所谓的 chunked_output 。 即 在创建答案时,我向RESTinio指示答案将分为部分。 然后,我只是定期调用append_chunk方法,将下一部分添加到答案中,然后flush以将累积的部分写入套接字。


让我们看一下代码!


开头的单词足够了,现在可以继续检查代码本身了,可以在此存储库中找到。 让我们从request_processor函数开始,该函数被调用以处理每个有效的HTTP请求。 同时,让我们深入研究那些从request_processor调用的函数。 好了,接下来我们将了解request_processor如何精确映射到一个或另一个传入的HTTP请求。


Request_processor函数及其助手


调用request_processor函数来处理我需要的HTTP GET请求。 它作为参数传递:


  • 在其上执行所有工作的asio-shny io_context(例如,对于翘起计时器,这是必需的);
  • 响应的一部分的大小。 即 如果我需要以512KiB / sec的速率提供传出流,则将值512KiB作为此参数传递;
  • 响应的零件数。 万一该流应具有有限的长度。 例如,如果您要以512KiB / sec的速率提供5分钟的流,则将值300作为该参数传递(每分钟60个块,持续5分钟);
  • 好了,传入请求本身进行处理。

request_processor内部,使用有关请求及其处理参数的信息创建一个对象,此后即开始处理:


 void request_processor( asio_ns::io_context & ctx, std::size_t chunk_size, std::size_t count, restinio::request_handle_t req) { auto data = std::make_shared<response_data>( ctx, chunk_size, req->create_response<output_t>(), count); data->response_ .append_header(restinio::http_field::server, "RESTinio") .append_header_date_field() .append_header( restinio::http_field::content_type, "text/plain; charset=utf-8") .flush(); send_next_portion(data); } 

包含与请求相关的所有参数的response_data类型如下所示:


 struct response_data { asio_ns::io_context & io_ctx_; std::size_t chunk_size_; response_t response_; std::size_t counter_; response_data( asio_ns::io_context & io_ctx, std::size_t chunk_size, response_t response, std::size_t counter) : io_ctx_{io_ctx} , chunk_size_{chunk_size} , response_{std::move(response)} , counter_{counter} {} }; 

这里应该注意,出现response_data结构的原因之一是类型为restinio::response_builder_t<restinio::chunked_output_t> (即,该类型隐藏在短别名response_t后面)是可移动类型,但不是可复制类型(通过与std::unique_ptr类似)。 因此,不能仅在lambda函数中捕获该对象,然后将其包装在std::function 。 但是,如果将响应对象放在动态创建的response_data实例中,则已经可以reponse_data地在lambda函数中捕获指向reponse_data实例的智能指针,然后将该lambda保存到std::function


Send_next_portion函数


每当需要将响应的下一部分发送给客户端时, send_next_portion调用send_next_portion函数。 它没有复杂的事情发生,因此看起来非常简洁:


 void send_next_portion(response_data_shptr data) { data->response_.append_chunk(make_buffer(data->chunk_size_)); if(1u == data->counter_) { data->response_.flush(); data->response_.done(); } else { data->counter_ -= 1u; data->response_.flush(make_done_handler(data)); } } 

即 发送下一部分。 并且,如果这部分是最后一部分,那么我们将完成请求的处理。 如果不是后者,则将flush程序发送到flush方法,该方法可能是由本示例中最复杂的函数创建的。


函数make_done_handler


make_done_handler函数负责创建一个lambda,该lambda将作为写后通知程序传递给RESTinio。 该通知者应检查响应的下一部分的记录是否成功完成。 如果是,那么您需要弄清楚是否应该立即发送下一部分(即,套接字中有“刹车”并且无法保持发送速率)或暂停之后。 如果您需要暂停,则可以通过翘起计时器来提供。


通常,操作很简单,但是代码会在lambda内部生成lambda,这会使那些不习惯“现代” C ++的人感到困惑。 可以这么短的几年才算是现代;)


 auto make_done_handler(response_data_shptr data) { const auto next_timepoint = steady_clock::now() + 1s; return [=](const auto & ec) { if(!ec) { const auto now = steady_clock::now(); if(now < next_timepoint) { auto timer = std::make_shared<asio_ns::steady_timer>(data->io_ctx_); timer->expires_after(next_timepoint - now); timer->async_wait([timer, data](const auto & ec) { if(!ec) send_next_portion(data); }); } else data->io_ctx_.post([data] { send_next_portion(data); }); } }; } 

我认为,该代码的主要困难在于Asio中计时器的创建和排的特殊性。 在我看来,事实证明它太冗长了。 但是确实存在。 但是您不需要吸引任何其他库。


连接类似快递的路由器


上面显示的request_processorsend_next_portionmake_done_handler send_next_portion构成了我的测试服务器的第一个版本,实际上是在15或20分钟内编写的。


但是使用该测试服务器几天后,发现其中存在一个严重的缺点:它始终以相同的速度返回响应流。 编译速度为512KiB /秒-提供所有512KiB /秒。 以20KiB /秒的速度重新编译-将为所有人提供20KiB /秒的速度。 什么不方便,因为 必须能够接收不同“厚度”的答案。


然后这个想法浮出水面:如果直接在URL中请求返回速度怎么办? 例如,他们向localhost:8080/发出请求localhost:8080/并以预定速度收到响应。 如果您向localhost:8080/128K发出请求localhost:8080/128K ,则它们开始以128KiB /秒的速度接收响应。


然后想法就更进一步了:在URL中,您还可以指定响应中各个部分的数量。 即 localhost:8080/128K/3000请求localhost:8080/128K/3000将以128KiB / sec的速度生成3000个部分的流。


没问题 RESTinio可以使用在ExpressJS的影响下制造查询路由器 。 结果,有这样一个函数来描述传入的HTTP请求的处理程序:


 auto make_router(asio_ns::io_context & ctx) { auto router = std::make_unique<router_t>(); router->http_get("/", [&ctx](auto req, auto) { request_processor(ctx, 100u*1024u, 10000u, std::move(req)); return restinio::request_accepted(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); if(0u != chunk_size) { request_processor(ctx, chunk_size, 10000u, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?)/:count(\d+))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); const auto count = restinio::cast_to<std::size_t>(params["count"]); if(0u != chunk_size && 0u != count) { request_processor(ctx, chunk_size, count, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); return router; } 

这里,HTTP GET请求处理程序针对三种类型的URL形成:


  • 形式为http://localhost/ ;
  • 格式为http://localhost/<speed>[<U>]/ ;
  • 形式为http://localhost/<speed>[<U>]/<count>/

其中speed是一个定义速度的数字, U是一个可选的乘数,用于指示以哪种单位设置速度。 因此128128b表示每秒128字节的速度。 128k是每秒128 KB。


每个URL都有自己的lambda函数,该函数可以理解接收的参数,如果一切正常,它将调用上面显示的request_processor函数。


辅助函数extract_chunk_size如下:


 std::size_t extract_chunk_size(const restinio::router::route_params_t & params) { const auto multiplier = [](const auto sv) noexcept -> std::size_t { if(sv.empty() || "B" == sv || "b" == sv) return 1u; else if("K" == sv || "k" == sv) return 1024u; else return 1024u*1024u; }; return restinio::cast_to<std::size_t>(params["value"]) * multiplier(params["multiplier"]); } 

在这里,C ++ lambda用于从其他编程语言模拟本地函数。


主要功能


剩下的要看所有这些如何在main函数中运行:


 using router_t = restinio::router::express_router_t<>; ... int main() { struct traits_t : public restinio::default_single_thread_traits_t { using logger_t = restinio::single_threaded_ostream_logger_t; using request_handler_t = router_t; }; asio_ns::io_context io_ctx; restinio::run( io_ctx, restinio::on_this_thread<traits_t>() .port(8080) .address("localhost") .write_http_response_timelimit(60s) .request_handler(make_router(io_ctx))); return 0; } 

这是怎么回事:


  1. 由于我不需要普通的常规请求路由器(根本无法执行任何操作,而将所有工作都交给了程序员),因此,我为HTTP服务器定义了新属性。 为此,我采用了单线程HTTP服务器的标准属性(类型restinio::default_single_thread_traits_t ),并指出将类似express的路由器的实例用作请求处理程序。 同时,为了控制内部发生的事情,我指出HTTP服务器使用了真正的记录器(默认情况下,使用null_logger_t根本不记录任何内容)。
  2. 由于我需要在写入后通知程序中设置计时器,因此需要一个可以使用的io_context实例。 因此,我自己创建它。 这使我有机会在make_router函数中传递指向我的io_context的链接。
  3. 仅保留在我先前创建的io_context上以单线程版本启动HTTP服务器。 restinio::run函数仅在HTTP服务器完成其工作时才返回控制。

结论


本文未显示测试服务器的完整代码,仅显示了其要点。 完整的代码,由于附加的typedef和辅助功能而稍大一些,因此更具真实性。 你可以在这里看到它。 在撰写本文时,这是185行,包括空白行和注释。 嗯,这185行是用几种方法编写的,总持续时间几乎不超过一个小时。


我喜欢这个结果,任务很有趣。 实际上,我所需要的辅助工具很快就得到了。 在RESTinio的进一步发展方面,出现了一些想法。


通常,如果其他人还没有尝试过RESTinio,那么我邀请您尝试。 该项目本身位于GitHub上 。 您可以在Google网上论坛或评论中提出问题或表达您的建议。

Source: https://habr.com/ru/post/zh-CN462349/


All Articles