RESTinio é um servidor HTTP assíncrono. Um exemplo simples da prática: retornando uma grande quantidade de dados em resposta


Recentemente, trabalhei em um aplicativo que deveria controlar a velocidade de suas conexões de saída. Por exemplo, ao conectar-se a um URL, o aplicativo deve se limitar a, digamos, 200KiB / s. E conectando-se a outro URL - apenas 30KiB / s.


O ponto mais interessante aqui foi testar essas mesmas limitações. Eu precisava de um servidor HTTP que desse tráfego a uma determinada velocidade, por exemplo, 512KiB / s. Então eu pude ver se o aplicativo realmente suporta a velocidade de 200KiB / s ou se ele quebra em velocidades mais altas.


Mas onde conseguir um servidor HTTP?


Como tenho algo a ver com o servidor HTTP RESTinio incorporado em aplicativos C ++, não consegui nada melhor do que jogar rapidamente um simples servidor de teste HTTP no meu joelho, que pode enviar um longo fluxo de dados de saída para o cliente.


Sobre como seria simples e gostaria de dizer no artigo. Ao mesmo tempo, descubra nos comentários se isso é realmente simples ou se estou me enganando. Em princípio, este artigo pode ser considerado uma continuação do artigo anterior sobre o RESTinio chamado "RESTinio é um servidor HTTP assíncrono. Assíncrono" . Portanto, se alguém estiver interessado em ler sobre a aplicação real, embora não muito séria do RESTinio, você será bem-vindo ao gato.


Ideia geral


A idéia geral do servidor de teste mencionado acima é muito simples: quando o cliente se conecta ao servidor e executa uma solicitação HTTP GET, é ativado um timer que é executado uma vez por segundo. Quando o timer é acionado, o próximo bloco de dados de um determinado tamanho é enviado ao cliente.


Mas tudo é um pouco mais complicado


Se o cliente ler os dados em uma taxa mais lenta do que o servidor envia, apenas enviar N kilobytes uma vez por segundo não é uma boa ideia. Como os dados começarão a se acumular no soquete e isso não levará a nada de bom.


Portanto, ao enviar dados, é aconselhável controlar a prontidão do soquete para gravar no lado do servidor HTTP. Enquanto o soquete estiver pronto (ou seja, muitos dados ainda não foram acumulados), você poderá enviar uma nova parte. Mas se não estiver pronto, é necessário aguardar até que o soquete entre em um estado de prontidão para a gravação.


Parece razoável, mas as operações de E / S estão ocultas nos grupos do RESTinio ... Como descobrir se a próxima parte dos dados pode ser gravada ou não?


Você pode sair dessa situação se usar notificadores pós-gravação , que estão no RESTinio. Por exemplo, podemos escrever isso:


void request_handler(restinio::request_handle_t req) { req->create_response() //   . ... //   . .done([](const auto & ec) { ... //         . }); } 

O lambda passado para o método done() será chamado quando o RESTinio concluir a gravação dos dados de saída. Portanto, se o soquete não estiver pronto para gravação por algum tempo, o lambda não será chamado imediatamente, mas depois que o soquete chegar ao seu estado adequado e aceitar todos os dados de saída.


Com o uso de notificadores pós-gravação, a lógica do servidor de teste será a seguinte:


  • envie o próximo lote de dados, calcule o tempo em que precisaríamos enviar o próximo lote no curso normal dos eventos;
  • penduramos depois de escrever o notificador na próxima porção de dados;
  • quando a notificação de pós-gravação é chamada, verificamos se o próximo lote chegou. Se isso acontecer, inicie imediatamente o envio da próxima parte. Caso contrário, acione o cronômetro.

Como resultado, acontece que, assim que a gravação começar a desacelerar, o envio de novos dados será pausado. E continue quando o soquete estiver pronto para aceitar novos dados de saída.


E um pouco mais complicado: chunked_output


O RESTinio suporta três maneiras de gerar uma resposta a uma solicitação HTTP . O método mais simples, usado por padrão, não é adequado neste caso, porque Preciso de um fluxo quase interminável de dados enviados. E esse fluxo, é claro, não pode ser set_body para uma única chamada para o método set_body .


Portanto, o servidor de teste descrito usa o chamado chunked_output . I.e. ao criar uma resposta, indico ao RESTinio que a resposta será formada em partes. Então, periodicamente, chamo os métodos append_chunk para adicionar a próxima parte à resposta e flush para escrever as partes acumuladas no soquete.


E vamos olhar para o código!


Talvez seja suficiente que as palavras de abertura sejam suficientes e esteja na hora de seguir para o próprio código, que pode ser encontrado neste repositório . Vamos começar com a função request_processor , chamada para processar cada solicitação HTTP válida. Ao mesmo tempo, vamos nos aprofundar nas funções que são chamadas de request_processor . Bem, veremos como exatamente o request_processor é mapeado para uma ou outra solicitação HTTP recebida.


Função Request_processor e seus auxiliares


A função request_processor é chamada para processar as solicitações HTTP GET que eu preciso. É passado como argumentos:


  • Asio-shny io_context no qual todo o trabalho é realizado (será necessário, por exemplo, para temporizadores de armar);
  • o tamanho de uma parte da resposta. I.e. se eu precisar fornecer um fluxo de saída a uma taxa de 512KiB / s, o valor 512KiB será passado como esse parâmetro;
  • número de peças em resposta. Caso o fluxo tenha um comprimento limitado. Por exemplo, se você deseja fornecer um fluxo a uma taxa de 512 KiB / s por 5 minutos, o valor 300 será passado como esse parâmetro (60 blocos por minuto por 5 minutos);
  • Bem, a própria solicitação de entrada para processamento.

Dentro de request_processor , um objeto é criado com informações sobre a solicitação e seus parâmetros de processamento, após o qual esse mesmo processamento começa:


 void request_processor( asio_ns::io_context & ctx, std::size_t chunk_size, std::size_t count, restinio::request_handle_t req) { auto data = std::make_shared<response_data>( ctx, chunk_size, req->create_response<output_t>(), count); data->response_ .append_header(restinio::http_field::server, "RESTinio") .append_header_date_field() .append_header( restinio::http_field::content_type, "text/plain; charset=utf-8") .flush(); send_next_portion(data); } 

O tipo response_data , contendo todos os parâmetros relacionados à solicitação, tem a seguinte aparência:


 struct response_data { asio_ns::io_context & io_ctx_; std::size_t chunk_size_; response_t response_; std::size_t counter_; response_data( asio_ns::io_context & io_ctx, std::size_t chunk_size, response_t response, std::size_t counter) : io_ctx_{io_ctx} , chunk_size_{chunk_size} , response_{std::move(response)} , counter_{counter} {} }; 

Deve-se observar aqui que uma das razões para a aparência da estrutura response_data é que um objeto do tipo restinio::response_builder_t<restinio::chunked_output_t> (ou seja, esse tipo está oculto por trás do apelido curto response_t ) é um tipo móvel, mas não um tipo copiável (por analogias com std::unique_ptr ). Portanto, esse objeto não pode ser capturado apenas em uma função lambda, que se std::function na std::function . Mas se você colocar o objeto de resposta em uma instância criada dinamicamente de response_data , o ponteiro inteligente para a instância reponse_datareponse_data ser capturado nas funções lambda sem problemas e salve esse lambda na std::function .


Função Send_next_portion


A função send_next_portion chamada sempre que é necessário enviar a próxima parte da resposta ao cliente. Nada de complicado acontece nele, então parece bastante simples e conciso:


 void send_next_portion(response_data_shptr data) { data->response_.append_chunk(make_buffer(data->chunk_size_)); if(1u == data->counter_) { data->response_.flush(); data->response_.done(); } else { data->counter_ -= 1u; data->response_.flush(make_done_handler(data)); } } 

I.e. envie a próxima parte. E, se essa parte foi a última, concluímos o processamento da solicitação. E, se não o último, um flush é enviado ao método flush , que é criado, talvez, pela função mais complexa deste exemplo.


Função make_done_handler


A função make_done_handler responsável por criar um lambda que será passado para o RESTinio como um notificador pós-gravação. Este notificador deve verificar se a gravação da próxima parte da resposta foi concluída com êxito. Se sim, você precisa descobrir se a próxima parte deve ser enviada imediatamente (ou seja, houve "freios" no soquete e a taxa de envio não pode ser mantida) ou após uma pausa. Se você precisar de uma pausa, ela será fornecida através de um temporizador de armar.


Em geral, ações simples, mas no código, você obtém o lambda dentro do lambda, o que pode confundir pessoas que não estão acostumadas ao C ++ "moderno". O que não é tão poucos anos para ser chamado de moderno;)


 auto make_done_handler(response_data_shptr data) { const auto next_timepoint = steady_clock::now() + 1s; return [=](const auto & ec) { if(!ec) { const auto now = steady_clock::now(); if(now < next_timepoint) { auto timer = std::make_shared<asio_ns::steady_timer>(data->io_ctx_); timer->expires_after(next_timepoint - now); timer->async_wait([timer, data](const auto & ec) { if(!ec) send_next_portion(data); }); } else data->io_ctx_.post([data] { send_next_portion(data); }); } }; } 

Na minha opinião, a principal dificuldade desse código decorre das peculiaridades da criação e pelotão de temporizadores no Asio. Na minha opinião, acontece de alguma maneira muito detalhado. Mas realmente existe. Mas você não precisa atrair nenhuma biblioteca adicional.


Conectando um roteador do tipo expresso


O request_processor , send_next_portion e make_done_handler mostrados acima, send_next_portion make_done_handler a primeira versão do meu servidor de teste, escrita literalmente em 15 ou 20 minutos.


Porém, após alguns dias de uso desse servidor de teste, houve uma séria desvantagem: sempre retornava o fluxo de resposta na mesma velocidade. Compilado a uma velocidade de 512 KiB / s - fornece todos os 512 KiB / s. Recompilado a uma velocidade de 20KiB / s - dará a todos 20KiB / s e nada mais. O que foi inconveniente, porque tornou-se necessário poder receber respostas de diferentes "espessuras".


Então surgiu a ideia: e se a velocidade de retorno for solicitada diretamente no URL? Por exemplo, eles fizeram uma solicitação ao localhost:8080/ e receberam uma resposta em uma velocidade predeterminada. E se você fez uma solicitação ao localhost:8080/128K , eles começaram a receber uma resposta a uma velocidade de 128KiB / s.


Depois, o pensamento foi ainda mais longe: na URL, você também pode especificar o número de partes individuais na resposta. I.e. solicitação localhost:8080/128K/3000 produzirá um fluxo de 3000 partes a uma velocidade de 128KiB / s.


Não tem problema O RESTinio tem a capacidade de usar um roteador de consulta feito sob a influência do ExpressJS . Como resultado, havia uma função para descrever manipuladores para solicitações HTTP recebidas:


 auto make_router(asio_ns::io_context & ctx) { auto router = std::make_unique<router_t>(); router->http_get("/", [&ctx](auto req, auto) { request_processor(ctx, 100u*1024u, 10000u, std::move(req)); return restinio::request_accepted(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); if(0u != chunk_size) { request_processor(ctx, chunk_size, 10000u, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); router->http_get( R"(/:value(\d+):multiplier([MmKkBb]?)/:count(\d+))", [&ctx](auto req, auto params) { const auto chunk_size = extract_chunk_size(params); const auto count = restinio::cast_to<std::size_t>(params["count"]); if(0u != chunk_size && 0u != count) { request_processor(ctx, chunk_size, count, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); return router; } 

Aqui, os manipuladores de solicitação HTTP GET são formados para três tipos de URLs:


  • no formato http://localhost/ ;
  • no formato http://localhost/<speed>[<U>]/ ;
  • no formato http://localhost/<speed>[<U>]/<count>/

Onde speed é um número que define a velocidade e U é um multiplicador opcional que indica em quais unidades a velocidade está definida. Então 128 ou 128b significa uma velocidade de 128 bytes por segundo. E 128k são 128 kilobytes por segundo.


Cada URL tem sua própria função lambda, que entende os parâmetros recebidos; se tudo estiver bem, chama a função request_processor mostrada acima.


A função auxiliar extract_chunk_size seguinte:


 std::size_t extract_chunk_size(const restinio::router::route_params_t & params) { const auto multiplier = [](const auto sv) noexcept -> std::size_t { if(sv.empty() || "B" == sv || "b" == sv) return 1u; else if("K" == sv || "k" == sv) return 1024u; else return 1024u*1024u; }; return restinio::cast_to<std::size_t>(params["value"]) * multiplier(params["multiplier"]); } 

Aqui, o C ++ lambda é usado para emular funções locais de outras linguagens de programação.


Função principal


Resta ver como tudo isso é executado na função principal:


 using router_t = restinio::router::express_router_t<>; ... int main() { struct traits_t : public restinio::default_single_thread_traits_t { using logger_t = restinio::single_threaded_ostream_logger_t; using request_handler_t = router_t; }; asio_ns::io_context io_ctx; restinio::run( io_ctx, restinio::on_this_thread<traits_t>() .port(8080) .address("localhost") .write_http_response_timelimit(60s) .request_handler(make_router(io_ctx))); return 0; } 

O que está acontecendo aqui:


  1. Como não preciso de um roteador comum comum de solicitações (que não pode fazer nada e coloca todo o trabalho sobre o ombro do programador), defino novas propriedades para o meu servidor HTTP. Para fazer isso, pego as propriedades padrão de um servidor HTTP de thread único (digite restinio::default_single_thread_traits_t ) e indico que uma instância de roteador expresso será usada como manipulador de solicitações. Ao mesmo tempo, para controlar o que está acontecendo lá dentro, indico que o servidor HTTP usa um logger real (por padrão, null_logger_t usado e não registra nada).
  2. Como eu preciso ativar os temporizadores dentro dos notificadores pós-gravação, preciso de uma instância io_context com a qual eu possa trabalhar. Portanto, eu mesmo o crio. Isso me dá a oportunidade de passar um link para o meu io_context na função make_router .
  3. Resta apenas iniciar o servidor HTTP em uma versão de thread único no io_context que eu criei anteriormente. A função restinio::run retornará o controle somente quando o servidor HTTP concluir seu trabalho.

Conclusão


O artigo não mostrou o código completo do meu servidor de teste, apenas seus pontos principais. O código completo, que é um pouco maior devido a typedefs adicionais e funções auxiliares, é um pouco mais autêntico. Você pode vê-lo aqui . No momento da redação deste artigo, são 185 linhas, incluindo linhas em branco e comentários. Bem, essas 185 linhas são escritas em algumas abordagens com uma duração total de pouco mais de uma hora.


Gostei desse resultado e a tarefa foi interessante. Em termos práticos, a ferramenta auxiliar de que eu precisava foi rapidamente obtida. E em termos do desenvolvimento do RESTinio, alguns pensamentos apareceram.


Em geral, se alguém não tentou o RESTinio, convido você a tentar. O projeto em si vive no GitHub . Você pode fazer uma pergunta ou expressar suas sugestões no grupo do Google ou aqui nos comentários.

Source: https://habr.com/ru/post/pt462349/


All Articles