البرمجة غير المتزامنة مع أمثلة: إعادة بناء java.util.concurrent.CompletableFuture أساليب

لماذا تكون إعادة الإعمار ضرورية إذا كانت شفرة المصدر لهذه الفئة مفتوحة؟


إذا فقط لأنه تحت غطاء المحرك يوجد رمز محسن للغاية ، ويصعب قراءته ، ولا تعطي دراسته سوى القليل من الناحية التربوية.


لذلك ، سنعيد إنشاء دلالات العمليات وفقًا لمواصفاتها ، ونكتب رمزًا مكافئًا وظيفيًا ومفهومًا وقابلًا للقراءة ، على الرغم من أنه قد لا يكون الأكثر اقتصادية من حيث استهلاك الذاكرة ووقت المعالج.


لنبدأ بطريقة بسيطة نسبيًا:


public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. Type Parameters: U - the function's return type Parameters: supplier - a function returning the value to be used to complete the returned CompletableFuture executor - the executor to use for asynchronous execution Returns: the new CompletableFuture 

اقرأ المواصفات بعناية:


 Returns a new CompletableFuture 

بمعنى ، يتم إنشاء كائن من نوع CompletableFuture أو الفئة الفرعية الخاصة به وإعادته نتيجة لذلك.


 that is asynchronously completed by a task running in the given executor` 

بالإضافة إلى ذلك ، يتم تنفيذ المهمة التي يتم تنفيذها على البريد Executor .
كما نعلم ، يقبل Executor الكائنات Runnable فقط.
Runnable هي واجهة ، وقد ينفذها الكائن الأول بشكل جيد - لذلك سنجمع وظيفتين في كائن واحد.


  completed ... with the value obtained by calling the given Supplier. 

يجب أن يقوم Runnable باستدعاء Supplier المعين ومع القيمة المستلمة ، أكمل CompletableFuture تم إنشاؤه.


Supplier هو وظيفة بدون معلمات ، لذا فإن ترميزها كله بسيط للغاية:


  class CompletableFutureForSupplyAsync<U> extends CompletableFuture<U> implements Runnable { Supplier<U> supplier; public CompletableFutureForSupplyAsync(Supplier<U> supplier) { this.supplier = supplier; } public void run() { try { U result = supplier.get(); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { CompletableFutureForSupplyAsync<U> task = new CompletableFutureForSupplyAsync<>(supplier); executor.execute(task); return task; } 

المثال التالي أكثر تعقيدًا بعض الشيء:


 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. See the CompletionStage documentation for rules covering exceptional completion. Specified by: thenApplyAsync in interface CompletionStage<T> Type Parameters: U - the function's return type Parameters: fn - the function to use to compute the value of the returned CompletionStage executor - the executor to use for asynchronous execution Returns: the new CompletionStage 

Returns a new CompletionStage that... is executed using the supplied Executor


هنا يُعرض علينا مباشرة ترتيب الكائن الذي تم إنشاؤه Runnable في شكل Runnable .


... with this stage's result as the argument to the supplied function.


لكن هذا بالفعل أكثر إثارة للاهتمام. تحتوي الوظيفة التي تم تمريرها إلينا على معلمة ، وقيمة هذه المعلمة هي القيمة التي تكمل CompletionStage الحالية. في وقت استدعاء thenApplyAsync قد لا تكون هذه القيمة معروفة ، لذلك لا يمكننا تشغيل المهمة على الفور على Executor . بدلاً من ذلك ، يجب علينا التفاوض مع CompletionStage الحالية ،
بحيث أنه في وقت الانتهاء ، ينقل قيمته إلى المهمة. من بين الطرق العديدة لـ CompletionStage هناك طريقة مناسبة تمامًا لهذا الغرض ، whenComplete :


 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. 

أي أنه في كائن المهمة الذي تم إنشاؤه حديثًا ، يكفي تنفيذ واجهة BiConsumer لتلقي حجة:


  class CompletableFutureForApplyAsync<T, U> extends CompletableFuture<U> implements Runnable, BiConsumer<T,Throwable> { Function<? super T,? extends U> fn; Executor executor; T arg; Throwable throwable; public CompletableFutureForApplyAsync(Function<? super T,? extends U> fn, Executor executor) { this.fn = fn; this.executor = executor; } @Override // implementation of BiConsumer interface public void accept(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; } else { this.arg = argument; } executor.execute(this); } @Override public void run() { if (throwable == null) { try { U result = fn.apply(arg); super.complete(result); } catch (Throwable e) { super.completeExceptionally(e); } } else { super.completeExceptionally(throwable); } } } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor ) { CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); this.whenComplete(task); return task; } } 

هذا المثال مهم جدًا لفهم طبيعة البرمجة غير المتزامنة ، لذلك مرة أخرى نسرد خطواتها الرئيسية:


1) يتم إنشاء إجراء غير متزامن:


  CompletableFutureForApplyAsync<T,U> task = new CompletableFutureForApplyAsync<>(fn, executor); 

2) لم يكن جاهزًا بعد للتنفيذ ، لذلك نطلب من مقدم الحجة المفقودة تمرير هذه الحجة إلينا في المستقبل ، من خلال استدعاء الطريقة التي قدمناها:


  this.whenComplete(task); 

3) في هذه الطريقة ، لا نقوم بحفظ الوسيطة المستلمة فحسب ، بل نقوم أيضًا بتشغيل مهمة التنفيذ (انظر طريقة accept ()).


4) يتم تقليل تنفيذ المهمة إلى تحقيق الوظيفة الموكلة إلينا والحفاظ على النتيجة.
يمكن أن تكون هذه النتيجة كما هو مطلوب في الإجراءات الأخرى باستخدام طريقة whenComplete () المطبقة على whenComplete الذي تم إنشاؤه حديثًا ، حتى نتمكن من بناء سلسلة من الإجراءات غير المتزامنة ذات الطول التعسفي. ولكن سيتم تحقيق هذه السلسلة بشكل متسلسل ، دون أي توازٍ.


ولكن كيف يتم تصوير مخطط حسابي أكثر تعقيدًا يحتوي على فروع متوازية؟
للقيام بذلك ، استخدم أسلوب thenCombineAsync .


إذا بدأنا في المثال السابق الإجراء غير المتزامن بحجة واحدة ، فعندئذٍ - مع اثنتين.


في هذه الحالة ، يمكن أن يحدث حساب كلتا الوسيطتين بالتوازي.


 ublic <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) Description copied from interface: CompletionStage Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using the supplied executor, with the two results as arguments to the supplied function. 

هنا ، كل شيء هو نفسه كما في المثال السابق مع thenApplyAsync ، لكن معلمة الدالة تحتوي بالفعل على thenApplyAsync ، وتتم إضافة المعلمة CompletionStage<? extends U> other CompletionStage<? extends U> other ، وهو الموفر غير المتزامن للوسيطة الثانية.


كيف نضمن معالجة الحجة الثانية؟


حسنًا ، أولاً ، بدلاً من متغير T arg اثنين: T arg1; U arg2; T arg1; U arg2; ، بدلاً من طريقة واحدة ، void accept(T argument, Throwable throwable) العام void accept(T argument, Throwable throwable) وصف اثنين - accept1 و accept2 ،
كل منها يعمل بحجته الخاصة.


في الوقت نفسه ، لم يعد BiConsumer<T,Throwable> قيد الإنشاء ينفذ BiConsumer<T,Throwable> ولم يعد بإمكاننا كتابة جملة رئيسية لربط عقد الرسم البياني للحساب غير المتزامن


  this.whenComplete(task); 

لحسن الحظ ، يمكن تمثيل كائن واجهة وظيفية بمرجع إلى أسلوب ، دون اختتامه في فئة منفصلة:


  this.whenComplete(task::accept1); other.whenComplete(task::accept2); 

أي أن التيار الحالي this الكائن يوفر الوسيطة الأولى ، والكائن other الثاني.


إليك فقط رموز الطرق التي يجب تغييرها حتى لا تبدأ المهمة فور وصول حجتها ، ولكن أيضًا تحقق من استلام الطريقة الثانية:


  public synchronized void accept1(T argument, Throwable throwable) { if (throwable != null) { this.throwable = throwable; executor.execute(this); } else { this.arg1 = argument; if (arg2 != null) { executor.execute(this); } } } 

يتم وصف طريقة Accept2 بالمثل.


لاحظ أن:


  • تصبح الأساليب متزامنة (نحن نعمل مع البيانات المشتركة)
  • في حالة إرسال الخطأ ، انتظر الوسيطة الثانية ليست ضرورية.
  • التحقق من وصول الوسيطة عن طريق المقارنة null ليست أفضل طريقة ، ربما تحتاج إلى إضافة متغير منطقي لكل وسيطة.

بهذه الطريقة ، يمكنك إجراء إجراءات غير متزامنة من عدد أكبر من الحجج من اثنين ، ولكن الفكرة تأتي على الفور - هل لا يزال بإمكانك إنشاء فصل منفصل للمعلمات ، حتى لا تكتب طريقتك الخاصة لاستقبال كل معلمة ، ولكن تفعل ذلك عن طريق إنشاء المعلمات ديناميكيًا؟


  Parameter<Integer> arg1 = new Parameter<>(); Parameter<Float> arg2 = new Parameter<>(); ... future1.whenComplete(arg1); future2.whenComplete(arg2); 

نعم ، يمكنك إنشاء مثل هذا الفصل ، ولكن المزيد عن ذلك في المرة القادمة.


ملخص موجز لما سبق:


  • البرنامج غير المتزامن هو شبكة من الإجراءات غير المتزامنة المترابطة ،
    تمامًا مثل البرنامج متعدد الخيوط هو شبكة من خيوط التنفيذ المترابطة (الخيوط).

لكن وسائل تدفق الاتصالات والإجراءات غير المتزامنة تختلف اختلافًا جوهريًا.


يتم ربط التدفقات باستخدام الإشارات وحجب الطوابير والكائنات المماثلة الأخرى ،
الذي يحظر تدفق المستلم إذا لم تصل المعلومات بعد ، لكن الدفق يحاول بالفعل استخراجه باستخدام العملية القائمة على السحب.


إجراءات غير متزامنة - ببساطة لا يدخل المستلمون في التنفيذ حتى تصبح جميع المعلومات التي يحتاجونها جاهزة.
إنهم ينتظرون بشكل سلبي حتى يمررها مقدمو المعلومات أنفسهم من خلال عملية تعتمد على الدفع.
وبسبب هذا ، لا ينفقون ذاكرة على المكدس أثناء الانتظار ، وبالتالي ، يستهلكون ذاكرة أقل بكثير من خيوط التنفيذ.


  • يتم تقليل بناء شبكة من الإجراءات غير المتزامنة لإنشاء كائنات وربطها معًا ، بشكل أكثر دقة ، لربط الكائنات الفرعية - المعلمات: يتم إرسال عنوان معلمة إدخال المستلم إلى معلمة الإخراج الخاصة بموفر المعلومات.

تقوم مجموعة أساليب CompletableFuture تمامًا ، ومن حيث المبدأ ، يمكنك الاستغناء عن هذه الأساليب من خلال إنشاء كائنات بشكل صريح ، كما هو موضح في الأمثلة أعلاه.
ولكن من الضروري لهذا أن يكون لدينا فئات مماثلة لتلك التي تم وصفها في هذه الأمثلة.
لسبب ما ، اختار منشئو java.util.concurrent عدم منح المستخدمين حق الوصول إلى هذه الفئات وإخفائهم في أعماق كود CompletableFuture .


يمكن لأولئك الذين يرغبون في الحصول على تمثيل مرئي للشبكة غير المتزامنة التي يتم إنشاؤها إعادة بناء هذه الفئات من خلال متابعة الأمثلة المذكورة أعلاه. كود المصدر للأمثلة متاح في جيثب .

Source: https://habr.com/ru/post/ar418547/


All Articles