Échange de données asynchrone avec une application distante via SSH

Bonjour mes amis et collègues. Je m'appelle toujours Dmitry Smirnov et je suis toujours, à mon grand plaisir, le développeur d'ISPsystem. Il y a quelque temps, j'ai commencé à travailler sur un tout nouveau projet, ce qui m'a beaucoup inspiré, car le nouveau est dans notre cas le manque de code hérité et de support pour les anciens compilateurs. Bonjour, Boost, C ++ 17 et toutes les autres joies du développement moderne.

Il se trouve que tous mes projets antérieurs étaient multi-threads, respectivement, j'avais très peu d'expérience avec les solutions asynchrones. C'est ce qui est devenu le plus agréable pour moi dans ce développement, en plus des outils puissants modernes.

L'une des dernières tâches connexes était la nécessité d'écrire un wrapper sur la bibliothèque libssh2 dans les réalités d'une application asynchrone utilisant Boost.Asio , et capable de générer pas plus de deux threads. Je vais en parler.



Remarque: l'auteur suppose que le lecteur est familiarisé avec les bases du développement asynchrone et boost :: asio.

DĂ©fi


En termes généraux, la tâche était la suivante: se connecter à un serveur distant à l'aide d'une clé rsa ou d'un nom d'utilisateur et d'un mot de passe; télécharger un script sur la machine distante et l'exécuter; lire ses réponses et lui envoyer des commandes via la même connexion. Dans ce cas, bien sûr, sans bloquer le débit (ce qui représente la moitié de la piscine totale possible).

Disclaimer : Je sais que Poco travaille avec SSH, mais je n'ai pas trouvé de moyen de l'épouser avec Asio, et c'était plus intéressant d'écrire moi-même quelque chose :-).

Initialisation


Pour initialiser et minimiser la bibliothèque, j'ai décidé d'utiliser le singleton habituel:

Init ()
class LibSSH2 { public: static void Init() { static LibSSH2 instance; } private: explicit LibSSH2() { if (libssh2_init(0) != 0) { throw std::runtime_error("libssh2 initialization failed"); } } ~LibSSH2() { std::cout << "shutdown libssh2" << std::endl; libssh2_exit(); } }; 



Il y a, bien sûr, des pièges dans cette décision, selon mon manuel préféré, «Mille et une façons de tirer votre jambe en C ++». Si quelqu'un génère un flux qu'il oublie de piquer et que le principal se termine plus tôt, des effets spéciaux intéressants peuvent bien survenir. Mais dans ce cas, je ne tiendrai pas compte de cette possibilité.

Entités clés


Après avoir analysé l' exemple , il devient clair que pour notre petite bibliothèque, nous avons besoin de trois entités simples: socket, session et channel. Puisqu'il est agréable d'avoir des outils synchrones, nous allons laisser Asio de côté pour l'instant.

Commençons par une simple socket:

Prise
 class Socket { public: explicit Socket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)) { if (m_sock == -1) { throw std::runtime_error("failed to create socket"); } } ~Socket() { close(m_sock); } private: int m_sock = -1; } 


Maintenant session:

La session
 class Session { public: explicit Session(const bool enable_compression) : m_session(libssh2_session_init()) { if (m_session == nullptr) { throw std::runtime_error("failed to create libssh2 session"); } libssh2_session_set_blocking(m_session, 0); if (enable_compression) { libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1); } } ~Session() { const std::string desc = "Shutting down libssh2 session"; libssh2_session_disconnect(m_session, desc.c_str()); libssh2_session_free(m_session); } private: LIBSSH2_SESSION *m_session; } 


Puisque nous avons maintenant un socket et une session, il serait bien d'écrire une fonction d'attente pour un socket dans les réalités de libssh2:

Prise d'attente
 int WaitSocket() const { pollfd fds{}; fds.fd = sock; fds.events = 0; if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { fds.events |= POLLIN; } if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) { fds.events |= POLLOUT; } return poll(&fds, 1, 10); } 


En fait, cela n'est pratiquement pas différent de l'exemple ci-dessus, sauf qu'il utilise select au lieu de poll.

Le canal reste. Il existe plusieurs types de canaux dans libssh2: simple, SCP, tcp direct. Nous sommes intéressés par le canal de base le plus simple:

Chaîne
 class SimpleChannel { public: explicit SimpleChannel(session) { while ((m_channel = libssh2_channel_open_session(session) == nullptr && GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (m_channel == nullptr) { throw std::runtime_error("Critical error while opening simple channel"); } } void SendEof() { while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } } ~SimpleChannel() { CloseChannel(); } private: void CloseChannel() { int rc; while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } libssh2_channel_free(m_channel); } LIBSSH2_CHANNEL *m_channel; }; 


Maintenant que tous les outils de base sont prêts, il reste à établir une connexion avec l'hôte et à effectuer les manipulations nécessaires. L'enregistrement asynchrone sur le canal et synchrone, bien sûr, sera très différent, mais le processus d'établissement d'une connexion ne l'est pas.

Par conséquent, nous écrivons la classe de base:

Connexion basique
 class BaseConnectionImpl { protected: explicit BaseConnectionImpl(const SshConnectData &connect_data) ///<    ,     : m_session(connect_data.enable_compression) , m_connect_data(connect_data) { LibSSH2::Init(); ConnectSocket(); HandShake(); ProcessKnownHosts(); Auth(); } ///       bool CheckSocket(int type) const { pollfd fds{}; fds.fd = m_sock; fds.events = type; return poll(&fds, 1, 0) == 1; } bool WantRead() const { return CheckSocket(POLLIN); } bool WantWrite() const { return CheckSocket(POLLOUT); } /*   ,   ,       *  - . */ void ConnectSocket() {...} void HandShake() {...} void Auth() {...} class Socket m_sock; class Session m_session; class SimpleChannel; SshConnectData m_connect_data; }; 


Nous sommes maintenant prêts à écrire la classe la plus simple pour se connecter à l'hôte distant et exécuter n'importe quelle commande dessus:

Connexion synchrone
 class Connection::Impl : public BaseConnectionImpl { public: explicit Impl(const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) {} template <typename Begin> void WriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size) { do { int rc; while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (rc < 0) { break; } size -= rc; ptr += rc; } while (size != 0); } void ExecuteCommand(const std::string &command, const std::string &in = "") { SimpleChannel channel(*this); int return_code = libssh2_channel_exec(channel, command.c_str()); if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("Critical error while executing ssh command"); } if (!in.empty()) { WriteToChannel(channel, in.c_str(), in.size()); channel.SendEof(); } std::string response; for (;;) { int rc; do { std::array<char, 4096> buffer{}; rc = libssh2_channel_read(channel, buffer.data(), buffer.size()); if (rc > 0) { boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response)); } else if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")"); } } while (rc > 0); if (rc == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } else { break; } } } }; 


Jusqu'à présent, tout ce que nous avons écrit a été une simple réduction d'exemples libssh2 à une forme plus civilisée. Mais maintenant, avec tous les outils simples pour écrire des données sur le canal de manière synchrone, nous pouvons passer à Asio.

Avoir un socket standard est bon, mais pas trop pratique si vous devez attendre de manière asynchrone qu'il lise / écrive tout en faisant votre propre entreprise dans le processus. Voici boost :: asio :: ip :: tcp :: socket vient à la rescousse, avec une merveilleuse méthode:

 async_wait(wait_type, WaitHandler) 

Il est merveilleusement construit à partir d'une socket régulière, pour laquelle nous avons déjà configuré la connexion à l'avance et boost :: asio :: io_context - le contexte d'exécution de notre application.

Constructeur de connexion asynchrone
 class AsyncConnection::Impl : public BaseConnectionImpl, public std::enable_shared_from_this<AsyncConnection::Impl> { public: Impl(boost::asio::io_context &context, const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) { m_tcp_socket.non_blocking(true); } }; 



Maintenant, nous devons commencer l'exécution d'une commande sur l'hôte distant et, dès que les données en provenance arrivent, l'envoyer à un rappel.

 void AsyncRun(const std::string &command, CallbackType &&callback) { m_read_callback = std::move(callback); auto ec = libssh2_channel_exec(*m_channel, command.c_str()); TryRead(); } 

Ainsi, en exécutant la commande, nous transférons le contrôle à la méthode TryRead ().

 void TryRead() { if (m_read_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) { if (WantRead()) { ReadHandler(ec); } if (m_complete) { return; } TryRead(); }); } 

Tout d'abord, nous vérifions si le processus de lecture est déjà en cours d'exécution par un appel précédent. Sinon, nous commençons à nous attendre à ce que le socket soit prêt pour la lecture. Un lambda régulier avec la capture de shared_from_this () est utilisé comme gestionnaire d'attente.

Faites attention à l'appel à WantRead (). Async_wait, comme il s'est avéré, a également ses défauts, et peut simplement revenir par timeout. Afin d'éviter des actions inutiles dans ce cas, j'ai décidé de vérifier le socket via le sondage sans timeout - le socket veut-il vraiment lire maintenant. Sinon, nous exécutons à nouveau TryRead () et attendons. Sinon, nous commençons immédiatement la lecture et le transfert des données vers le rappel.

 void ReadHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_read_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; std::array<char, 4096> buffer {}; while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) { std::string tmp; boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp)); if (m_read_callback != nullptr) { m_read_callback(tmp); } } m_read_in_progress = false; } 

Ainsi, un cycle de lecture asynchrone sans fin est démarré à partir de l'application en cours d'exécution. La prochaine étape pour nous sera d'envoyer des instructions à l'application:

 void AsyncWrite(const std::string &data, WriteCallbackType &&callback) { m_input += data; m_write_callback = std::move(callback); TryWrite(); } 

Les données et le rappel transférés vers l'enregistrement asynchrone seront sauvegardés dans la connexion. Et exécutez le cycle suivant, mais cette fois, les entrées:

Cycle d'enregistrement
 void TryWrite() { if (m_input.empty() || m_write_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) { if (WantWrite()) { WriteHandler(ec); } if (m_complete) { return; } TryWrite(); }); } void WriteHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_write_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; while (!m_input.empty()) { auto ptr = m_input.c_str(); auto read_size = m_input.size(); while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) { read_size -= ec; ptr += ec; } AssertResult(ec); m_input.erase(0, m_input.size() - read_size); if (ec == LIBSSH2_ERROR_EAGAIN) { break; } } if (m_input.empty() && m_write_callback != nullptr) { m_write_callback(); } m_write_in_progress = false; } 


Ainsi, nous écrirons des données sur le canal jusqu'à ce qu'elles soient toutes transférées avec succès. Ensuite, nous rendrons le contrôle à l'appelant afin qu'une nouvelle donnée puisse être transférée. De cette façon, vous pouvez non seulement envoyer des instructions à une application sur l'hôte, mais également, par exemple, télécharger des fichiers de toute taille en petites portions, sans bloquer le fil, ce qui est important.

En utilisant cette bibliothèque, j'ai pu exécuter avec succès un script sur un serveur distant qui suit les modifications du système de fichiers, tout en lisant sa sortie et en envoyant diverses commandes. En général: une expérience très précieuse dans l'adaptation de la bibliothèque de style si pour un projet C ++ moderne utilisant Boost.

Je serai heureux de lire les conseils des utilisateurs plus expérimentés de Boost.Asio pour en savoir plus et améliorer ma solution :-).

Source: https://habr.com/ru/post/fr430488/


All Articles