Warum ist eine Rekonstruktion erforderlich, wenn der Quellcode für diese Klasse geöffnet ist?
Schon allein deshalb, weil sich unter der Haube ein hochoptimierter, schwer lesbarer Code befindet, dessen Studium pädagogisch wenig aussagt.
Daher werden wir die Semantik von Operationen gemäß ihren Spezifikationen neu erstellen und funktional äquivalenten, verständlichen und lesbaren Code schreiben, obwohl dies hinsichtlich Speicherverbrauch und Prozessorzeit möglicherweise nicht der wirtschaftlichste ist.
Beginnen wir mit einer relativ einfachen Methode:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture
Lesen Sie die Spezifikation sorgfältig durch:
Returns a new CompletableFuture
Das heißt, ein Objekt vom Typ CompletableFuture
oder seine Unterklasse wird erstellt und als Ergebnis zurückgegeben.
that is asynchronously completed by a task running in the given executor`
Zusätzlich wird eine Aufgabe ausgeführt, die auf Executor
'e ausgeführt wird.
Wie wir wissen, akzeptiert Executor
nur Runnable
Objekte.
Runnable ist eine Schnittstelle, die möglicherweise vom ersten Objekt implementiert wird. Daher werden zwei Funktionen in einem Objekt kombiniert.
completed ... with the value obtained by calling the given Supplier.
Diese Runnable
sollte den angegebenen Supplier
aufrufen und mit dem empfangenen Wert die erstellte CompletableFuture
vervollständigen.
Supplier
ist eine parameterlose Funktion, daher ist die Codierung sehr einfach:
class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; }
Das folgende Beispiel ist etwas komplizierter:
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage
Returns a new CompletionStage that... is executed using the supplied Executor
Hier wird uns direkt angeboten, das erstellte Objekt in Form von Runnable
.
... with this stage's result as the argument to the supplied function.
Das ist aber schon interessanter. Die an uns übergebene Funktion hat einen Parameter, und der Wert dieses Parameters ist der Wert, der die aktuelle CompletionStage
vervollständigt. Zum Zeitpunkt des Aufrufs von thenApplyAsync
dieser Wert möglicherweise nicht bekannt, sodass wir die Aufgabe nicht sofort auf Executor
. Stattdessen müssen wir mit der aktuellen CompletionStage
verhandeln.
so dass es zum Zeitpunkt seiner Fertigstellung seinen Wert auf die Aufgabe überträgt. Unter den zahlreichen Methoden von CompletionStage
gibt es eine, die genau für diesen Zweck geeignet ist, whenComplete
:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes.
Das heißt, im neu erstellten BiConsumer
reicht es aus, die BiConsumer
Schnittstelle zu implementieren, um ein Argument zu erhalten:
class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override
Dieses Beispiel ist sehr wichtig, um die Natur der asynchronen Programmierung zu verstehen. Daher listen wir noch einmal die wichtigsten Schritte auf:
1) Eine asynchrone Prozedur wird erstellt:
CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor);
2) es ist noch nicht zur Ausführung bereit, daher bitten wir den Anbieter des fehlenden Arguments, dieses Argument in Zukunft an uns weiterzuleiten, indem wir die von uns eingereichte Methode aufrufen:
this.whenComplete(task);
3) In dieser Methode speichern wir nicht nur das empfangene Argument, sondern führen auch die Task zur Ausführung aus (siehe die Methode accept
()).
4) Die Ausführung der Aufgabe reduziert sich auf die Erfüllung der uns übertragenen Funktion und das Speichern des Ergebnisses.
Dieses Ergebnis kann genauso wie von anderen Prozeduren mithilfe der whenComplete
() -Methode angefordert werden, die auf unser neu erstelltes Objekt angewendet wird, sodass wir eine Kette von asynchronen Prozeduren beliebiger Länge erstellen können. Diese Kette wird jedoch streng sequentiell ohne Parallelität erfüllt.
Aber wie kann man ein komplexeres Berechnungsdiagramm mit parallelen Zweigen darstellen?
Verwenden Sie dazu die thenCombineAsync
Methode.
Wenn wir im vorherigen Beispiel die asynchrone Prozedur mit einem Argument gestartet haben, dann in diesem - mit zwei.
In diesem Fall kann die Berechnung beider Argumente parallel erfolgen.
ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function.
Hier ist alles dasselbe wie im vorherigen Beispiel mit thenApplyAsync
, aber der Funktionsparameter hat bereits zwei Argumente, und der CompletionStage<? extends U> other
-Parameter wird hinzugefügt CompletionStage<? extends U> other
, was der asynchrone Anbieter des zweiten Arguments ist.
Wie stellen wir sicher, dass das zweite Argument verarbeitet wird?
Beschreiben Sie zunächst anstelle einer Variablen T arg
zwei: T arg1; U arg2;
T arg1; U arg2;
, a anstelle einer Methode public void accept(T argument, Throwable throwable)
beschreiben zwei - accept1
und accept2
,
Jedes davon arbeitet mit einem eigenen Argument.
Gleichzeitig implementiert unser im Aufbau befindliches Objekt die BiConsumer<T,Throwable>
nicht mehr und wir können keinen Schlüsselsatz mehr zum Verknüpfen der Knoten des asynchronen Berechnungsgraphen schreiben
this.whenComplete(task);
Glücklicherweise kann ein funktionales Schnittstellenobjekt durch einen Verweis auf eine Methode dargestellt werden, ohne es in einer separaten Klasse abzuschließen:
this.whenComplete(task::accept1); other.whenComplete(task::accept2);
Das heißt, das aktuelle Objekt liefert das erste Argument und das other
Objekt das zweite.
Hier sind nur die Codes der Methoden, die geändert werden müssen, damit sie die Aufgabe nicht sofort nach Eingang ihres Arguments starten, sondern auch den Empfang der zweiten überprüfen:
public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } }
Die accept2-Methode wird ähnlich beschrieben.
Beachten Sie Folgendes:
- Methoden werden synchronisiert (wir arbeiten mit gemeinsamen Daten)
- Bei einer Fehlerübertragung ist es nicht erforderlich, auf das zweite Argument zu warten.
- Es ist nicht der beste Weg, zu überprüfen, ob das Argument durch Vergleich mit
null
eintrifft. Möglicherweise müssen Sie für jedes Argument eine boolesche Variable hinzufügen.
Auf diese Weise können Sie asynchrone Prozeduren aus einer größeren Anzahl von Argumenten als zwei erstellen, aber der Gedanke kommt sofort auf - können Sie immer noch eine separate Klasse für die Parameter erstellen, um nicht Ihre eigene Methode zum Empfangen jedes Parameters zu schreiben, sondern indem Sie die Parameter dynamisch erstellen?
Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2);
Ja, Sie können eine solche Klasse erstellen, aber dazu beim nächsten Mal mehr.
Eine kurze Zusammenfassung des Vorstehenden:
- Ein asynchrones Programm ist ein Netzwerk miteinander verbundener asynchroner Prozeduren.
Genau wie ein Multithread-Programm ist ein Netzwerk von miteinander verbundenen Ausführungsthreads (Threads).
Die Mittel für Kommunikationsflüsse und asynchrone Prozeduren unterscheiden sich jedoch grundlegend.
Streams werden mithilfe von Semaphoren, blockierenden Warteschlangen und anderen ähnlichen Objekten verbunden.
Diese blockieren den Empfänger-Stream, wenn die Informationen noch nicht eingetroffen sind, der Stream jedoch bereits versucht, sie mithilfe der Pull-basierten Operation zu extrahieren.
Asynchrone Prozeduren - Empfänger geben die Ausführung erst ein, wenn alle benötigten Informationen bereit sind.
Sie warten passiv, bis die Informationsanbieter selbst eine Push-basierte Operation durchlaufen.
Aus diesem Grund verbrauchen sie während des Wartens keinen Speicher auf dem Stapel und belegen daher viel weniger Speicher als Ausführungsthreads.
- Der Aufbau eines Netzwerks asynchroner Prozeduren reduziert sich auf das Erstellen und Verknüpfen von Objekten, genauer gesagt auf das Verknüpfen ihrer Unterobjekte - Parameter: Die Adresse des Eingabeparameters des Empfängers wird an den Ausgabeparameter des Informationsanbieters übertragen.
Der Satz von CompletableFuture
Methoden macht genau das, und im Prinzip können Sie auf diese Methoden verzichten, indem Sie Objekte explizit erstellen, wie in den obigen Beispielen gezeigt.
Dafür ist es jedoch erforderlich, Klassen zu haben, die denen ähneln, die in diesen Beispielen beschrieben wurden.
Aus irgendeinem Grund haben die Ersteller von java.util.concurrent
beschlossen, Benutzern keinen Zugriff auf diese Klassen zu gewähren und sie in den Tiefen des CompletableFuture
Codes zu verstecken.
Diejenigen, die eine visuelle Darstellung des erstellten asynchronen Netzwerks wünschen, können diese Klassen rekonstruieren, indem sie die obigen Beispiele fortsetzen. Der Quellcode für die Beispiele ist bei Github erhältlich .