منذ شهرين ، بدأت دراسة Spark ، وفي وقت ما واجهت مشكلة حفظ حسابات التدفق المنظم في قاعدة بيانات كاساندرا.
في هذا المنشور ، أعطي مثالاً بسيطًا على إنشاء واستخدام كاسندرا سينك للتدفق المنظم Spark. آمل أن يكون المنشور مفيدًا لأولئك الذين بدأوا مؤخرًا العمل مع Spark Structured Streaming ويتساءلون عن كيفية تحميل نتائج الحساب إلى قاعدة البيانات.
فكرة التطبيق بسيطة للغاية - لتلقي الرسائل من كافكا وتحليلها ، وإجراء تحويلات بسيطة في زوج وحفظ النتائج في كاساندرا.
إيجابيات التدفق المنظم
يمكنك قراءة المزيد حول التدفق المنظم في
الوثائق . باختصار ، إن البث المنظم عبارة عن محرك معالجة معلومات تدفق قابل للتطوير بشكل جيد يعتمد على محرك Spark SQL. يسمح لك باستخدام Dataset / DataFrame لتجميع البيانات ، وحساب وظائف النافذة ، والاتصالات ، وما إلى ذلك ، أي أن التدفق المنظم يسمح لك باستخدام SQL القديم الجيد للعمل مع تدفقات البيانات.
ما هي المشكلة؟
تم إصدار الإصدار الثابت من Spark Structured Streaming في عام 2017. بمعنى ، هذه واجهة برمجة تطبيقات جديدة إلى حد ما تقوم بتنفيذ الوظائف الأساسية ، ولكن بعض الأشياء يجب أن نقوم بها بأنفسنا. على سبيل المثال ، يحتوي التدفق المنظم على وظائف قياسية لكتابة الإخراج إلى ملف أو مربع أو وحدة تحكم أو ذاكرة ، ولكن من أجل حفظ البيانات في قاعدة البيانات ، يجب عليك استخدام مستقبل
foreach المتوفر في التدفق المنظم وتنفيذ واجهة
ForeachWriter .
بدءًا من Spark 2.3.1 ، لا يمكن تنفيذ هذه الوظيفة إلا في Scala و Java .
أفترض أن القارئ يعرف بالفعل كيف يعمل التدفق المنظم بعبارات عامة ، ويعرف كيفية تنفيذ التحولات الضرورية وهو الآن جاهز لتحميل النتائج إلى قاعدة البيانات. إذا كانت بعض الخطوات المذكورة أعلاه غير واضحة ، فقد تكون الوثائق الرسمية بمثابة نقطة بداية جيدة في تعلم البث المنظم. في هذه المقالة ، أود التركيز على الخطوة الأخيرة عندما تحتاج إلى حفظ النتائج في قاعدة بيانات.
أدناه ، سأصف مثالاً على تنفيذ مغسلة كاساندرا للتدفق المنظم وشرح كيفية تشغيلها في مجموعة. الكود الكامل متاح
هنا .
عندما واجهت المشكلة المذكورة أعلاه لأول مرة ، تبين أن
هذا المشروع مفيد للغاية. ومع ذلك ، قد يبدو الأمر معقدًا بعض الشيء إذا بدأ القارئ للتو العمل مع التدفق المنظم ويبحث عن مثال بسيط لكيفية تحميل البيانات إلى كاساندرا. بالإضافة إلى ذلك ، تمت كتابة المشروع للعمل في الوضع المحلي ويتطلب بعض التغييرات للتشغيل في الكتلة.
أريد أيضًا أن أعطي أمثلة حول كيفية حفظ البيانات في
MongoDB وأي قاعدة بيانات أخرى باستخدام
JDBC .
حل بسيط
لتحميل البيانات إلى نظام خارجي ، يجب عليك استخدام جهاز استقبال
foreach . اقرأ المزيد عن هذا
هنا . باختصار ، يجب تنفيذ واجهة
ForeachWriter . أي أنه من الضروري تحديد كيفية فتح الاتصال ، وكيفية معالجة كل جزء من البيانات وكيفية إغلاق الاتصال في نهاية المعالجة. كود المصدر كما يلي:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
تعريف
CassandraDriver وهيكل جدول الإخراج الذي سأصفه لاحقًا ، ولكن الآن ، لنلقي نظرة فاحصة على كيفية عمل الرمز أعلاه. للاتصال بـ Kasandra من Spark ، أقوم بإنشاء كائن
CassandraDriver الذي يوفر الوصول إلى
CassandraConnector ، وهو موصل تم تطويره بواسطة
DataStax . يعتبر CassandraConnector مسؤولاً عن فتح الاتصال بقاعدة البيانات وإغلاقه ، لذلك أقوم فقط بعرض رسائل التصحيح بالطرق
المفتوحة والإغلاق لفئة
CassandraSinkForeach .
يتم استدعاء الرمز أعلاه من التطبيق الرئيسي على النحو التالي:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
يتم إنشاء
CassandraSinkForeach لكل صف من البيانات ، لذا تقوم كل عقدة عمل بإدراج جزء من الصفوف في قاعدة البيانات. أي أن كل عقدة عاملة تنفذ
val cassandraDriver = new CassandraDriver ()؛ هذا ما يبدو عليه CassandraDriver:
class CassandraDriver extends SparkSessionBuilder {
دعونا نلقي نظرة فاحصة على كائن
الشرارة . رمز
SparkSessionBuilder هو كما يلي:
class SparkSessionBuilder extends Serializable {
في كل عقدة عمل ، يوفر
SparkSessionBuilder الوصول إلى
SparkSession الذي تم إنشاؤه على برنامج التشغيل. لجعل مثل هذا الوصول ممكنًا ، من الضروري إجراء تسلسل
SparkSessionBuilder واستخدام
فال كسول عابر ، والذي يسمح لنظام التسلسل بتجاهل
conf وإشعال الأشياء عند تهيئة البرنامج وحتى الوصول إلى الكائنات. وهكذا ، عندما يبدأ البرنامج ،
يتم إجراء تسلسل لـ
buildSparkSession وإرساله إلى كل عقدة عمل ، ولكن يُسمح باستخدام كائنات
conf و
spark فقط عندما تصل عقدة العمل إليها.
الآن دعونا نلقي نظرة على رمز التطبيق الرئيسي:
object KafkaToCassandra extends SparkSessionBuilder {
عندما يتم إرسال التطبيق للتنفيذ ،
يتم إجراء تسلسل
buildSparkSession وإرساله إلى عقد العمل ، ومع ذلك ، تظل كائنات
conf و
spark دون حل. ثم يقوم السائق بإنشاء عنصر شرارة داخل
KafkaToCassandra ويوزع العمل بين عقد العمل. تقوم كل عقدة عاملة بقراءة البيانات من Kafka ،
وتقوم بتحويلات بسيطة على الجزء المستلم من السجلات ، وعندما تكون العقدة العاملة جاهزة لكتابة النتائج إلى قاعدة البيانات ، فإنها تسمح بأشياء
الاحتراق والشرر ، وبالتالي الوصول إلى
SparkSession التي تم إنشاؤها على برنامج التشغيل.
كيفية بناء وتشغيل التطبيق؟
عندما انتقلت من PySpark إلى Scala ، استغرق الأمر مني بعض الوقت لمعرفة كيفية إنشاء التطبيق. لذلك ، قمت بتضمين Maven
pom.xml في مشروعي. يمكن للقارئ إنشاء التطبيق باستخدام Maven عن طريق تشغيل
الأمر mvn package . بعد يمكن إرسال التطبيق للتنفيذ باستخدام
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
من أجل إنشاء التطبيق وتشغيله ، من الضروري استبدال أسماء أجهزة AWS الخاصة بك بأسمائك (أي استبدال كل ما يبدو مثل ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).
يعتبر Spark and Streaming Streaming على وجه الخصوص موضوعًا جديدًا بالنسبة لي ، لذلك سأكون ممتنًا جدًا للقراء على التعليقات والمناقشة والتصحيحات.