تقديم البرمجة التفاعلية في الربيع

مرحبا يا هبر!

نتوقع هذا الأسبوع كتاب ربيع جديد من المطبعة:


من بين السمات المثيرة للاهتمام في برنامج Spring 5 ، تستحق البرمجة التفاعلية الإشارة بشكل خاص ، والتي تم وصف تنفيذها بإيجاز في المقالة المقترحة بواسطة Matt Raible. في الكتاب المذكور أعلاه ، تمت مناقشة الأنماط التفاعلية في الفصل 11.

شارك في تأليف مات جوش لونج ، مؤلف كتاب آخر رائع عن جافا والربيع ، " جافا في الغيمة " ، صدر في الصيف الماضي.

البرمجة التفاعلية هي طريقتك لبناء أنظمة مقاومة للأحمال الكبيرة. لم تعد معالجة عدد كبير من الزيارات مشكلة ، حيث أن الخادم غير محظور ولا يتعين على عمليات العميل انتظار الردود. لا يمكن للعميل مراقبة كيفية تشغيل البرنامج مباشرة على الخادم والمزامنة معه. عندما تجد واجهة برمجة التطبيقات صعوبة في معالجة الطلبات ، يجب أن تقدم ردودًا معقولة. يجب ألا ترفض الرسائل وتتجاهلها بطريقة لا يمكن التحكم فيها. يجب أن تبلغ المكونات العليا بأنها تعمل تحت الحمل حتى يتمكنوا من تحريرها جزئيًا من هذا الحمل. وتسمى هذه التقنية الضغط الخلفي ، وهو جانب مهم في البرمجة التفاعلية.

شاركنا في تأليف هذا المقال مع جوش لونج . Josh هو بطل Java و Spring Developer Advocate وعموما رجل عالمي يعمل في Pivotal. لقد عملت مع Spring لفترة طويلة ، ولكن كان Josh هو الذي أطلعني على Boot Spring ، وكان في مؤتمر Devoxx في بلجيكا. منذ ذلك الحين ، أصبحنا أصدقاء أقوياء ، نحن مغرمون بجافا ونكتب تطبيقات رائعة.

البرمجة التفاعلية أو I / O ، I / O ، نذهب للعمل ...

البرمجة التفاعلية هي طريقة لإنشاء برنامج يستخدم الإدخال / الإخراج غير المتزامن بنشاط. الإدخال / الإخراج غير المتزامن هي فكرة صغيرة ، محفوفة بالتغييرات الكبيرة في البرمجة. الفكرة في حد ذاتها بسيطة: تصحيح الموقف من خلال تخصيص غير فعال للموارد ، وتحرير تلك الموارد التي كانت ستظل خامدة دون تدخلنا ، في انتظار استكمال الإدخال / الإخراج. يؤدي الإدخال / الإخراج غير المتزامن إلى عكس الطريقة المعتادة لمعالجة I / O: يتم تحرير العميل ويمكنه القيام بمهام أخرى ، في انتظار إعلامات جديدة.

فكر فيما هو شائع بين الإدخال / الإخراج المتزامن وغير المتزامن ، وما هي الاختلافات بينهما.

سنقوم بكتابة برنامج بسيط يقرأ البيانات من المصدر (على وجه التحديد ، نحن نتحدث عن رابط java.io.File ). لنبدأ بتنفيذ تطبيق يستخدم java.io.InputStream القديم الجيد:

مثال 1. قراءة البيانات من ملف بشكل متزامن

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } } 

  1. نحن نقدم ملف للقراءة مع java.io.File المعتاد
  2. اسحب النتائج من مصدر سطر واحد في وقت واحد ...
  3. لقد كتبت هذا الرمز لاتخاذ Consumer<BytesPayloadgt; دعا عندما تصل البيانات الجديدة

بسيطة بما فيه الكفاية ، ماذا تقول؟ قم بتشغيل هذا الرمز وسترى في إخراج السجل (على يسار كل سطر) ، مشيرا إلى أن جميع الإجراءات تحدث في موضوع واحد.
نحن هنا نستخرج البايتات من بياناتنا المأخوذة في المصدر (في هذه الحالة ، نتحدث عن فئة فرعية من java.io.FileInputStream الموروثة من java.io.InputStream ). ما هو الخطأ في هذا المثال؟ في هذه الحالة ، نستخدم InputStream يشير إلى البيانات الموجودة على نظام الملفات لدينا. إذا كان الملف موجودًا ، وكان القرص الصلب يعمل ، فسيعمل هذا الرمز كما هو متوقع.

ولكن ، ماذا يحدث إذا قرأنا البيانات ليس من File ، ولكن من مقبس شبكة ، InputStream تطبيقًا آخر لـ InputStream ؟ لا شيء يدعو للقلق! بالطبع ، لن يكون هناك أي شيء يدعو للقلق مطلقًا إذا كانت سرعة الشبكة عالية للغاية. وإذا كانت قناة الشبكة بين هذه والعقدة الأخرى لا تفشل أبدا. إذا تم استيفاء هذه الشروط ، فإن الرمز سوف يعمل بشكل مثالي.

ولكن ماذا يحدث إذا تباطأت الشبكة أو تباطأت؟ في هذه الحالة ، أعني أننا in.read(…) الفترة حتى in.read(…) العملية in.read(…) . في الواقع ، قد لا تعود على الإطلاق! هذه مشكلة إذا حاولنا القيام بشيء آخر باستخدام الدفق الذي نقرأ منه البيانات. بالطبع ، يمكنك دائمًا إنشاء دفق آخر وقراءة البيانات من خلاله. يمكن القيام بذلك إلى حد ما ، لكن في النهاية ، سنصل إلى الحد الذي لم تعد فيه إضافة مؤشرات الترابط لمزيد من التوسع أكبر. لن يكون لدينا منافسة حقيقية تتجاوز عدد النوى الموجودة على الجهاز. طريق مسدود! في هذه الحالة ، يمكننا زيادة معالجة المدخلات / المخرجات (القراءة تعني هنا) فقط بسبب التدفقات الإضافية ، ولكننا سنصل عاجلاً أم آجلاً إلى هذا الحد.

في هذا المثال ، يكمن الجزء الرئيسي من العمل في القراءة - لا يحدث شيء تقريبًا على جبهات أخرى. نحن نعتمد على I / O. فكر في الطريقة التي يساعدنا بها الحل غير المتزامن في التغلب جزئيًا على احتكار تدفقاتنا.

مثال 2. قراءة البيانات بشكل غير متزامن من ملف

 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } } 

  1. هذه المرة نقوم بتكييف java.io.File ، مما يجعل Java NIO java.nio.file.Path منه
  2. عند إنشاء Channel ، نحدد ، على وجه الخصوص ، خدمة java.util.concurrent.ExecutorService ، والتي سيتم استخدامها للاتصال CompletionHandler CompleteHandler عند ظهور البيانات اللازمة
  3. نبدأ القراءة بتمرير رابط إلى CompletionHandler<Integer, ByteBuffer> (this)
  4. في رد الاتصال ، اقرأ البايتات من ByteBuffer في سعة byte[]
  5. تمامًا كما في المثال Synchronous ، يتم تمرير بيانات byte[] للمستهلك.

سنقوم بالحجز على الفور: لقد أصبح هذا الرمز أصعب بكثير! هناك الكثير من الأشياء التي تحدث هنا بحيث يدور رأسك على الفور ، ولكن دعني أشير ... هذا الرمز يقرأ البيانات من Java NIO Channel ، ثم يعالج هذه البيانات في سلسلة رسائل منفصلة مسؤولة عن عمليات الاسترجاعات. وبالتالي ، لا يُحتكر الدفق الذي بدأت فيه القراءة. نعود على الفور تقريبًا بعد الاتصال .read(..) ، وعندما يكون لدينا أخيرًا البيانات المتوفرة لدينا ، يتم إجراء رد اتصال - موجود بالفعل في سلسلة رسائل أخرى. إذا كان هناك تأخير بين المكالمات إلى .read() يمكنك الانتقال إلى مسائل أخرى عن طريق تنفيذها في سلسلة رسائلنا. مدة عملية القراءة غير المتزامنة ، من البايت الأول إلى الأخير ، في أحسن الأحوال لم تعد أطول من عملية القراءة المتزامنة. عادةً ما تكون العملية غير المتزامنة ضئيلة لفترة أطول. ومع ذلك ، في مواجهة هذه الصعوبات الإضافية ، يمكننا معالجة تدفقاتنا بشكل أكثر فعالية. القيام بالمزيد من العمل ، قم بتعدد الإدخال / الإخراج في التجمع مع عدد محدود من مؤشرات الترابط.

أنا أعمل في شركة الحوسبة السحابية. نود بالنسبة لك الحصول على مثيلات جديدة للتطبيق لحل مشكلات القياس الأفقي! بالطبع ، أنا هنا مخادع بعض الشيء. الإدخال / الإخراج غير المتزامن يعقد الأشياء قليلاً ، لكن آمل أن يوضح هذا المثال مدى فعالية التعليمات البرمجية التفاعلية: فهو يتيح لك معالجة المزيد من الطلبات والقيام بمزيد من العمل على أجهزتك الحالية إذا كان الأداء يعتمد اعتمادًا كبيرًا على I / O. إذا كان الأداء يعتمد على استخدام المعالج (على سبيل المثال ، فنحن نتحدث عن العمليات على أرقام فيبوناتشي ، أو التنقيب عن عملات البيتكوين أو تشفيرها) ، ثم البرمجة التفاعلية لن تعطينا أي شيء.

في الوقت الحالي ، لا يستخدم معظمنا تطبيقات Channel أو InputStream في عملنا اليومي! علينا أن نفكر في المشاكل على مستوى التجريدات عالية المستوى. يتعلق الأمر بأشياء مثل المصفوفات ، أو بالأحرى التسلسل الهرمي java.util.Collection . يتم عرض مجموعة java.util.Collection بشكل جيد للغاية على InputStream: يفترض كلا الكيانين أنه يمكنك العمل على جميع البيانات في وقت واحد وعلى الفور تقريبًا. من المتوقع أن تتمكن من إنهاء القراءة من معظم InputStreams وقت سابق ، وليس في وقت لاحق. تصبح أنواع المجموعات غير مريحة قليلاً عند الانتقال إلى كميات أكبر من البيانات. ماذا لو كنت تتعامل مع شيء غير محدود (غير محدود) - على سبيل المثال ، مآخذ الويب أو أحداث الخادم؟ ماذا تفعل إذا كان هناك تأخير بين التسجيلات؟

نحن بحاجة إلى طريقة أفضل لوصف هذا النوع من البيانات. نحن نتحدث عن أحداث غير متزامنة ، من شأنها أن تحدث في النهاية. قد يبدو أن Future<T> أو CompletableFuture<T> مناسب تمامًا لهذا الغرض ، لكنهما يصفان شيئًا واحدًا فقط يحدث في النهاية. في الواقع ، لا توفر Java استعارة مناسبة لوصف هذا النوع من البيانات. قد لا يكون كلا النوعين Iterator و Stream من Java 8 مرتبطين ، ولكن كلاهما موجه للسحب ؛ أنت نفسك تطلب الإدخال التالي ، وليس النوع يجب أن يرسل رد اتصال إلى الكود. من المفترض أنه في حالة دعم المعالجة المستندة إلى الدفع في هذه الحالة ، مما سيتيح تحقيق المزيد على مستوى الخيط ، فإن API ستوفر أيضًا التحكم في عملية الجدولة والترابط. لا تذكر تطبيقات Iterator شيئًا عن الترابط ، وتتشارك جميع مؤشرات ترابط Java 8 في نفس تجمع الشوكة.

إذا كان Iterator و Stream يدعمان حقًا معالجة الدفع ، فسنواجه مشكلة أخرى تتصاعد حقًا في سياق I / O: سنحتاج إلى نوع من آلية اختراق الظهر! نظرًا لأن مستهلك البيانات تتم معالجته بشكل غير متزامن ، فليس لدينا أي فكرة عن متى ستكون البيانات قيد الإعداد وبأي كمية. لا نعرف مقدار البيانات التي ستحتاج إلى معالجتها في رد الاتصال التالي: بايت واحد أو تيرابايت واحد!

عند سحب البيانات من InputStream ، تقرأ قدر المعلومات التي تكون جاهزًا للمعالجة ، وليس أكثر. في الأمثلة السابقة ، نقرأ البيانات في المخزن المؤقت byte[] بطول ثابت ومعروف. في سياق غير متزامن ، نحتاج إلى طريقة لإخبار المزود مقدار البيانات التي نرغب في معالجتها.
نعم سيدي هناك شيء مفقود هنا بالتأكيد.

ابحث عن الاستعارة المفقودة

في هذه الحالة ، نبحث عن استعارة تعكس بشكل جميل جوهر الإدخال / الإخراج غير المتزامن ، وتدعم مثل هذه الآلية للنقل العكسي للبيانات وتتيح لنا التحكم في تدفق التنفيذ في الأنظمة الموزعة. في البرمجة التفاعلية ، تسمى قدرة العميل على الإشارة إلى الحمل الذي يمكنه التعامل معه "التدفق العكسي".

يوجد الآن عدد من المشاريع الجيدة - Vert.x و Akka Streams و RxJava - تدعم البرمجة التفاعلية. يدير فريق Spring أيضًا مشروعًا يسمى Reactor . بين هذه المعايير المختلفة ، يوجد مجال عام واسع إلى حد ما ، يتم تخصيصه فعليًا لمعيار مبادرة تدفقات التفاعلات . تعرّف مبادرة الجداول التفاعلية أربعة أنواع:

واجهة Publisher<T> ؛ تنتج القيم التي قد تصل في نهاية المطاف. واجهة Publisher<T> ؛ ينتج قيم النوع T Subscriber<T> .

مثال 3. التدفقات التفاعلية: واجهة Publisher<T> .

 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); } 

يشترك نوع المشترك في Publisher<T> ، ويتلقى إخطارات بأي قيم جديدة من النوع T خلال onNext(T) . في حالة حدوث أي أخطاء ، يتم onError(Throwable) أسلوب onError(Throwable) . عند اكتمال المعالجة بشكل طبيعي ، يتم onComplete طريقة onComplete للمشترك.

مثال 4. تدفقات Jet: واجهة Subscriber<T> .

 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } 

عندما يتصل Subscriber بـ Publisher لأول مرة ، فإنه يتلقى Subscriber#onSubscribe . الاشتراك هو ربما الجزء الأكثر أهمية في كامل المواصفات ؛ هي التي توفر تدفق العودة. يستخدم المشترك المشترك طريقة Subscription#request بيانات إضافية أو طريقة Subscription#cancel لإيقاف المعالجة.

مثال 5. التدفقات التفاعلية: واجهة Subscription<T> .

 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); } 

توفر مواصفات الدفق التفاعلي نوعًا آخر مفيدًا ، وإن كان واضحًا ،: Processor<A,B> Subscriber<A> ، Subscriber<A> هو مجرد واجهة ترث Subscriber<A> Publisher<B> .

مثال 6. تدفقات Jet: واجهة Processor<T> .

 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { } 

لا يتم تحديد المواصفة كوصفة للتنفيذ ؛ في الواقع ، الغرض منه هو تحديد أنواع لدعم التشغيل المتداخل. تتمثل الفائدة الواضحة للأنواع المرتبطة بالتدفقات التفاعلية في أنها مع ذلك وجدت مكانًا في إصدار Java 9 ، علاوة على ذلك ، فهي تعني "واحد إلى واحد" تتوافق مع الواجهات من فئة java.util.concurrent.Flow ، على سبيل المثال: java.util.concurrent.Flow.Publisher .

تلبية مفاعل

أنواع التدفقات التفاعلية وحدها ليست كافية ؛ هناك حاجة إلى تطبيقات أعلى ترتيب لدعم عمليات مثل التصفية والتحويل. على هذا النحو ، فإن مشروع Reactor مناسب ؛ يعتمد على مواصفات التدفقات التفاعلية ويوفر اثنين من تخصصات Publisher<T> .

أولاً ، Flux<T> هو Publisher ينتج قيمًا صفرية أو أكثر. الثاني ، Mono<T> ، هو Publisher<T> ، ينتج عنه صفر أو قيمة واحدة. ينشر كلاهما القيم ويمكنهما التعامل معها وفقًا لذلك ، ومع ذلك ، فإن قدراتهما أوسع بكثير من مواصفات التدفقات التفاعلية. كلاهما يوفر عوامل تشغيل تتيح لك معالجة تدفقات القيمة. يتم تكوين أنواع المفاعلات بشكل جيد - يمكن أن يعمل إخراج أحدها كمدخلات للآخر ، وإذا احتاج نوع ما إلى العمل مع تدفقات البيانات الأخرى ، فإنهم يعتمدون على مثيلات Publisher<T> .

تطبيق كل من Mono<T> و Flux<T> Publisher<T> ؛ نوصي بأن تقبل الأساليب الخاصة بك مثيلات Publisher<T> مع إرجاع Flux<T> أو Mono<T> ؛ هذا سيساعد العميل على التمييز بين نوع البيانات التي يتلقاها.

افترض أنك حصلت على Publisher<T> وطُلب منك عرض واجهة المستخدم لهذا Publisher<T> . هل ينبغي علي بعد ذلك عرض صفحة تحتوي على تفاصيل لسجل واحد ، حيث يمكنك الحصول على CompletableFuture<T> ؟ أو عرض صفحة نظرة عامة مع قائمة أو شبكة حيث يتم عرض جميع الإدخالات صفحة صفحة؟ من الصعب القول.

بدوره ، Flux<T> و Mono<T> محددة للغاية. أنت تعلم أنك بحاجة إلى عرض صفحة مراجعة إذا Flux<T> استلام Flux<T> ، وصفحة تحتوي على تفاصيل لسجل واحد (أو ليس واحد) عندما تتلقى Mono<T> .

مفاعل هو مشروع مفتوح المصدر أطلقته Pivotal. الآن أصبح يتمتع بشعبية كبيرة. يستخدمه Facebook في محركه النفاث لاستدعاء الإجراءات عن بُعد ، ويستخدمه أيضًا في Rsocket ، بقيادة RxJava Creator Ben Christensen. Salesforce يستخدمه في تنفيذ gRPC رد الفعل . يقوم مفاعل بتنفيذ أنواع Reactive Streams ، بحيث يمكنه التفاعل مع التقنيات الأخرى التي تدعم هذه الأنواع ، على سبيل المثال ، مع RxJava 2 من Netflix و Akka Streams من Lightbend ومع مشروع Vert.x من Eclipse Foundation. تعاون ديفيد Cairnock ، مدير RxJava 2 ، بنشاط مع Pivotal لتطوير Reactor ، مما جعل المشروع أفضل. بالإضافة إلى ذلك ، بالطبع ، إنه موجود بشكل أو بآخر في Spring Framework ، بدءًا من Spring Framework 4.0.

البرمجة التفاعلية مع Spring WebFlux

مع كل فائدته ، فإن Reactor هو الأساس فقط. يجب أن تتواصل تطبيقاتنا مع مصادر البيانات. يجب أن تدعم المصادقة والترخيص. الربيع يوفر كل هذا. إذا أعطانا مفاعل الاستعارة المفقودة ، فإن Spring يساعدنا جميعًا على التحدث بلغة مشتركة.

تم إصدار Spring Framework 5.0 في سبتمبر عام 2017. وهو يعتمد على مواصفات Reactive Stream. لديها وقت تشغيل تفاعلي جديد وطراز مكون يسمى Spring WebFlux .

Spring WebFlux مستقل عن واجهة برمجة تطبيقات Servlet ولا يتطلب منهم العمل. يأتي مزودًا بمحولات تتيح لك استخدامه أعلى محرك Servlet إذا لزم الأمر ، ولكن هذا ليس ضروريًا. كما يوفر وقت تشغيل يستند إلى Netty جديد يسمى Spring WebFlux. يعد Spring Framework 5 ، الذي يعمل مع Java 8 و Java EE 7 والإصدارات الأحدث ، بمثابة الأساس لمعظم النظام البيئي Spring ، بما في ذلك Spring Data Kay و Spring Security 5 و Spring Boot 2 و Spring Cloud Finchley.

Source: https://habr.com/ru/post/ar435972/


All Articles