Dans cet article, j'invite le lecteur à faire un voyage modérément excitant dans les entrailles de l' asyncio avec moi pour comprendre comment il implémente l'exécution de code asynchrone. Nous passons en revue les rappels et nous précipitons à travers une série d'événements à travers quelques abstractions clés directement dans la corutine. Si votre carte Python n'a pas encore ces attractions, bienvenue à cat.
Pour commencer - une brève référence sur la zone étalée devant nous
asyncio est une bibliothèque d'E / S asynchrone qui, selon pep3153 , a été créée pour fournir un cadre normalisé pour la création de cadres asynchrones. pep3156 lui attribue également la nécessité de fournir une intégration extrêmement simple dans des frameworks asynchrones déjà existants (Twisted, Tornado, Gevent). Comme nous pouvons maintenant l'observer, ces objectifs ont été atteints avec succès - un nouveau cadre basé sur asyncio est apparu: aiohttp , dans Tornado AsyncioMainLoop est la boucle d'événements par défaut de la version 5.0, dans Twisted asyncioreactor est disponible à partir de la version 16.5.0, et il existe une bibliothèque aiogevent tierce pour Gevent .
asyncio est une bibliothèque hybride qui utilise simultanément deux approches pour implémenter l'exécution de code asynchrone: classique sur les rappels et, relativement nouvelle, (au moins pour python) sur les coroutines. Il est basé sur trois abstractions principales, qui sont des analogues d'abstractions qui existent dans des cadres tiers:
- Boucle d'événement enfichable
Boucle d'événement enfichable. Pluggable signifie qu'il peut être remplacé sur deux lignes de code par une autre qui implémente la même interface. Maintenant, il y a des implémentations de cython sur libuv ( uvloop ) et à Rust ( asyncio-tokio ) . - L'avenir
Le résultat de l'opération, qui sera disponible à l'avenir. Il est nécessaire d'obtenir le résultat de l'exécution des rappels dans les coroutines. - Tâche
Une sous-classe spéciale de Future pour exécuter la coroutine sur une boucle d'événement.
C'est parti!
Le cycle des événements est le composant principal de la bibliothèque, le long des routes qui le traversent, les données sont livrées à l'un de ses composants. Il est grand et complexe, alors considérez d'abord sa version allégée.
Surfant sur notre petit rappel, nous avons pris la route via call_soon , nous sommes dans la file d'attente et après une courte attente, nous serons affichés.
Épisode Bad Callback
Il convient de mentionner que les rappels sont des chevaux dangereux - s'ils vous jettent au milieu de la route, l'interpréteur python ne pourra pas aider à comprendre où cela s'est produit. Si vous ne me croyez pas , roulez de la même manière sur le rappel peut- être_print, qui arrive à la fin environ la moitié du temps.
Vous trouverez ci-dessous la trace complète de l'exemple précédent. En raison du fait que la fonction peut - être_print a été démarrée par une boucle d'événement, et non directement à partir de starting_point , le suivi se termine sur elle, dans la méthode run_until_complete . Il est impossible de déterminer où est le point de départ dans le code à l'aide d'un tel traçage, ce qui compliquera considérablement le débogage si le point de départ est situé à plusieurs endroits dans la base de code.
$: python3 base_loop.py >>
Une pile d'appels continue est nécessaire non seulement pour afficher le traçage complet, mais également pour implémenter d'autres fonctionnalités linguistiques. Par exemple, la gestion des exceptions est basée sur elle. L'exemple ci-dessous ne fonctionnera pas, car au début du point de départ, la fonction principale sera déjà exécutée:
L'exemple suivant ne fonctionnera pas non plus. Le gestionnaire de contexte de la fonction principale ouvrira et fermera le fichier avant le démarrage de son traitement.
L'absence d'une pile d'appels continue limite l'utilisation de fonctionnalités linguistiques familières. Pour contourner partiellement cet inconvénient dans asyncio, j'ai dû ajouter beaucoup de code supplémentaire qui n'était pas directement lié à la tâche qu'il résolvait. Ce code, pour la plupart, manque dans les exemples - ils sont assez compliqués sans lui.
Du cycle des événements au monde extérieur et retour
La boucle d'événements communique avec le monde extérieur via le système d'exploitation via des événements. Le code qui sait comment l'utiliser est fourni par un module de bibliothèque standard appelé sélecteurs . Il vous permet de dire au système d'exploitation que nous attendons une sorte d'événement, puis de demander s'il s'est produit. Dans l'exemple ci-dessous, l'événement attendu sera la disponibilité du socket lu.
Un messager du monde extérieur laisse son message ou son colis dans le sélecteur, et le sélecteur le transmet au destinataire. Il est maintenant possible de lire à partir du socket à l'aide d'une boucle d'événements. Si vous exécutez ce code et vous connectez à l'aide de netcat, il affichera fidèlement tout ce qui lui sera envoyé.
$: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n'
Au début de l'article, il a été dit que asyncio est une bibliothèque hybride dans laquelle les coroutines fonctionnent en plus des rappels. Pour implémenter cette fonctionnalité, deux abstractions principales restantes sont utilisées: Tâche et Futur . Ensuite, le code de ces abstractions sera affiché, puis, en utilisant leur cycle d'événements, des coroutines sont exécutées.
L'avenir
Voici le code de la classe Future. Il est nécessaire pour que dans corutin vous puissiez attendre que le rappel soit terminé et obtenir son résultat.
Tâche
Il s'agit d'une sous-classe spéciale de Future . Il est nécessaire d'exécuter des coroutines sur la boucle d'événement de rappel.
Un cycle d'événements qui peut fonctionner avec Future
Continuons
Voyons maintenant comment sera exécuté coroutine main :
Accomplissement __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): # # sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): # Future fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # # # Future self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) # Future return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __await__ Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): # Future if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) # result = fut -------------------------------- else: # Future # wakeup if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) # - Task Future # # ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): # callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: # Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): # self._result = result # self._state = 'FINISHED' # self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: # task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # # task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: # Future task._step self._step() def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __awai__ conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self # Future return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # Future conn addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
De cette manière simple, asyncio exécute des coroutines.
Résumé
L'objectif de créer asyncio a été atteint avec succès. Cela a non seulement résolu le problème de compatibilité, mais a également provoqué une énorme augmentation de l'intérêt pour une programmation compétitive dans la communauté. De nouveaux articles et bibliothèques ont commencé à apparaître, comme des champignons après la pluie. De plus, asyncio a influencé la langue elle-même: des coroutines natives et de nouveaux mots clés async / attente y ont été ajoutés. La dernière fois qu'un nouveau mot clé a été ajouté en 2003, c'était le mot clé yield .
L'un des objectifs de la création d'asyncio était de fournir une intégration extrêmement simple dans les frameworks asynchrones déjà existants (Twisted, Tornado, Gevent). Le choix des outils découle logiquement de cet objectif: s'il n'y avait pas d'exigence de compatibilité, les coroutines auraient probablement le rôle principal. Étant donné que lors de la programmation sur les rappels, il est impossible de maintenir une pile continue d'appels, un système supplémentaire a dû être créé à la frontière entre eux et les coroutines pour prendre en charge les fonctionnalités linguistiques basées sur celui-ci.
Maintenant, la question principale. Pourquoi un simple utilisateur de la bibliothèque devrait-il tout savoir, qui suit les recommandations de la documentation et n'utilise que des coroutines et une API de haut niveau?
Voici une partie de la documentation de la classe StreamWriter

Son instance est renvoyée par la fonction asyncio.open_connection et est l'API async / wait au-dessus de l'API de rappel. Et ces rappels s'en détachent. Les fonctions d' écriture et d' écriture sont synchrones, elles essaient d'écrire dans le socket et si cela échoue, elles vident les données dans le tampon sous-jacent et ajoutent des rappels à l'enregistrement. Le drainage de la corutine est nécessaire afin de permettre d'attendre que la quantité de données dans le tampon tombe à la valeur spécifiée.
Si vous oubliez d'appeler le drain entre les appels d' écriture , le tampon interne peut atteindre des tailles indécentes. Cependant, si vous gardez cela à l'esprit, il reste quelques moments désagréables. Premièrement: si le rappel sur l'enregistrement «casse», alors la coroutine utilisant cette API ne le saura d'aucune façon et, par conséquent, ne pourra pas le traiter. Deuxièmement: si la coroutine "casse", le rappel à l'enregistrement n'en sera pas informé et continuera d'écrire des données à partir du tampon.
Ainsi, même en utilisant uniquement des coroutines, préparez-vous au fait que les rappels se rappelleront d'eux-mêmes.
Vous pouvez lire comment travailler avec des bases de données à partir de code asynchrone dans cet article de notre blog d'entreprise Antida .
PS Merci pour les informations sur les fautes de frappe et les inexactitudes pour les utilisateurs de eirnym , kurb , rasswet