أباتشي كافكا والجري مع شرارة الجري

مرحبا يا هبر! اليوم سنقوم ببناء نظام يستخدم Apark Kafka لمعالجة تدفقات الرسائل باستخدام Spark Streaming وكتابة نتيجة المعالجة إلى قاعدة بيانات AWS RDS السحابية.

تخيل أن مؤسسة ائتمانية معينة قد حددت لنا مهمة معالجة المعاملات الواردة في جميع فروعها. يمكن القيام بذلك من أجل حساب مركز العملة المفتوحة بسرعة للخزانة أو الحدود أو النتائج المالية للمعاملات ، إلخ.

كيفية تنفيذ هذه الحالة دون استخدام تعاويذ سحرية وسحر - نقرأ تحت خفض! دعنا نذهب!


(مصدر الصورة)

مقدمة


بالطبع ، توفر معالجة مجموعة كبيرة من البيانات في الوقت الحقيقي فرصًا كبيرة للاستخدام في الأنظمة الحديثة. واحدة من أكثر المجموعات شيوعًا لهذا الأمر هي Apache Kafka و Spark Streaming جنبًا إلى جنب ، حيث تنشئ Kafka مجموعة من حزم الرسائل الواردة ، ويقوم Spark Streaming بمعالجة هذه الحزم في فترة زمنية محددة.

لزيادة التسامح مع الخطأ للتطبيق ، سوف نستخدم نقاط التفتيش - نقاط التفتيش. باستخدام هذه الآلية ، عندما تحتاج وحدة Spark Streaming لاستعادة البيانات المفقودة ، فإنها تحتاج فقط للعودة إلى نقطة التحكم الأخيرة واستئناف العمليات الحسابية منها.

بنية النظام قيد التطوير




المكونات المستخدمة:

  • Apache Kafka هو نظام رسائل موزع مع نشر واشتراك. مناسبة لكلا استهلاك الرسائل عبر الإنترنت وغير متصل. لمنع فقدان البيانات ، يتم تخزين رسائل كافكا على القرص وتكرارها داخل المجموعة. نظام Kafka مبني على أعلى خدمة مزامنة ZooKeeper ؛
  • Apache Spark Streaming - مكون Spark لمعالجة بيانات التدفق. تم تصميم وحدة Spark Streaming باستخدام بنية الدُفعات الصغيرة ، عندما يتم تفسير دفق البيانات على أنه تسلسل مستمر لحزم البيانات الصغيرة. يتلقى Spark Streaming البيانات من مصادر مختلفة ويجمعها في حزم صغيرة. يتم إنشاء حزم جديدة على فترات منتظمة. في بداية كل فاصل زمني ، يتم إنشاء حزمة جديدة ، ويتم تضمين أي بيانات يتم تلقيها خلال هذه الفترة في الحزمة. في نهاية الفاصل الزمني ، يتوقف نمو الحزمة. يتم تحديد حجم الفاصل الزمني بواسطة معلمة تسمى الفاصل الدفعي؛
  • ؛ Apache Spark SQL - يجمع بين المعالجة العلائقية والبرمجة الوظيفية لـ Spark. تشير البيانات المهيكلة إلى البيانات التي تحتوي على مخطط ، أي مجموعة واحدة من الحقول لجميع السجلات. يدعم Spark SQL المدخلات من مجموعة متنوعة من مصادر البيانات المهيكلة ، وبفضل توافر المعلومات حول المخطط ، يمكنه استرداد حقول السجلات المطلوبة بكفاءة ، كما يوفر واجهات برمجة التطبيقات DataFrame ؛
  • AWS RDS هي قاعدة بيانات علائقية مستندة إلى مجموعة النظراء غير مكلفة نسبيًا ، وهي خدمة ويب تعمل على تبسيط التكوين والتشغيل والتحجيم ، وتديرها Amazon مباشرةً.

تثبيت وبدء خادم كافكا


قبل استخدام Kafka مباشرة ، تحتاج إلى التأكد من توفر Java ، مثل يستخدم JVM للعمل:

sudo apt-get update sudo apt-get install default-jre java -version 

قم بإنشاء مستخدم جديد للعمل مع Kafka:

 sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo 

بعد ذلك ، قم بتنزيل التوزيع من موقع Apache Kafka الرسمي:

 wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz" 

فك الأرشيف الذي تم تنزيله:
 tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka 

الخطوة التالية اختيارية. الحقيقة هي أن الإعدادات الافتراضية لا تسمح بالاستخدام الكامل لجميع ميزات Apache Kafka. على سبيل المثال ، احذف موضوعًا أو فئة أو مجموعة يمكن نشر الرسائل عليها. لتغيير هذا ، قم بتحرير ملف التكوين:

 vim ~/kafka/config/server.properties 

أضف ما يلي إلى نهاية الملف:

 delete.topic.enable = true 

قبل بدء تشغيل خادم Kafka ، تحتاج إلى بدء تشغيل خادم ZooKeeper ، وسوف نستخدم البرنامج النصي المساعد الذي يأتي مع توزيع Kafka:

 Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties 

بعد بدء ZooKeeper بنجاح ، في محطة منفصلة نطلق خادم Kafka:

 bin/kafka-server-start.sh config/server.properties 

إنشاء موضوع جديد يسمى المعاملات:

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction 

تأكد من إنشاء الموضوع بالعدد الصحيح من الأقسام والنسخ المتماثل:

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 



سنفتقد لحظات اختبار المنتج والمستهلك للموضوع الذي تم إنشاؤه حديثًا. لمزيد من التفاصيل حول كيفية اختبار إرسال واستقبال الرسائل ، راجع الوثائق الرسمية - إرسال بعض الرسائل . حسنًا ، ننتقل إلى كتابة منتج في Python باستخدام API KafkaProducer.

كتابة المنتج


سيقوم المنتج بإنشاء بيانات عشوائية - 100 رسالة في كل ثانية. نعني بالبيانات العشوائية قاموس يتكون من ثلاثة حقول:

  • الفرع - اسم نقطة بيع المؤسسة الائتمانية ؛
  • العملة - عملة المعاملة ؛
  • المبلغ - مبلغ الصفقة. سيكون المبلغ رقمًا إيجابيًا إذا كان شراء عملة من قبل البنك ، وسالبًا إذا كان عملية بيع.

رمز المنتج كما يلي:

 from numpy.random import choice, randint def get_random_value(): new_dict = {} branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict 

بعد ذلك ، باستخدام طريقة الإرسال ، نرسل رسالة إلى الخادم ، في الموضوع الذي نحتاجه ، بتنسيق JSON:

 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: \ {}, partition: {}, offset: {}' \ .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: {}'.format(e)) finally: producer.flush() 

عند تشغيل البرنامج النصي ، نتلقى الرسائل التالية في الجهاز:


هذا يعني أن كل شيء يعمل كما أردنا - يقوم المنتج بإنشاء وإرسال رسائل إلى الموضوع الذي نحتاج إليه.

والخطوة التالية هي تثبيت Spark ومعالجة تدفق الرسالة هذا.

تثبيت اباتشي سبارك


Apache Spark عبارة عن منصة حوسبة عنقودية متعددة الاستخدامات وعالية الأداء.

فيما يتعلق بالأداء ، يتفوق Spark على التطبيقات الشائعة لنموذج MapReduce ، ويوفر في الوقت نفسه الدعم لمجموعة واسعة من أنواع الحسابات ، بما في ذلك الاستعلامات التفاعلية ومعالجة الدفق. تلعب السرعة دورًا مهمًا في معالجة كميات كبيرة من البيانات ، فهي السرعة التي تتيح لك العمل بشكل تفاعلي دون قضاء دقائق أو ساعات في الانتظار. واحدة من أعظم نقاط قوة سبارك في مثل هذه السرعة العالية هي قدرتها على أداء العمليات الحسابية في الذاكرة.

هذا الإطار مكتوب في Scala ، لذلك يجب عليك تثبيته أولاً:

 sudo apt-get install scala 

قم بتنزيل Spark التوزيع من الموقع الرسمي:

 wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz" 

فك الأرشيف:

 sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark 

أضف المسار إلى Spark في ملف bash:

 vim ~/.bashrc 

أضف الأسطر التالية من خلال المحرر:

 SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH 

قم بتشغيل الأمر أدناه بعد إجراء تغييرات على bashrc:

 source ~/.bashrc 

نشر AWS بوستجرس


يبقى نشر قاعدة البيانات ، حيث سنقوم بتحميل المعلومات التي تمت معالجتها من التدفقات. لهذا سوف نستخدم خدمة AWS RDS.

انتقل إلى وحدة التحكم AWS -> AWS RDS -> قواعد البيانات -> إنشاء قاعدة بيانات:


حدد PostgreSQL وانقر فوق الزر "التالي":


لأن يتم فهم هذا المثال فقط للأغراض التعليمية ، وسوف نستخدم خادمًا مجانيًا "على الأقل" (المستوى المجاني):


بعد ذلك ، ضع علامة في كتلة المستوى المجاني ، وبعد ذلك سيتم تقديم مثيل فئة t2.micro تلقائيًا - رغم أنه ضعيف ، إلا أنه مجاني ومناسب تمامًا لمهمتنا:

تتبع الأمور المهمة جدًا: اسم مثيل قاعدة البيانات ، واسم المستخدم الرئيسي وكلمة المرور الخاصة به. لنقم بتسمية المثال: myHabrTest ، المستخدم الرئيسي: habr ، كلمة المرور: habr12345 وانقر على الزر "التالي":



تحتوي الصفحة التالية على المعلمات المسؤولة عن توفر خادم قاعدة البيانات لدينا من الخارج (إمكانية الوصول العامة) وتوافر المنفذ:


لنقم بإنشاء تكوين جديد لمجموعة الأمان VPC ، والتي سوف تتيح لنا الوصول إلى خادم قاعدة البيانات لدينا من الخارج من خلال المنفذ 5432 (PostgreSQL).

في نافذة متصفح منفصلة ، انتقل إلى وحدة تحكم AWS في لوحة معلومات VPC -> مجموعات الأمان -> إنشاء قسم مجموعة الأمان:

عيّن اسم مجموعة الأمان - يشير PostgreSQL ، وهو وصف ، إلى VPC الذي يجب أن ترتبط به هذه المجموعة وانقر فوق الزر "إنشاء":


املأ مجموعة قواعد Inbound التي تم إنشاؤها حديثًا للمنفذ 5432 ، كما هو موضح في الصورة أدناه. ليس لديك لتحديد منفذ يدوي ، ولكن حدد PostgreSQL من القائمة المنسدلة النوع.

بالمعنى الدقيق للكلمة ، تعني القيمة :: / 0 توفر حركة المرور الواردة لخادم من جميع أنحاء العالم ، وهو أمر غير صحيح تمامًا ، ولكن من أجل تحليل المثال ، دعونا نستخدم هذا الأسلوب:


نعود إلى صفحة المتصفح ، حيث لدينا "تكوين الإعدادات المتقدمة" مفتوحة وحدد في قسم مجموعات أمان VPC -> اختر مجموعات أمان VPC الحالية -> PostgreSQL:


بعد ذلك ، في القسم خيارات قاعدة البيانات -> اسم قاعدة البيانات -> تعيين الاسم - habrDB .

يمكننا ترك بقية المعلمات ، باستثناء تعطيل النسخ الاحتياطي (فترة الاحتفاظ بالنسخ الاحتياطي - 0 يومًا) ، والرصد ، ورؤى الأداء ، افتراضيًا. انقر على زر إنشاء قاعدة بيانات :


دفق معالج


ستكون الخطوة الأخيرة هي تطوير Spark-jobs ، والتي ستقوم بمعالجة كل ثانيتين بيانات جديدة قادمة من Kafka وإدخال النتيجة في قاعدة البيانات.

كما هو مذكور أعلاه ، فإن نقاط التفتيش هي الآلية الأساسية في SparkStreaming التي يجب تكوينها لتوفير التسامح مع الخطأ. سنستخدم نقاط التحكم ، وفي حالة حدوث انخفاض في الإجراءات ، ستحتاج وحدة Spark Streaming فقط للعودة إلى نقطة التحكم الأخيرة واستئناف الحسابات منها لاستعادة البيانات المفقودة.

يمكنك تمكين نقطة التفتيش عن طريق تعيين الدليل في نظام ملفات موثوق ومقاوم للأخطاء (على سبيل المثال ، HDFS ، S3 ، إلخ) ، حيث سيتم حفظ معلومات نقطة التفتيش. يتم ذلك باستخدام ، على سبيل المثال:

 streamingContext.checkpoint(checkpointDirectory) 

في المثال الخاص بنا ، سوف نستخدم الطريقة التالية ، أي في حالة وجود checkpointDirectory ، سيتم إعادة إنشاء السياق من بيانات نقطة التحكم. إذا كان الدليل غير موجود (أي أنه يتم تنفيذه لأول مرة) ، فسيتم استدعاء الدالة functionToCreateContext لإنشاء سياق جديد وتكوين DStreams:

 from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) 

قم بإنشاء كائن DirectStream للاتصال بموضوع "المعاملة" باستخدام طريقة createDirectStream لمكتبة KafkaUtils:

 from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": broker_list}) 

تحليل البيانات الواردة بتنسيق JSON:

 rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream") 

باستخدام Spark SQL ، نقوم بعمل تجميع بسيط وطباعة النتيجة على وحدة التحكم:

 select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency 

الحصول على نص الاستعلام وتشغيله من خلال Spark SQL:

 sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5) 

ثم نحفظ البيانات المجمعة المستلمة في جدول في AWS RDS. لحفظ نتائج التجميع في جدول قاعدة البيانات ، سنستخدم طريقة الكتابة للكائن DataFrame:

 testResultDataFrame.write \ .format("jdbc") \ .mode("append") \ .option("driver", 'org.postgresql.Driver') \ .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") \ .option("dbtable", "transaction_flow") \ .option("user", "habr") \ .option("password", "habr12345") \ .save() 

بضع كلمات حول إعداد اتصال AWS RDS. لقد أنشأنا المستخدم وكلمة المرور الخاصة به في خطوة "نشر AWS PostgreSQL". بالنسبة لعنوان URL لخادم قاعدة البيانات ، استخدم Endpoint ، والذي يتم عرضه في قسم الاتصال والأمان:


من أجل توصيل Spark و Kafka بشكل صحيح ، يجب تشغيل المهمة من خلال smark-submit باستخدام قطعة أثرية من spark-streaming-kafka-0-8_2.11 . بالإضافة إلى ذلك ، نطبق أيضًا الأداة للتفاعل مع قاعدة بيانات PostgreSQL ، وسننقلها من خلال - حزم.

من أجل مرونة البرنامج النصي ، نأخذ أيضًا اسم خادم الرسائل والموضوع الذي نريد استلام البيانات منه كمعلمات إدخال.

لذلك ، حان الوقت لبدء واختبار النظام:

 spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\ org.postgresql:postgresql:9.4.1207 \ spark_job.py localhost:9092 transaction 

كل شيء يعمل بها! كما ترى في الصورة أدناه ، أثناء عمل التطبيق ، يتم عرض نتائج التجميع الجديدة كل ثانيتين ، لأننا قمنا بتعيين الفاصل الزمني للدفع إلى ثانيتين عند إنشاء كائن StreamingContext:


بعد ذلك ، نقوم بإجراء استعلام بسيط إلى قاعدة البيانات للتحقق من السجلات في جدول المعاملات_التدفق :


استنتاج


درست هذه المقالة مثالًا على معالجة معلومات التدفق باستخدام Spark Streaming بالتزامن مع Apache Kafka و PostgreSQL. مع نمو البيانات من مصادر مختلفة ، من الصعب المبالغة في تقدير القيمة العملية لـ Spark Streaming لإنشاء تطبيقات وتطبيقات متدفقة تعمل في الوقت الفعلي.

يمكنك العثور على شفرة المصدر الكاملة في مستودع التخزين الخاص بي على جيثب .

أنا مستعد لمناقشة هذا المقال بكل سرور ، وأتطلع إلى تعليقاتكم ، وآمل أيضًا أن يتم توجيه النقد البناء لجميع القراء المعنيين.

اتمنى لك التوفيق

ملاحظة: كان من المخطط أصلاً استخدام قاعدة بيانات PostgreSQL محلية ، لكن بالنظر إلى حبي لـ AWS ، قررت أن أضع قاعدة البيانات في السحابة. في المقالة التالية حول هذا الموضوع ، سأعرض كيفية تنفيذ النظام بالكامل الموضح أعلاه في AWS باستخدام AWS Kinesis و AWS EMR. اتبع الأخبار!

Source: https://habr.com/ru/post/ar451160/


All Articles