Une histoire simple sur la façon dont j'avais honte de constamment demander aux camarades de classe les informations manquantes et j'ai décidé de nous faciliter la vie un peu.

Je pense que beaucoup de mes pairs connaissent la situation lorsque dans une salle de chat générale, où des informations importantes sont souvent diffusées, il y a environ 30 interlocuteurs actifs qui chargent constamment les bases de données Vkontakte avec leurs messages. Dans de telles conditions, il est peu probable que tout le monde voie ces informations importantes. Ça m'arrive aussi. Il y a un an, il a été décidé de corriger ce malentendu.
Ceux qui sont prêts à ne pas s'indigner du prochain article sur le bot, je demande sous cat.
Étant donné que je suis un étudiant de premier niveau, des exemples seront liés à ce sujet.
Donc, il y a une tâche: rendre le transfert d'informations du chef d'établissement aux élèves commode pour le chef d'établissement et les élèves. Grâce aux fonctionnalités relativement nouvelles de Vkontakte (à savoir les messages personnels des communautés), la décision a tout de suite attiré mon attention. Un bot assis dans un groupe doit recevoir des messages de l'aîné (aînés, s'il y a de nombreux groupes sur le flux) et les envoyer aux parties intéressées (étudiants).
La tâche est définie, continuez.
Nous aurons besoin de:
- bibliothèque vk_api pour utiliser vk api
- peewee orm pour travailler avec la base de données
- et modules intégrés python
Aussi, avant de lire, je propose de rafraîchir les schémas de "Observer" ( Habr , Wiki ) et "Facade" ( Habr , Wiki )
Partie 1. "Ravi de vous rencontrer, camarade Bot."
Vous devez d'abord apprendre à notre robot à se comprendre en tant que communauté. Créez une classe appelée Groupe. En tant qu'arguments, laissez-le accepter un objet de session et un objet représentatif de base de données (Proxy).
class Group(BaseCommunicateVK): def __init__(self, vksession, storage): super().__init__(vksession) self.storage = storage
BaseCommunicateVK? Qu'y a-t-il?La décision de placer cette fonctionnalité dans une classe distincte s'explique par le fait qu'à l'avenir, certains d'entre vous décideront peut-être de compléter le bot par d'autres fonctionnalités de Vkontakte.
Eh bien, pour décharger l'abstraction de la communauté, bien sûr.
class BaseCommunicateVK: longpoll = None def __init__(self, vksession): self.session = vksession self.api = vksession.get_api() if BaseCommunicateVK.longpoll is None: BaseCommunicateVK.longpoll = VkLongPoll(self.session) def get_api(self): return self.api def get_longpoll(self): return self.longpoll def method(self, func, args): return self.api.method(func, args) @staticmethod def create_session(token=None, login=None, password=None, api_v='5.85'): try: if token: session = vk_api.VkApi(token=token, api_version=api_v) elif login and password: session = vk_api.VkApi(login, password, api_version=api_v) else: raise vk_api.AuthError("Define login and password or token.") return session except vk_api.ApiError as error: logging.info(error) def get_last_message(self, user_id): return self.api.messages.getHistory( peer_id=user_id, count=1)["items"][0] @staticmethod def get_attachments(last_message): if not last_message or "attachments" not in last_message: return "" attachments = last_message["attachments"] attach_strings = [] for attach in attachments: attach_type = attach["type"] attach_info = attach[attach_type] attach_id = attach_info["id"] attach_owner_id = attach_info["owner_id"] if "access_key" in attach_info: access_key = attach_info["access_key"] attach_string = "{}{}_{}_{}".format(attach_type, attach_owner_id, attach_id, access_key) else: attach_string = "{}{}_{}".format(attach_type, attach_owner_id, attach_id) attach_strings.append(attach_string) return ",".join(attach_strings) @staticmethod def get_forwards(attachments, last_message): if not attachments or "fwd_count" not in attachments: return "" if len(last_message["fwd_messages"]) == int(attachments["fwd_count"]): return last_message["id"] def send(self, user_id, message, attachments=None, **kwargs): send_to = int(user_id) if "last_message" in kwargs: last_message = kwargs["last_message"] else: last_message = None p_attachments = self.get_attachments(last_message) p_forward = self.get_forwards(attachments, last_message) if message or p_attachments or p_forward: self.api.messages.send( user_id=send_to, message=message, attachment=p_attachments, forward_messages=p_forward) if destroy: accept_msg_id = self.api.messages \ .getHistory(peer_id=user_id, count=1) \ .get('items')[0].get('id') self.delete(accept_msg_id, destroy_type=destroy_type) def delete(self, msg_id, destroy_type=1): self.api.messages.delete(message_id=msg_id, delete_for_all=destroy_type)
Créez une méthode pour mettre à jour les membres de la communauté. Divisez-les immédiatement en administrateurs et participants et enregistrez-les dans la base de données.
- self.api est configuré lors de la création de la classe de base Groupe (BaseCommunicateVK)
def update_members(self): fields = 'domain, sex' admins = self.api.groups.getMembers(group_id=self.group_id, fields=fields, filter='managers') self.save_members(self._configure_users(admins)) members = self.api.groups.getMembers(group_id=self.group_id, fields=fields) self.save_members(self._configure_users(members)) return self def save_members(self, members): self.storage.update(members) @staticmethod def _configure_users(items, exclude=None): if exclude is None: exclude = [] users = [] for user in items.get('items'): if user.get('id') not in exclude: member = User() member.configure(**user) users.append(member) return users
Cette classe devrait également être en mesure d'envoyer des messages aux destinataires, donc la méthode suivante dans le studio. Dans les paramètres: liste de diffusion, texte du message et application. Tout cela commence dans un fil séparé afin que le bot puisse recevoir des messages des autres participants.
Les messages sont reçus en mode synchrone, donc avec une augmentation du nombre de clients actifs, la vitesse de réponse va évidemment diminuer.
def broadcast(self, uids, message, attachments=None, **kwargs): report = BroadcastReport() def send_all(): users_ids = uids if not isinstance(users_ids, list): users_ids = list(users_ids) report.should_be_sent = len(users_ids) for user_id in users_ids: try: self.send(user_id, message, attachments, **kwargs) if message or attachments: report.sent += 1 except vk_api.VkApiError as error: report.errors.append('vk.com/id{}: {}'.format(user_id, error)) except ValueError: continue for uid in self.get_member_ids(admins=True, moders=True): self.send(uid, str(report)) broadcast_thread = Thread(target=send_all) broadcast_thread.start() broadcast_thread.join()
BroadcastReport - classe de rapport class BroadcastReport: def __init__(self): self.should_be_sent = 0 self.sent = 0 self.errors = [] def __str__(self): res = "# #" res += "\n: {} ".format(self.should_be_sent) res += "\n: {} ".format(self.sent) if self.errors: res += "\n:" for i in self.errors: res += "\n- {}".format(i) return res
Sur ce point, semble-t-il, l'abstraction du groupe est terminée. Nous avons rencontré tous les membres de la communauté, nous devons maintenant apprendre à les comprendre.
Partie 2. "Psh ... bienvenue .."
Laissez le bot écouter tous les messages des membres de notre communauté.
Pour ce faire, créez une classe ChatHandler, qui le fera
Dans les paramètres:
- group_manager est une instance de la classe communautaire que nous venons d'écrire
- command_observer reconnaît les commandes connectées (mais plus à ce sujet dans la troisième partie)
class ChatHandler(Handler): def __init__(self, group_manager, command_observer): super().__init__() self.longpoll = group_manager.get_longpoll() self.group = group_manager self.api = group_manager.get_api() self.command_observer = command_observer
De plus, en fait, nous écoutons les messages des utilisateurs et reconnaissons les commandes.
def listen(self): try: for event in self.longpoll.listen(): if event.user_id and event.type == VkEventType.MESSAGE_NEW and event.to_me: self.group.api.messages.markAsRead(peer_id=event.user_id) self.handle(event.user_id, event.text, event.attachments, message_id=event.message_id) except ConnectionError: logging.error("I HAVE BEEN DOWNED AT {}".format(datetime.datetime.today())) self.longpoll.update_longpoll_server() def handle(self, user_id, message, attachments, **kwargs): member = self.group.get_member(user_id) self.group.update_members() self.command_observer.execute(member, message, attachments, self.group, **kwargs) def run(self): self.listen()
Partie 3. "Qu'avez-vous écrit sur le mien ..?"
La reconnaissance des commandes est gérée par un sous-système distinct implémenté via le modèle Observer.
Attention CommandObserver:
class CommandObserver(AbstractObserver): def execute(self, member, message, attachments, group, **kwargs): for command in self.commands: for trigger in command.triggers: body = command.get_body(trigger, message) if body is not None: group.api.messages.setActivity(user_id=member.id, type="typing") if command.system: kwargs.update({"trigger": trigger, "commands": self.commands}) else: kwargs.update({"trigger": trigger}) return command.proceed(member, body, attachments, group, **kwargs)
AbstractObserverEncore une fois, le rendu est fait pour une éventuelle expansion future.
class AbstractObserver(metaclass=ABCMeta): def __init__(self): self.commands = [] def add(self, *args): for arg in args: self.commands.append(arg) @abstractmethod def execute(self, *args, **kwargs): pass
Mais que reconnaîtra cet observateur?
Nous sommes donc arrivés à la partie la plus intéressante - l'équipe.
Chaque équipe est une classe indépendante, descendante de la classe Command de base.
Tout ce qui est requis de la commande est d'exécuter la méthode procéder () si son mot-clé est trouvé au début du message de l'utilisateur. Les mots-clés de commande sont définis dans la variable triggers de la classe de commande (chaîne ou liste de chaînes)
class Command(metaclass=ABCMeta): def __init__(self): self.triggers = [] self.description = "Empty description." self.system = False self.privilege = False self.activate_times = [] self.activate_days = set() self.autostart_func = self.proceed def proceed(self, member, message, attachments, group, **kwargs): raise NotImplementedError() @staticmethod def get_body(kw, message): if not isinstance(kw, list): kw = [kw, ] for i in kw: reg = '^ *(\\{}) *'.format(i) if re.search(reg, message): return re.sub(reg, '', message).strip(' ')
Comme le montre la signature de la méthode procéder (), chaque commande reçoit un lien vers une instance d'un membre du groupe, son message (sans mot-clé), des applications et un lien vers une instance de groupe. C'est-à-dire que toute interaction avec un membre du groupe appartient à l'équipe. Je pense que c'est la solution la plus correcte, car il est ainsi possible de créer un shell (Shell) pour une plus grande interactivité.
(En vérité, pour ce faire, vous devrez soit ajouter asynchrone, car le traitement est synchrone, soit chaque message reçu doit être traité dans un nouveau thread, ce qui n'est pas du tout rentable)
Exemples d'implémentation de commandes:
BroadcastCommand class BroadcastCommand(Command): def __init__(self): super().__init__() self.triggers = ['.mb'] self.privilege = True self.description = " ." def proceed(self, member, message, attachments, group, **kwargs): if member.id not in group.get_member_ids(admins=True, editors=True): group.send(member.id, "You cannot do this ^_^") return True last_message = group.get_last_message(member.id) group.broadcast(group.get_member_ids(), message, attachments, last_message=last_message, **kwargs) return True
AideCommande class HelpCommand(Command): def __init__(self): super().__init__() self.commands = [] self.triggers = ['.h', '.help'] self.system = True self.description = " ." def proceed(self, member, message, attachments, group, **kwargs): commands = kwargs["commands"] help = " :\n\n" admins = group.get_member_ids(admins=True, moders=True) i = 0 for command in commands: if command.privilege and member.id not in admins: continue help += "{}) {}\n\n".format(i + 1, command.name()) i += 1 group.send(member.id, help) return True
Partie 4. "Nous sommes une grande équipe."
Maintenant, tous ces modules et gestionnaires doivent être combinés et configurés.
Une autre classe s'il vous plaît!
Créez une façade qui personnalisera notre bot.
class VKManage: def __init__(self, token=None, login=None, password=None): self.session = BaseCommunicateVK.create_session(token, login, password, api_version) self.storage = DBProxy(DatabaseORM) self.group = Group(self.session, self.storage).setup().update_members() self.chat = ChatHandler(self.group, CommandObserver.get_observer()) def start(self): self.chat.run() def get_command(self, command_name): return { " ": BroadcastCommand(), " ": AdminBroadcastCommand(), "": HelpCommand(), " ": SkippedLectionsCommand(), "": TopicTimetableCommand().setup_account(self.bot.api), }.get(command_name) def connect_command(self, command_name): command = self.get_command(str(command_name).lower()) if command: self.chat.command_observer.add(command) return self def connect_commands(self, command_names): for i in command_names.split(','): self.connect_command(i.strip()) return self
La dernière étape est le lancement. Toujours le plus méchant, car une sorte de surprise peut surgir. Pas cette fois.
- ConfigParser est importé de core.settings.ConfigParser. En fait, il lit simplement la configuration.
project_path est importé du module de paramètres à la racine du projet.
if __name__ == '__main__': config = ConfigParser(project_path) VKManage(token=config['token'], login=config['login'], password=config['password'])\ .connect_commands(", , , ")\ .start()
Cela semble être tout.
Pour le moment, ce programme a bénéficié à au moins trois groupes et, je l'espère, vous apportera aussi.
Vous pouvez le déployer gratuitement sur Heroku, mais c'est une autre histoire.
Références: