一个简单的故事,让我感到羞耻,不断向同学询问缺少的信息,我决定让我们的生活更轻松一些。

我相信我的许多同龄人都熟悉在普通聊天室中经常闪现重要信息的情况,大约有30位活跃的对话者不断向Vkontakte数据库加载其消息。 在这种情况下,每个人都不太可能看到此重要信息。 我也是。 一年前,决定纠正这种误解。
我问那些准备好对下一篇有关该机器人的文章不屑一顾的人。
由于我是一年级的学生,因此示例将与此主题相关。
因此,有一个任务:使校长和学生之间的信息从校长到学生的传送都变得方便。 得益于Vkontakte的相对较新的功能(即社区的个人信息),这一决定立即引起了我的注意。 坐在一个小组中的漫游器应从长者(如果有很多小组,则是老人)接收消息,并将其发送给感兴趣的团体(学生)。
任务已设置,继续。
我们将需要:
- vk_api库,用于使用vk api
- peewee orm使用数据库
- 和python内置模块
另外,在阅读之前,我建议刷新模式“ Observer”( Habr , Wiki )和“ Facade”( Habr , Wiki )的内存
第1部分:“很高兴认识您,博特同志。”
首先,您需要教我们的机器人理解自己作为一个社区。 创建一个名为Group的类。 作为参数,让它接受会话对象和数据库代表对象(代理)。
class Group(BaseCommunicateVK): def __init__(self, vksession, storage): super().__init__(vksession) self.storage = storage
BaseCommunicateVK? 那里有什么?之所以决定将此功能放在单独的类中,是因为将来,也许你们中的某些人会决定用其他一些Vkontakte功能来补充该机器人。
好吧,当然要卸载社区的抽象了。
class BaseCommunicateVK: longpoll = None def __init__(self, vksession): self.session = vksession self.api = vksession.get_api() if BaseCommunicateVK.longpoll is None: BaseCommunicateVK.longpoll = VkLongPoll(self.session) def get_api(self): return self.api def get_longpoll(self): return self.longpoll def method(self, func, args): return self.api.method(func, args) @staticmethod def create_session(token=None, login=None, password=None, api_v='5.85'): try: if token: session = vk_api.VkApi(token=token, api_version=api_v) elif login and password: session = vk_api.VkApi(login, password, api_version=api_v) else: raise vk_api.AuthError("Define login and password or token.") return session except vk_api.ApiError as error: logging.info(error) def get_last_message(self, user_id): return self.api.messages.getHistory( peer_id=user_id, count=1)["items"][0] @staticmethod def get_attachments(last_message): if not last_message or "attachments" not in last_message: return "" attachments = last_message["attachments"] attach_strings = [] for attach in attachments: attach_type = attach["type"] attach_info = attach[attach_type] attach_id = attach_info["id"] attach_owner_id = attach_info["owner_id"] if "access_key" in attach_info: access_key = attach_info["access_key"] attach_string = "{}{}_{}_{}".format(attach_type, attach_owner_id, attach_id, access_key) else: attach_string = "{}{}_{}".format(attach_type, attach_owner_id, attach_id) attach_strings.append(attach_string) return ",".join(attach_strings) @staticmethod def get_forwards(attachments, last_message): if not attachments or "fwd_count" not in attachments: return "" if len(last_message["fwd_messages"]) == int(attachments["fwd_count"]): return last_message["id"] def send(self, user_id, message, attachments=None, **kwargs): send_to = int(user_id) if "last_message" in kwargs: last_message = kwargs["last_message"] else: last_message = None p_attachments = self.get_attachments(last_message) p_forward = self.get_forwards(attachments, last_message) if message or p_attachments or p_forward: self.api.messages.send( user_id=send_to, message=message, attachment=p_attachments, forward_messages=p_forward) if destroy: accept_msg_id = self.api.messages \ .getHistory(peer_id=user_id, count=1) \ .get('items')[0].get('id') self.delete(accept_msg_id, destroy_type=destroy_type) def delete(self, msg_id, destroy_type=1): self.api.messages.delete(message_id=msg_id, delete_for_all=destroy_type)
创建一种更新社区成员的方法。 立即将他们分为管理员和参与者,并将其保存在数据库中。
- 创建组基类(BaseCommunicateVK)时配置了self.api
def update_members(self): fields = 'domain, sex' admins = self.api.groups.getMembers(group_id=self.group_id, fields=fields, filter='managers') self.save_members(self._configure_users(admins)) members = self.api.groups.getMembers(group_id=self.group_id, fields=fields) self.save_members(self._configure_users(members)) return self def save_members(self, members): self.storage.update(members) @staticmethod def _configure_users(items, exclude=None): if exclude is None: exclude = [] users = [] for user in items.get('items'): if user.get('id') not in exclude: member = User() member.configure(**user) users.append(member) return users
该类还应该能够将消息发送给收件人,因此是Studio中的下一个方法。 在参数中:邮件列表,消息文本和应用程序。 整个过程从一个单独的线程开始,因此该漫游器可以接收来自其他参与者的消息。
以同步方式接收消息,因此,随着活动客户端数量的增加,响应速度将明显降低。
def broadcast(self, uids, message, attachments=None, **kwargs): report = BroadcastReport() def send_all(): users_ids = uids if not isinstance(users_ids, list): users_ids = list(users_ids) report.should_be_sent = len(users_ids) for user_id in users_ids: try: self.send(user_id, message, attachments, **kwargs) if message or attachments: report.sent += 1 except vk_api.VkApiError as error: report.errors.append('vk.com/id{}: {}'.format(user_id, error)) except ValueError: continue for uid in self.get_member_ids(admins=True, moders=True): self.send(uid, str(report)) broadcast_thread = Thread(target=send_all) broadcast_thread.start() broadcast_thread.join()
BroadcastReport-报告类别 class BroadcastReport: def __init__(self): self.should_be_sent = 0 self.sent = 0 self.errors = [] def __str__(self): res = "# #" res += "\n: {} ".format(self.should_be_sent) res += "\n: {} ".format(self.sent) if self.errors: res += "\n:" for i in self.errors: res += "\n- {}".format(i) return res
至此,该组的抽象似乎结束了。 我们会见了社区的所有成员,现在我们需要学习如何理解他们。
第2部分。“ Psh ...欢迎..”
让该机器人收听来自社区成员的所有消息。
为此,创建一个类ChatHandler,它将执行此操作
在参数中:
- group_manager是我们刚刚编写的社区类的一个实例
- command_observer可以识别连接的命令(但在第三部分中有更多介绍)
class ChatHandler(Handler): def __init__(self, group_manager, command_observer): super().__init__() self.longpoll = group_manager.get_longpoll() self.group = group_manager self.api = group_manager.get_api() self.command_observer = command_observer
此外,实际上,我们会监听来自用户的消息并识别命令。
def listen(self): try: for event in self.longpoll.listen(): if event.user_id and event.type == VkEventType.MESSAGE_NEW and event.to_me: self.group.api.messages.markAsRead(peer_id=event.user_id) self.handle(event.user_id, event.text, event.attachments, message_id=event.message_id) except ConnectionError: logging.error("I HAVE BEEN DOWNED AT {}".format(datetime.datetime.today())) self.longpoll.update_longpoll_server() def handle(self, user_id, message, attachments, **kwargs): member = self.group.get_member(user_id) self.group.update_members() self.command_observer.execute(member, message, attachments, self.group, **kwargs) def run(self): self.listen()
第3部分:“您对我的..写什么?”
命令识别由通过观察者模式实现的单独子系统处理。
注意CommandObserver:
class CommandObserver(AbstractObserver): def execute(self, member, message, attachments, group, **kwargs): for command in self.commands: for trigger in command.triggers: body = command.get_body(trigger, message) if body is not None: group.api.messages.setActivity(user_id=member.id, type="typing") if command.system: kwargs.update({"trigger": trigger, "commands": self.commands}) else: kwargs.update({"trigger": trigger}) return command.proceed(member, body, attachments, group, **kwargs)
抽象观察者同样,渲染是为了将来可能的扩展。
class AbstractObserver(metaclass=ABCMeta): def __init__(self): self.commands = [] def add(self, *args): for arg in args: self.commands.append(arg) @abstractmethod def execute(self, *args, **kwargs): pass
但是这个观察者会认识到什么?
因此,我们进入了最有趣的部分-团队。
每个团队都是一个独立的类,是基本Command类的后代。
如果在用户消息的开头找到了关键字,则该命令所需的所有操作就是运行proce()方法。 命令关键字在命令类的triggers变量(字符串或字符串列表)中定义
class Command(metaclass=ABCMeta): def __init__(self): self.triggers = [] self.description = "Empty description." self.system = False self.privilege = False self.activate_times = [] self.activate_days = set() self.autostart_func = self.proceed def proceed(self, member, message, attachments, group, **kwargs): raise NotImplementedError() @staticmethod def get_body(kw, message): if not isinstance(kw, list): kw = [kw, ] for i in kw: reg = '^ *(\\{}) *'.format(i) if re.search(reg, message): return re.sub(reg, '', message).strip(' ')
从proceed()方法的签名中可以看到,每个命令都会收到一个指向组成员实例的链接,其消息(没有关键字),应用程序以及一个指向组实例的链接。 也就是说,与小组成员的所有互动都取决于团队。 我认为这是最正确的解决方案,因为通过这种方式可以创建一个外壳(Shell)以提高交互性。
(实际上,要做到这一点,您要么需要添加异步,因为处理是同步的,要么应该在一个新线程中处理每个收到的消息,这根本就不赚钱)
命令执行示例:
广播命令 class BroadcastCommand(Command): def __init__(self): super().__init__() self.triggers = ['.mb'] self.privilege = True self.description = " ." def proceed(self, member, message, attachments, group, **kwargs): if member.id not in group.get_member_ids(admins=True, editors=True): group.send(member.id, "You cannot do this ^_^") return True last_message = group.get_last_message(member.id) group.broadcast(group.get_member_ids(), message, attachments, last_message=last_message, **kwargs) return True
帮助命令 class HelpCommand(Command): def __init__(self): super().__init__() self.commands = [] self.triggers = ['.h', '.help'] self.system = True self.description = " ." def proceed(self, member, message, attachments, group, **kwargs): commands = kwargs["commands"] help = " :\n\n" admins = group.get_member_ids(admins=True, moders=True) i = 0 for command in commands: if command.privilege and member.id not in admins: continue help += "{}) {}\n\n".format(i + 1, command.name()) i += 1 group.send(member.id, help) return True
第4部分:“我们是一支大团队。”
现在,所有这些模块和处理程序都需要组合和配置。
请再上一堂课!
创建将自定义我们的机器人的外观。
class VKManage: def __init__(self, token=None, login=None, password=None): self.session = BaseCommunicateVK.create_session(token, login, password, api_version) self.storage = DBProxy(DatabaseORM) self.group = Group(self.session, self.storage).setup().update_members() self.chat = ChatHandler(self.group, CommandObserver.get_observer()) def start(self): self.chat.run() def get_command(self, command_name): return { " ": BroadcastCommand(), " ": AdminBroadcastCommand(), "": HelpCommand(), " ": SkippedLectionsCommand(), "": TopicTimetableCommand().setup_account(self.bot.api), }.get(command_name) def connect_command(self, command_name): command = self.get_command(str(command_name).lower()) if command: self.chat.command_observer.add(command) return self def connect_commands(self, command_names): for i in command_names.split(','): self.connect_command(i.strip()) return self
最后阶段是启动。 总是最讨厌的,因为可能会出现某种惊奇。 这次不行。
- ConfigParser是从core.settings.ConfigParser导入的。 实际上,它只是读取配置。
从项目根目录中的设置模块导入project_path。
if __name__ == '__main__': config = ConfigParser(project_path) VKManage(token=config['token'], login=config['login'], password=config['password'])\ .connect_commands(", , , ")\ .start()
好像就这些了。
目前,该计划至少使三个小组受益,我希望也能为您带来好处。
您可以在Heroku上免费部署它,但这是另一回事。
参考文献: