Guten Tag, Freunde und Kollegen. Mein Name ist immer noch Dmitry Smirnov und ich bin zu meiner großen Freude immer noch der Entwickler von ISPsystem. Vor einiger Zeit habe ich angefangen, an einem völlig neuen Projekt zu arbeiten, was mich sehr inspiriert hat, da das neue in unserem Fall der Mangel an Legacy-Code und die Unterstützung für alte Compiler ist. Hallo, Boost, C ++ 17 und all die anderen Freuden der modernen Entwicklung.
So kam es, dass alle meine früheren Projekte Multithread-Projekte waren und ich nur sehr wenig Erfahrung mit asynchronen Lösungen hatte. Dies wurde für mich in dieser Entwicklung neben modernen leistungsstarken Werkzeugen am angenehmsten.
Eine der letzten verwandten Aufgaben war die Notwendigkeit, einen Wrapper über die
libssh2- Bibliothek in der Realität einer asynchronen Anwendung mit
Boost.Asio zu schreiben , der nicht mehr als zwei Threads
erzeugen kann. Ich werde dir davon erzählen.

Hinweis: Der Autor geht davon aus, dass der Leser mit den Grundlagen der asynchronen Entwicklung und boost :: asio vertraut ist.
Herausforderung
Im Allgemeinen lautete die Aufgabe wie folgt: Stellen Sie mit einem RSA-Schlüssel oder einem Benutzernamen und einem Kennwort eine Verbindung zu einem Remote-Server her. Laden Sie ein Skript auf den Remotecomputer hoch und führen Sie es aus. Lesen Sie seine Antworten und senden Sie ihm Befehle über dieselbe Verbindung. In diesem Fall natürlich ohne Blockierung des Flusses (was die Hälfte des gesamten möglichen Pools ist).
Haftungsausschluss : Ich weiß, dass Poco mit SSH zusammenarbeitet, aber ich habe keinen Weg gefunden, ihn mit Asio zu heiraten, und es war interessanter, etwas Eigenes zu schreiben :-).
Initialisierung
Um die Bibliothek zu initialisieren und zu minimieren, habe ich mich für den üblichen Singleton entschieden:
Init ()class LibSSH2 { public: static void Init() { static LibSSH2 instance; } private: explicit LibSSH2() { if (libssh2_init(0) != 0) { throw std::runtime_error("libssh2 initialization failed"); } } ~LibSSH2() { std::cout << "shutdown libssh2" << std::endl; libssh2_exit(); } };
Laut meinem Lieblingshandbuch „Tausendundeiner Weg, um in C ++ auf das Bein zu schießen“ gibt es bei dieser Entscheidung natürlich Fallstricke. Wenn jemand einen Stream generiert, den er vergessen hat zu stecken, und der Hauptstrom früher endet, können durchaus interessante Spezialeffekte auftreten. In diesem Fall werde ich diese Möglichkeit jedoch nicht berücksichtigen.
Schlüsselentitäten
Nach der Analyse des
Beispiels wird klar, dass wir für unsere kleine Bibliothek drei einfache Entitäten benötigen: Socket, Sitzung und Kanal. Da es schön ist, synchrone Werkzeuge zu haben, werden wir Asio vorerst beiseite lassen.
Beginnen wir mit einem einfachen Socket:
Steckdose class Socket { public: explicit Socket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)) { if (m_sock == -1) { throw std::runtime_error("failed to create socket"); } } ~Socket() { close(m_sock); } private: int m_sock = -1; }
Jetzt Sitzung:
Die Sitzung class Session { public: explicit Session(const bool enable_compression) : m_session(libssh2_session_init()) { if (m_session == nullptr) { throw std::runtime_error("failed to create libssh2 session"); } libssh2_session_set_blocking(m_session, 0); if (enable_compression) { libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1); } } ~Session() { const std::string desc = "Shutting down libssh2 session"; libssh2_session_disconnect(m_session, desc.c_str()); libssh2_session_free(m_session); } private: LIBSSH2_SESSION *m_session; }
Da wir jetzt einen Socket und eine Sitzung haben, wäre es schön, eine Wartefunktion für einen Socket in der Realität von libssh2 zu schreiben:
Wartebuchse int WaitSocket() const { pollfd fds{}; fds.fd = sock; fds.events = 0; if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { fds.events |= POLLIN; } if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) { fds.events |= POLLOUT; } return poll(&fds, 1, 10); }
Tatsächlich unterscheidet sich dies praktisch nicht vom obigen Beispiel, außer dass es select anstelle von poll verwendet.
Der Kanal bleibt. In libssh2 gibt es verschiedene Arten von Kanälen: simple, SCP, direct tcp. Wir interessieren uns für den einfachsten, einfachsten Kanal:
Kanal class SimpleChannel { public: explicit SimpleChannel(session) { while ((m_channel = libssh2_channel_open_session(session) == nullptr && GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (m_channel == nullptr) { throw std::runtime_error("Critical error while opening simple channel"); } } void SendEof() { while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } } ~SimpleChannel() { CloseChannel(); } private: void CloseChannel() { int rc; while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } libssh2_channel_free(m_channel); } LIBSSH2_CHANNEL *m_channel; };
Nachdem alle grundlegenden Tools bereit sind, müssen Sie noch eine Verbindung zum Host herstellen und die erforderlichen Manipulationen durchführen. Die asynchrone Aufzeichnung auf dem Kanal und die synchrone Aufnahme sind natürlich sehr unterschiedlich, der Prozess des Verbindungsaufbaus jedoch nicht.
Deshalb schreiben wir die Basisklasse:
Grundverbindung class BaseConnectionImpl { protected: explicit BaseConnectionImpl(const SshConnectData &connect_data)
Jetzt können wir die einfachste Klasse schreiben, um eine Verbindung zum Remote-Host herzustellen und einen beliebigen Befehl darauf auszuführen:
Synchrone Verbindung class Connection::Impl : public BaseConnectionImpl { public: explicit Impl(const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) {} template <typename Begin> void WriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size) { do { int rc; while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (rc < 0) { break; } size -= rc; ptr += rc; } while (size != 0); } void ExecuteCommand(const std::string &command, const std::string &in = "") { SimpleChannel channel(*this); int return_code = libssh2_channel_exec(channel, command.c_str()); if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("Critical error while executing ssh command"); } if (!in.empty()) { WriteToChannel(channel, in.c_str(), in.size()); channel.SendEof(); } std::string response; for (;;) { int rc; do { std::array<char, 4096> buffer{}; rc = libssh2_channel_read(channel, buffer.data(), buffer.size()); if (rc > 0) { boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response)); } else if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")"); } } while (rc > 0); if (rc == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } else { break; } } } };
Bisher war alles, was wir geschrieben haben, eine einfache Reduktion von libssh2-Beispielen auf eine zivilisiertere Form. Mit all den einfachen Tools zum synchronen Schreiben von Daten in den Kanal können wir nun zu Asio übergehen.
Ein Standard-Socket ist gut, aber nicht zu praktisch, wenn Sie asynchron warten müssen, bis er gelesen / geschrieben wird, während Sie Ihr eigenes Geschäft betreiben. Hier kommt boost :: asio :: ip :: tcp :: socket zur Rettung und hat eine wunderbare Methode:
async_wait(wait_type, WaitHandler)
Es ist wunderbar aus einem regulären Socket aufgebaut, für den wir die Verbindung bereits im Voraus eingerichtet haben und boost :: asio :: io_context - den Ausführungskontext unserer Anwendung.
Asynchroner Verbindungskonstruktor class AsyncConnection::Impl : public BaseConnectionImpl, public std::enable_shared_from_this<AsyncConnection::Impl> { public: Impl(boost::asio::io_context &context, const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) { m_tcp_socket.non_blocking(true); } };
Jetzt müssen wir die Ausführung eines Befehls auf dem Remote-Host starten und ihn, sobald Daten von ihm eintreffen, an einen Rückruf senden.
void AsyncRun(const std::string &command, CallbackType &&callback) { m_read_callback = std::move(callback); auto ec = libssh2_channel_exec(*m_channel, command.c_str()); TryRead(); }
Durch Ausführen des Befehls übertragen wir die Steuerung an die TryRead () -Methode.
void TryRead() { if (m_read_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) { if (WantRead()) { ReadHandler(ec); } if (m_complete) { return; } TryRead(); }); }
Zunächst prüfen wir, ob der Lesevorgang bereits durch einen vorherigen Aufruf ausgeführt wurde. Wenn nicht, erwarten wir die Lesebereitschaft der Steckdose. Ein normales Lambda mit der Erfassung von shared_from_this () wird als Wartehandler verwendet.
Achten Sie auf den Aufruf von WantRead (). Wie sich herausstellte, hat Async_wait auch seine Fehler und kann einfach per Timeout zurückkehren. Um unnötige Aktionen in diesem Fall zu vermeiden, habe ich beschlossen, den Socket durch Abfrage ohne Zeitüberschreitung zu überprüfen - möchte der Socket jetzt wirklich lesen? Wenn nicht, führen wir TryRead () erneut aus und warten. Andernfalls beginnen wir sofort mit dem Lesen und Übertragen von Daten zum Rückruf.
void ReadHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_read_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; std::array<char, 4096> buffer {}; while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) { std::string tmp; boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp)); if (m_read_callback != nullptr) { m_read_callback(tmp); } } m_read_in_progress = false; }
Somit wird ein endloser asynchroner Lesezyklus von der laufenden Anwendung gestartet. Der nächste Schritt für uns ist das Senden von Anweisungen an die Anwendung:
void AsyncWrite(const std::string &data, WriteCallbackType &&callback) { m_input += data; m_write_callback = std::move(callback); TryWrite(); }
Die an die asynchrone Aufzeichnung übertragenen Daten und Rückrufe werden in der Verbindung gespeichert. Und führen Sie den nächsten Zyklus aus, nur diesmal die Einträge:
Aufnahmezyklus void TryWrite() { if (m_input.empty() || m_write_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) { if (WantWrite()) { WriteHandler(ec); } if (m_complete) { return; } TryWrite(); }); } void WriteHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_write_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; while (!m_input.empty()) { auto ptr = m_input.c_str(); auto read_size = m_input.size(); while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) { read_size -= ec; ptr += ec; } AssertResult(ec); m_input.erase(0, m_input.size() - read_size); if (ec == LIBSSH2_ERROR_EAGAIN) { break; } } if (m_input.empty() && m_write_callback != nullptr) { m_write_callback(); } m_write_in_progress = false; }
Daher schreiben wir Daten in den Kanal, bis sie alle erfolgreich übertragen wurden. Dann geben wir die Kontrolle an den Anrufer zurück, damit ein neues Datenelement übertragen werden kann. Auf diese Weise können Sie nicht nur Anweisungen an eine Anwendung auf dem Host senden, sondern beispielsweise auch Dateien beliebiger Größe in kleinen Portionen hochladen, ohne den Thread zu blockieren, was wichtig ist.
Mit dieser Bibliothek konnte ich erfolgreich ein Skript auf einem Remote-Server ausführen, das Dateisystemänderungen verfolgt, gleichzeitig die Ausgabe liest und verschiedene Befehle sendet. Im Allgemeinen: Eine sehr wertvolle Erfahrung bei der Anpassung der si-style-Bibliothek für ein modernes C ++ - Projekt mit Boost.
Ich werde gerne die Tipps erfahrener Boost.Asio-Benutzer lesen, um mehr zu erfahren und meine Lösung zu verbessern :-).