कुछ महीने पहले मैंने स्पार्क का अध्ययन शुरू किया था, और किसी समय मुझे कैसंड्रा डेटाबेस में संरचित स्ट्रीमिंग गणना को बचाने की समस्या का सामना करना पड़ा था।
इस पोस्ट में, मैं स्पार्क स्ट्रक्चर्ड स्ट्रीमिंग के लिए कैसंड्रा सिंक बनाने और उपयोग करने का एक सरल उदाहरण देता हूं। मुझे उम्मीद है कि यह पोस्ट उन लोगों के लिए उपयोगी होगी जिन्होंने हाल ही में स्पार्क स्ट्रक्चर्ड स्ट्रीमिंग के साथ काम करना शुरू कर दिया है और सोच रहे हैं कि कैसे गणना परिणामों को डेटाबेस में अपलोड किया जाए।
आवेदन का विचार बहुत सरल है - काफ्का से संदेश प्राप्त करने और पार्स करने के लिए, एक जोड़ी में सरल परिवर्तन करें और कैसेंड्रा में परिणामों को बचाएं।
संरचित स्ट्रीमिंग के पेशेवरों
आप
दस्तावेज़ में संरचित स्ट्रीमिंग के बारे में अधिक पढ़ सकते हैं। संक्षेप में, संरचित स्ट्रीमिंग स्पार्क SQL इंजन पर आधारित एक अच्छी तरह से स्केलेबल स्ट्रीमिंग सूचना प्रसंस्करण इंजन है। यह आपको डेटा एकत्र करने के लिए डेटासेट / डेटाफ़्रेम का उपयोग करने, विंडो फ़ंक्शंस, कनेक्शन आदि की गणना करने की अनुमति देता है, अर्थात, संरचित स्ट्रीमिंग आपको डेटा स्ट्रीम के साथ काम करने के लिए अच्छे पुराने एसक्यूएल का उपयोग करने की अनुमति देता है।
क्या समस्या है?
स्पार्क स्ट्रक्चर्ड स्ट्रीमिंग की स्थिर रिलीज 2017 में जारी की गई थी। यही है, यह एक काफी नया एपीआई है जो बुनियादी कार्यक्षमता को लागू करता है, लेकिन कुछ चीजें खुद ही करनी होंगी। उदाहरण के लिए, संरचित स्ट्रीमिंग में फ़ाइल, टाइल, कंसोल या मेमोरी पर आउटपुट लिखने के लिए मानक कार्य होते हैं, लेकिन डेटाबेस में डेटा को बचाने के लिए, आपको संरचित स्ट्रीमिंग में उपलब्ध
फ़ॉर्वर्ड रिसीवर का उपयोग करना होगा और
फ़ॉरचाइवर इंटरफ़ेस लागू करना
होगा ।
स्पार्क 2.3.1 के साथ शुरू, यह कार्यक्षमता केवल स्काला और जावा में लागू की जा सकती है ।
मुझे लगता है कि पाठक पहले से ही जानते हैं कि संरचित स्ट्रीमिंग सामान्य शब्दों में कैसे काम करती है, आवश्यक परिवर्तनों को लागू करना जानता है और अब डेटाबेस पर परिणाम अपलोड करने के लिए तैयार है। यदि उपर्युक्त चरणों में से कुछ स्पष्ट नहीं हैं, तो आधिकारिक दस्तावेज संरचित स्ट्रीमिंग सीखने में एक अच्छा प्रारंभिक बिंदु के रूप में काम कर सकता है। इस लेख में, मैं अंतिम चरण पर ध्यान केंद्रित करना चाहता हूं जब आपको डेटाबेस में परिणाम सहेजने की आवश्यकता होती है।
नीचे, मैं संरचित स्ट्रीमिंग के लिए कैसंड्रा सिंक के एक उदाहरण के कार्यान्वयन का वर्णन करूंगा और समझाऊंगा कि इसे एक क्लस्टर में कैसे चलाया जाए। पूर्ण कोड
यहाँ उपलब्ध
है ।
जब मैंने पहली बार उपरोक्त समस्या का सामना किया, तो
यह परियोजना बहुत उपयोगी साबित हुई। हालाँकि, यह थोड़ा जटिल लग सकता है अगर पाठक ने अभी स्ट्रक्चर्ड स्ट्रीमिंग के साथ काम करना शुरू कर दिया है और कैसेसेंड्रा में डेटा अपलोड करना है इसका एक सरल उदाहरण ढूंढ रहा है। इसके अलावा, परियोजना को स्थानीय मोड में काम करने के लिए लिखा गया है और क्लस्टर में चलाने के लिए कुछ परिवर्तनों की आवश्यकता है।
मैं यह भी उदाहरण देना चाहता हूं कि कैसे
जिंगबीसी का उपयोग करके
मोंगोडीबी और किसी अन्य डेटाबेस में डेटा को बचाया जाए।
सरल उपाय
किसी बाहरी सिस्टम पर डेटा अपलोड करने के लिए, आपको
फ़ॉर्च रिसीवर का उपयोग करना होगा। इसके बारे में
यहाँ और पढ़ें। संक्षेप में,
ForeachWriter इंटरफ़ेस को लागू किया जाना चाहिए। यही है, यह निर्धारित करना आवश्यक है कि कनेक्शन कैसे खोला जाए, डेटा के प्रत्येक टुकड़े को कैसे संसाधित किया जाए और प्रसंस्करण के अंत में कनेक्शन को कैसे बंद किया जाए। स्रोत कोड इस प्रकार है:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
कैसंड्राड्राइवर की परिभाषा और आउटपुट तालिका की संरचना मैं बाद में वर्णन करूंगा, लेकिन अभी के लिए, आइए देखें कि उपरोक्त कोड कैसे काम करता है। स्पार्क से
कैसेंड्रा से कनेक्ट करने के लिए, मैं एक
कैसेंड्राड्राइवर ऑब्जेक्ट
बनाता हूं जो
डेटास्टैक्स द्वारा विकसित कनेक्टर,
कैसेंड्राकोनेक्टर तक पहुंच प्रदान करता है। CassandraConnector डेटाबेस से कनेक्शन को खोलने और बंद करने के लिए जिम्मेदार है, इसलिए मैं सिर्फ
CassandraSinkForeach वर्ग के
खुले और
बंद तरीकों में डिबगिंग संदेश प्रदर्शित करता हूं।
उपरोक्त कोड को मुख्य एप्लिकेशन से निम्नानुसार कहा जाता है:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
CassandraSinkForeach डेटा की प्रत्येक पंक्ति के लिए बनाया गया है, इसलिए प्रत्येक कार्य नोड डेटाबेस में पंक्तियों के अपने हिस्से को सम्मिलित करता है। यही है, प्रत्येक कार्य नोड
वैल कैसेंड्राड्राइवर = नया कैसंड्राड्राइवर () निष्पादित करता है
; कैसेंड्राड्राइवर जैसा दिखता है यह है:
class CassandraDriver extends SparkSessionBuilder {
चिंगारी वस्तु पर करीब से नज़र डालते हैं।
SparkSessionBuilder के लिए कोड इस प्रकार है:
class SparkSessionBuilder extends Serializable {
प्रत्येक कार्य नोड पर,
SparkSessionBuilder ड्राइवर पर बनाई गई
SparkSession तक पहुँच प्रदान करती है। इस तरह की पहुंच को संभव बनाने के लिए,
SparkSessionBuilder को क्रमबद्ध
करना और
क्षणिक आलसी घाटी का उपयोग करना आवश्यक है, जो क्रमबद्धता प्रणाली को प्रोग्राम को इनिशियलाइज़ किए जाने तक और ऑब्जेक्ट्स तक पहुंचने तक
स्पार्क ऑब्जेक्ट्स को अनदेखा करने की अनुमति देता है। इस प्रकार, जब
buildSparkSession प्रोग्राम
शुरू होता है, तो इसे क्रमबद्ध
किया जाता है और प्रत्येक काम करने वाले नोड को भेजा जाता है, लेकिन वर्किंग नोड द्वारा उन्हें एक्सेस करने पर
कन्फ़ेक्शन और
स्पार्क ऑब्जेक्ट्स की अनुमति होती है।
अब मुख्य एप्लिकेशन कोड को देखते हैं:
object KafkaToCassandra extends SparkSessionBuilder {
जब आवेदन निष्पादन के लिए भेजा जाता है,
buildSparkSession को क्रमबद्ध किया जाता है और काम करने वाले नोड्स को भेजा जाता है, हालांकि,
भ्रम और
स्पार्क ऑब्जेक्ट्स अनसुलझे रहते हैं। फिर ड्राइवर
KafkaToCassandra के अंदर एक स्पार्क ऑब्जेक्ट बनाता है और काम करने वाले नोड्स के बीच काम को वितरित करता है। प्रत्येक कार्यशील नोड काफ्का के डेटा को पढ़ता है, रिकॉर्ड के प्राप्त हिस्से पर सरल परिवर्तन करता है, और जब कार्य नोड डेटाबेस के लिए परिणाम लिखने के लिए तैयार होता है, तो यह ऑब्जेक्ट और
स्पार्क ऑब्जेक्ट्स को अनुमति देता है, जिससे ड्राइवर पर बनाए गए
स्पार्कसेशन तक पहुंच प्राप्त होती है।
एप्लिकेशन कैसे बनाएं और चलाएं?
जब मैं PySpark से Scala में चला गया, तो मुझे यह पता लगाने में थोड़ा समय लगा कि मुझे आवेदन कैसे बनाना है। इसलिए, मैंने अपने प्रोजेक्ट में मावेन
pom.xml को शामिल किया। पाठक मावेन
पैकेज कमांड को चलाकर मावेन का उपयोग करके एप्लिकेशन का निर्माण कर सकता है। आवेदन का उपयोग करने के बाद निष्पादन के लिए भेजा जा सकता है
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
एप्लिकेशन को बनाने और चलाने के लिए, मेरी एडब्ल्यूएस मशीनों के नामों को अपने स्वयं के साथ बदलना आवश्यक है (अर्थात, सब कुछ बदलने के लिए जो ec2-xxx-xxx-xx.compute-1.amazonaws.com की तरह दिखता है)।
स्पार्क और स्ट्रक्चर्ड स्ट्रीमिंग विशेष रूप से मेरे लिए एक नया विषय है, इसलिए मैं टिप्पणी, चर्चा और सुधार के लिए पाठकों का बहुत आभारी रहूंगा।