Programmation asynchrone avec exemples: reconstruction des méthodes java.util.concurrent.CompletableFuture

Pourquoi la reconstruction est-elle nécessaire si le code source de cette classe est ouvert?


Ne serait-ce que parce que sous le capot, il existe un code très optimisé, difficile à lire, dont l'étude donne peu de pédagogie.


Par conséquent, nous recréerons la sémantique des opérations en fonction de leurs spécifications et rédigerons du code fonctionnellement équivalent, compréhensible et lisible, bien qu'il ne soit pas le plus économique en termes de consommation de mémoire et de temps processeur.


Commençons par une méthode relativement simple:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture 

Lisez attentivement les spécifications:


 Returns a new CompletableFuture 

Autrement dit, un objet de type CompletableFuture ou sa sous-classe est créé et renvoyé en conséquence.


 that is asynchronously completed by a task running in the given executor` 

De plus, une tâche est exécutée qui est exécutée sur Executor 'e.
Comme nous le savons, Executor accepte uniquement les objets Runnable .
Runnable est une interface, et le premier objet pourrait bien l'implémenter - nous allons donc combiner deux fonctions en un seul objet.


  completed ... with the value obtained by calling the given Supplier. 

Ce Runnable doit appeler le Supplier donné et avec la valeur reçue compléter le CompletableFuture créé.


Supplier est une fonction sans paramètre, donc tout encoder est très simple:


  class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; } 

L'exemple suivant est un peu plus compliqué:


 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage 

Returns a new CompletionStage that... is executed using the supplied Executor


Ici, on nous propose directement d'organiser l'objet créé à émettre sous la forme de Runnable .


... with this stage's result as the argument to the supplied function.


Mais c'est déjà plus intéressant. La fonction qui nous est transmise a un paramètre, et la valeur de ce paramètre est la valeur qui termine le CompletionStage actuel. Au moment d'appeler thenApplyAsync cette valeur n'est peut-être pas connue, nous ne pouvons donc pas exécuter immédiatement la tâche sur Executor . Au lieu de cela, nous devons négocier avec le CompletionStage actuel,
de sorte qu'au moment de son achèvement, il transfère sa valeur à la tâche. Parmi les nombreuses méthodes de CompletionStage il y en a une qui convient parfaitement à cet effet, whenComplete :


 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. 

Autrement dit, dans l'objet de tâche nouvellement créé, il suffit d'implémenter l'interface BiConsumer pour recevoir un argument:


  class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; } else { this.arg = argument; } executor.execute(this); } @Override public void run() { if (throwable == null) { try { U result = fn.apply(arg); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } else { super.completeExceptionally(throwable); } } } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor ) { CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; } } 

Cet exemple est très important pour comprendre la nature de la programmation asynchrone, donc encore une fois nous énumérons ses principales étapes:


1) une procédure asynchrone est créée:


  CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); 

2) il n'est pas encore prêt à être exécuté, nous demandons donc au fournisseur de l'argument manquant de nous transmettre cet argument à l'avenir, en appelant la méthode que nous avons soumise:


  this.whenComplete(task); 

3) dans cette méthode, nous enregistrons non seulement l'argument reçu, mais nous exécutons également la tâche pour l'exécution (voir la méthode accept ()).


4) l'exécution de la tâche est réduite à l'accomplissement de la fonction qui nous est confiée et à la sauvegarde du résultat.
Ce résultat peut être exactement comme demandé par d'autres procédures utilisant la méthode whenComplete () appliquée à notre objet nouvellement construit, afin que nous puissions construire une chaîne de procédures asynchrones de longueur arbitraire. Mais cette chaîne sera remplie strictement séquentiellement, sans aucun parallélisme.


Mais comment représenter un diagramme de calcul plus complexe contenant des branches parallèles?
Pour ce faire, utilisez la méthode thenCombineAsync .


Si dans l'exemple précédent, nous avons commencé la procédure asynchrone avec un argument, alors dans celui-ci - avec deux.


Dans ce cas, le calcul des deux arguments peut se produire en parallèle.


 ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function. 

Ici, tout est le même que dans l'exemple précédent avec thenApplyAsync , mais le paramètre de fonction a déjà deux arguments et le paramètre CompletionStage<? extends U> other Est ajouté CompletionStage<? extends U> other , qui est le fournisseur asynchrone du deuxième argument.


Comment s'assurer que le deuxième argument est traité?


Eh bien, premièrement, au lieu d'une variable T arg décrivez-en deux: T arg1; U arg2; T arg1; U arg2; , une au lieu d'une méthode public void accept(T argument, Throwable throwable) décrit deux - accept1 et accept2 ,
dont chacun fonctionne avec son propre argument.


Dans le même temps, notre objet en construction BiConsumer<T,Throwable> plus l' BiConsumer<T,Throwable> et nous ne pouvons plus écrire de phrase clé pour relier les nœuds du graphe de calcul asynchrone


  this.whenComplete(task); 

Heureusement, un objet d'interface fonctionnelle peut être représenté par une référence à une méthode, sans le conclure dans une classe distincte:


  this.whenComplete(task::accept1); other.whenComplete(task::accept2); 

Autrement dit, le courant de this objet fournit le premier argument, et l' other objet le second.


Voici juste les codes des méthodes qui devront être modifiées pour qu'ils ne démarrent pas la tâche immédiatement à l'arrivée de leur argument, mais vérifient également la réception du second:


  public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } } 

La méthode accept2 est décrite de la même manière.


Notez que:


  • les méthodes se synchronisent (nous travaillons avec des données communes)
  • en cas de transmission d'erreur, attendre le second argument n'est pas nécessaire.
  • vérifier que l'argument arrive par comparaison avec null n'est pas le meilleur moyen, peut-être devrez-vous ajouter une variable booléenne pour chaque argument.

De cette façon, vous pouvez créer des procédures asynchrones à partir d'un plus grand nombre d'arguments que deux, mais l'idée vient immédiatement - pouvez-vous toujours créer une classe distincte pour les paramètres, afin de ne pas écrire votre propre méthode pour recevoir chaque paramètre, mais le faire en créant dynamiquement les paramètres?


  Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2); 

Oui, vous pouvez créer une telle classe, mais plus sur la prochaine fois.


Un bref résumé de ce qui précède:


  • un programme asynchrone est un réseau de procédures asynchrones interconnectées,
    tout comme un programme multithread est un réseau de threads d'exécution interconnectés (threads).

Mais les moyens de communication et les procédures asynchrones sont fondamentalement différents.


Les flux sont connectés à l'aide de sémaphores, de files d'attente de blocage et d'autres objets similaires,
qui bloquent le flux destinataire si les informations ne sont pas encore arrivées, mais le flux tente déjà de les extraire à l'aide de l'opération basée sur l'extraction.


Procédures asynchrones - les destinataires n'entrent simplement dans l'exécution que lorsque toutes les informations dont ils ont besoin sont prêtes.
Ils attendent passivement que les fournisseurs d'informations eux-mêmes la transmettent par une opération push.
Pour cette raison, ils ne dépensent pas de mémoire sur la pile pendant l'attente et, par conséquent, occupent beaucoup moins de mémoire que les threads d'exécution.


  • la construction d'un réseau de procédures asynchrones se réduit à créer des objets et à les lier entre eux, plus précisément à lier leurs sous-objets - paramètres: l'adresse du paramètre d'entrée du destinataire est transmise au paramètre de sortie du fournisseur d'informations.

L'ensemble des méthodes CompletableFuture fait exactement cela, et en principe, vous pouvez vous passer de ces méthodes en créant des objets explicitement, comme indiqué dans les exemples ci-dessus.
Mais pour cela, il est nécessaire d'avoir des classes similaires à celles décrites dans ces exemples.
Pour une raison quelconque, les créateurs de java.util.concurrent choisi de ne pas donner aux utilisateurs l'accès à ces classes et de les cacher dans les profondeurs du code CompletableFuture .


Ceux qui veulent avoir une représentation visuelle du réseau asynchrone en cours de création peuvent reconstruire ces classes en poursuivant les exemples ci-dessus. Le code source des exemples est disponible sur Github .

Source: https://habr.com/ru/post/fr418547/


All Articles