Python 3.5 Implementieren der Parallelität mit asyncio

Übersetzung von Kapitel 13 Parallelität
aus dem Buch 'Expert Python Programming',
Zweite Auflage
Michał Jaworski & Tarek Ziadé, 2016

Asynchrone Programmierung


In den letzten Jahren hat die asynchrone Programmierung eine große Popularität erlangt. Python 3.5 hat endlich einige Syntaxfunktionen, die die Konzepte asynchroner Lösungen unterstützen. Dies bedeutet jedoch nicht, dass eine asynchrone Programmierung erst seit Python 3.5 möglich ist. Viele Bibliotheken und Frameworks wurden bereits viel früher bereitgestellt, und die meisten stammten aus älteren Versionen von Python 2. Es gibt sogar eine alternative Implementierung von Python namens Stackless (siehe Kapitel 1, „Der aktuelle Status von Python“), die sich auf diesen einzigen Programmieransatz konzentriert. Für einige Lösungen, wie Twisted, Tornado oder Eventlet , gibt es noch aktive Communities, die es wirklich wert sind, kennen zu lernen . In jedem Fall ist die asynchrone Programmierung seit Python 3.5 so einfach wie nie zuvor. Daher wird erwartet, dass die integrierten asynchronen Funktionen die meisten alten Tools ersetzen, oder dass externe Projekte allmählich zu einer Art übergeordneten Frameworks werden, die auf integriertem Python basieren.

Wenn Sie versuchen zu erklären, was asynchrone Programmierung ist, ist es am einfachsten, sich diesen Ansatz als Thread-ähnlichen Ansatz vorzustellen, jedoch ohne einen System-Scheduler. Dies bedeutet, dass ein asynchrones Programm gleichzeitig Aufgaben verarbeiten kann, sein Kontext jedoch intern und nicht vom System-Scheduler umgeschaltet wird.

Natürlich verwenden wir keine Threads für die parallele Verarbeitung von Aufgaben in einem asynchronen Programm. Die meisten Lösungen verwenden unterschiedliche Konzepte und werden je nach Implementierung unterschiedlich aufgerufen. Einige Beispiele für Namen, die zur Beschreibung solcher paralleler Programmobjekte verwendet werden, sind:

  • Grüne Fäden - Grüne Fäden (Greenlet-, Ereignis- oder Eventlet-Projekte)
  • Coroutinen - Coroutinen (reine asynchrone Programmierung in Python 3.5)
  • Tasklets (Stackless Python) Hierbei handelt es sich im Grunde genommen um dieselben Konzepte, die jedoch häufig auf leicht unterschiedliche Weise implementiert werden.

Aus offensichtlichen Gründen konzentrieren wir uns in diesem Abschnitt nur auf Coroutinen, die ab Version 3.5 von Python unterstützt werden.

Kollaboratives Multitasking und asynchrone E / A


Kollaboratives Multitasking ist der Kern der asynchronen Programmierung. In diesem Sinne ist Multitasking im Betriebssystem nicht erforderlich, um einen Kontextwechsel (zu einem anderen Prozess oder Thread) auszulösen, sondern jeder Prozess gibt freiwillig die Kontrolle frei, wenn er sich im Standby-Modus befindet, um die gleichzeitige Ausführung mehrerer Programme sicherzustellen. Deshalb nennt man es kollaborativ. Alle Prozesse müssen zusammenarbeiten, um sicherzustellen, dass Multitasking erfolgreich ist.

Das Multitasking-Modell wurde manchmal in Betriebssystemen verwendet, kann aber jetzt kaum noch als Lösung auf Systemebene gefunden werden. Dies liegt daran, dass die Gefahr besteht, dass ein schlecht konzipierter Dienst die Stabilität des gesamten Systems leicht stört. Das Planen von Threads und Prozessen mithilfe von Kontextwechseln, die direkt vom Betriebssystem gesteuert werden, ist derzeit der dominierende Ansatz für die Parallelität auf Systemebene. Kollaboratives Multitasking ist jedoch immer noch ein hervorragendes Tool für den gemeinsamen Zugriff auf Anwendungsebene.

Apropos gemeinsames Multitasking auf Anwendungsebene: Wir haben es nicht mit Threads oder Prozessen zu tun, die die Kontrolle freigeben müssen, da die gesamte Ausführung in einem Prozess und Thread enthalten ist. Stattdessen haben wir mehrere Aufgaben (Coroutinen, Tasklets und grüne Fäden), die die Kontrolle auf eine einzige Funktion übertragen, die die Koordination der Aufgaben steuert. Diese Funktion ist normalerweise eine Art Ereignisschleife.

Um (aufgrund der Python-Terminologie) Verwirrung zu vermeiden, werden wir jetzt solche parallelen Aufgaben Coroutinen nennen. Das wichtigste Problem beim kollaborativen Multitasking ist die Übertragung der Kontrolle. In den meisten asynchronen Anwendungen wird die Steuerung während der E / A-Vorgänge an den Scheduler oder die Ereignisschleife übergeben. Unabhängig davon, ob das Programm Daten aus dem Dateisystem liest oder über einen Socket kommuniziert, ist eine solche E / A-Operation immer mit einer Wartezeit verbunden, wenn der Prozess inaktiv wird. Die Latenz hängt von einer externen Ressource ab, daher ist dies eine gute Gelegenheit, die Kontrolle über andere Coroutinen freizugeben, bis diese darauf warten müssen, dass sich dieses Vorgehen in etwa so verhält, wie Multithreading in Python implementiert wird. Wir wissen, dass die GIL Python-Threads serialisiert, sie wird jedoch auch bei jeder E / A-Operation freigegeben. Der Hauptunterschied besteht darin, dass Threads in Python als Threads auf Systemebene implementiert sind, sodass das Betriebssystem den aktuell ausgeführten Thread jederzeit entladen und die Steuerung auf einen anderen übertragen kann.

Bei der asynchronen Programmierung werden Aufgaben niemals durch die Hauptereignisschleife unterbrochen. Aus diesem Grund wird dieser Multitasking-Stil auch als Multitasking ohne Priorität bezeichnet.

Natürlich läuft jede Python-Anwendung auf einem Betriebssystem, auf dem andere Prozesse um Ressourcen konkurrieren. Dies bedeutet, dass das Betriebssystem immer das Recht hat, den gesamten Prozess auszulagern und die Kontrolle auf einen anderen zu übertragen. Wenn unsere asynchrone Anwendung jedoch wieder gestartet wird, wird sie an der Stelle fortgesetzt, an der sie beim Eingreifen des System-Schedulers angehalten wurde. Aus diesem Grund gelten Koroutinen in diesem Zusammenhang als nicht überfüllt.

Python asynchrone und warten Schlüsselwörter


Die Schlüsselwörter async und await sind die Hauptbausteine ​​der asynchronen Python-Programmierung.

Das vor der def- Anweisung verwendete Schlüsselwort async definiert eine neue Coroutine. Eine Coroutine-Funktion kann unter genau definierten Umständen ausgesetzt und wieder aufgenommen werden. Die Syntax und das Verhalten sind den Generatoren sehr ähnlich (siehe Kapitel 2, „Syntaxempfehlungen“, unter der Klassenebene). Tatsächlich sollten Generatoren in älteren Versionen von Python verwendet werden, um Coroutinen zu implementieren. Hier ist eine Beispieldeklaration einer Funktion, die das Schlüsselwort async verwendet :

async def async_hello(): print("hello, world!") 

Funktionen, die mit dem Schlüsselwort async definiert wurden, sind speziell. Beim Aufruf führen sie keinen Code im Inneren aus, sondern geben stattdessen ein Coroutine-Objekt zurück:

 >>>> async def async_hello(): ... print("hello, world!") ... >>> async_hello() <coroutine object async_hello at 0x1014129e8> 

Das Coroutine-Objekt tut nichts, bis seine Ausführung in der Ereignisschleife geplant ist. Das Asyncio-Modul ist verfügbar, um eine grundlegende Implementierung der Ereignisschleife sowie viele andere asynchrone Dienstprogramme bereitzustellen:

 >>> import asyncio >>> async def async_hello(): ... print("hello, world!") ... >>> loop = asyncio.get_event_loop() >>> loop.run_until_complete(async_hello()) hello, world! >>> loop.close() 

Wenn wir nur eine einfache Koroutine erstellen, implementieren wir in unserem Programm natürlich keine Parallelität. Um etwas wirklich Paralleles zu sehen, müssen wir mehr Aufgaben erstellen, die von einer Ereignisschleife ausgeführt werden.

Neue Tasks können der Schleife hinzugefügt werden, indem die loop.create_task () -Methode aufgerufen oder ein anderes Objekt bereitgestellt wird, das auf die Verwendung der asyncio.wait () -Funktion wartet. Wir werden den letzteren Ansatz verwenden und versuchen, eine mit der range () -Funktion erzeugte Folge von Zahlen asynchron zu drucken:

 import asyncio async def print_number(number): print(number) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete( asyncio.wait([ print_number(number) for number in range(10) ]) ) loop.close() 

Die Funktion asyncio.wait () akzeptiert eine Liste von Coroutine-Objekten und kehrt sofort zurück. Das Ergebnis ist ein Generator, der Objekte erzeugt, die zukünftige Ergebnisse darstellen (Futures). Wie der Name schon sagt, wird es verwendet, um auf den Abschluss aller bereitgestellten Coroutinen zu warten. Der Grund, warum ein Generator anstelle eines Coroutine-Objekts zurückgegeben wird, liegt darin, dass es mit früheren Versionen von Python abwärtskompatibel ist, was später erläutert wird. Das Ergebnis der Ausführung dieses Skripts kann wie folgt aussehen:

 $ python asyncprint.py 0 7 8 3 9 4 1 5 2 6 

Wie wir sehen können, werden die Zahlen nicht in der Reihenfolge gedruckt, in der wir unsere Koroutinen erstellt haben. Aber genau das wollten wir erreichen.

Das zweite wichtige Schlüsselwort, das in Python 3.5 hinzugefügt wurde, wird erwartet . Es wird verwendet, um auf die Ergebnisse einer Coroutine oder eines zukünftigen Ereignisses zu warten (wird später erläutert) und die Kontrolle über die Ausführung in der Ereignisschleife freizugeben. Um besser zu verstehen, wie dies funktioniert, müssen wir ein komplexeres Codebeispiel betrachten.

Angenommen, wir möchten zwei Coroutinen erstellen, die einige einfache Aufgaben in einer Schleife ausführen:

  • Warten Sie eine zufällige Anzahl von Sekunden
  • Geben Sie einen als Argument angegebenen Text und die Wartezeit aus. Beginnen wir mit einer einfachen Implementierung, die einige Parallelitätsprobleme aufweist, die wir später mit der zusätzlichen Verwendung von await verbessern wollen:

     import time import random import asyncio async def waiter(name): for _ in range(4): time_to_sleep = random.randint(1, 3) / 4 time.sleep(time_to_sleep) print( "{} waited {} seconds" "".format(name, time_to_sleep) ) async def main(): await asyncio.wait([waiter("foo"), waiter("bar")]) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() 

Bei der Ausführung im Terminal (mit dem Befehl time zum Messen der Zeit) sehen Sie Folgendes:

 $ time python corowait.py bar waited 0.25 seconds bar waited 0.25 seconds bar waited 0.5 seconds bar waited 0.5 seconds foo waited 0.75 seconds foo waited 0.75 seconds foo waited 0.25 seconds foo waited 0.25 seconds real 0m3.734s user 0m0.153s sys 0m0.028s 


Wie wir sehen können, haben beide Coroutinen ihre Ausführung abgeschlossen, jedoch nicht asynchron. Der Grund dafür ist, dass beide die Funktion time.sleep () verwenden, die das Steuerelement in der Ereignisschleife sperrt, aber nicht freigibt . Dies funktioniert in einer Multithread-Installation besser, wir möchten jedoch derzeit keine Streams verwenden. Wie können wir das beheben?

Die Antwort lautet: Verwenden Sie asyncio.sleep () , eine asynchrone Version von time.sleep (), und erwarten Sie das Ergebnis mit dem Schlüsselwort await . Wir haben diese Anweisung bereits in der ersten Version von main () verwendet , dies diente jedoch nur der besseren Übersichtlichkeit des Codes. Dies hat unsere Implementierung eindeutig nicht paralleler gemacht. Schauen wir uns eine verbesserte Version der waiter () - Coroutine an, die await asyncio.sleep () verwendet:

 async def waiter(name): for _ in range(4): time_to_sleep = random.randint(1, 3) / 4 await asyncio.sleep(time_to_sleep) print( "{} waited {} seconds" "".format(name, time_to_sleep) ) 


Wenn Sie das aktualisierte Skript ausführen, werden Sie sehen, wie sich die Ausgabe von zwei Funktionen abwechselt:

 $ time python corowait_improved.py bar waited 0.25 seconds foo waited 0.25 seconds bar waited 0.25 seconds foo waited 0.5 seconds foo waited 0.25 seconds bar waited 0.75 seconds foo waited 0.25 seconds bar waited 0.5 seconds real 0m1.953s user 0m0.149s sys 0m0.026s 


Ein zusätzlicher Vorteil dieser einfachen Verbesserung ist, dass der Code schneller ausgeführt wird. Die Gesamtausführungszeit war kürzer als die Summe aller Schlafzeiten, da die Koroutinen nacheinander die Kontrolle übernahmen.

Asyncio in früheren Versionen von Python


Das Asyncio-Modul erschien in Python 3.4. Dies ist also die einzige Version von Python, die ernsthafte Unterstützung für die asynchrone Programmierung vor Python 3.5 bietet. Leider scheinen diese beiden nachfolgenden Versionen ausreichend zu sein, um Kompatibilitätsprobleme aufzuzeigen.

Wie auch immer, der asynchrone Programmierkern in Python wurde früher eingeführt als die Syntaxelemente, die diese Vorlage unterstützen. Besser spät als nie, aber dies führte zu einer Situation, in der es zwei Syntaxen für die Arbeit mit Coroutinen gibt.

Ab Python 3.5 können Sie async verwenden und auf Folgendes warten :

 async def main (): await asyncio.sleep(0) 


In Python 3.4 müssen Sie jedoch zusätzlich den Dekorator asyncio.coroutine anwenden und im Coroutine-Text ausgeben:

 @asyncio.couroutine def main(): yield from asyncio.sleep(0) 


Eine weitere nützliche Tatsache ist, dass die Ausgabe von statement in Python 3.3 eingeführt wurde und PyPI einen asynchronen Backport hat. Dies bedeutet, dass Sie diese Implementierung des kollaborativen Multitasking auch mit Python 3.3 verwenden können.

Ein praktisches Beispiel für asynchrone Programmierung


Wie in diesem Kapitel oft erwähnt, ist die asynchrone Programmierung ein hervorragendes Werkzeug für die Handhabung von E / A. Es ist an der Zeit, etwas Praktischeres als nur das Drucken von Sequenzen oder das asynchrone Warten zu schaffen.

Um die Konsistenz zu gewährleisten, werden wir versuchen, dasselbe Problem zu lösen, das wir mit Hilfe von Multithreading und Multiprocessing gelöst haben. Aus diesem Grund werden wir versuchen, einige Daten über eine Netzwerkverbindung asynchron aus externen Ressourcen zu extrahieren. Es wäre großartig, wenn wir das gleiche python-gmaps- Paket wie in den vorherigen Abschnitten verwenden könnten. Leider können wir nicht.

Der Schöpfer von Python-Gmaps war etwas faul und nahm nur den Namen. Um die Entwicklung zu vereinfachen, wählte er das Anforderungspaket als seine HTTP-Client-Bibliothek. Leider unterstützen Anforderungen keine asynchrone E / A mit Async und warten . Es gibt einige andere Projekte, die darauf abzielen, eine gewisse Parallelität für das Abfrageprojekt bereitzustellen, aber sie basieren entweder auf Gevent ( grequests , siehe https://github.com/ kennethreitz / grequests ) oder führen einen Thread- / Prozesspool aus (query-futures) siehe github.com/ross/requests-futures ). Keiner von ihnen löst unser Problem.

Beruhige dich, bevor ich mich beschuldige, einen unschuldigen Open-Source-Entwickler beschimpft zu haben. Die Person hinter dem Paket python-gmaps bin ich. Eine schlechte Wahl der Abhängigkeiten ist eines der Probleme dieses Projekts. Ich kritisiere mich nur gerne von Zeit zu Zeit öffentlich. Dies wird eine bittere Lektion für mich sein, da Python-gmaps in der neuesten Version (0.3.1 zum Zeitpunkt des Schreibens) nicht einfach in Pythons asynchrones I / O integriert werden kann. In jedem Fall kann sich dies in Zukunft ändern, sodass nichts verloren geht.
Da wir die Einschränkungen der Bibliothek kennen, die in den vorherigen Beispielen so einfach zu verwenden war, müssen wir etwas erstellen, das diese Lücke füllt. Google MapsAPI ist sehr einfach zu bedienen. Wir werden daher zur Veranschaulichung ein asynchrones Hilfsprogramm entwickeln. In der Standardbibliothek von Python 3.5 fehlt noch eine Bibliothek, die asynchrone HTTP-Anforderungen so einfach ausführen kann wie das Aufrufen von urllib.urlopen () . Wir möchten definitiv keine vollständige Protokollunterstützung von Grund auf erstellen, daher werden wir eine kleine Hilfe aus dem in PyPI verfügbaren aiohttp- Paket verwenden. Dies ist eine vielversprechende Bibliothek, die sowohl Client- als auch Serverimplementierungen für asynchrones HTTP hinzufügt. Hier ist ein kleines Modul, das auf aiohttp aufbaut und eine Geocode () - Hilfsfunktion erstellt, mit der Geocodierungsanforderungen an den Google Maps-API-Dienst ausgeführt werden:

 import aiohttp session = aiohttp.ClientSession() async def geocode(place): params = { 'sensor': 'false', 'address': place } async with session.get( 'https://maps.googleapis.com/maps/api/geocode/json', params=params ) as response: result = await response.json() return result['results'] 


Nehmen wir an, dass dieser Code in einem Modul mit dem Namen asyncgmaps gespeichert ist, das wir später verwenden werden. Jetzt können wir das Beispiel für Multithreading und Multiprocessing neu schreiben. Bisher haben wir den gesamten Vorgang in zwei separate Phasen unterteilt:

  1. Erfüllen Sie alle Anfragen an den externen Dienst parallel mit der Funktion fetch_place () .
  2. Zeigen Sie mit der Funktion present_result () alle Ergebnisse in einer Schleife an.

Da sich kollaboratives Multitasking jedoch grundlegend von der Verwendung mehrerer Prozesse oder Threads unterscheidet, können wir unseren Ansatz leicht ändern. Die meisten Probleme, die bei der Verwendung eines einzelnen Threads pro Element auftreten, sind nicht länger unser Anliegen.
Coroutinen sind nicht präemptiv, daher können wir die Ergebnisse sofort nach Erhalt der HTTP-Antworten problemlos anzeigen. Dies wird unseren Code vereinfachen und verständlicher machen:

 import asyncio # note: local module introduced earlier from asyncgmaps import geocode, session PLACES = ( 'Reykjavik', 'Vien', 'Zadar', 'Venice', 'Wrocław', 'Bolognia', 'Berlin', 'Słubice', 'New York', 'Dehli', ) async def fetch_place(place): return (await geocode(place))[0] async def present_result(result): geocoded = await result print("{:>25s}, {:6.2f}, {:6.2f}".format( geocoded['formatted_address'], geocoded['geometry']['location']['lat'], geocoded['geometry']['location']['lng'], )) async def main(): await asyncio.wait([ present_result(fetch_place(place)) for place in PLACES ]) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) # aiohttp will raise issue about unclosed # ClientSession so we perform cleanup manually loop.run_until_complete(session.close()) loop.close() 


Die asynchrone Programmierung eignet sich hervorragend für Back-End-Entwickler, die skalierbare Anwendungen erstellen möchten. In der Praxis ist dies eines der wichtigsten Tools für die Erstellung von Servern mit hohem Wettbewerbsdruck.

Aber die Realität ist traurig. Viele beliebte Pakete, die sich mit E / A-Problemen befassen, sind nicht für die Verwendung mit asynchronem Code vorgesehen. Die Hauptgründe dafür sind:

  • Noch immer wenig Implementierung von Python 3 und einigen seiner erweiterten Funktionen
  • Geringes Verständnis für verschiedene Nebenläufigkeitskonzepte bei Anfängern zum Erlernen von Python

Dies bedeutet, dass die Migration vorhandener synchroner Multithread-Anwendungen und -Pakete häufig entweder nicht möglich (aufgrund architektonischer Einschränkungen) oder zu teuer ist. Viele Projekte könnten von der Implementierung des asynchronen Multitasking-Stils stark profitieren, aber nur wenige werden dies letztendlich tun. Dies bedeutet, dass Sie von Anfang an Schwierigkeiten haben werden, asynchrone Anwendungen zu erstellen. In den meisten Fällen ähnelt dies dem im Abschnitt „Praktisches Beispiel für asynchrone Programmierung“ genannten Problem: Inkompatible Schnittstellen und nicht synchrones Blockieren von E / A-Vorgängen. Natürlich kann es manchmal vorkommen, dass Sie das Warten aufgeben, wenn eine solche Inkompatibilität auftritt, und nur synchron die erforderlichen Ressourcen abrufen. Dies verhindert jedoch, dass sich die Coroutine gegenseitig den Code ausführt, während Sie auf die Ergebnisse warten. Technisch funktioniert dies, zerstört aber auch alle Vorteile der asynchronen Programmierung. Daher ist die Kombination von asynchroner E / A mit synchroner E / A letztendlich keine Option. Dies ist ein Alles-oder-Nichts-Spiel.

Ein weiteres Problem sind langwierige prozessorgebundene Operationen. Wenn Sie eine E / A-Operation ausführen, ist es kein Problem, die Steuerung von einer Coroutine freizugeben. Wenn Sie von einem Dateisystem oder Socket aus schreiben / lesen, werden Sie eventuell warten, sodass ein Aufruf mit wait das Beste ist, was Sie tun können. Aber was ist, wenn Sie etwas berechnen müssen und wissen, dass es einige Zeit dauern wird? Natürlich können Sie das Problem in Teile aufteilen und die Kontrolle jedes Mal abbrechen, wenn Sie die Arbeit ein wenig vorantreiben. Aber bald werden Sie feststellen, dass dies kein sehr gutes Modell ist. So etwas kann den Code unordentlich machen und garantiert auch keine guten Ergebnisse.

Die zeitliche Bindung sollte in der Verantwortung des Interpreters oder des Betriebssystems liegen.

Kombination von asynchronem Code mit asynchronen Zukünften


Was ist also zu tun, wenn Sie Code haben, der lange synchrone E / A-Vorgänge ausführt, die Sie nicht umschreiben können oder wollen? Oder was tun, wenn Sie in einer Anwendung, die hauptsächlich für asynchrone E / A konzipiert ist, einige schwere Prozessoroperationen ausführen müssen? Nun ... Sie müssen eine Problemumgehung finden. Und damit meine ich Multithreading oder Multiprocessing.

Das hört sich vielleicht nicht sehr gut an, aber manchmal ist die beste Lösung das, wovon wir versucht haben wegzukommen. Die parallele Verarbeitung von ressourcenintensiven Aufgaben in Python wird aufgrund der Mehrfachverarbeitung immer besser ausgeführt. Multithreading kann E / A-Vorgänge gleichermaßen gut (schnell und ohne große Ressourcen) verarbeiten, da es asynchron ist und wartet, wenn es ordnungsgemäß konfiguriert und sorgfältig behandelt wird.

Wenn Sie also manchmal nicht wissen, was zu tun ist, wenn etwas nicht in Ihre asynchrone Anwendung passt, verwenden Sie einen Code, der es in einen separaten Thread oder Prozess stellt. Sie können so tun, als ob es sich um eine Koroutine handelt, die Steuerung für die Ereignisschleife freigeben und die Ergebnisse schließlich verarbeiten, wenn sie bereit sind.

Glücklicherweise bietet die Python-Standardbibliothek das concurrent.futures- Modul, das auch in das asyncio- Modul integriert ist. Zusammen ermöglichen diese beiden Module das Planen von Blockierungsfunktionen, die in Threads oder zusätzlichen Prozessen ausgeführt werden, als wären sie asynchrone, nicht blockierende Coroutinen.

Executors und Futures


Bevor wir uns mit dem Einbetten von Threads oder Prozessen in eine asynchrone Ereignisschleife befassen, schauen wir uns das concurrent.futures- Modul genauer an, das später die Hauptkomponente unserer so genannten Problemumgehung sein wird.

Die wichtigsten Klassen im Modul concurrent.futures sind Executor und Future .

Executor ist ein Ressourcenpool, der Workitems parallel verarbeiten kann. Es mag in seinem Zweck den Klassen des Multiprozessor-Moduls - Pool und dummy.Pool - sehr ähnlich erscheinen , hat aber eine völlig andere Schnittstelle und Semantik. Dies ist eine Basisklasse, die nicht implementiert werden soll und zwei spezifische Implementierungen aufweist:

  • ThreadPoolExecutor : repräsentiert einen Thread-Pool
  • ProcessPoolExecutor : repräsentiert einen Prozesspool

Jeder Executor stellt drei Methoden vor:

  • submit (fn, * args, ** kwargs) : Plant die Ausführung der Funktion fn im Ressourcenpool und gibt ein Future-Objekt zurück, das die Ausführung des aufgerufenen Objekts darstellt
  • map (func, * iterables, timeout = None, chunksize = 1) : Die Funktion func wird bei der Iteration ähnlich wie bei der Mehrfachverarbeitung ausgeführt. Pool.map () - Methode
  • shutdown (wait = True) : Hiermit wird der Executor heruntergefahren und alle seine Ressourcen freigegeben .

Die interessanteste Methode ist submit (), da das Future-Objekt zurückgegeben wird. Es repräsentiert die asynchrone Ausführung des aufgerufenen und nur indirekt dessen Ergebnis. Um den tatsächlichen Rückgabewert des versendeten aufgerufenen Objekts abzurufen , müssen Sie die Future.result () -Methode aufrufen . Und wenn das aufgerufene Objekt bereits abgeschlossen ist, blockiert die Methode result () es nicht und gibt einfach die Ausgabe der Funktion zurück. Ist dies nicht der Fall, blockiert er es, bis das Ergebnis fertig ist. Stellen Sie es sich als Versprechen eines Ergebnisses vor (es ist eigentlich dasselbe Konzept wie ein Versprechen in JavaScript). Sie müssen es nicht sofort nach Erhalt entpacken (mithilfe der result () - Methode), aber wenn Sie dies versuchen, wird es garantiert irgendwann etwas zurückgeben:

 >>> def loudy_return(): ... print("processing") ... return 42 ... >>> from concurrent.futures import ThreadPoolExecutor >>> with ThreadPoolExecutor(1) as executor: ... future = executor.submit(loudy_return) ... processing >>> future <Future at 0x33cbf98 state=finished returned int> >>> future.result() 42 


Wenn Sie die Executor.map () -Methode verwenden möchten, unterscheidet sie sich nicht von der Pool.map () -Methode der Pool- Klasse des Multiprozessor-Moduls:

 def main(): with ThreadPoolExecutor(POOL_SIZE) as pool: results = pool.map(fetch_place, PLACES) for result in results: present_result(result) 


Verwenden von Executor in einer Ereignisschleife


Die von der Executor.submit () -Methode zurückgegebenen Instanzen der Future-Klasse sind konzeptionell den bei der asynchronen Programmierung verwendeten Coroutinen sehr ähnlich. Aus diesem Grund können wir Künstler einsetzen, um eine Mischung aus kollaborativem Multitasking und Multiprocessing oder Multithreading zu erstellen.

Der Kern dieser Problemumgehung ist die BaseEventLoop.run_in_executor-Methode (executor, func, * args) der Ereignisschleifenklasse . Auf diese Weise können Sie die Ausführung der func-Funktion in einem durch das executor-Argument dargestellten Prozess oder Thread-Pool planen. Das Wichtigste an dieser Methode ist, dass sie das neue erwartete Objekt zurückgibt (das Objekt, das mit dem Operator await erwartet werden kann). Auf diese Weise können Sie eine Blockierungsfunktion ausführen, die nicht genau wie eine Coroutine ist, und die nicht blockiert, egal wie lange es dauert, bis sie abgeschlossen ist. Es wird nur die Funktion gestoppt, die Ergebnisse von einem solchen Aufruf erwartet, aber der gesamte Zyklus von Ereignissen wird fortgesetzt.

Und eine nützliche Tatsache ist, dass Sie nicht einmal Ihre eigene Executor-Instanz erstellen müssen. Wenn Sie None als Argument an executor übergeben , wird die ThreadPoolExecutor- Klasse mit der Standardanzahl von Threads verwendet (für Python 3.5 ist dies die Anzahl der Prozessoren multipliziert mit 5).

Nehmen wir also an, wir wollten den problematischen Teil des python-gmaps-Pakets, der unsere Kopfschmerzen verursacht hat, nicht umschreiben. Wir können einen blockierenden Aufruf an einen separaten Thread einfach verschieben, indem wir loop.run_in_executor () aufrufen und dabei die Funktion fetch_place () als erwartete Coroutine belassen:

 async def fetch_place(place): coro = loop.run_in_executor(None, api.geocode, place) result = await coro return result[0] 


Eine solche Lösung ist schlimmer als eine vollständig asynchrone Bibliothek, aber Sie wissen, dass zumindest etwas besser ist als nichts.

Nachdem wir erklärt hatten, was Parallelität wirklich ist, haben wir Maßnahmen ergriffen und eines der typischen Parallelprobleme mithilfe von Multithreading analysiert. Nachdem wir die Hauptmängel unseres Codes identifiziert und korrigiert hatten, wandten wir uns der Mehrfachverarbeitung zu, um zu sehen, wie dies in unserem Fall funktionieren würde.

Danach stellten wir fest, dass die Verwendung mehrerer Prozesse mit einem Multiprozessor-Modul viel einfacher ist als grundlegende Threads mit Multithreading. Erst danach wurde uns klar, dass wir dank multiprocessing.dummy dieselbe API für Threads verwenden können . , , , .

, - , , / , , . , , , !

Und das führt uns zum endgültigen Abschluss dieses Kapitels. Es gibt keine Lösung, die allen zusagt. Es gibt verschiedene Ansätze, die Sie bevorzugen oder bevorzugen. Es gibt einige Ansätze, die für diese Problematik besser geeignet sind, aber Sie müssen alle kennen, um erfolgreich zu sein. In realistischen Szenarien können Sie das gesamte Arsenal an Werkzeugen und Parallelitätsstilen in einer Anwendung verwenden. Dies ist keine Seltenheit.

Die vorherige Schlussfolgerung ist eine hervorragende Einführung in das Thema des nächsten Kapitels, Kapitel 14 „Nützliche Entwurfsmuster“. Da es keine einzige Vorlage gibt, die alle Ihre Probleme löst. Sie sollten so viel wie möglich wissen, denn letztendlich werden Sie sie jeden Tag verwenden.

Source: https://habr.com/ru/post/de484446/


All Articles