In diesem Artikel lade ich den Leser ein, eine mäßig aufregende Reise in die Eingeweide von Asyncio mit mir zu unternehmen , um zu verstehen, wie es die asynchrone Codeausführung implementiert. Wir satteln Rückrufe und stürzen uns durch eine Reihe von Ereignissen durch ein paar wichtige Abstraktionen direkt in die Coroutine. Wenn Ihre Python-Karte diese Attraktionen noch nicht enthält, sind Sie bei cat willkommen.
Für den Anfang - eine kurze Referenz über das Gebiet, das vor uns ausgebreitet ist
asyncio ist eine asynchrone E / A-Bibliothek, die laut pep3153 erstellt wurde, um ein standardisiertes Framework zum Erstellen asynchroner Frameworks bereitzustellen. pep3156 schreibt ihr auch die Notwendigkeit zu, eine äußerst einfache Integration in bereits vorhandene asynchrone Frameworks (Twisted, Tornado, Gevent) bereitzustellen. Wie wir jetzt beobachten können, wurden diese Ziele erfolgreich erreicht - ein neues asyncio-basiertes Framework wurde veröffentlicht: aiohttp ist in Tornado AsyncioMainLoop die Standardereignisschleife ab Version 5.0, in Twisted ist asyncioreactor ab Version 16.5.0 verfügbar, und für Gevent gibt es eine Aiogevent-Bibliothek eines Drittanbieters .
asyncio ist eine Hybridbibliothek, die gleichzeitig zwei Ansätze zur Implementierung der asynchronen Codeausführung verwendet: klassisch bei Rückrufen und relativ neu (zumindest für Python) bei Coroutinen. Es basiert auf drei Hauptabstraktionen, die Analoga von Abstraktionen sind, die in Frameworks von Drittanbietern existieren:
- Steckbare Ereignisschleife
Steckbare Ereignisschleife. Pluggable bedeutet, dass es in zwei Codezeilen durch eine andere ersetzt werden kann, die dieselbe Schnittstelle implementiert. Jetzt gibt es Cython-Implementierungen über libuv ( uvloop ) und in Rust ( asyncio-tokio ) . - Zukunft
Das Ergebnis der Operation, die in Zukunft verfügbar sein wird. Es ist erforderlich, das Ergebnis der Rückrufausführung in Coroutinen abzurufen. - Aufgabe
Eine spezielle Unterklasse von Future zum Ausführen von Coroutine in einer Ereignisschleife.
Lass uns gehen!
Der Ereigniszyklus ist die Hauptkomponente der Bibliothek. Auf den durch sie verlaufenden Straßen werden Daten an eine ihrer Komponenten geliefert. Es ist groß und komplex, also betrachten Sie zuerst seine abgespeckte Version.
Mit unserem kleinen Rückruf machen wir uns über call_soon auf den Weg, stellen uns in die Warteschlange und nach einer kurzen Wartezeit werden wir angezeigt.
Schlechte Rückruf-Episode
Es ist erwähnenswert, dass Rückrufe gefährliche Pferde sind - wenn sie Sie mitten auf die Straße werfen, kann der Python-Interpreter nicht helfen, zu verstehen, wo dies passiert ist. Wenn Sie mir nicht glauben, fahren Sie auf dem Rückruf von Maybe_print, der ungefähr die Hälfte der Zeit zum Ziel kommt, auf die gleiche Weise.
Unten finden Sie den vollständigen Traceback des vorherigen Beispiels. Aufgrund der Tatsache, dass die Funktion " Maybe_print" von einer Ereignisschleife und nicht direkt vom Startpunkt aus gestartet wurde, endet der Traceback in der Methode " run_until_complete " darauf. Mit einem solchen Traceback kann nicht festgestellt werden, wo sich der Startpunkt im Code befindet. Dies erschwert das Debuggen erheblich, wenn sich der Startpunkt an mehreren Stellen in der Codebasis befindet.
$: python3 base_loop.py >>
Ein kontinuierlicher Aufrufstapel wird nicht nur benötigt, um den vollständigen Traceback anzuzeigen, sondern auch um andere Sprachfunktionen zu implementieren. Die Ausnahmebehandlung basiert beispielsweise darauf. Das folgende Beispiel funktioniert nicht, da zum Zeitpunkt des Starts von launch_point die Hauptfunktion bereits ausgeführt wird:
Das folgende Beispiel funktioniert auch nicht. Der Kontextmanager in der Hauptfunktion öffnet und schließt die Datei, bevor die Verarbeitung gestartet wird.
Das Fehlen eines kontinuierlichen Aufrufstapels schränkt die Verwendung vertrauter Sprachfunktionen ein. Um diesen Nachteil in Asyncio teilweise zu umgehen, musste ich viel zusätzlichen Code hinzufügen, der nicht direkt mit der zu lösenden Aufgabe zusammenhängt. Dieser Code fehlt größtenteils in den Beispielen - ohne ihn sind sie ziemlich kompliziert.
Vom Zyklus der Ereignisse nach außen und zurück
Die Ereignisschleife kommuniziert über Ereignisse über das Betriebssystem mit der Außenwelt. Code, der weiß, wie man damit arbeitet, wird von einem Standardbibliotheksmodul namens Selektoren bereitgestellt. Sie können dem Betriebssystem mitteilen, dass wir auf ein Ereignis warten, und dann fragen, ob es passiert ist. Im folgenden Beispiel wird als erwartetes Ereignis die Verfügbarkeit des Sockets gelesen.
Ein Bote aus der Außenwelt hinterlässt seine Nachricht oder sein Paket im Selektor und der Selektor gibt es an den Empfänger weiter. Jetzt ist es möglich, mit einer Ereignisschleife aus dem Socket zu lesen. Wenn Sie diesen Code ausführen und eine Verbindung mit netcat herstellen, wird alles, was an ihn gesendet wird, originalgetreu angezeigt.
$: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n'
Zu Beginn des Artikels wurde gesagt, dass Asyncio eine Hybridbibliothek ist, in der Coroutinen zusätzlich zu Rückrufen arbeiten. Um diese Funktionalität zu implementieren, werden zwei verbleibende Hauptabstraktionen verwendet: Task und Future . Als nächstes wird der Code dieser Abstraktionen gezeigt, und dann werden unter Verwendung ihres Ereigniszyklus Coroutinen ausgeführt.
Zukunft
Unten finden Sie den Code für die Future-Klasse. Es wird benötigt, damit Sie in corutin warten können, bis der Rückruf abgeschlossen ist, und das Ergebnis erhalten.
Aufgabe
Dies ist eine spezielle Unterklasse von Future . Es wird benötigt, um Coroutinen in der Callback-Ereignisschleife auszuführen.
Ein Zyklus von Ereignissen, die mit Future zusammenarbeiten können
Lass uns weitermachen
Nun wollen wir sehen, wie Coroutine Main ausgeführt wird:
Erfüllung __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): # # sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): # Future fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # # # Future self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) # Future return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __await__ Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): # Future if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) # result = fut -------------------------------- else: # Future # wakeup if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) # - Task Future # # ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): # callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: # Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): # self._result = result # self._state = 'FINISHED' # self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: # task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # # task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: # Future task._step self._step() def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __awai__ conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self # Future return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # Future conn addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
Auf diese einfache Weise führt Asyncio Coroutinen durch.
Zusammenfassung
Das Ziel, Asyncio zu erstellen, wurde erfolgreich erreicht. Dies löste nicht nur das Kompatibilitätsproblem, sondern führte auch zu einem enormen Anstieg des Interesses an wettbewerbsfähiger Programmierung in der Community. Neue Artikel und Bibliotheken tauchten auf, wie Pilze nach dem Regen. Darüber hinaus hat Asyncio die Sprache selbst beeinflusst: Native Coroutinen und neue Schlüsselwörter für Async / Warten wurden hinzugefügt. Das letzte Mal, dass 2003 ein neues Keyword hinzugefügt wurde, war es das Yield- Keyword.
Eines der Ziele bei der Erstellung von Asyncio war die extrem einfache Integration in bereits vorhandene asynchrone Frameworks (Twisted, Tornado, Gevent). Die Auswahl der Tools folgt logischerweise aus diesem Ziel: Wenn keine Kompatibilitätsanforderungen bestehen würden, würden Coroutinen wahrscheinlich die Hauptrolle erhalten. Aufgrund der Tatsache, dass es beim Programmieren von Rückrufen unmöglich ist, einen kontinuierlichen Stapel von Anrufen aufrechtzuerhalten, musste ein zusätzliches System an der Grenze zwischen ihnen und den Coroutinen erstellt werden, um die darauf basierenden Sprachfunktionen zu unterstützen.
Nun die Hauptfrage. Warum sollte ein einfacher Benutzer der Bibliothek all dies wissen, was den Empfehlungen aus der Dokumentation folgt und nur Coroutinen und eine API auf hoher Ebene verwendet?
Hier ist eine Dokumentation zur StreamWriter-Klasse

Die Instanz wird von der Funktion asyncio.open_connection zurückgegeben und ist die async / await- API über der Callback-API. Und diese Rückrufe ragen heraus. Die Funktionen write und writelines sind synchron, sie versuchen, in den Socket zu schreiben, und wenn dies fehlschlägt, geben sie die Daten in den zugrunde liegenden Puffer aus und fügen dem Datensatz Rückrufe hinzu. Corutin Drain wird benötigt, um die Möglichkeit zu bieten, zu warten, bis die Datenmenge im Puffer auf den angegebenen Wert abfällt.
Wenn Sie vergessen, zwischen Schreibaufrufen Drain aufzurufen, kann der interne Puffer zu unanständigen Größen anwachsen. Wenn Sie dies jedoch berücksichtigen, bleiben einige unangenehme Momente übrig. Erstens: Wenn der Rückruf im Datensatz "unterbrochen" wird, weiß die Coroutine, die diese API verwendet, nichts davon und kann ihn dementsprechend nicht verarbeiten. Zweitens: Wenn die Coroutine "bricht", weiß der Rückruf zum Datensatz in keiner Weise davon und schreibt weiterhin Daten aus dem Puffer.
Seien Sie also auch mit Coroutinen darauf vorbereitet, dass Rückrufe an sich selbst erinnern.
In diesem Artikel unseres Unternehmensblogs Antida-Software erfahren Sie, wie Sie mit Datenbanken aus asynchronem Code arbeiten .
PS Vielen Dank für die Informationen zu Tippfehlern und Ungenauigkeiten an Benutzer von eirnym , kurb , rasswet