通过SSH与远程应用程序进行异步数据交换

朋友和同事,美好的一天。 我的名字仍然是Dmitry Smirnov,而我仍然非常高兴的是ISPsystem的开发人员。 前一段时间,我开始从事一个全新的项目,这给了我很大的启发,因为在我们的案例中,新项目缺乏遗留代码和对旧编译器的支持。 您好,Boost,C ++ 17和现代开发的所有其他乐趣。

碰巧我过去的所有项目都是多线程的,我对异步解决方案的经验很少。 除了现代强大的工具之外,这对于本次开发对我来说也是最愉快的。

最后相关的任务之一是需要使用Boost.Asio在异步应用程序的现实中为libssh2库编写一个包装,并且能够产生不超过两个线程。 我会告诉你的。



注意:作者假定读者熟悉异步开发和boost :: asio的基础知识。

挑战赛


一般而言,任务如下:使用rsa密钥或用户名和密码连接到远程服务器; 将脚本上传到远程计算机并运行它; 阅读他的答案并通过相同的连接向他发送命令。 当然,在这种情况下,不会阻塞流(可能是总池的一半)。

免责声明 :我知道Poco可与SSH一起使用,但是我找不到与Asio结婚的方法,写我自己的东西:-)更有趣。

初始化


为了初始化和最小化库,我决定使用通常的单例:

初始化()
class LibSSH2 { public: static void Init() { static LibSSH2 instance; } private: explicit LibSSH2() { if (libssh2_init(0) != 0) { throw std::runtime_error("libssh2 initialization failed"); } } ~LibSSH2() { std::cout << "shutdown libssh2" << std::endl; libssh2_exit(); } }; 



根据我最喜欢的手册“用C ++射击腿的千种方法”,这个决定当然会有陷阱。 如果某人产生了他们忘记戳的流,并且主要流较早结束,则很有可能会产生有趣的特殊效果。 但是在这种情况下,我不会考虑这种可能性。

关键实体


在分析了示例之后 ,很明显,对于我们的小型库,我们需要三个简单的实体:套接字,会话和通道。 拥有同步工具非常好,我们暂时将Asio放在一旁。

让我们从一个简单的套接字开始:

插座
 class Socket { public: explicit Socket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)) { if (m_sock == -1) { throw std::runtime_error("failed to create socket"); } } ~Socket() { close(m_sock); } private: int m_sock = -1; } 


现在会话:

会议
 class Session { public: explicit Session(const bool enable_compression) : m_session(libssh2_session_init()) { if (m_session == nullptr) { throw std::runtime_error("failed to create libssh2 session"); } libssh2_session_set_blocking(m_session, 0); if (enable_compression) { libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1); } } ~Session() { const std::string desc = "Shutting down libssh2 session"; libssh2_session_disconnect(m_session, desc.c_str()); libssh2_session_free(m_session); } private: LIBSSH2_SESSION *m_session; } 


由于我们现在有了一个套接字和一个会话,因此在libssh2的现实中为套接字编写一个等待函数会很好:

等待插座
 int WaitSocket() const { pollfd fds{}; fds.fd = sock; fds.events = 0; if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { fds.events |= POLLIN; } if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) { fds.events |= POLLOUT; } return poll(&fds, 1, 10); } 


实际上,这实际上与上面的示例没有什么不同,只是它使用select而不是poll。

通道仍然存在。 libssh2中有几种类型的通道:简单,SCP,直接tcp。 我们对最简单的基本渠道感兴趣:

频道
 class SimpleChannel { public: explicit SimpleChannel(session) { while ((m_channel = libssh2_channel_open_session(session) == nullptr && GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (m_channel == nullptr) { throw std::runtime_error("Critical error while opening simple channel"); } } void SendEof() { while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } } ~SimpleChannel() { CloseChannel(); } private: void CloseChannel() { int rc; while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } libssh2_channel_free(m_channel); } LIBSSH2_CHANNEL *m_channel; }; 


现在已经准备好所有基本工具,现在仍然可以与主机建立连接并执行必要的操作。 向通道的异步记录和同步记录当然会有很大的不同,但是建立连接的过程却没有。

因此,我们编写基类:

基本连接
 class BaseConnectionImpl { protected: explicit BaseConnectionImpl(const SshConnectData &connect_data) ///<    ,     : m_session(connect_data.enable_compression) , m_connect_data(connect_data) { LibSSH2::Init(); ConnectSocket(); HandShake(); ProcessKnownHosts(); Auth(); } ///       bool CheckSocket(int type) const { pollfd fds{}; fds.fd = m_sock; fds.events = type; return poll(&fds, 1, 0) == 1; } bool WantRead() const { return CheckSocket(POLLIN); } bool WantWrite() const { return CheckSocket(POLLOUT); } /*   ,   ,       *  - . */ void ConnectSocket() {...} void HandShake() {...} void Auth() {...} class Socket m_sock; class Session m_session; class SimpleChannel; SshConnectData m_connect_data; }; 


现在,我们准备编写最简单的类以连接到远程主机并在其上执行任何命令:

同步连接
 class Connection::Impl : public BaseConnectionImpl { public: explicit Impl(const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) {} template <typename Begin> void WriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size) { do { int rc; while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } if (rc < 0) { break; } size -= rc; ptr += rc; } while (size != 0); } void ExecuteCommand(const std::string &command, const std::string &in = "") { SimpleChannel channel(*this); int return_code = libssh2_channel_exec(channel, command.c_str()); if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("Critical error while executing ssh command"); } if (!in.empty()) { WriteToChannel(channel, in.c_str(), in.size()); channel.SendEof(); } std::string response; for (;;) { int rc; do { std::array<char, 4096> buffer{}; rc = libssh2_channel_read(channel, buffer.data(), buffer.size()); if (rc > 0) { boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response)); } else if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) { throw std::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")"); } } while (rc > 0); if (rc == LIBSSH2_ERROR_EAGAIN) { WaitSocket(); } else { break; } } } }; 


到目前为止,我们编写的所有内容都只是将libssh2示例简化为更文明的形式。 但是现在,有了所有用于将数据同步写入通道的简单工具,我们就可以继续使用Asio。

拥有一个标准套接字是很好的,但是如果您需要在处理自己的业务时异步等待它读/写,那么它就不太实用。 这里boost :: asio :: ip:tcp :: tcp :: socket来了,有一个很棒的方法:

 async_wait(wait_type, WaitHandler) 

它是从常规套接字中奇妙地构造而成的,我们已经为其预先建立了连接,并提升了:: asio :: io_context-应用程序的执行上下文。

异步连接构造函数
 class AsyncConnection::Impl : public BaseConnectionImpl, public std::enable_shared_from_this<AsyncConnection::Impl> { public: Impl(boost::asio::io_context &context, const SshConnectData &connect_data) : BaseConnectionImpl(connect_data) , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) { m_tcp_socket.non_blocking(true); } }; 



现在,我们需要在远程主机上开始执行某些命令,并且一旦其中的数据到达,就将其发送到某个回调。

 void AsyncRun(const std::string &command, CallbackType &&callback) { m_read_callback = std::move(callback); auto ec = libssh2_channel_exec(*m_channel, command.c_str()); TryRead(); } 

因此,通过运行命令,我们将控制权转移到TryRead()方法。

 void TryRead() { if (m_read_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) { if (WantRead()) { ReadHandler(ec); } if (m_complete) { return; } TryRead(); }); } 

首先,我们通过先前的调用来检查读取过程是否已经在运行。 如果不是,那么我们开始期望套接字已准备好进行读取。 捕获了shared_from_this()的常规lambda用作等待处理程序。

注意对WantRead()的调用。 事实证明,Async_wait也有其缺陷,可以通过超时简单地返回。 为了避免在这种情况下采取不必要的操作,我决定通过轮询来检查套接字,而不会超时-套接字真的要立即读取吗? 如果没有,那么我们只需再次运行TryRead()并等待。 否则,我们将立即开始读取数据并将其传输到回调。

 void ReadHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_read_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; std::array<char, 4096> buffer {}; while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) { std::string tmp; boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp)); if (m_read_callback != nullptr) { m_read_callback(tmp); } } m_read_in_progress = false; } 

因此,从正在运行的应用程序开始一个无限的异步读取周期。 对我们来说,下一步将向该应用程序发送指令:

 void AsyncWrite(const std::string &data, WriteCallbackType &&callback) { m_input += data; m_write_callback = std::move(callback); TryWrite(); } 

传输到异步记录的数据和回调将保存在连接内部。 并运行下一个周期,仅这次输入:

记录周期
 void TryWrite() { if (m_input.empty() || m_write_in_progress) { return; } m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) { if (WantWrite()) { WriteHandler(ec); } if (m_complete) { return; } TryWrite(); }); } void WriteHandler(const boost::system::error_code &error) { if (error != boost::system::errc::success) { return; } m_write_in_progress = true; int ec = LIBSSH2_ERROR_EAGAIN; while (!m_input.empty()) { auto ptr = m_input.c_str(); auto read_size = m_input.size(); while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) { read_size -= ec; ptr += ec; } AssertResult(ec); m_input.erase(0, m_input.size() - read_size); if (ec == LIBSSH2_ERROR_EAGAIN) { break; } } if (m_input.empty() && m_write_callback != nullptr) { m_write_callback(); } m_write_in_progress = false; } 


因此,我们将数据写入通道,直到它们全部成功传输为止。 然后,我们将控制权返回给调用者,以便可以传输新的数据。 这样,您不仅可以将指令发送到主机上的某个应用程序,而且还可以在不阻塞线程的情况下,小规模上传任何大小的文件,这很重要。

使用该库,我能够在远程服务器上成功运行脚本,该脚本跟踪文件系统的更改,同时读取其输出并发送各种命令。 总的来说:在使用Boost使si样式库适应现代C ++项目方面非常宝贵的经验。

我将很高兴阅读经验丰富的Boost.Asio用户的提示,以了解更多并改进我的解决方案:-)。

Source: https://habr.com/ru/post/zh-CN430488/


All Articles