स्पार्क स्ट्रक्चर्ड स्ट्रीमिंग के लिए कैसांद्रा सिंक

कुछ महीने पहले मैंने स्पार्क का अध्ययन शुरू किया था, और किसी समय मुझे कैसंड्रा डेटाबेस में संरचित स्ट्रीमिंग गणना को बचाने की समस्या का सामना करना पड़ा था।

इस पोस्ट में, मैं स्पार्क स्ट्रक्चर्ड स्ट्रीमिंग के लिए कैसंड्रा सिंक बनाने और उपयोग करने का एक सरल उदाहरण देता हूं। मुझे उम्मीद है कि यह पोस्ट उन लोगों के लिए उपयोगी होगी जिन्होंने हाल ही में स्पार्क स्ट्रक्चर्ड स्ट्रीमिंग के साथ काम करना शुरू कर दिया है और सोच रहे हैं कि कैसे गणना परिणामों को डेटाबेस में अपलोड किया जाए।

आवेदन का विचार बहुत सरल है - काफ्का से संदेश प्राप्त करने और पार्स करने के लिए, एक जोड़ी में सरल परिवर्तन करें और कैसेंड्रा में परिणामों को बचाएं।

संरचित स्ट्रीमिंग के पेशेवरों


आप दस्तावेज़ में संरचित स्ट्रीमिंग के बारे में अधिक पढ़ सकते हैं। संक्षेप में, संरचित स्ट्रीमिंग स्पार्क SQL इंजन पर आधारित एक अच्छी तरह से स्केलेबल स्ट्रीमिंग सूचना प्रसंस्करण इंजन है। यह आपको डेटा एकत्र करने के लिए डेटासेट / डेटाफ़्रेम का उपयोग करने, विंडो फ़ंक्शंस, कनेक्शन आदि की गणना करने की अनुमति देता है, अर्थात, संरचित स्ट्रीमिंग आपको डेटा स्ट्रीम के साथ काम करने के लिए अच्छे पुराने एसक्यूएल का उपयोग करने की अनुमति देता है।

क्या समस्या है?


स्पार्क स्ट्रक्चर्ड स्ट्रीमिंग की स्थिर रिलीज 2017 में जारी की गई थी। यही है, यह एक काफी नया एपीआई है जो बुनियादी कार्यक्षमता को लागू करता है, लेकिन कुछ चीजें खुद ही करनी होंगी। उदाहरण के लिए, संरचित स्ट्रीमिंग में फ़ाइल, टाइल, कंसोल या मेमोरी पर आउटपुट लिखने के लिए मानक कार्य होते हैं, लेकिन डेटाबेस में डेटा को बचाने के लिए, आपको संरचित स्ट्रीमिंग में उपलब्ध फ़ॉर्वर्ड रिसीवर का उपयोग करना होगा और फ़ॉरचाइवर इंटरफ़ेस लागू करना होगास्पार्क 2.3.1 के साथ शुरू, यह कार्यक्षमता केवल स्काला और जावा में लागू की जा सकती है

मुझे लगता है कि पाठक पहले से ही जानते हैं कि संरचित स्ट्रीमिंग सामान्य शब्दों में कैसे काम करती है, आवश्यक परिवर्तनों को लागू करना जानता है और अब डेटाबेस पर परिणाम अपलोड करने के लिए तैयार है। यदि उपर्युक्त चरणों में से कुछ स्पष्ट नहीं हैं, तो आधिकारिक दस्तावेज संरचित स्ट्रीमिंग सीखने में एक अच्छा प्रारंभिक बिंदु के रूप में काम कर सकता है। इस लेख में, मैं अंतिम चरण पर ध्यान केंद्रित करना चाहता हूं जब आपको डेटाबेस में परिणाम सहेजने की आवश्यकता होती है।

नीचे, मैं संरचित स्ट्रीमिंग के लिए कैसंड्रा सिंक के एक उदाहरण के कार्यान्वयन का वर्णन करूंगा और समझाऊंगा कि इसे एक क्लस्टर में कैसे चलाया जाए। पूर्ण कोड यहाँ उपलब्ध है

जब मैंने पहली बार उपरोक्त समस्या का सामना किया, तो यह परियोजना बहुत उपयोगी साबित हुई। हालाँकि, यह थोड़ा जटिल लग सकता है अगर पाठक ने अभी स्ट्रक्चर्ड स्ट्रीमिंग के साथ काम करना शुरू कर दिया है और कैसेसेंड्रा में डेटा अपलोड करना है इसका एक सरल उदाहरण ढूंढ रहा है। इसके अलावा, परियोजना को स्थानीय मोड में काम करने के लिए लिखा गया है और क्लस्टर में चलाने के लिए कुछ परिवर्तनों की आवश्यकता है।

मैं यह भी उदाहरण देना चाहता हूं कि कैसे जिंगबीसी का उपयोग करके मोंगोडीबी और किसी अन्य डेटाबेस में डेटा को बचाया जाए।

सरल उपाय


किसी बाहरी सिस्टम पर डेटा अपलोड करने के लिए, आपको फ़ॉर्च रिसीवर का उपयोग करना होगा। इसके बारे में यहाँ और पढ़ें। संक्षेप में, ForeachWriter इंटरफ़ेस को लागू किया जाना चाहिए। यही है, यह निर्धारित करना आवश्यक है कि कनेक्शन कैसे खोला जाए, डेटा के प्रत्येक टुकड़े को कैसे संसाधित किया जाए और प्रसंस्करण के अंत में कनेक्शन को कैसे बंद किया जाए। स्रोत कोड इस प्रकार है:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] { // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as output val cassandraDriver = new CassandraDriver(); def open(partitionId: Long, version: Long): Boolean = { // open connection println(s"Open connection") true } def process(record: org.apache.spark.sql.Row) = { println(s"Process new $record") cassandraDriver.connector.withSessionDo(session => session.execute(s""" insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt) values('${record(0)}', '${record(1)}', '${record(2)}')""") ) } def close(errorOrNull: Throwable): Unit = { // close the connection println(s"Close connection") } } 

कैसंड्राड्राइवर की परिभाषा और आउटपुट तालिका की संरचना मैं बाद में वर्णन करूंगा, लेकिन अभी के लिए, आइए देखें कि उपरोक्त कोड कैसे काम करता है। स्पार्क से कैसेंड्रा से कनेक्ट करने के लिए, मैं एक कैसेंड्राड्राइवर ऑब्जेक्ट बनाता हूं जो डेटास्टैक्स द्वारा विकसित कनेक्टर, कैसेंड्राकोनेक्टर तक पहुंच प्रदान करता है। CassandraConnector डेटाबेस से कनेक्शन को खोलने और बंद करने के लिए जिम्मेदार है, इसलिए मैं सिर्फ CassandraSinkForeach वर्ग के खुले और बंद तरीकों में डिबगिंग संदेश प्रदर्शित करता हूं।

उपरोक्त कोड को मुख्य एप्लिकेशन से निम्नानुसार कहा जाता है:

 val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() 

CassandraSinkForeach डेटा की प्रत्येक पंक्ति के लिए बनाया गया है, इसलिए प्रत्येक कार्य नोड डेटाबेस में पंक्तियों के अपने हिस्से को सम्मिलित करता है। यही है, प्रत्येक कार्य नोड वैल कैसेंड्राड्राइवर = नया कैसंड्राड्राइवर () निष्पादित करता है ; कैसेंड्राड्राइवर जैसा दिखता है यह है:

 class CassandraDriver extends SparkSessionBuilder { // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor. // It extends SparkSessionBuilder so to use the same SparkSession on each node. val spark = buildSparkSession import spark.implicits._ val connector = CassandraConnector(spark.sparkContext.getConf) // Define Cassandra's table which will be used as a sink /* For this app I used the following table: CREATE TABLE fx.spark_struct_stream_sink ( fx_marker text, timestamp_ms timestamp, timestamp_dt date, primary key (fx_marker)); */ val namespace = "fx" val foreachTableSink = "spark_struct_stream_sink" } 

चिंगारी वस्तु पर करीब से नज़र डालते हैं। SparkSessionBuilder के लिए कोड इस प्रकार है:

 class SparkSessionBuilder extends Serializable { // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val def buildSparkSession: SparkSession = { @transient lazy val conf: SparkConf = new SparkConf() .setAppName("Structured Streaming from Kafka to Cassandra") .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com") .set("spark.sql.streaming.checkpointLocation", "checkpoint") @transient lazy val spark = SparkSession .builder() .config(conf) .getOrCreate() spark } } 

प्रत्येक कार्य नोड पर, SparkSessionBuilder ड्राइवर पर बनाई गई SparkSession तक पहुँच प्रदान करती है। इस तरह की पहुंच को संभव बनाने के लिए, SparkSessionBuilder को क्रमबद्ध करना और क्षणिक आलसी घाटी का उपयोग करना आवश्यक है, जो क्रमबद्धता प्रणाली को प्रोग्राम को इनिशियलाइज़ किए जाने तक और ऑब्जेक्ट्स तक पहुंचने तक स्पार्क ऑब्जेक्ट्स को अनदेखा करने की अनुमति देता है। इस प्रकार, जब buildSparkSession प्रोग्राम शुरू होता है, तो इसे क्रमबद्ध किया जाता है और प्रत्येक काम करने वाले नोड को भेजा जाता है, लेकिन वर्किंग नोड द्वारा उन्हें एक्सेस करने पर कन्फ़ेक्शन और स्पार्क ऑब्जेक्ट्स की अनुमति होती है।

अब मुख्य एप्लिकेशन कोड को देखते हैं:

 object KafkaToCassandra extends SparkSessionBuilder { // Main body of the app. It also extends SparkSessionBuilder. def main(args: Array[String]) { val spark = buildSparkSession import spark.implicits._ // Define location of Kafka brokers: val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092" /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"} {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"} {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"} {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"} */ // Read incoming stream val dfraw = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", broker) .option("subscribe", "currency_exchange") .load() val schema = StructType( Seq( StructField("fx_marker", StringType, false), StructField("timestamp_ms", StringType, false) ) ) val df = dfraw .selectExpr("CAST(value AS STRING)").as[String] .flatMap(_.split("\n")) val jsons = df.select(from_json($"value", schema) as "data").select("data.*") // Process data. Create a new date column val parsed = jsons .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS"))) .filter("fx_marker != ''") // Output results into a database val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start() sink.awaitTermination() } } 

जब आवेदन निष्पादन के लिए भेजा जाता है, buildSparkSession को क्रमबद्ध किया जाता है और काम करने वाले नोड्स को भेजा जाता है, हालांकि, भ्रम और स्पार्क ऑब्जेक्ट्स अनसुलझे रहते हैं। फिर ड्राइवर KafkaToCassandra के अंदर एक स्पार्क ऑब्जेक्ट बनाता है और काम करने वाले नोड्स के बीच काम को वितरित करता है। प्रत्येक कार्यशील नोड काफ्का के डेटा को पढ़ता है, रिकॉर्ड के प्राप्त हिस्से पर सरल परिवर्तन करता है, और जब कार्य नोड डेटाबेस के लिए परिणाम लिखने के लिए तैयार होता है, तो यह ऑब्जेक्ट और स्पार्क ऑब्जेक्ट्स को अनुमति देता है, जिससे ड्राइवर पर बनाए गए स्पार्कसेशन तक पहुंच प्राप्त होती है।

एप्लिकेशन कैसे बनाएं और चलाएं?


जब मैं PySpark से Scala में चला गया, तो मुझे यह पता लगाने में थोड़ा समय लगा कि मुझे आवेदन कैसे बनाना है। इसलिए, मैंने अपने प्रोजेक्ट में मावेन pom.xml को शामिल किया। पाठक मावेन पैकेज कमांड को चलाकर मावेन का उपयोग करके एप्लिकेशन का निर्माण कर सकता है। आवेदन का उपयोग करने के बाद निष्पादन के लिए भेजा जा सकता है

 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar 

एप्लिकेशन को बनाने और चलाने के लिए, मेरी एडब्ल्यूएस मशीनों के नामों को अपने स्वयं के साथ बदलना आवश्यक है (अर्थात, सब कुछ बदलने के लिए जो ec2-xxx-xxx-xx.compute-1.amazonaws.com की तरह दिखता है)।

स्पार्क और स्ट्रक्चर्ड स्ट्रीमिंग विशेष रूप से मेरे लिए एक नया विषय है, इसलिए मैं टिप्पणी, चर्चा और सुधार के लिए पाठकों का बहुत आभारी रहूंगा।

Source: https://habr.com/ru/post/hi425503/


All Articles