Was ist in Asyncio

In diesem Artikel lade ich den Leser ein, eine mäßig aufregende Reise in die Eingeweide von Asyncio mit mir zu unternehmen , um zu verstehen, wie es die asynchrone Codeausführung implementiert. Wir satteln Rückrufe und stürzen uns durch eine Reihe von Ereignissen durch ein paar wichtige Abstraktionen direkt in die Coroutine. Wenn Ihre Python-Karte diese Attraktionen noch nicht enthält, sind Sie bei cat willkommen.


Für den Anfang - eine kurze Referenz über das Gebiet, das vor uns ausgebreitet ist


asyncio ist eine asynchrone E / A-Bibliothek, die laut pep3153 erstellt wurde, um ein standardisiertes Framework zum Erstellen asynchroner Frameworks bereitzustellen. pep3156 schreibt ihr auch die Notwendigkeit zu, eine äußerst einfache Integration in bereits vorhandene asynchrone Frameworks (Twisted, Tornado, Gevent) bereitzustellen. Wie wir jetzt beobachten können, wurden diese Ziele erfolgreich erreicht - ein neues asyncio-basiertes Framework wurde veröffentlicht: aiohttp ist in Tornado AsyncioMainLoop die Standardereignisschleife ab Version 5.0, in Twisted ist asyncioreactor ab Version 16.5.0 verfügbar, und für Gevent gibt es eine Aiogevent-Bibliothek eines Drittanbieters .


asyncio ist eine Hybridbibliothek, die gleichzeitig zwei Ansätze zur Implementierung der asynchronen Codeausführung verwendet: klassisch bei Rückrufen und relativ neu (zumindest für Python) bei Coroutinen. Es basiert auf drei Hauptabstraktionen, die Analoga von Abstraktionen sind, die in Frameworks von Drittanbietern existieren:


  • Steckbare Ereignisschleife
    Steckbare Ereignisschleife. Pluggable bedeutet, dass es in zwei Codezeilen durch eine andere ersetzt werden kann, die dieselbe Schnittstelle implementiert. Jetzt gibt es Cython-Implementierungen über libuv ( uvloop ) und in Rust ( asyncio-tokio ) .
  • Zukunft
    Das Ergebnis der Operation, die in Zukunft verfügbar sein wird. Es ist erforderlich, das Ergebnis der Rückrufausführung in Coroutinen abzurufen.
  • Aufgabe
    Eine spezielle Unterklasse von Future zum Ausführen von Coroutine in einer Ereignisschleife.

Lass uns gehen!


Der Ereigniszyklus ist die Hauptkomponente der Bibliothek. Auf den durch sie verlaufenden Straßen werden Daten an eine ihrer Komponenten geliefert. Es ist groß und komplex, also betrachten Sie zuerst seine abgespeckte Version.


# ~/inside_asyncio/base_loop.py import collections import random class Loop: def __init__(self): #     self.ready = collections.deque() def call_soon(self, callback, *args): #          self.ready.append((callback, args)) def run_until_complete(self, callback, *args): #         self.call_soon(callback, *args) #    -   #       while self.ready: ntodo = len(self.ready) #      #          for _ in range(ntodo): #       #        callback, args = self.ready.popleft() callback(*args) def callback(loop): print('') loop.call_soon(print, '') loop = Loop() loop.run_until_complete(callback, loop) 

Mit unserem kleinen Rückruf machen wir uns über call_soon auf den Weg, stellen uns in die Warteschlange und nach einer kurzen Wartezeit werden wir angezeigt.


Schlechte Rückruf-Episode


Es ist erwähnenswert, dass Rückrufe gefährliche Pferde sind - wenn sie Sie mitten auf die Straße werfen, kann der Python-Interpreter nicht helfen, zu verstehen, wo dies passiert ist. Wenn Sie mir nicht glauben, fahren Sie auf dem Rückruf von Maybe_print, der ungefähr die Hälfte der Zeit zum Ziel kommt, auf die gleiche Weise.


 # ~/inside_asyncio/base_loop.py def maybe_print(msg): if random.randint(0, 1): raise Exception(msg) else: print(msg) def starting_point(loop): #   print('') loop.call_soon(maybe_print, '') def main(loop): loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) loop = Loop() loop.run_until_complete(main, loop) 

Unten finden Sie den vollständigen Traceback des vorherigen Beispiels. Aufgrund der Tatsache, dass die Funktion " Maybe_print" von einer Ereignisschleife und nicht direkt vom Startpunkt aus gestartet wurde, endet der Traceback in der Methode " run_until_complete " darauf. Mit einem solchen Traceback kann nicht festgestellt werden, wo sich der Startpunkt im Code befindet. Dies erschwert das Debuggen erheblich, wenn sich der Startpunkt an mehreren Stellen in der Codebasis befindet.


 $: python3 base_loop.py >>  #    >>  #    >>  #    >> Traceback (most recent call last): >> File "base_loop.py", line 42, in <module> >> loop.run_until_complete(main, loop) >> File "base_loop.py", line 17, in run_until_complete >> callback(*args) >> File "base_loop.py", line 29, in maybe_print >> raise Exception(msg) >> Exception:  #     

Ein kontinuierlicher Aufrufstapel wird nicht nur benötigt, um den vollständigen Traceback anzuzeigen, sondern auch um andere Sprachfunktionen zu implementieren. Die Ausnahmebehandlung basiert beispielsweise darauf. Das folgende Beispiel funktioniert nicht, da zum Zeitpunkt des Starts von launch_point die Hauptfunktion bereits ausgeführt wird:


 # ~/inside_asyncio/base_loop.py def main(loop): try: loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) except: pass Loop().run_until_complete(main, loop) 

Das folgende Beispiel funktioniert auch nicht. Der Kontextmanager in der Hauptfunktion öffnet und schließt die Datei, bevor die Verarbeitung gestartet wird.


 # ~/inside_asyncio/base_loop.py def main(loop): with open('file.txt', 'rb') as f: loop.call_soon(process_file, f) Loop().run_until_complete(main, loop) #       ,    =( 

Das Fehlen eines kontinuierlichen Aufrufstapels schränkt die Verwendung vertrauter Sprachfunktionen ein. Um diesen Nachteil in Asyncio teilweise zu umgehen, musste ich viel zusätzlichen Code hinzufügen, der nicht direkt mit der zu lösenden Aufgabe zusammenhängt. Dieser Code fehlt größtenteils in den Beispielen - ohne ihn sind sie ziemlich kompliziert.


Vom Zyklus der Ereignisse nach außen und zurück


Die Ereignisschleife kommuniziert über Ereignisse über das Betriebssystem mit der Außenwelt. Code, der weiß, wie man damit arbeitet, wird von einem Standardbibliotheksmodul namens Selektoren bereitgestellt. Sie können dem Betriebssystem mitteilen, dass wir auf ein Ereignis warten, und dann fragen, ob es passiert ist. Im folgenden Beispiel wird als erwartetes Ereignis die Verfügbarkeit des Sockets gelesen.


Ereignisschleife
 # ~/inside_asyncio/event_loop.py import selectors import socket import collections from future import Future from handle import Handle from task import Task class EventLoop: def __init__(self): self.ready = collections.deque() #   self.selector = selectors.DefaultSelector() def add_reader(self, sock, callback): #       # : # , #         #           self.selector.register( sock, socket.EVENT_READ, (self._accept_conn, sock, callback) ) def _accept_conn(self, sock, callback): #    conn, addr = sock.accept() conn.setblocking(False) #      self.selector.register( conn, socket.EVENT_READ, (callback, conn) ) def run_until_complete(self, callback, *args): self.call_soon(callback, *args) #           -  while self.ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self.ready.popleft() callback(*args) #       for key, events in self.selector.select(timeout=0): #          callback, *args = key.data #       self.call_soon(callback, *args) def call_soon(self, callback, *args): self.ready.append((callback, args)) def print_data(conn): print(conn.recv(1000)) def main(loop): #   sock = socket.socket() #     8086  sock.bind(('localhost', 8086)) sock.listen(100) sock.setblocking(False) #      loop.add_reader(sock, print_data) loop = EventLoop() #    loop.run_until_complete(main, loop) 

Ein Bote aus der Außenwelt hinterlässt seine Nachricht oder sein Paket im Selektor und der Selektor gibt es an den Empfänger weiter. Jetzt ist es möglich, mit einer Ereignisschleife aus dem Socket zu lesen. Wenn Sie diesen Code ausführen und eine Verbindung mit netcat herstellen, wird alles, was an ihn gesendet wird, originalgetreu angezeigt.


 $: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n' 

Zu Beginn des Artikels wurde gesagt, dass Asyncio eine Hybridbibliothek ist, in der Coroutinen zusätzlich zu Rückrufen arbeiten. Um diese Funktionalität zu implementieren, werden zwei verbleibende Hauptabstraktionen verwendet: Task und Future . Als nächstes wird der Code dieser Abstraktionen gezeigt, und dann werden unter Verwendung ihres Ereigniszyklus Coroutinen ausgeführt.


Zukunft


Unten finden Sie den Code für die Future-Klasse. Es wird benötigt, damit Sie in corutin warten können, bis der Rückruf abgeschlossen ist, und das Ergebnis erhalten.


Zukunft
 # ~/inside_asyncio/future.py import sys from asyncio import events, CancelledError class Future: #        _state = 'PENDING' # FINISHED, CANCELLED #          Future #          _source_traceback = None #            _callbacks = [] #           _exception = None #           _loop = None #     _result = None def __init__(self, loop): self._loop = loop self._source_traceback = events.extract_stack(sys._getframe(1)) def add_done_callback(self, callback): #        self._callbacks.append(callback) def _schedule_callbacks(self): #         for callback in self._callbacks: self._loop.call_soon(callback, self) self._callbacks[:] = [] #            Future #    -     def set_exception(self, exception): #       self._exception = exception #   self._state = 'FINISHED' #      self._schedule_callbacks() def set_result(self, result): #         self._result = result self._state = 'FINISHED' self._schedule_callbacks() def cancel(self): #       self._state = 'CANCELLED' self._schedule_callbacks() def result(self): #     #         if self._state == 'CANCELLED': raise CancelledError #         if self._exception is not None: raise self._exception #    return self._result def __await__(self): #  ,    await #        if self._state == 'PENDING': yield self #     return self.result() 

Aufgabe


Dies ist eine spezielle Unterklasse von Future . Es wird benötigt, um Coroutinen in der Callback-Ereignisschleife auszuführen.


Aufgabe
 # ~/inside_asyncio/task.py from asyncio import futures from future import Future class Task(Future): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) #    self._coro = coro def _step(self, exc=None): #    ,     try: if exc is None: #        None #        result = self._coro.send(None) else: #        self._coro.throw(exc) except StopIteration: result = None except Exception as exc: self.set_exception(exc) else: #   Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) #     step     elif result is None: self._loop.call_soon(self._step) def _wakeup(self, future): #     Future       Task #   try: future.result() except Exception as exc: self._step(exc) #        Future else: self._step() 

Ein Zyklus von Ereignissen, die mit Future zusammenarbeiten können


EventLoop mit Futures
 # ~/inside_asyncio/future_event_loop.py import selectors from selectors import EVENT_READ, EVENT_WRITE import socket import collections from future import Future from task import Task class EventLoop: def __init__(self): self._ready = collections.deque() self.selector = selectors.DefaultSelector() def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): callback, *args = key.data self.call_soon(callback, *args) def call_soon(self, callback, *args): self._ready.append((callback, args)) #      Future def sock_accept(self, sock, fut=None): #       #  Future     fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: #    #   Future    fut.set_result((conn, address)) self.selector.unregister(sock) return fut def sock_recv(self, sock, n, fut=None): #       #      , #     ,      fut = fut if fut else Future(loop=self) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.selector.register( sock, EVENT_READ, (self.sock_recv, sock, n, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: fut.set_result(data) self.selector.unregister(sock) return fut async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    conn, addr = await loop.sock_accept(sock) #     result = await loop.sock_recv(conn, 1000) print(result) loop = EventLoop() #    Task task = Task(coro=main(loop), loop=loop) #         loop.run_until_complete(task._step) 

Lass uns weitermachen


Nun wollen wir sehen, wie Coroutine Main ausgeführt wird:


Erfüllung
 __________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step    self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #     callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: #  None   result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): #      #   sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #         conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): #   Future fut = fut if fut else Future(loop=self) try: #     conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): #     #      #   Future    self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) #  Future   return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __await__  Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): #   Future         if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: #           None result = self._coro.send(None) # result = fut -------------------------------- else: #  Future      # wakeup      if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) #      -    Task  Future #    #               ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): #    callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: #    conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: #   Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): #   self._result = result #   self._state = 'FINISHED' #      self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: #         task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() #      #    task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: #   Future      task._step self._step() def _step(self, exc=None): try: if exc is None: #       None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #   await   __awai__   conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self #   Future    return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) #    Future    conn  addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) 

Auf diese einfache Weise führt Asyncio Coroutinen durch.


Zusammenfassung


Das Ziel, Asyncio zu erstellen, wurde erfolgreich erreicht. Dies löste nicht nur das Kompatibilitätsproblem, sondern führte auch zu einem enormen Anstieg des Interesses an wettbewerbsfähiger Programmierung in der Community. Neue Artikel und Bibliotheken tauchten auf, wie Pilze nach dem Regen. Darüber hinaus hat Asyncio die Sprache selbst beeinflusst: Native Coroutinen und neue Schlüsselwörter für Async / Warten wurden hinzugefügt. Das letzte Mal, dass 2003 ein neues Keyword hinzugefügt wurde, war es das Yield- Keyword.


Eines der Ziele bei der Erstellung von Asyncio war die extrem einfache Integration in bereits vorhandene asynchrone Frameworks (Twisted, Tornado, Gevent). Die Auswahl der Tools folgt logischerweise aus diesem Ziel: Wenn keine Kompatibilitätsanforderungen bestehen würden, würden Coroutinen wahrscheinlich die Hauptrolle erhalten. Aufgrund der Tatsache, dass es beim Programmieren von Rückrufen unmöglich ist, einen kontinuierlichen Stapel von Anrufen aufrechtzuerhalten, musste ein zusätzliches System an der Grenze zwischen ihnen und den Coroutinen erstellt werden, um die darauf basierenden Sprachfunktionen zu unterstützen.


Nun die Hauptfrage. Warum sollte ein einfacher Benutzer der Bibliothek all dies wissen, was den Empfehlungen aus der Dokumentation folgt und nur Coroutinen und eine API auf hoher Ebene verwendet?
Hier ist eine Dokumentation zur StreamWriter-Klasse



Die Instanz wird von der Funktion asyncio.open_connection zurückgegeben und ist die async / await- API über der Callback-API. Und diese Rückrufe ragen heraus. Die Funktionen write und writelines sind synchron, sie versuchen, in den Socket zu schreiben, und wenn dies fehlschlägt, geben sie die Daten in den zugrunde liegenden Puffer aus und fügen dem Datensatz Rückrufe hinzu. Corutin Drain wird benötigt, um die Möglichkeit zu bieten, zu warten, bis die Datenmenge im Puffer auf den angegebenen Wert abfällt.


Wenn Sie vergessen, zwischen Schreibaufrufen Drain aufzurufen, kann der interne Puffer zu unanständigen Größen anwachsen. Wenn Sie dies jedoch berücksichtigen, bleiben einige unangenehme Momente übrig. Erstens: Wenn der Rückruf im Datensatz "unterbrochen" wird, weiß die Coroutine, die diese API verwendet, nichts davon und kann ihn dementsprechend nicht verarbeiten. Zweitens: Wenn die Coroutine "bricht", weiß der Rückruf zum Datensatz in keiner Weise davon und schreibt weiterhin Daten aus dem Puffer.


Seien Sie also auch mit Coroutinen darauf vorbereitet, dass Rückrufe an sich selbst erinnern.


In diesem Artikel unseres Unternehmensblogs Antida-Software erfahren Sie, wie Sie mit Datenbanken aus asynchronem Code arbeiten .


PS Vielen Dank für die Informationen zu Tippfehlern und Ungenauigkeiten an Benutzer von eirnym , kurb , rasswet

Source: https://habr.com/ru/post/de453348/


All Articles