
للوهلة الأولى ، تحتوي هذه القصة على كل شيء لكسب صيام رومانسي عشية يوم 8 مارس: الطائرات ، والحب ، والتجسس قليلاً ، وأخيراً ، القط (بدقة أكثر ، القط). من الصعب أن نتخيل أن كل هذا مرتبط مباشرة بكافكا ، KSQL وتجربة "كيفية العثور على الطائرات الأكثر ضوضاء باستخدام تكنولوجيا المعلومات المنزلية". إنه أمر صعب ، لكنه ضروري: لقد كانت هذه التجربة التي أجراها سيمون أوبيري ، وقمنا بترجمة مقال من تأليفه يصف كل تفاصيل العملية.
لدينا القط الجديد المسمى ندفة الثلج يستيقظ باكرا. أصوات الطائرات تحلق فوق منزلنا أيقظها. ولكن ماذا لو ، باستخدام Apache Kafka و KSQL و Raspberry Pi ، هل يمكنني تحديد الطائرة التي تبقي قطتي مستيقظة؟ سيكون من اللطيف إنشاء لوحة تتبع مسلية ، والتي يمكن للقطة أن تحول انتباهها إليها - وتمنحني مزيدًا من النوم.
بعبارات عامة
نقوم بنقل الطائرات من السماء إلى الرسومات باستخدام Kafka و KSQLتحدد الطائرات موقعها باستخدام مستقبلات GPS. يبلغ المرسل الموجود على متن السفينة بشكل دوري عن موقع السفينة ورقم تعريفها وارتفاعها وسرعتها باستخدام إرسالات الراديو القصيرة. تعد عمليات البث للمراقبة التلقائية التي تعتمد على البث (
AZN-V ) حزم بيانات بشكل أساسي مفتوحة للوصول من المحطات الأرضية.
جهاز كمبيوتر صغير واحد ، مثل Raspberry Pi ، والعديد من المكونات الإضافية ، كل ما هو مطلوب لتلقي الرسائل من أجهزة الإرسال المحمولة جواً التي تطير بسرعة فوق منزلي.
تبدو إشارات الطائرة المحمولة جواً ككرة متشابكة من الرسائل وتتطلب منهجية. يشبه التعرف على تدفقات البيانات الفوضوية مثل الاستماع إلى محادثة في حفلة صاخبة. لذلك ، للعثور على طائرة تقلق قطتي ، قررت استخدام مزيج من Kafka و KSQL.
استيقظ القط وتوت العليق بيمجموعة من قراءات AZN-B مع Raspberry Pi
لجمع عمليات الإرسال على متن الطائرة ، استخدمت Raspberry Pi و RTL2832U ، وهو مودم USB تم بيعه في الأصل كجهاز لمشاهدة التلفزيون الرقمي على جهاز كمبيوتر. على Raspberry Pi I ، قمت بتثبيت
dump1090 - وهو برنامج يستقبل البيانات من AZN-V عبر RTL2832U باستخدام هوائي صغير.
راديو البرنامج الخاص بي من Raspberry Pi و RTL2832Uتحويل إشارات AZN-B إلى سمات كافكا
الآن وقد استلمت دفقًا من إشارات AZN-B الخام ، يجب أن نولي اهتمامًا لحركة المرور. لا يملك Raspberry Pi طاقة كافية للحوسبة الجادة ، لذا اضطررت إلى نقل معالجة البيانات إلى مجموعتي المحلية على كافكا.

يتم تقسيم الرسائل المستلمة إلى
رسائل موقع أو
رسائل حول تحديد اللوحة . يشبه الموقع رسالة النموذج:
"تحلق اللوحة 7c6db8 على ارتفاع 6250 قدمًا عند الإحداثيات -33.8151.0 .
" ستبدو المعلومات المتعلقة بتحديد اللوحة كما يلي:
"الطائرة 7c451c تطير على طول الطريق QJE1726 .
"يقوم
برنامج Python الصغير الخاص بـ Raspberry Pi بمشاركة جميع رسائل AZN-B الواردة. لقد استخدمت وكيل Confluent Rest Proxy لتوزيع البيانات من Raspberry Pi إلى
موضوع موضوع الموقع وموضوع التعريف على Kafka. يوفر الخادم الوكيل واجهة مريحة لمجموعة نظام كافكا ، مما يجعل من السهل إنشاء رسائل باستخدام استدعاء REST بسيط على Pi.

أردت أن أفهم الطائرات التي تطير فوق سقفي والطرق. تسمح
لك قاعدة بيانات
OpenFlights بمقارنة رمز المطار ، على سبيل المثال 7C6DB8 ، المعينة من قبل منظمة الطيران المدني الدولي (ICAO) ، بنوع الطائرة - في حالتنا ، Boeing 737. لقد
حمّلت بيانات التعيين إلى موضوع
icao-to-aircraft .
يوفر
KSQL "محرك SQL" الذي يسمح لك بمعالجة البيانات في الوقت الحقيقي على مواضيع Apache Kafka. على سبيل المثال ، للعثور على الرمز 7C6DB8 على متن الطائرة ، يمكننا كتابة الاستعلام التالي:
CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI
وبالمثل ، في موضوع
تفاصيل النداء ، حملت علامات النداء (أي QFA563 ، هذه هي رحلة طيران كانتاس من بريسبان إلى سيدني).
CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney
الآن دعونا نلقي نظرة على دفق بيانات
موضوع الموقع . هنا يمكننا مراقبة تدفق مستمر من الرسائل الواردة حول موقع الطائرة الطائرة.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}
سيبدو استعلام KSQL كما يلي:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495
KSQL: تنسيق الدفق ...
تكمن القيمة الحقيقية لـ KSQL في القدرة على الجمع بين التدفقات الواردة لبيانات الموقع والبيانات المصدر للمواضيع (انظر
03_ksql.sql ) - أي في إضافة معلومات مفيدة إلى دفق البيانات الخام. هذا يشبه إلى حد كبير "صلة اليسار" في قاعدة البيانات التقليدية. والنتيجة هي موضوع كافكا آخر تم إنشاؤه دون سطر واحد من كود جافا!
المصدر> إنشاء STREAM location_and_details_stream AS \
اختر l.ico ، l. Height ، l.location ، t.aircraft \
من location_stream l \
LEFT JOIN icao_to_aircraft t ON l.ico = t.icao؛
بالإضافة إلى ذلك ، تتلقى استعلام KSQL. سيظهر دفق البيانات بهذا الشكل:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048
بالإضافة إلى ذلك ، يمكننا دمج دفق إدخال
إشارة النداء مع
سمة callign_details الثابتة:
CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland
الآن لدينا موضوعان إعلاميان:
- location_and_details_stream ، والذي يوفر دفقًا من المعلومات المحدّثة حول موقع الطائرة وسرعتها ؛
- ident_callsign_stream ، الذي يصف تفاصيل الرحلة ، بما في ذلك شركة الطيران والوجهة.
من خلال هذه السمات التي يتم تحديثها باستمرار ، يمكننا إنشاء بعض لوحات النظرة العامة الرائعة. لقد استخدمت
Kafka Connect لتحميل سمات Kafka التي يسكنها KSQL إلى Elasticsearch (البرامج النصية الكاملة
هنا ).
Kibana Dashboard
فيما يلي مثال على لوحة نظرة عامة توضح موقع الطائرة على الخريطة. بالإضافة إلى ذلك ، يمكنك رؤية مخطط حسب شركة الطيران ، رسم بياني لارتفاع الرحلة ، وغيوم الكلمات حسب الوجهة الرئيسية. تُظهر خريطة الحرارة المناطق التي تتركز فيها الطائرات ، أي المناطق ذات أعلى مستوى للضوضاء.

العودة إلى القط
استيقظت القطة اليوم في حوالي الساعة 6 صباحًا هل تستطيع KSQL مساعدتي في العثور على الطائرة التي حلقت في هذا الوقت على منزلي على ارتفاع أقل من 3500 قدم؟
select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0
رائع! أستطيع أن أرصد طائرة فوق سقفي في الساعة 6:15 صباحًا. اتضح أن Snowflake قد استيقظت على طائرة إيرباص A380 (سفينة ضخمة ، بالمناسبة) ، التي وصلت إلى دبي.
فقط بضعة أيام عطلة ، ولديك نظام معالجة التدفق مع KSQL. والذي ، علاوة على ذلك ، يسمح لك بالعثور بسرعة على أحداث البيانات المثيرة للاهتمام. على الرغم من أن ندفة الثلج قد تكون متشككة فيها.
