Troca de dados assíncrona com um aplicativo remoto via SSH

Bom dia, amigos e colegas. Meu nome ainda é Dmitry Smirnov e, para minha alegria, sou o desenvolvedor do ISPsystem. Há algum tempo, comecei a trabalhar em um projeto completamente novo, o que me inspirou muito, pois o novo é, no nosso caso, a falta de código legado e suporte para compiladores antigos. Olá, Boost, C ++ 17 e todas as outras alegrias do desenvolvimento moderno.

Aconteceu que todos os meus projetos anteriores eram multithread, respectivamente, eu tinha muito pouca experiência com soluções assíncronas. Foi isso que me tornou mais agradável nesse desenvolvimento, além das ferramentas poderosas e modernas.

Uma das últimas tarefas relacionadas foi a necessidade de escrever um wrapper na biblioteca libssh2 nas realidades de um aplicativo assíncrono usando o Boost.Asio , e capaz de gerar no máximo dois encadeamentos. Eu vou falar sobre isso.



Nota: o autor assume que o leitor esteja familiarizado com os conceitos básicos de desenvolvimento assíncrono e boost :: asio.

Desafio


Em termos gerais, a tarefa foi a seguinte: conectar-se a um servidor remoto usando uma chave rsa ou nome de usuário e senha; faça o upload de um script para a máquina remota e execute-o; leia suas respostas e envie comandos através da mesma conexão. Nesse caso, é claro, sem bloquear o fluxo (que é metade da piscina total possível).

Disclaimer : Eu sei que Poco trabalha com SSH, mas não encontrei uma maneira de me casar com ele com Asio, e foi mais interessante escrever algo meu :-).

Inicialização


Para inicializar e minimizar a biblioteca, decidi usar o singleton usual:

Init ()
class LibSSH2 { public: static void Init() { static LibSSH2 instance; } private: explicit LibSSH2() { if (libssh2_init(0) != 0) { throw std::runtime_error("libssh2 initialization failed"); } } ~LibSSH2() { std::cout << "shutdown libssh2" << std::endl; libssh2_exit(); } }; 



É claro que existem armadilhas nessa decisão, de acordo com meu manual favorito: “Mil e uma maneiras de fotografar sua perna em C ++”. Se alguém gerar um fluxo que eles esquecem de cutucar, e o principal terminar mais cedo, efeitos especiais interessantes poderão surgir. Mas, neste caso, não levarei em conta essa possibilidade.

Entidades-chave


Após analisar o exemplo , fica claro que, para nossa pequena biblioteca, precisamos de três entidades simples: soquete, sessão e canal. Como é bom ter ferramentas síncronas, deixaremos o Asio de lado por enquanto.

Vamos começar com um soquete simples:

Soquete
 class Socket { public: explicit Socket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)) { if (m_sock == -1) { throw std::runtime_error("failed to create socket"); } } ~Socket() { close(m_sock); } private: int m_sock = -1; } 


Agora sessão:

A sessão
 class Session { public: explicit Session(const bool enable_compression) : m_session(libssh2_session_init()) { if (m_session == nullptr) { throw std::runtime_error("failed to create libssh2 session"); } libssh2_session_set_blocking(m_session, 0); if (enable_compression) { libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1); } } ~Session() { const std::string desc = "Shutting down libssh2 session"; libssh2_session_disconnect(m_session, desc.c_str()); libssh2_session_free(m_session); } private: LIBSSH2_SESSION *m_session; } 


Como agora temos um soquete e uma sessão, seria bom escrever uma função de espera para um soquete nas realidades do libssh2:

Soquete de espera
 int WaitSocket() const { pollfd fds{}; fds.fd = sock; fds.events = 0; if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { fds.events |= POLLIN; } if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) { fds.events |= POLLOUT; } return poll(&fds, 1, 10); } 


Na verdade, isso praticamente não é diferente do exemplo acima, exceto pelo fato de que ele usa select em vez de poll.

O canal permanece. Existem vários tipos de canais no libssh2: simple, SCP, tcp direto. Estamos interessados ​​no canal mais simples e básico:

Canal
 class SimpleChannel { public: explicit SimpleChannel(session) { while ((m_channel = libssh2_channel_open_session(session) == nullptr && GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (m_channel == nullptr) { throw std::runtime_error("Critical error while opening simple channel"); } } void SendEof() { while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } } ~SimpleChannel() { CloseChannel(); } private: void CloseChannel() { int rc; while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } libssh2_channel_free(m_channel); } LIBSSH2_CHANNEL *m_channel; }; 


Agora que todas as ferramentas básicas estão prontas, resta estabelecer uma conexão com o host e executar as manipulações necessárias. A gravação assíncrona no canal e síncrona, é claro, será muito diferente, mas o processo de estabelecer uma conexão não é.

Portanto, escrevemos a classe base:

Conexão básica
 class BaseConnectionImpl { protected: explicit BaseConnectionImpl(const SshConnectData &connect_data) ///<    ,     : m_session(connect_data.enable_compression) , m_connect_data(connect_data) { LibSSH2::Init(); ConnectSocket(); HandShake(); ProcessKnownHosts(); Auth(); } ///       bool CheckSocket(int type) const { pollfd fds{}; fds.fd = m_sock; fds.events = type; return poll(&fds, 1, 0) == 1; } bool WantRead() const { return CheckSocket(POLLIN); } bool WantWrite() const { return CheckSocket(POLLOUT); } /*   ,   ,       *  - . */ void ConnectSocket() {...} void HandShake() {...} void Auth() {...} class Socket m_sock; class Session m_session; class SimpleChannel; SshConnectData m_connect_data; }; 


Agora estamos prontos para escrever a classe mais simples para conectar-se ao host remoto e executar qualquer comando nele:

Conexão síncrona
 class Connection::Impl : public BaseConnectionImpl { public: explicit Impl(const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) {} template <typename Begin> void WriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size) { do { int rc; while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (rc < 0) { break; } size -= rc; ptr += rc; } while (size != 0); } void ExecuteCommand(const std::string &command, const std::string &in = "") { SimpleChannel channel(*this); int return_code = libssh2_channel_exec(channel, command.c_str()); if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("Critical error while executing ssh command"); } if (!in.empty()) { WriteToChannel(channel, in.c_str(), in.size()); channel.SendEof(); } std::string response; for (;;) { int rc; do { std::array<char, 4096> buffer{}; rc = libssh2_channel_read(channel, buffer.data(), buffer.size()); if (rc > 0) { boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response)); } else if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")"); } } while (rc > 0); if (rc == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } else { break; } } } }; 


Até agora, tudo o que escrevemos foi uma simples redução dos exemplos da libssh2 para uma forma mais civilizada. Mas agora, com todas as ferramentas simples para gravar dados no canal de forma síncrona, podemos seguir para o Asio.

Ter um soquete padrão é bom, mas não é muito prático se você precisar esperar assincronamente para ler / gravar enquanto faz seus próprios negócios no processo. Aqui o boost :: asio :: ip :: tcp :: socket vem em socorro, com um método maravilhoso:

 async_wait(wait_type, WaitHandler) 

É maravilhosamente construído a partir de um soquete regular, para o qual já configuramos a conexão com antecedência e boost :: asio :: io_context - o contexto de execução do nosso aplicativo.

Construtor de conexão assíncrona
 class AsyncConnection::Impl : public BaseConnectionImpl, public std::enable_shared_from_this<AsyncConnection::Impl> { public: Impl(boost::asio::io_context &context, const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) { m_tcp_socket.non_blocking(true); } }; 



Agora precisamos iniciar a execução de algum comando no host remoto e, assim que os dados chegarem, enviá-los para algum retorno de chamada.

 void AsyncRun(const std::string &command, CallbackType &&callback) { m_read_callback = std::move(callback); auto ec = libssh2_channel_exec(*m_channel, command.c_str()); TryRead(); } 

Assim, executando o comando, transferimos o controle para o método TryRead ().

 void TryRead() { if (m_read_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) { if (WantRead()) { ReadHandler(ec); } if (m_complete) { return; } TryRead(); }); } 

Antes de tudo, verificamos se o processo de leitura já está sendo executado por alguma chamada anterior. Caso contrário, começamos a esperar a disponibilidade do soquete para leitura. Um lambda regular com a captura de shared_from_this () é usado como um manipulador de espera.

Preste atenção na chamada para WantRead (). O Async_wait, como se viu, também tem suas falhas e pode simplesmente retornar por tempo limite. Para evitar ações desnecessárias nesse caso, decidi verificar o soquete através da pesquisa sem tempo limite - o soquete realmente deseja ler agora. Caso contrário, basta executar o TryRead () novamente e aguardar. Caso contrário, começamos imediatamente a ler e transferir dados para o retorno de chamada.

 void ReadHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_read_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; std::array<char, 4096> buffer {}; while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) { std::string tmp; boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp)); if (m_read_callback != nullptr) { m_read_callback(tmp); } } m_read_in_progress = false; } 

Assim, um ciclo de leitura assíncrono interminável é iniciado a partir do aplicativo em execução. O próximo passo será enviar instruções para o aplicativo:

 void AsyncWrite(const std::string &data, WriteCallbackType &&callback) { m_input += data; m_write_callback = std::move(callback); TryWrite(); } 

Os dados e o retorno de chamada transferidos para a gravação assíncrona serão salvos dentro da conexão. E execute o próximo ciclo, somente desta vez as entradas:

Ciclo de gravação
 void TryWrite() { if (m_input.empty() || m_write_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) { if (WantWrite()) { WriteHandler(ec); } if (m_complete) { return; } TryWrite(); }); } void WriteHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_write_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; while (!m_input.empty()) { auto ptr = m_input.c_str(); auto read_size = m_input.size(); while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) { read_size -= ec; ptr += ec; } AssertResult(ec); m_input.erase(0, m_input.size() - read_size); if (ec == LIBSSH2_ERROR_EAGAIN) { break; } } if (m_input.empty() && m_write_callback != nullptr) { m_write_callback(); } m_write_in_progress = false; } 


Assim, gravaremos dados no canal até que todos sejam transferidos com sucesso. Em seguida, retornaremos o controle ao chamador para que um novo dado possa ser transferido. Dessa forma, você pode não apenas enviar instruções para algum aplicativo no host, mas também, por exemplo, enviar arquivos de qualquer tamanho em pequenas porções, sem bloquear o encadeamento, o que é importante.

Usando esta biblioteca, consegui executar com êxito um script em um servidor remoto que rastreia as alterações do sistema de arquivos, ao mesmo tempo que lê sua saída e envia vários comandos. Em geral: uma experiência muito valiosa na adaptação da biblioteca de estilo si para um projeto C ++ moderno usando o Boost.

Ficarei feliz em ler as dicas de usuários mais experientes do Boost.Asio para aprender mais e melhorar minha solução :-).

Source: https://habr.com/ru/post/pt430488/


All Articles