كيف يمكنني طباعة دفق مستمر من الرسائل من Twitter مع بضعة أسطر من التعليمات البرمجية عن طريق إضافة بيانات الطقس إلى الأماكن التي يعيش فيها مؤلفوها؟ وكيف يمكنك تحديد سرعة الطلبات لمزود الطقس حتى لا تضعنا في القائمة السوداء؟
سنخبرك اليوم بكيفية القيام بذلك ، ولكننا سنتعرف أولاً على تقنية Akka Streams ، التي تجعل العمل مع تدفقات البيانات في الوقت الفعلي سهلاً مثل المبرمجين الذين يعملون مع تعبيرات LINQ دون الحاجة إلى تنفيذ الممثلين الفرديين أو واجهات التدفقات التفاعلية .
يستند المقال إلى نص
تقرير Vagif Abilov من مؤتمرنا في ديسمبر DotNext 2017 موسكو.
اسمي فاجيف أعمل في شركة Miles النرويجية. سنتحدث اليوم عن مكتبة Akka Streams.
Akka و Reactive Streams عبارة عن تقاطع بين مجموعات ضيقة إلى حد ما ، وقد يحصل المرء على انطباع بأن هذا هو المكان الذي تحتاج فيه إلى بعض المعرفة الرائعة للدخول ، ولكن العكس تمامًا. تهدف هذه المقالة إلى إظهار أنه باستخدام Akka Streams ، يمكنك تجنب البرمجة منخفضة المستوى المطلوبة عند استخدام Streamsive Streams و Akka.NET. بالنظر إلى المستقبل ، يمكنني القول على الفور: إذا كنا في بداية مشروعنا ، الذي نستخدم فيه Akka ، كنا نعرف عن وجود Akka Streams ، فسوف نكتب بشكل مختلف كثيرًا ، وسنوفر الوقت والرمز.
"ربما يكون أسوأ ما يمكنك فعله هو جعل الأشخاص الذين لا يشعرون بالألم لتناول الأسبرين".
ماكس كريمينسكي
"أبواب مغلقة وصداع واحتياجات فكرية"
قبل أن نتطرق إلى التفاصيل الفنية ، قليلاً عن ما قد يتحول إليه مسارك إلى Akka Streams وما الذي يمكن أن يقودك إلى هناك. في أحد الأيام ، صادفت مدونة ماكس كريمينسكي ، حيث سأل مثل هذا السؤال الفلسفي للمبرمجين: كيف أو لماذا من المستحيل للمبرمج أن يشرح ما هو الموناد. أوضح الأمر بهذه الطريقة: في كثير من الأحيان يذهب الناس على الفور إلى التفاصيل الفنية ، موضحًا كيف تعمل البرمجة بشكل جميل ومدى المعنى في monad ، دون عناء التساؤل عن سبب حاجة المبرمج لها على الإطلاق. من خلال رسم القياس ، يشبه محاولة بيع الأسبرين دون عناء لمعرفة ما إذا كان المريض يعاني من الألم.
باستخدام هذا القياس ، أود أن أطرح السؤال التالي: إذا كان Akka Streams هو الأسبرين ، فماذا سيكون الألم الذي سيقودك إليه؟
تدفقات البيانات
أولاً ، لنتحدث عن تدفقات البيانات. يمكن أن يكون التدفق بسيطًا جدًا ، خطيًا.
هنا لدينا مستهلك بيانات معين (أرنب في الفيديو). تستهلك البيانات بسرعة تناسبها. هذا هو التفاعل المثالي للمستهلك مع الدفق: فهو يحدد عرض النطاق الترددي ، وتتدفق البيانات إليه بهدوء. يمكن أن يكون دفق البيانات البسيط هذا غير محدود ، أو يمكن أن ينتهي.
لكن التدفق قد يكون أكثر تعقيدًا. إذا قمت بزراعة العديد من الأرانب جنبًا إلى جنب ، سيكون لدينا بالفعل موازاة للتدفقات. ما تحاول التدفقات التفاعلية حله هو فقط كيف يمكننا التواصل مع التدفقات على مستوى أكثر مفاهيمية ، أي بغض النظر عما إذا كنا نتحدث فقط عن نوع من قياس مستشعر درجة الحرارة ، حيث تأتي القياسات الخطية ، أو لدينا قياسات مستمرة لآلاف أجهزة استشعار درجة الحرارة التي تدخل النظام من خلال قوائم انتظار RabbitMQ والمخزنة في سجلات النظام. كل ما سبق يمكن اعتباره تيارًا مركبًا واحدًا. إذا ذهبت إلى أبعد من ذلك ، فيمكن أيضًا إدارة الإنتاج المؤتمتة (على سبيل المثال ، بعض المتاجر عبر الإنترنت) إلى دفق بيانات ، وسيكون من الرائع أن نتحدث عن تخطيط مثل هذا الدفق ، بغض النظر عن مدى تعقيده.

بالنسبة للمشاريع الحديثة ، فإن دعم مؤشر الترابط ليس جيدًا جدًا. إذا كنت أتذكر بشكل صحيح ، أراد آرون ستانارد ، الذي تراه تغريدة في الصورة ، الحصول على دفق من ملف متعدد غيغابايت يحتوي على ملف CSV ، أي النص ، وتبين أنه لا يوجد شيء يمكنك تنفيذه واستخدامه على الفور ، دون مجموعة من الإجراءات الإضافية. لكنه ببساطة لم يستطع الحصول على تيار من قيم CSV ، مما أحزنه. هناك عدد قليل من الحلول (باستثناء بعض المجالات الخاصة) ، يتم تنفيذ الكثير من الأشياء بالطرق القديمة ، عندما نفتح كل هذا ، نبدأ القراءة ، التخزين المؤقت ، في أسوأ الأحوال ، نحصل على شيء مثل المفكرة يقول أن الملف كبير جدًا.
على مستوى مفاهيمي عالٍ ، نشارك جميعًا في معالجة تدفقات البيانات ، وستساعدك Akka Streams إذا:
- أنت على دراية بـ Akka ، لكنك تريد أن توفر لنفسك التفاصيل المرتبطة بكتابة كود الممثل وتنسيقها ؛
- أنت على دراية بالتدفقات التفاعلية وتريد استخدام تطبيق جاهز لمواصفاتها ؛
- عناصر كتلة Akka Streams للمراحل مناسبة لنمذجة العملية الخاصة بك ؛
- تريد الاستفادة من Akka Streams الضغط الخلفي (الضغط الخلفي) لإدارة وتنقيح مراحل الإنتاجية في سير عملك بشكل ديناميكي.
من الممثلين إلى عكا تيارات

الطريقة الأولى هي من الممثلين إلى Akka Streams ، طريقي.
تظهر الصورة لماذا بدأنا في استخدام نموذج الممثل. لقد استنفدنا التحكم اليدوي في التدفقات ، الحالة المشتركة ، هذا كل شيء. كل من عمل مع أنظمة كبيرة ، مع تعدد خيوط ، يفهم مدى الوقت الذي يستغرقه الأمر ومدى سهولة ارتكاب خطأ فيه ، والذي يمكن أن يكون قاتلاً طوال العملية. هذا قادنا إلى نموذج الممثلين. نحن لا نأسف على الاختيار الذي تم اتخاذه ، ولكن ، بالطبع ، عندما تبدأ في العمل والبرمجة أكثر ، لا يعني أن الحماس الأولي يفسح المجال لشيء آخر ، ولكنك تبدأ في إدراك أنه يمكن القيام بشيء أكثر فعالية.
"بشكل افتراضي ، يتم إدخال مستلمي رسائلهم في رمز الممثلين. إذا قمت بإنشاء ممثل أ يرسل رسالة إلى الممثل ب ، وترغب في استبدال المتلقي بالممثل ج ، في الحالة العامة ، لن يعمل هذا بالنسبة لك "
نويل ولش (underscore.io)
انتقد الممثلون لعدم تأليفهم. كان نويل ويلش ، أحد مطوري Underscore ، من أوائل الذين كتبوا عن هذا في مدونته. لاحظ أن نظام الممثلين يبدو مثل هذا:

إذا لم تستخدم أي أشياء إضافية ، مثل حقن التبعية ، يتم خياطة عنوان المستلم في الفاعل.

عندما يبدأون في إرسال الرسائل إلى بعضهم البعض ، كل هذا الذي قمت بتعيينه مسبقًا ، الجهات الفاعلة في البرمجة. وبدون حيل إضافية ، يتم الحصول على مثل هذا النظام الجامد.
شرح أحد مطوري Akka ، Roland Kuhn ، ما هو المقصود عمومًا بالتصميم الضعيف. أساس إرسال رسائل الفاعل هو الأسلوب tell ، أي الرسائل أحادية الاتجاه: فهي من النوع باطل ، أي أنها لا تُرجع أي شيء (أو وحدة ، اعتمادًا على اللغة). لذلك ، من المستحيل إنشاء وصف للعملية من سلسلة من الجهات الفاعلة. لذا أرسلت قل ، ثم ماذا؟ توقف لدينا فراغ. يمكنك مقارنتها ، على سبيل المثال ، مع تعبيرات LINQ ، حيث يُرجع كل عنصر من عناصر التعبير IQueryable و IEnumerable ، ويمكن تجميع كل هذا بسهولة. الفاعلون لا يعطون مثل هذه الفرصة. في الوقت نفسه ، اعترض Roland Kuhn على حقيقة أنهم يفترض أنهم لا يؤلفون من حيث المبدأ ، قائلين إنهم في الواقع يتم تجميعهم بطرق أخرى ، بنفس المعنى الذي يجعل المجتمع البشري نفسه للتخطيط. يبدو الأمر كحجة فلسفية ، ولكن إذا فكرت في الأمر ، فإن القياس منطقي - نعم ، يرسل الممثلون بعضهم البعض رسائل أحادية الاتجاه ، لكننا نتواصل أيضًا مع بعضنا البعض ، وننطق رسائل أحادية الاتجاه ، ولكن في نفس الوقت نتفاعل بشكل فعال تمامًا ، أي أننا ننشئ أنظمة معقدة. ومع ذلك ، يوجد مثل هذا النقد للجهات الفاعلة.
public class SampleActor : ReceiveActor { public SampleActor() { Idle(); } protected override void PreStart() { } private void Idle() { Receive<Job>(job => ); } private void Working() { Receive<Cancel>(job => ); } }
بالإضافة إلى ذلك ، يتطلب تنفيذ الممثل على الأقل كتابة فصل إذا كنت تعمل في C # ، أو وظائف إذا كنت تعمل في F #. في المثال أعلاه - رمز مرجعي ، والذي يجب عليك كتابته على أي حال. على الرغم من أنها ليست كبيرة جدًا ، إلا أنه يجب عليك كتابة عدد معين من الأسطر دائمًا عند هذا المستوى المنخفض. تقريبا كل الكود الموجود هنا هو نوع من المراسم. ما يحدث عندما يتلقى الممثل رسالة مباشرة لا يظهر هنا على الإطلاق. وكل هذا يحتاج إلى أن يكتب. هذا ، بالطبع ، ليس كثيرًا جدًا ، ولكن هذا دليل على أننا نعمل مع الممثلين على مستوى منخفض ، نخلق مثل هذه الأساليب الفارغة.
ماذا لو تمكنا من الانتقال إلى مستوى مختلف وأعلى ، وطرح على أنفسنا أسئلة حول نمذجة عمليتنا ، والتي تشمل معالجة البيانات من مصادر مختلفة يتم خلطها وتحويلها ونقلها؟
var results = db.Companies .Join(db.People, c => c.CompanyID, p => p.PersonID, (c, p) => new { c, p }) .Where(z => zcCreated >= fromDate) .OrderByDescending(z => zcCreated) .Select(z => zp) .ToList();
يمكن أن يكون هذا النهج هو ما اعتدنا عليه جميعًا للعمل مع LINQ لمدة عشر سنوات. لا نتساءل كيف يعمل الانضمام. نحن نعلم أن هناك مزود LINQ من شأنه أن يفعل كل هذا من أجلنا ، ونحن مهتمون بمستوى أعلى في تلبية الطلب. ويمكننا بشكل عام مزج قواعد البيانات هنا ، يمكننا إرسال طلبات التوزيع. ماذا لو كنت تستطيع وصف العملية بهذه الطريقة؟
HttpGet pageUrl |> fun s -> Regex.Replace(s, "[^A-Za-z']", " ") |> fun s -> Regex.Split(s, " +") |> Set.ofArray |> Set.filter (fun word -> not (Spellcheck word)) |> Set.iter (fun word -> printfn " %s" word)
(المصدر)أو ، على سبيل المثال ، التحولات الوظيفية. ما يحبه الكثير من الناس في البرمجة الوظيفية هو أنه يمكنك تمرير البيانات من خلال سلسلة من التحولات ، وستحصل على رمز مضغوط واضح إلى حد ما ، بغض النظر عن اللغة التي تكتبها بها. من السهل القراءة. الرمز في الصورة مكتوب بشكل خاص في F # ، ولكن بشكل عام ، على الأرجح ، الجميع يفهم ما يحدث هنا.
val in = Source(1 to 10) val out = Sink.ignore val bcast = builder.add(Broadcast[Int](2)) val merge = builder.add(Merge[Int](2)) val f1,f2,f3,f4 = Flow[Int].map(_ + 10) source ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> sink bcast ~> f4 ~> merge ~>
(المصدر)ماذا عن هذا بعد ذلك؟ في المثال أعلاه ، لدينا مصدر بيانات مصدر يتكون من أعداد صحيحة من 1 إلى 10. وهذا ما يسمى DSL الرسومية (لغة خاصة بالمجال). عناصر لغة المجال في المثال أعلاه هي رموز أسهم أحادية الاتجاه - وهي عوامل تشغيل إضافية يتم تحديدها بواسطة أدوات اللغة التي تعرض اتجاه التدفق بشكل رسومي. نقوم بتمرير Source من خلال سلسلة من التحولات - لسهولة العرض ، فإنها تضيف فقط عشرة إلى الرقم. بعد ذلك يأتي البث: نضرب القنوات ، أي أن كل رقم يدخل قناتين. ثم نضيف 10 مرة أخرى ، نمزج تدفقات البيانات الخاصة بنا ، نحصل على دفق جديد ، نضيف 10 فيه أيضًا ، وكل هذا يذهب إلى دفق البيانات لدينا ، والذي لا يحدث فيه شيء. هذا هو الرمز الحقيقي المكتوب في Scala ، جزء من Akka Streams ، المنفذة بهذه اللغة. أي أنك تحدد مراحل تحويل بياناتك ، وتشير إلى ما يجب فعله بها ، وتحدد المصدر ، والمخزون ، وبعض نقاط التفتيش ، ثم تشكل مثل هذا الرسم البياني باستخدام DSL رسومي. هذا هو كل رمز لبرنامج واحد. تُظهر بضعة أسطر من التعليمات البرمجية ما يجري في هذه العملية.
دعونا ننسى كيفية كتابة رمز التعريف للجهات الفاعلة الفردية وبدلاً من ذلك نتعرف على بدائل التخطيط عالية المستوى التي ستخلق وتربط الجهات المطلوبة داخل أنفسهم. عندما نقوم بتشغيل مثل هذا الرسم البياني ، فإن النظام الذي يوفر Akka Streams سيقوم بإنشاء الممثل المطلوب من تلقاء نفسه ، وإرسال كل هذه البيانات هناك ، ومعالجتها كما ينبغي ، وإعطائها في النهاية للمستلم النهائي.
var runnable = Source .From(Enumerable.Range(1, 1000)) .Via(Flow.Create<int>().Select(x => x * 2) .To(Sink.ForEach<int>(x => Console.Write(x.ToString));
يوضح المثال أعلاه كيف قد يبدو هذا في C #. أبسط طريقة: لدينا مصدر بيانات واحد - هذه أرقام من 1 إلى 1000 (كما ترون ، في أي Akka Streams يمكن لأي IEnumerable أن يصبح مصدرًا لتدفق البيانات ، وهو أمر مريح للغاية). نقوم ببعض الحسابات البسيطة ، على سبيل المثال ، نضرب في اثنين ، ثم في تدفق البيانات يتم عرض كل هذا على الشاشة.
var graph = GraphDsl.Create(builder => { var bcast = builder.Add(new Broadcast<int>(2)); var merge = builder.Add(new Merge<int, int>(2)); var count = Flow.FromFunction(new Func<int, int>(x => 1)); var sum = Flow.Create<int>().Sum((x, y) => x + y); builder.From(bcast.Out(0)).To(merge.In(0)); builder.From(bcast.Out(1)).Via(count).Via(sum).To(merge.In(1)); return new FlowShape<int, int>(bcast.In, merge.Out); });
ما هو موضح في المثال أعلاه يسمى "DSL الرسومية في C #". في الواقع ، لا توجد رسومات هنا ، إنه منفذ مع Scala ، ولكن في C # لا توجد طريقة لتحديد المشغلين بهذه الطريقة ، لذلك يبدو أكثر تعقيدًا بعض الشيء ، ولكنه لا يزال مضغوطًا بما يكفي لفهم ما يحدث هنا. لذلك ، نقوم بإنشاء رسم بياني معين (هناك أنواع مختلفة من الرسم البياني ، هنا يسمى FlowShape) من مكونات مختلفة ، حيث يوجد مصدر بيانات وهناك بعض التحولات. نرسل البيانات إلى قناة واحدة ننشئ فيها حسابًا ، أي عدد عناصر البيانات المراد إرسالها ، وفي الأخرى ننشئ المجموع ثم نمزجها جميعًا. بعد ذلك سنرى أمثلة أكثر إثارة للاهتمام من معالجة الأعداد الصحيحة فقط.
هذا هو المسار الأول الذي يمكن أن يقودك إلى Akka Streams ، إذا كانت لديك خبرة في العمل مع نموذج ممثل وفكرت في ما إذا كنت تريد الكتابة يدويًا لكل منها ، حتى أبسط ممثل. الطريقة الثانية التي تأتي بها Akka Streams هي من خلال الجداول التفاعلية.
من تيارات رد الفعل إلى تيارات عكا
ما هي
التيارات التفاعلية ؟ هذه مبادرة مشتركة لتطوير معيار للمعالجة غير المتزامنة لتدفقات البيانات. وهي تحدد الحد الأدنى من الواجهات والأساليب والبروتوكولات التي تصف العمليات والكيانات اللازمة لتحقيق الهدف - المعالجة غير المتزامنة للبيانات في الوقت الحقيقي مع الضغط الخلفي غير المسدود (الضغط الخلفي). يسمح بتنفيذ مختلف باستخدام لغات برمجة مختلفة.
تسمح لك التدفقات التفاعلية بمعالجة عدد غير محدود من العناصر في تسلسل ونقل العناصر بشكل غير متزامن بين المكونات ذات الضغط الخلفي غير المحجوب.
إن قائمة المبادرين بإنشاء تيارات تفاعلية مثيرة للإعجاب تمامًا: إليك Netflix و Oracle و Twitter.
المواصفات بسيطة للغاية لجعل التنفيذ بلغات ومنصات مختلفة ممكن الوصول إليه قدر الإمكان. المكونات الرئيسية لواجهة برمجة التطبيقات للتفاعل التفاعلي:
- الناشر
- مشترك
- اشتراك
- المعالج
بشكل أساسي ، لا تعني هذه المواصفات أنك ستبدأ يدويًا في تنفيذ هذه الواجهات. من المفهوم أن هناك بعض مطوري المكتبات سيقومون بذلك نيابة عنك. و Akka Streams هو أحد تطبيقات هذه المواصفات.
public interface IPublisher<out T> { void Subscribe(ISubscriber<T> subscriber); } public interface ISubscriber<in T> { void OnSubscribe(ISubscription subscription); void OnNext(T element); void OnError(Exception cause); void OnComplete(); }
الواجهات ، كما ترون من المثال ، بسيطة جدًا حقًا: على سبيل المثال ، يحتوي الناشر على طريقة واحدة فقط - "الاشتراك". يحتوي المشترك ، المشترك ، على عدد قليل من ردود الفعل على الحدث.
public interface ISubscription { void Request(long n); void Cancel(); } public interface IProcessor<in T1, out T2> : ISubscriber<T1>, IPublisher<T2> { }
وأخيرًا ، يحتوي الاشتراك على طريقتين - "ابدأ" و "رفض". لا يحدد المعالج أي طرق جديدة على الإطلاق ؛ فهو يجمع بين الناشر والمشترك.
ما الذي يميز التدفقات التفاعلية عن تطبيقات البث الأخرى؟ تدفقات تيارات رد الفعل تجمع بين نماذج الدفع والسحب. للدعم ، هذا هو سيناريو الأداء الأكثر كفاءة. لنفترض أن لديك مشتركًا بطيئًا في البيانات. في هذه الحالة ، يمكن أن يكون الدفع له قاتلاً: إذا قمت بإرسال كمية كبيرة من البيانات إليه ، فلن يتمكن من معالجتها. من الأفضل استخدام السحب حتى يتمكن المشترك من سحب البيانات من الناشر. ولكن إذا كان الناشر بطيئًا ، فقد اتضح أن المشترك محظور طوال الوقت ، وينتظر طوال الوقت. قد يكون الحل الوسيط هو التكوين: لدينا ملف التكوين الذي نحدد فيه أيهما أسرع. وإذا تغيرت سرعاتها؟
لذا ، فإن التنفيذ الأكثر أناقة هو الذي يمكننا فيه تغيير نماذج الدفع والسحب ديناميكيًا.
(المصدر (Apache Flink))يوضح الرسم البياني كيف يمكن أن يحدث هذا. يستخدم هذا العرض الأباتشي Flink. يلو ناشر ، منتج بيانات ، تم تعيينه على حوالي 50 ٪ من قدرته. يحاول المشترك اختيار أفضل إستراتيجية - اتضح أنه دفع. ثم نعيد تعيين المشترك بسرعة تبلغ حوالي 20٪ ، ثم يتحول إلى سحب. ثم نذهب 100٪ ، ونعود مرة أخرى 20٪ ، إلى نموذج السحب ، وما إلى ذلك. كل هذا يحدث في الديناميكيات ، ولا تحتاج إلى إيقاف الخدمة ، وإدخال شيء ما في التكوين. هذا توضيح لكيفية عمل الضغط الخلفي في Akka Streams.
مبادئ عكا تيارات
بالطبع ، لن تكتسب Streams شعبية كبيرة إذا لم يكن هناك كتل مدمجة سهلة الاستخدام. هناك الكثير منهم. وهي مقسمة إلى ثلاث مجموعات رئيسية:
- مصدر البيانات (المصدر) - مرحلة المعالجة بمخرج واحد.
- يعد الحوض خطوة معالجة أحادية الإدخال.
- Checkpoint (Flow) - مرحلة المعالجة بإدخال واحد وإخراج واحد. تحدث التحولات الوظيفية هنا ، وليس بالضرورة في الذاكرة: يمكن أن تكون ، على سبيل المثال ، استدعاء لخدمة ويب ، لبعض عناصر التوازي ، متعددة الخيوط.
من هذه الأنواع الثلاثة ، يمكن تشكيل الرسوم البيانية. هذه بالفعل مراحل معالجة أكثر تعقيدًا ، والتي يتم بناؤها من المصادر والمصارف ونقاط التفتيش. ولكن لا يمكن تنفيذ كل رسم بياني: إذا كانت هناك ثقوب فيه ، أي فتح المدخلات والمخرجات ، فلن يتم تشغيل هذا الرسم البياني.
الرسم البياني هو رسم بياني قابل للتشغيل ، إذا تم إغلاقه ، أي أن هناك مخرجات لكل إدخال: إذا تم إدخال البيانات ، فيجب أن تكون قد ذهبت إلى مكان ما.

يحتوي Akka Streams على مصادر مدمجة: في الصورة ترى كم منها. أسمائهم هي من شخص لآخر وتعكس ما لدى Scala أو JVM ، باستثناء بعض المصادر المفيدة الخاصة بـ .NET. يعد الأولين (FromEnumerator و From) من الأهم: أي ترقيم ، أي رقم يمكن تحويله إلى مصدر دفق.

هناك مصارف مدمجة: بعضها يشبه طرق LINQ ، على سبيل المثال ، First ، Last ، FirstOrDefault. بالطبع ، كل ما تحصل عليه ، يمكنك التفريغ في الملفات ، في التدفقات ، ليس في تدفقات Akka ، ولكن في تدفقات .NET. ومرة أخرى ، إذا كان لديك أي ممثلين في نظامك ، يمكنك استخدامها عند كل من مدخلات ومخرجات النظام ، أي ، إذا كنت ترغب في ذلك ، قم بتضمين ذلك في نظامك النهائي.

وهناك عدد كبير من نقاط التفتيش المضمنة ، والتي ربما تذكرنا أكثر بـ LINQ ، لأنه هنا يوجد Select و SelectMany و GroupBy ، أي كل ما اعتدنا على العمل معه في LINQ.
على سبيل المثال ، يُدعى Select in Scala SelectAsync: إنه قوي بما فيه الكفاية لأنه يأخذ مستوى التوازي كإحدى الوسيطات. بمعنى ، يمكنك الإشارة إلى أنه ، على سبيل المثال ، يرسل Select البيانات إلى بعض خدمات الويب بالتوازي في عشرة سلاسل رسائل ، ثم يتم جمعها جميعًا وتمريرها. في الواقع ، يمكنك تحديد درجة قياس نقطة التفتيش بسطر واحد من التعليمات البرمجية.
إعلان التدفق هو خطة التنفيذ الخاصة به ، أي أنه لا يمكن تنفيذ رسم بياني ، حتى مخطط تشغيل مثل هذا - يجب أن يتحقق. يجب أن يكون هناك نظام فوري ، نظام ممثل ، يجب أن تعطيه تيارًا ، هذه الخطة للتنفيذ ، ومن ثم سيتم تنفيذها. علاوة على ذلك ، في وقت التشغيل ، يتم تحسينها إلى حد كبير ، تمامًا مثل عندما ترسل تعبير LINQ إلى قاعدة بيانات: يمكن للمزود تحسين SQL الخاص بك للحصول على إخراج بيانات أكثر كفاءة ، واستبدال أمر الاستعلام بأخرى بشكل أساسي. الأمر نفسه مع Akka Streams: بدءًا من الإصدار 2.0 ، يمكنك تعيين عدد معين من نقاط التفتيش ، وسيفهم النظام أنه يمكن دمج بعضها بحيث يتم تنفيذها من قبل ممثل واحد (دمج المشغل). تحتفظ نقاط التفتيش ، كقاعدة عامة ، بترتيب عناصر المعالجة.
var results = db.Companies .Join(db.People, c => c.CompanyID, p => p.PersonID, (c, p) => new { c, p }) .Where(z => zcCreated >= fromDate) .OrderByDescending(z => zcCreated) .Select(z => zp) .ToList();
يمكن مقارنة تجسيم الدفق بعنصر ToList الأخير في تعبير LINQ في المثال أعلاه. إذا لم نكتب ToList ، فإننا نحصل على تعبير LINQ غير مادي لن يؤدي إلى نقل البيانات إلى خادم SQL أو Oracle ، لأن معظم موفري LINQ يدعمون ما يسمى تنفيذ الاستعلام المؤجل (تنفيذ الاستعلام المؤجل) ، t أي ، يتم تنفيذ الطلب فقط عندما يتم إعطاء أمر لإعطاء بعض النتائج. بناءً على ما هو مطلوب - قائمة أو النتيجة الأولى - سيتم تشكيل الفريق الأكثر فعالية. عندما نقول ToList ، فإننا نطلب من مزود LINQ أن يعطينا النتيجة النهائية.
var runnable = Source .From(Enumerable.Range(1, 1000)) .Via(Flow.Create<int>().Select(x => x * 2) .To(Sink.ForEach<int>(x => Console.Write(x.ToString));
يعمل Akka Streams بنفس الطريقة. في الصورة يوجد الرسم البياني الذي تم إطلاقه ، والذي يتكون من مصدر نقاط التفتيش والجريان السطحي ، ونريد الآن تشغيله.
var runnable = Source .From(Enumerable.Range(1, 1000)) .Via(Flow.Create<int>().Select(x => x * 2) .To(Sink.ForEach<int>(x => Console.Write(x.ToString)); var system = ActorSystem.Create("MyActorSystem"); using (var materializer = ActorMaterializer.Create(system)) { await runnable.Run(materializer); }
لكي يحدث هذا ، نحتاج إلى إنشاء نظام من الممثلين ، حيث يوجد هناك مادة ، ونمرر الرسم البياني له ، وسوف يقوم بتنفيذه. إذا قمنا بإعادة إنشائه ، فسيتم تنفيذه مرة أخرى ، ويمكن الحصول على نتائج أخرى.
بالإضافة إلى تجسيد التدفق ، بالحديث عن الجزء المادي من Akka Streams ، تجدر الإشارة إلى القيم المادية.
var output = new List<int>(); var source1 = Source.From(Enumerable.Range(1, 1000)); var sink1 = Sink.ForEach<int>(output.Add); IRunnableGraph<NotUsed> runnable1 = source1.To(sink1); var source2 = Source.From(Enumerable.Range(1, 1000)); var sink2 = Sink.Sum<int>((x,y) => x + y); IRunnableGraph<Task<int>> runnable2 = source2.ToMaterialized(sink2, Keep.Right);
عندما يكون لدينا تيار ينتقل من المصدر عبر نقاط التفتيش إلى البالوعة ، إذا لم نطلب أي قيم وسيطة ، فلن تكون متاحة لنا ، حيث سيتم تنفيذه بأكثر الطرق فعالية. إنه مثل صندوق أسود. ولكن قد يكون من المثير للاهتمام بالنسبة لنا سحب بعض القيم الوسيطة ، لأنه في كل نقطة على اليسار تأتي بعض القيم ، والقيم الأخرى تظهر على اليمين ، ويمكنك تحديد رسم بياني للإشارة إلى ما أنت مهتم به. في المثال أعلاه ، رسم بياني سريع يتم فيه الإشارة إلى NotUsed ، أي أنه لا توجد قيم ملموسة تهمنا. أدناه نقوم بإنشائه مع الإشارة إلى أنه على الجانب الأيمن من الجريان السطحي ، أي بعد اكتمال جميع التحولات ، نحتاج إلى إعطاء قيم مادية. نحصل على مهمة الرسم البياني - وهي مهمة ، عند الانتهاء نحصل على عدد صحيح ، وهذا ما يحدث في نهاية هذا الرسم البياني. يمكنك أن تشير في كل فقرة إلى أنك بحاجة إلى نوع من القيم المادية ، سيتم جمع كل هذا تدريجيًا.
لنقل البيانات إلى تدفقات Akka Streams أو إخراجها من هناك ، بالطبع ، هناك حاجة إلى نوع من التفاعل مع العالم الخارجي. تحتوي مراحل المصدر المضمنة على نطاق واسع من تدفقات البيانات التفاعلية:
- Source.FromEnumerator و Source.From تسمح لك بنقل البيانات من أي مصدر ينفذ IEnumerable ؛
- يقوم Unfold و UnfoldAsync بتوليد نتائج حسابات الدوال بشرط أن تُرجع قيمًا غير صفرية ؛
- FromInputStream يحول تيار ؛
- FromFile يوزع محتويات الملف في تيار رد الفعل.
- يقوم ActorPublisher بتحويل رسائل الممثل.
كما سبق أن قلت ، بالنسبة لمطوري .NET ، من المفيد جدًا استخدام Enumerator أو IEnumerable ، ولكن في بعض الأحيان يكون الأمر بدائيًا جدًا ، وغير فعال للغاية للوصول إلى البيانات. تتطلب المصادر الأكثر تعقيدًا التي تحتوي على كمية كبيرة من البيانات موصلات خاصة. تتم كتابة هذه الروابط. هناك مشروع مفتوح المصدر Alpakka ، والذي ظهر أصلاً في Scala وهو الآن في .NET. بالإضافة إلى ذلك ، لدى Akka ما يسمى الممثلين الدائمين ، ولديهم تيارات خاصة بهم يمكن استخدامها (على سبيل المثال ، يشكل الاستعلام Akka المستمر استمرار تدفق محتوى Akka Event Journal).

إذا كنت تعمل مع Scala ، فإن أسهل طريقة لك هي: هناك عدد كبير من الموصلات ، وستجد بالتأكيد شيئًا يناسب ذوقك. للعلم ، كافكا هو ما يسمى كافكا رد الفعل ، وليس تيارات كافكا. تيارات كافكا ، على حد علمي ، لا تدعم الضغط الخلفي. رد الفعل كافكا هو تطبيق دفق من كافكا يدعم تيارات رد الفعل.

قائمة موصلات Alpakka .NET أكثر تواضعا ، ولكن تم تجديدها ، وهناك عنصر من المنافسة. هناك تغريدة تبلغ من العمر نصف عام من ديفيد فاولر من Microsoft ، قال إن SignalR يمكنه الآن تبادل البيانات مع الامتدادات التفاعلية ، وأجاب أحد مطوري Akka أنها كانت بالفعل في Akka Streams لبعض الوقت. يدعم Akka خدمات متنوعة من Microsoft Azure. CSV هو نتيجة إحباط آرون ستانارد عندما اكتشف أنه لا يوجد دفق جيد لـ CSV: الآن لدى Akka دفقها الخاص لـ CSV XML. يوجد AMQP (في الواقع ، RabbitMQ) ، قيد التطوير ، ولكنه متاح للاستخدام ، وهو يعمل. كافكا أيضا قيد التطوير. ستستمر هذه القائمة في التوسع.
بضع كلمات حول البدائل ، لأنه إذا كنت تعمل مع تدفقات البيانات ، فإن Akka Streams ، بالطبع ، ليست الطريقة الوحيدة للتعامل مع هذه التدفقات. على الأرجح ، في مشروعك ، يعتمد اختيار كيفية تنفيذ سلاسل العمليات على العديد من العوامل الأخرى التي قد تصبح أساسية. على سبيل المثال ، إذا كنت تعمل كثيرًا مع Microsoft Azure و Orleans مدمجًا بشكل طبيعي في احتياجات مشروعك بدعمهم للجهات الفاعلة الافتراضية ، أو كما يطلقون عليها ، الحبوب ، عندها يكون لديهم تنفيذ خاص بهم لا يفي بمواصفات التدفقات التفاعلية - Orleans Streams ، والتي سيكون الأقرب إليك ، ومن المنطقي أن تنتبه إليه. إذا كنت تعمل كثيرًا مع TPL ، فهناك TPL DataFlow - قد يكون هذا أقرب تشابه لتدفقات Akka: فهو يحتوي أيضًا على بدائل لإنشاء تدفقات البيانات ، بالإضافة إلى أدوات التخزين المؤقت والحد من النطاق الترددي (BoundedCapacity ، MaxMessagePerTask). إذا كانت أفكار نموذج الممثل قريبة منك ، فإن Akka Streams هي طريقة لمعالجة هذا الأمر وتوفير قدر كبير من الوقت دون الحاجة إلى كتابة كل ممثل يدويًا.
مثال على التنفيذ: دفق سجل الأحداث
دعونا نلقي نظرة على بعض أمثلة التنفيذ.
المثال الأول هو عدم تنفيذ الدفق مباشرة ، بل كيفية استخدام الدفق. كانت هذه تجربتنا الأولى مع Akka Streams ، عندما اكتشفنا أنه في الواقع يمكننا الاشتراك في بعض البث الذي سيبسط الكثير بالنسبة لنا.
نقوم بتحميل ملفات وسائط مختلفة إلى السحابة. كانت هذه مرحلة مبكرة من المشروع: هنا في آخر 15 دقيقة 23 ملفًا ، منها 7 أخطاء. الآن لا توجد أخطاء عمليًا وعدد الملفات أكبر بكثير - يمر المئات كل بضع دقائق. كل هذا موجود في لوحة معلومات كيبانا.تقرأ كيبانا البيانات من Elasticsearch ، وبما أن البيانات الثانوية بدلاً من البيانات الأولية يتم تخزينها في Elasticsearch ، فقد تطلب تنفيذ هذا المفهرس حذفها وإصدار أمر لملئها مرة أخرى. نظرًا لأن المشروع قيد التطوير ، فإن هذا يسمح لنا بتغيير تنسيقات البيانات ، وتوسيعها بقيم جديدة ، أي يجب تحديث الفهرس باستمرار. يتم تجديده بمحتويات مجلة الحدث Akka ، المخزنة في قاعدة بيانات Microsoft SQL Server. يجب عرض كل من الأحداث المحفوظة مسبقًا والأحداث في الوقت الفعلي في لوحة العمليات الحالية. CREATE TABLE EventJournal ( Ordering BIGINT IDENTITY(1,1) PRIMARY KEY NOT NULL, PersistenceID NVARCHAR(255) NOT NULL, SequenceNr BIGINT NOT NULL, Timestamp BIGINT NOT NULL, IsDeleted BIT NOT NULL, Manifest NVARCHAR(500) NOT NULL, Payload VARBINARY(MAX) NOT NULL, Tags NVARCHAR(100) NULL CONSTRAINT QU_EventJournal UNIQUE (PersistenceID, SequenceNr) )
لتحقيق ذلك ، نحتاج ، من ناحية ، إلى إعادة كتابة البيانات المأخوذة من SQL Server ، والتي تحتوي على بعض الجهات الفاعلة المستمرة في متجر الأحداث Akka ، eventJournal. تُظهر الصورة متجرًا نموذجيًا.
ومن ناحية أخرى ، تأتي البيانات في الوقت الفعلي. وتبين أنه من أجل كتابة فهرس ، نحتاج إلى قراءة البيانات من قاعدة البيانات ، بالإضافة إلى وصول البيانات في الوقت الفعلي ، وفي مرحلة ما نحتاج إلى فهم: هنا انتهت البيانات من هنا ، هذا جديد. تتطلب هذه اللحظة الحدية التحقق الإضافي حتى لا تفقد أي شيء ولا تسجل أي شيء مرتين. أي أنها اتضح أنها معقدة إلى حد ما. أنا وزميلي لم نكن سعداء بما يجري. إنه ليس رمزًا معقدًا للغاية ، بل مجرد كئيب. حتى نتذكر أن الممثلين الدائمين في عكا يدعمون الاستعلام المستمر.
هذه مجرد فرصة للحصول عليها في شكل دفق بيانات مجردة فوق المصدر ، أو أنها تأتي من قاعدة البيانات أو يتم الحصول عليها في الوقت الحقيقي.استعلامات مضمنة (استعلامات مثابرة):- Allpersistencelds
- CurrentPersistencelds
- الفعاليات ByPersistenceld
- CurrentEventsByPersistenceld
- الأحداث
- CurrentEventsByTag
وهناك عدد من الطرق التي يمكننا استخدامها ، على سبيل المثال ، هناك الطريقة الحالية - هذه لقطة ، بيانات تاريخيًا تصل إلى نقطة زمنية معينة. وبدون هذه البادئة ، أولاً بما في ذلك البادئات الحقيقية. نحن بحاجة إلى EventsByTag. let system = mailbox.Context.System let queries = PersistenceQuery.Get(system) .ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier) let mat = ActorMaterializer.Create(system) let offset = getCurrentOffset client config let ks = KillSwitches.Shared "persistence-elastic" let task = queries.EventsByTag(PersistenceUtils.anyEventTag, offset) .Select(fun e -> ElasticTypes.EventEnvelope.FromAkka e) .GroupedWithin(config.BatchSize, config.BatchTimeout) .Via(ks.Flow()) .RunForeach((fun batch -> processItems client batch), mat) .ContinueWith(handleStreamError mailbox, TaskContinuationOptions.OnlyOnFaulted) |> Async.AwaitTaskVoid
واتضح أن لدينا ما يكفي من التعليمات البرمجية. تم كتابته في F # ، ولكن في C # كان حول نفس التعاقد. نحصل على EventsByTag ، ونستخدم كتل Akka Streams المضمنة ، ومن كل هذا نحصل على البيانات التي نجمعها في Elasticsearch. أي أننا استفدنا من تنفيذ شخص آخر لدفق البيانات ، وهذا سمح لنا بنسيان مكان بياناتنا ، ومن أين تأتي - من قاعدة البيانات أو يحدث في الوقت الحقيقي. أعطانا هذا التنفيذ كل هذا بطلب واحد.ولكن هنا كنا مستهلكين لهذه البيانات. في الحالة التي نريد فيها إنتاج مثل هذه البيانات بأنفسنا ، يصبح المثال أكثر إثارة للاهتمام ، وننظر إليها في بيانات حقيقية ، حيث أن Twitter كان أحد المبادرين بهذه المواصفات ، والتغريدات هي شيء في متناول الجميع ، ونحن نفهم جميعًا . هذا مثال قياسي لكيفية عمل Akka Streams.مثال على التنفيذ: تغريدات جت
هناك مثال لـ Akka for Scala ، لـ Akka.NET ، لكني وجدت هذه الأمثلة غير كافية ، لأنها تظهر مثالًا محددًا واحدًا على كيفية سحب البيانات وما يتم فعله معها ، لكنني أردت أن ننظر إلى تعقيدات تدريجية ، أي البدء بدفق بسيط والاستمرار في إضافة بعض التصميمات الجديدة إليها. للقيام بذلك ، سنستخدم مكتبة Tweetinvi - هذه مكتبة مفتوحة المصدر توفر بيانات من Twitter ، فهي تدعم فقط إخراج البيانات في شكل دفق. لا يفي هذا الدفق بمواصفات التدفقات التفاعلية ، أي أنه لا يمكننا أخذها على الفور ، بل إنها جيدة ، لأنها ستسمح لنا بإظهار كيف يمكننا ، باستخدام عكا البدائية بشكل عام ، كتابة دفقنا الخاص بناءً على هذا ، التقى بهذه المواصفات.
الآن سيكون لدينا مصدر معين للتغريدات ، والذي سنوازيه في قناتين ، أي أن هذا هو البث البدائي. في القناة الأولى ، سنقوم ببساطة بتنسيق التغريدات ، وحدد اسم مؤلف التغريدات ، ثم نمزج مع بيانات القناة الثانية. وفي القناة الثانية ، سنفعل شيئًا أكثر تعقيدًا: سنحد من عرض النطاق الترددي لهذا الدفق ، ثم سنقوم بتوسيع بيانات التغريدات ببيانات الطقس في الأماكن التي تمت كتابة هذه التغريدات فيها ، وتهيئتها جميعًا مع درجة الحرارة ، وخلطها مع القناة الأولى وطباعتها كلها على الشاشة.كل هذا في حسابي في GitHub في AkkaStreamsDemo . افتح وشاهد (أو يمكنك بدء مشاهدة تسجيل التقرير من الآن فصاعدًا ).لنبدأ بواحد بسيط. أولاً ، أريد أن أقرأ البيانات مباشرة من Twitter: في ملف Program.cs var useCachedTweets = false
في حال تم حظري من تويتر ، لدي تغريدات مخزنة مؤقتًا ، فهي أسرع. للبدء ، قمنا بإنشاء بعض RunnableGraph. public static IRunnableGraph<IActorRef> CreateRunnableGraph() { var tweetSource = Source.ActorRef<ITweet>(100, OverflowStrategy.DropHead); var formatFlow = Flow.Create<ITweet>().Select(Utils.FormatTweet); var writeSink = Sink.ForEach<string>(Console.WriteLine); return tweetSource.Via(formatFlow).To(writeSink); }
( المصدر )لدينا مصدر للتغريدات هنا ، يأتي من ممثل. سأوضح لك كيفية سحب هذه التغريدات هناك ، تنسيقها (يمنح تنسيق التغريدة المؤلف فقط تغريدة) ثم نكتبها على الشاشة.StartTweetStream - هنا سنستخدم مكتبة Tweetinvi. public static void StartTweetStream(IActorRef actor) { var stream = Stream.CreateSampleStream(); stream.TweetReceived += (_, arg) => { arg.Tweet.Text = arg.Tweet.Text.Replace("\r", " ").Replace("\n", " "); var json = JsonConvert.SerializeObject(arg.Tweet); File.AppendAllText("tweets.txt", $"{json}\r\n"); actor.Tell(arg.Tweet); }; stream.StartStream(); }
( المصدر ) منخلال CreateSampleStream نحصل على عينة من التغريدات ، ويتم إصدارها بسرعة عالية جدًا. من كل هذا ، نختار ما نحتاج إليه وننشئ ممثلًا يقول: "اقبل هذه التغريدة". بعد ذلك ، نحتاج إلى الحصول على IEnumerable ، بحيث نحصل على المصدر في النهاية.A TweetEnumerator هو بسيط جدا: لدينا مجموعة من تويت، ونحن بحاجة إلى أن ندرك التيار، MOVENEXT، إعادة تعيين، حسنا، تخلص، ليكونوا مواطنين صالحين. إذا قمنا بتشغيل هذا ، سنرى مثالًا في الوقت الفعلي. هناك الكثير من المواد غير المطبوعة ، لأنها من بلدان غير لاتينية مختلفة. هذه هي أسهل نسخة من برنامجنا.الآن نقوم بتغيير قيمة useCachedTweets إلى true ، وهنا تبدأ المضاعفات. CashedTweets هو نفس الشيء ، فقط لديّ ملف يحتوي على 50000 تغريدة قمت بالفعل بتحديدها وحفظها وسوف نستخدمها. لقد حاولت اختيار التغريدات التي تحتوي على بيانات حول الإحداثيات الجغرافية لمؤلفيها ، والتي سنحتاجها. الخطوة التالية هي أننا نريد موازنة التغريدات. بعد التنفيذ ، سيكون لدينا أولاً مالك التغريدة في القائمة ، ثم الإحداثيات.مع البث المباشر: var graph = GraphDsl.Create(b => { var broadcast = b.Add(new Broadcast<ITweet>(2)); var merge = b.Add(new Merge<string>(2)); b.From(broadcast.Out(0)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.CreatedBy)) .Via(formatUser) .To(merge.In(0)); b.From(broadcast.Out(1)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.Coordinates)) .Via(formatCoordinates) .To(merge.In(1)); return new FlowShape<ITweet, string>(broadcast.In, merge.Out); });
( المصدر )إذا كانت Scala ، فستبدو حقًا مثل DSL الرسومي. ننشئ هنا بثًا بقناتين - خارج (0) ، خارج (1) - وفي حالة واحدة نطبع CreatedBy ، وفي الحالة الأخرى نطبع الإحداثيات ، ثم نمزجها كلها ونرسلها إلى المخزون. أيضا بسيطة بما يكفي في الوقت الحالي.الخطوة التالية في العرض التوضيحي لدينا هي تعقيد الأمور قليلاً. لنبدأ في تغيير النطاق الترددي. var graph = GraphDsl.Create(b => { var broadcast = b.Add(new Broadcast<ITweet>(2)); var merge = b.Add(new Merge<string>(2)); b.From(broadcast.Out(0)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.CreatedBy) .Throttle(10, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)) .Via(formatUser) .To(merge.In(0)); b.From(broadcast.Out(1)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.Coordinates) .Buffer(10, OverflowStrategy.DropNew) .Throttle(1, TimeSpan.FromSeconds(1), 10, ThrottleMode.Shaping)) .Via(formatCoordinates) .To(merge.In(1)); return new FlowShape<ITweet, string>(broadcast.In, merge.Out); });}
( المصدر )في القناة الأولى ، لدينا حد عرض النطاق الترددي يصل إلى 10 تغريدة في الثانية ، في القناة الثانية لدينا ما يصل إلى تغريدة واحدة في الثانية مع مخزن مؤقت من 10. عندما نحدد المخزن المؤقت ، يجب أن نشير إلى استراتيجية ما يجب فعله إذا كان المخزن المؤقت ممتلئًا. هذا ، بالمناسبة ، يميز تيارات Akka وتيارات رد الفعل بشكل عام: اختيار هذه الاستراتيجية أمر لا بد منه. في كثير من الحالات ، عندما نعمل مع سلاسل الرسائل ، لا نعرف ما سيحدث إذا بدأ شيء ما في التدفق معنا. هنا يمكننا أن نختار ، على سبيل المثال ، إذا كانت هذه البيانات حاسمة ، فقد يعطي الدفق بأكمله رسالة خطأ وينتهي. يمكنك إزالة أحدث البيانات ، يمكنك البدء في إزالة البيانات من النهاية. هذا هو خيارنا ، لكنه عقد معين أبرمناه هنا. ها هو في المخزن المؤقت (10 ، OverFlowStrategy.DropHead). إذا قمنا بتشغيل هذا البرنامج الآن ،سنحصل على تغريدات هذه القنوات بسرعات مختلفة. لدينا هنا حوالي 10 تغريدات باسم المالك ، وتغريدة واحدة مع الإحداثيات ، حيث قمنا بتعيين مثل هذا النطاق الترددي. بالطبع ، أريد أن أفعل شيئًا مع الإحداثيات ، أي يمكنك محاولة تحميلها إلى بعض الخدمات التي ستعطينا ، على سبيل المثال ، ما هو الطقس ، أي في الحالة المزاجية لمؤلف التغريدات بحسب الطقس. شاهد مدى سهولة التنفيذ الآن.في أي حالة مزاجية ، اعتمادًا على الطقس ، كان مؤلف التغريدة. شاهد مدى سهولة التنفيذ الآن.في أي حالة مزاجية ، اعتمادًا على الطقس ، كان مؤلف التغريدة. شاهد مدى سهولة التنفيذ الآن. var graph = GraphDsl.Create(b => { var broadcast = b.Add(new Broadcast<ITweet>(2)); var merge = b.Add(new Merge<string>(2)); b.From(broadcast.Out(0)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.CreatedBy) .Throttle(10, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)) .Via(formatUser) .To(merge.In(0)); b.From(broadcast.Out(1)) .Via(Flow.Create<ITweet>().Select(tweet => tweet.Coordinates) .Buffer(10, OverflowStrategy.DropNew) .Throttle(1, TimeSpan.FromSeconds(1), 10, ThrottleMode.Shaping)) .Via(Flow.Create<ICoordinates>().SelectAsync(5, Utils.GetWeatherAsync)) .Via(formatTemperature) .To(merge.In(1)); return new FlowShape<ITweet, string>(broadcast.In, merge.Out); });
( المصدر )هنا لدينا القناة الثانية ، ولها SelectAsync ، حيث نحصل على الطقس. نحن لا نرسل هذا فقط إلى خدمة توصيل الطقس ، بل نقول أيضًا أن هذا الرمز يتم تنفيذه بمستوى موازٍ يبلغ 5: وهذا يعني أنه سيتم إنشاء 5 سلاسل موازية إذا كانت هذه الخدمة بطيئة بما فيه الكفاية حيث ستطلب هذه الخدمة الطقس. يتم تنفيذ الخدمة نفسها هنا ، ومن المنطقي أيضًا إظهار مدى بساطة هذا الرمز. public static async Task<decimal> GetWeatherAsync(ICoordinates coordinates) { var httpClient = new HttpClient(); var requestUrl = $"http://api.met.no/weatherapi/locationforecast/1.9/?lat={coordinates.Latitude};lon={coordinates.Latitude}"; var result = await httpClient.GetStringAsync(requestUrl); var doc = XDocument.Parse(result); var temp = doc.Root.Descendants("temperature").First().Attribute("value").Value; return decimal.Parse(temp); }
( المصدر )كل شيء بسيط هنا. لقد وجدت أول خدمة ويب ظهرت والتي أظهرت حالة الطقس الحالية من خلال الإحداثيات ، وكانت نوعًا ما من موفر الأرصاد الجوية النرويجي ، إنه مجرد طلب تم إرساله هنا عبر HttpClient ، ويمكن استخراج بيانات الطقس من XML الذي أحصل عليه.والآن ، إذا قمنا بتشغيل العرض التوضيحي الخاص بنا ، فعند تشغيل هذه الخدمة الآن ، ستبدأ بيانات درجة الحرارة في الظهور في وقت متأخر. حوالي مرة واحدة في 10 تغريدات ، لدينا 10 رسائل باسم المالك ، وبمجرد الحصول على درجة الحرارة بالدرجات المئوية حيث تم كتابة التغريدات.إنه لأمر مثير للإعجاب بما فيه الكفاية كم هو بسيط لوصف مثل هذه العملية ، بما في ذلك الإشارة إلى مستوى التوازي. هذه ليست سوى عدد قليل من الكتل التي يمكن استخدامها في Akka Streams ، لقد سبق أن قلت أن هناك الكثير منها. إن فرص الاستفادة من العديد منها عالية جدًا.إذا استخدمت نموذج الممثل ، ربما قبل عام ، عندما لم أكن على دراية بـ Akka Streams ، سأكتب كل ممثل على حدة لهذا الغرض. كما ترون ، لا تحتاج إلى كتابة كود لكل نقطة تفتيش ، كل هذا يمكن القيام به باستخدام أدوات Akka Streams ، لذا في المجموع C # يستغرق عدة عشرات من الأسطر البرمجية ، مما يسمح لنا بتركيز سيطرتنا ، واهتمامنا على مستوى أعلى من تنظيم العملية ، وليس على التفاصيل الدقيقة ، من داخل دفق البيانات.الاعتبارات النهائية
ما الأفكار حول Akka Streams التي أود منك أن تصنعها لنفسك بعد قراءة هذه المقالة؟ في DotNext 2017 موسكو ، كنت على عرض تقديمي من أليكس تيسنحول وظائف Azure. بمعنى ما ، هذا تغيير في فكرة كيفية كتابة التعليمات البرمجية للنشر ، بدلاً من التركيز على تكوين الأجهزة ، نقوم بتثبيت مثل هذه البرامج على هذا الجهاز التي تتحدث إلى هذه الخدمات وتتلقى هذه البيانات) ، نركز مباشرة على الجزء الوظيفي وهذه الخوذة الوظيفية في السحابة. لا نفكر في أي عُقد آلة ستنفذ هذا الرمز بالضبط ، بل نفكر في كيفية تعاون وظائفنا مع بعضها البعض. يمكن رسم تشابه مماثل بين نظام مكتوب باستخدام نماذج الممثل ، ولكن يدويًا و Akka Streams ، أي ننسى كيفية كتابة الممثلين يدويًا وبدلاً من ذلك نركز على وصف العملية بالكامل.في جزء كبير من السيناريوهات ، تمكنا من البقاء على مستوى عالٍ إلى حد ما مع الحفاظ على قابلية النظام وسرعته.نظرًا لأن Akka Streams ليس البديل الوحيد لهذا النهج ، عندما تفكر في كيفية محاكاة العملية الخاصة بك ، فكر في ما إذا كان يمكنك الصعود إلى مستوى واحد. لكل المزايا التي توفرها لنا الخدمات الدقيقة مقارنة بالنهج المتآلف ، هناك مخاوف معينة من أننا نركز بشكل كبير على الخدمات الصغيرة ، ونحصل على مهام صغيرة ، وعلى الرغم من كل هذا لا نرى الغابة بأكملها. والآن تعد Akka Streams طريقة ، بدون العودة إلى المستوى المتآلف ، ومع ذلك تعود إلى مستوى الفكرة العامة للعملية.أخيرًا ، لدي لك أغنية صغيرة تتحدث عن بعض كتل Akka Streams المضمنة ، والتي تسمى "Akka Stream Rap". هناك كلمات تحت الفيديو ، يمكنك تشغيلها والغناء.This is the Akka Stream.
This is the Source that feeds the Akka Stream.
This is the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Streams.
This is the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is TakeWhile that pulls from the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Throttle that speeds down the TakeWhile that pulls from the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the Source that feeds the Akka Stream.
This is the Bidiflow that turns back the Throttle that speeds down the TakeWhile that pulls from the Drop that removes from the Zip that combines from the Balance that splits the FilterNot that selects from the Merge that collects from the Broadcast that forks the MapAsync that maps from the source that feeds the Akka Streams.
هذا هو الحوض المملوء من Bidiflow الذي يعيد الخانق الذي يسرع TakeWhile الذي ينسحب من Drop الذي يزيل من Zip الذي يجمع من الرصيد الذي يقسم FilterNot الذي يختار من الدمج الذي يجمع من البث الذي يقوم بتطبيق MapAsync الذي يقوم بتعيين الخرائط من المصدر الذي يغذي Akka Stream.
دقيقة من الدعاية. إذا أعجبك التقرير وترغب في شيء آخر مثل هذا ، فستعقد DotNext 2018 Moscow التالية في موسكو يومي 22 و 23 نوفمبر ، ولن يكون أقل إثارة للاهتمام بالنسبة لك هناك. أسرع للحصول على التذاكر بسعر يوليو (من 1 أغسطس ، ستزداد تكلفة التذاكر).