将聊天状态存储在堆栈中

一切新事物都被遗忘了!

现在,许多人正在编写各种与IM中的用户通信的机器人,并以某种方式帮助用户生活。



如果您查看许多机器人的代码,那么通常可以归结为相同的模式:


  • 消息到达
  • 它被传递给用户消息处理程序( callback

通常,这是编写机器人的通用方法。 它适合于单人聊天和与群组连接的漫游器。 使用这种方法,除了一种方法之外,其他一切都很好:即使是简单的bot的代码也常常令人困惑。


让我们尝试解开它。


我将从免责声明开始:


  1. 本文中描述的内容适用于 <-> 类型的 <->
  2. 本文中的代码是草图代码。 在15分钟内专门为本文撰写。 所以不要严格判断。
  3. 我在业务中使用了类似的方法:负载平衡。 但是,可惜,我的生产代码具有很多基础结构依赖性,因此很容易不发布它。 因此,本文中将使用此草图。 我将介绍范式开发问题(我将描述我们在哪里开发以及如何开发)。

好吧,现在开始吧。


作为支持,考虑使用航标图异步库python3.7 + 。 该链接包含一个简单的echo bot示例。


我将在这里复制:


查看代码
 """ This is a echo bot. It echoes any incoming text messages. """ import logging from aiogram import Bot, Dispatcher, executor, types API_TOKEN = 'BOT TOKEN HERE' # Configure logging logging.basicConfig(level=logging.INFO) # Initialize bot and dispatcher bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) @dp.message_handler(regexp='(^cat[s]?$|puss)') async def cats(message: types.Message): with open('data/cats.jpg', 'rb') as photo: ''' # Old fashioned way: await bot.send_photo( message.chat.id, photo, caption='Cats are here ', reply_to_message_id=message.message_id, ) ''' await message.reply_photo(photo, caption='Cats are here ') @dp.message_handler() async def echo(message: types.Message): # old style: # await bot.send_message(message.chat.id, message.text) await message.answer(message.text) if __name__ == '__main__': executor.start_polling(dp, skip_updates=True) 

我们看到机器人的组织是传统的。 每次用户给我们写东西时,都会调用处理程序函数。


这个范例有什么问题?


实现复杂对话框的处理函数必须在每次调用时从某种存储中恢复其状态。


如果您看大多数支持某种业务(例如,雇用)的机器人,他们会向用户提出1..N个问题,然后它们会根据这些问题的结果进行处理(例如,将配置文件保存在数据库中)。


如果可以用传统样式(而不是环形样式)编写机器人,则可以将用户数据直接存储在堆栈中。


让我们尝试去做。


我绘制了该模块的草图,并将其与该库一起使用:


查看代码
 #  - chat_dispatcher.py import asyncio class ChatDispatcher: class Timeout(RuntimeError): def __init__(self, last_message): self.last_message = last_message super().__init__('timeout exceeded') def __init__(self, *, chatcb, shardcb = lambda message: message.from_user.id, inactive_timeout = 15 * 60): self.chatcb = chatcb self.shardcb = shardcb self.inactive_timeout = inactive_timeout self.chats = {} async def handle(self, message): shard = self.shardcb(message) loop = asyncio.get_event_loop() if shard not in self.chats: self.chats[shard] = { 'task': self.create_chat(loop, shard), 'messages': [], 'wait': asyncio.Event(), 'last_message': None, } self.chats[shard]['messages'].append(message) self.chats[shard]['wait'].set() def create_chat(self, loop, shard): async def _chat_wrapper(): try: await self.chatcb(self.get_message(shard)) finally: del self.chats[shard] return loop.create_task(_chat_wrapper()) def get_message(self, shard): async def _get_message(inactive_timeout=self.inactive_timeout): while True: if self.chats[shard]['messages']: last_message = self.chats[shard]['messages'].pop(0) self.chats[shard]['last_message'] = last_message return last_message try: await asyncio.wait_for(self.chats[shard]['wait'].wait(), timeout=inactive_timeout) except asyncio.TimeoutError: self.chats[shard]['wait'].set() raise self.Timeout(self.chats[shard]['last_message']) if not self.chats[shard]['messages']: self.chats[shard]['wait'].clear() return _get_message 

一点解释:


使用以下参数ChatDispatcher类:


  1. 传入消息的分片功能(为什么称为分片-稍后,当我们接触大负载时)。 该函数返回一个唯一的数字,指示对话框。 在示例中,它仅返回用户ID。
  2. 将执行聊天服务功能的功能。
  3. 用户不活动的超时值。

职位描述:


  1. 响应用户的第一条消息,创建了一个异步任务,将为对话提供服务。 该任务将一直持续到对话完成。
  2. 要接收来自用户的消息,我们会明确要求它。 echo聊天示例:
     async def chat(get_message): message = await get_message() await message.answer(message.text) 
  3. 当图书馆向我们提供消息时,我们会回复消息( message.answer )。

让我们尝试以这种范例编写一个机器人


完整的代码示例在这里
 #  bot.py import asyncio import re from .chat_dispatcher import ChatDispatcher import logging from aiogram import Bot, Dispatcher, executor, types API_TOKEN ='    ' logging.basicConfig(level=logging.INFO) bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) async def chat(get_message): try: message = await get_message() await message.answer('  ,   ') first = await get_message() if not re.match('^\d+$', str(first.text)): await first.answer('  ,  : /start') return await first.answer('  ') second = await get_message() if not re.match('^\d+$', str(second.text)): await second.answer('  ,  : /start') return result = int(first.text) + int(second.text) await second.answer(' %s (/start - )' % result) except ChatDispatcher.Timeout as te: await te.last_message.answer('-   ,  ') await te.last_message.answer(' - /start') chat_dispatcher = ChatDispatcher(chatcb=chat, inactive_timeout=20) @dp.message_handler() async def message_handle(message: types.Message): await chat_dispatcher.handle(message) if __name__ == '__main__': executor.start_polling(dp, skip_updates=True) 

一个书面的机器人示例-简单地将几个数字相加并产生结果。


结果看起来像这样:



好了,现在让我们仔细看一下代码。 实例不应该提出问题。


与我们的草图集成是在标准处理程序中完成的,我们称为await chat_dispatcher.handle(message) 。 我们在chat功能中描述了chat ,我将在这里重复其代码:


 async def chat(get_message): try: message = await get_message() await message.answer('  ,   ') first = await get_message() if not re.match('^\d+$', str(first.text)): await first.answer('  ,  : /start') return await first.answer('  ') second = await get_message() if not re.match('^\d+$', str(second.text)): await second.answer('  ,  : /start') return result = int(first.text) + int(second.text) await second.answer(' %s (/start - )' % result) except ChatDispatcher.Timeout as te: await te.last_message.answer('-   ,  ') await te.last_message.answer(' - /start') 

聊天服务代码-只需从用户那里一一请求数据。 用户响应只是堆叠在堆栈上(变量firstsecondmessage )。


如果用户在设置的超时时间内未输入任何内容,则get_message函数可能会引发异常(您可以在同一位置将超时传递给它)。


对话框状态-与该函数内部的行号直接相关。 向下移动代码,我们沿着对话方案前进 。 更改对话线程并不容易,但非常简单!
因此,不需要状态机。 在这种范例中,您可以编写非常复杂的对话框,并了解它们的代码比带回调的代码要简单得多。


缺点


没有他们的地方。


  1. 对于每个活动用户,都有一个任务表。 平均而言,一个CPU通常为大约1000个用户提供服务,然后开始出现延迟。
  2. 重新启动整个守护程序-终止所有对话框(并重新启动它们)。
  3. [来自示例]的代码不适用于负载缩放和国际化。

如果第二个问题很清楚,该怎么办:截停信号并告诉用户“我在这里很紧急,起火,我待会儿再回来。” 最后一个问题可能会造成困难。 让我们看一下:


负载缩放


显然,已加载的漫游器必须同时在多个后端上启动。 因此,将使用webHook操作模式。


如果仅在两个后端之间平衡webHook ,那么显然您需要以某种方式确保同一用户使用与他交谈的同一个协程。


我们这样做如下。


  1. 在平衡器上,解析传入消息( message )的JSON
  2. 从中选择一个用户ID
  3. 使用标识符,我们计算后端号(==分片)。 例如,使用user_id % Nshards
  4. 我们将请求重定向到分片。

用户ID-成为在对话框的协程之间进行分片的关键,并且是计算平衡器中后端分片号的基础。


这种均衡器的代码很简单-可以在10分钟内用任何语言编写。 我不会带它。


结论


如果您以这种范例编写机器人,那么您可以非常简单地将对话框从一个重做到另一个。 此外,重要的是, 新程序员应易于理解某人在他之前进行的对话框的代码。


为什么大多数人都用环形架构编写机器人-我不知道。


他们曾经以这种范例写作。 在这种风格的聊天室中,IRC和机器人已被采用。 因此,我不假装是任何新奇事物。


还有更多。 如果您使用带有goto运算符的语言来使用此范例,那么这将只是使用goto一个漂亮示例(对话框中的循环在goto已完成)。 不幸的是,这与Python无关。

Source: https://habr.com/ru/post/zh-CN486006/


All Articles