مغسلة كاساندرا للتدفق المنظم Spark

منذ شهرين ، بدأت دراسة Spark ، وفي وقت ما واجهت مشكلة حفظ حسابات التدفق المنظم في قاعدة بيانات كاساندرا.

في هذا المنشور ، أعطي مثالاً بسيطًا على إنشاء واستخدام كاسندرا سينك للتدفق المنظم Spark. آمل أن يكون المنشور مفيدًا لأولئك الذين بدأوا مؤخرًا العمل مع Spark Structured Streaming ويتساءلون عن كيفية تحميل نتائج الحساب إلى قاعدة البيانات.

فكرة التطبيق بسيطة للغاية - لتلقي الرسائل من كافكا وتحليلها ، وإجراء تحويلات بسيطة في زوج وحفظ النتائج في كاساندرا.

إيجابيات التدفق المنظم


يمكنك قراءة المزيد حول التدفق المنظم في الوثائق . باختصار ، إن البث المنظم عبارة عن محرك معالجة معلومات تدفق قابل للتطوير بشكل جيد يعتمد على محرك Spark SQL. يسمح لك باستخدام Dataset / DataFrame لتجميع البيانات ، وحساب وظائف النافذة ، والاتصالات ، وما إلى ذلك ، أي أن التدفق المنظم يسمح لك باستخدام SQL القديم الجيد للعمل مع تدفقات البيانات.

ما هي المشكلة؟


تم إصدار الإصدار الثابت من Spark Structured Streaming في عام 2017. بمعنى ، هذه واجهة برمجة تطبيقات جديدة إلى حد ما تقوم بتنفيذ الوظائف الأساسية ، ولكن بعض الأشياء يجب أن نقوم بها بأنفسنا. على سبيل المثال ، يحتوي التدفق المنظم على وظائف قياسية لكتابة الإخراج إلى ملف أو مربع أو وحدة تحكم أو ذاكرة ، ولكن من أجل حفظ البيانات في قاعدة البيانات ، يجب عليك استخدام مستقبل foreach المتوفر في التدفق المنظم وتنفيذ واجهة ForeachWriter . بدءًا من Spark 2.3.1 ، لا يمكن تنفيذ هذه الوظيفة إلا في Scala و Java .

أفترض أن القارئ يعرف بالفعل كيف يعمل التدفق المنظم بعبارات عامة ، ويعرف كيفية تنفيذ التحولات الضرورية وهو الآن جاهز لتحميل النتائج إلى قاعدة البيانات. إذا كانت بعض الخطوات المذكورة أعلاه غير واضحة ، فقد تكون الوثائق الرسمية بمثابة نقطة بداية جيدة في تعلم البث المنظم. في هذه المقالة ، أود التركيز على الخطوة الأخيرة عندما تحتاج إلى حفظ النتائج في قاعدة بيانات.

أدناه ، سأصف مثالاً على تنفيذ مغسلة كاساندرا للتدفق المنظم وشرح كيفية تشغيلها في مجموعة. الكود الكامل متاح هنا .

عندما واجهت المشكلة المذكورة أعلاه لأول مرة ، تبين أن هذا المشروع مفيد للغاية. ومع ذلك ، قد يبدو الأمر معقدًا بعض الشيء إذا بدأ القارئ للتو العمل مع التدفق المنظم ويبحث عن مثال بسيط لكيفية تحميل البيانات إلى كاساندرا. بالإضافة إلى ذلك ، تمت كتابة المشروع للعمل في الوضع المحلي ويتطلب بعض التغييرات للتشغيل في الكتلة.

أريد أيضًا أن أعطي أمثلة حول كيفية حفظ البيانات في MongoDB وأي قاعدة بيانات أخرى باستخدام JDBC .

حل بسيط


لتحميل البيانات إلى نظام خارجي ، يجب عليك استخدام جهاز استقبال foreach . اقرأ المزيد عن هذا هنا . باختصار ، يجب تنفيذ واجهة ForeachWriter . أي أنه من الضروري تحديد كيفية فتح الاتصال ، وكيفية معالجة كل جزء من البيانات وكيفية إغلاق الاتصال في نهاية المعالجة. كود المصدر كما يلي:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

تعريف CassandraDriver وهيكل جدول الإخراج الذي سأصفه لاحقًا ، ولكن الآن ، لنلقي نظرة فاحصة على كيفية عمل الرمز أعلاه. للاتصال بـ Kasandra من Spark ، أقوم بإنشاء كائن CassandraDriver الذي يوفر الوصول إلى CassandraConnector ، وهو موصل تم تطويره بواسطة DataStax . يعتبر CassandraConnector مسؤولاً عن فتح الاتصال بقاعدة البيانات وإغلاقه ، لذلك أقوم فقط بعرض رسائل التصحيح بالطرق المفتوحة والإغلاق لفئة CassandraSinkForeach .

يتم استدعاء الرمز أعلاه من التطبيق الرئيسي على النحو التالي:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

يتم إنشاء CassandraSinkForeach لكل صف من البيانات ، لذا تقوم كل عقدة عمل بإدراج جزء من الصفوف في قاعدة البيانات. أي أن كل عقدة عاملة تنفذ val cassandraDriver = new CassandraDriver ()؛ هذا ما يبدو عليه CassandraDriver:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

دعونا نلقي نظرة فاحصة على كائن الشرارة . رمز SparkSessionBuilder هو كما يلي:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

في كل عقدة عمل ، يوفر SparkSessionBuilder الوصول إلى SparkSession الذي تم إنشاؤه على برنامج التشغيل. لجعل مثل هذا الوصول ممكنًا ، من الضروري إجراء تسلسل SparkSessionBuilder واستخدام فال كسول عابر ، والذي يسمح لنظام التسلسل بتجاهل conf وإشعال الأشياء عند تهيئة البرنامج وحتى الوصول إلى الكائنات. وهكذا ، عندما يبدأ البرنامج ، يتم إجراء تسلسل لـ buildSparkSession وإرساله إلى كل عقدة عمل ، ولكن يُسمح باستخدام كائنات conf و spark فقط عندما تصل عقدة العمل إليها.

الآن دعونا نلقي نظرة على رمز التطبيق الرئيسي:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

عندما يتم إرسال التطبيق للتنفيذ ، يتم إجراء تسلسل buildSparkSession وإرساله إلى عقد العمل ، ومع ذلك ، تظل كائنات conf و spark دون حل. ثم يقوم السائق بإنشاء عنصر شرارة داخل KafkaToCassandra ويوزع العمل بين عقد العمل. تقوم كل عقدة عاملة بقراءة البيانات من Kafka ، وتقوم بتحويلات بسيطة على الجزء المستلم من السجلات ، وعندما تكون العقدة العاملة جاهزة لكتابة النتائج إلى قاعدة البيانات ، فإنها تسمح بأشياء الاحتراق والشرر ، وبالتالي الوصول إلى SparkSession التي تم إنشاؤها على برنامج التشغيل.

كيفية بناء وتشغيل التطبيق؟


عندما انتقلت من PySpark إلى Scala ، استغرق الأمر مني بعض الوقت لمعرفة كيفية إنشاء التطبيق. لذلك ، قمت بتضمين Maven pom.xml في مشروعي. يمكن للقارئ إنشاء التطبيق باستخدام Maven عن طريق تشغيل الأمر mvn package . بعد يمكن إرسال التطبيق للتنفيذ باستخدام

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

من أجل إنشاء التطبيق وتشغيله ، من الضروري استبدال أسماء أجهزة AWS الخاصة بك بأسمائك (أي استبدال كل ما يبدو مثل ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).

يعتبر Spark and Streaming Streaming على وجه الخصوص موضوعًا جديدًا بالنسبة لي ، لذلك سأكون ممتنًا جدًا للقراء على التعليقات والمناقشة والتصحيحات.

Source: https://habr.com/ru/post/ar425503/


All Articles