
À première vue, cette histoire a tout pour mériter le statut de jeûne romantique la veille du 8 mars: avions, amour, un peu d'espionnage et, enfin, un chat (plus précisément, un chat). Il est difficile d'imaginer que tout cela soit directement lié à Kafka, KSQL et à l'expérience «comment trouver l'avion le plus bruyant en utilisant les technologies de l'information à domicile». C'est difficile, mais c'est nécessaire: c'est une telle expérience que Simon Obury a menée, et nous avons traduit un article de sa paternité avec une description de tous les détails du processus.
Notre nouveau chat nommé Snowflake se réveille tôt. Le bruit des avions survolant notre maison la réveille. Mais que faire si, en utilisant Apache Kafka, KSQL et Raspberry Pi, je pouvais déterminer quel avion gardait mon chat éveillé? Ce serait bien de créer un panneau de suivi divertissant, sur lequel le chat pourrait attirer son attention - et me donner un peu plus de sommeil.
En termes généraux
Nous transférons des avions du ciel vers des graphiques en utilisant Kafka et KSQLLes avions déterminent leur emplacement à l'aide de récepteurs GPS. L'émetteur de bord signale périodiquement l'emplacement, le numéro d'identification, l'altitude et la vitesse du navire en utilisant de courtes transmissions radio. Ces émissions de surveillance dépendante automatique de diffusion (
AZN-V ) sont essentiellement des paquets de données qui sont ouverts à l'accès depuis les stations au sol.
Un micro-ordinateur, comme le Raspberry Pi, et plusieurs composants auxiliaires suffisent pour recevoir des messages des émetteurs aéroportés de l'avion se précipitant au-dessus de ma maison.
Les signaux aéroportés de l'avion ressemblent à une boule de messages emmêlés et nécessitent une systématisation. Reconnaître ces flux de données chaotiques, c'est comme écouter une conversation lors d'une fête bruyante. Par conséquent, pour trouver un avion qui inquiète mon chat, j'ai décidé d'utiliser une combinaison de Kafka et KSQL.
Chat éveillé et Raspberry PiCollection de lectures AZN-B avec le Raspberry Pi
Pour collecter les transmissions à bord, j'ai utilisé le Raspberry Pi et le RTL2832U, un modem USB qui était à l'origine vendu comme appareil pour regarder la télévision numérique sur un ordinateur. Sur Raspberry Pi, j'ai installé
dump1090 - un programme qui reçoit les données d'AZN-V via RTL2832U à l'aide d'une petite antenne.
Ma radio logicielle de Raspberry Pi et RTL2832UConvertissez les signaux AZN-B en thèmes Kafka
Maintenant que j'ai reçu un flux de signaux AZN-B bruts, nous devons prêter attention au trafic. Le Raspberry Pi n'a pas assez de puissance pour un calcul sérieux, j'ai donc dû transférer le traitement des données vers mon cluster local sur Kafka.

Les messages reçus sont divisés en
messages de localisation ou
messages sur l'identification de la carte . L'emplacement ressemble à un message de la forme:
"la planche 7c6db8 vole à une altitude de 6250 pieds à la coordonnée -33.8.151.0 .
" Les informations sur l'identification de la carte ressembleront à
ceci :
"la carte 7c451c vole le long de la route QJE1726" .
Un petit
script Python pour mon Raspberry Pi partage tous les messages AZN-B entrants. J'ai utilisé le proxy Confluent Rest Proxy pour distribuer les données du Raspberry Pi aux
thèmes location-topic et
ident-topic sur Kafka. Le serveur proxy fournit une interface RESTful pour le cluster Kafka, ce qui facilite la création de messages à l'aide d'un simple appel REST sur Pi.

Je voulais comprendre quels avions survolaient mon toit et quels itinéraires. La base de données
OpenFlights vous permet de comparer le code d'aéroport, par exemple 7C6DB8, attribué par l'Organisation de l'aviation civile internationale (OACI), avec le type d'avion - dans notre cas, le Boeing 737. J'ai
téléchargé mes
données cartographiques sur le
thème icao-to-aircraft .
KSQL fournit un «moteur SQL» qui vous permet de traiter des données en temps réel sur des sujets Apache Kafka. Par exemple, pour trouver le code embarqué 7C6DB8, nous pouvons écrire la requête suivante:
CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI
De même, dans la rubrique
détails des
indicatifs , j'ai téléchargé les indicatifs (c'est-à-dire QFA563, c'est le vol de Qantas de Brisbane à Sydney).
CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney
Jetons maintenant un œil au flux de données d'
emplacement-sujet . Ici, nous pouvons observer un flux constant de messages entrants sur l'emplacement d'un avion volant.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}
La requête KSQL ressemblera à ceci:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495
KSQL: harmonisation des flux ...
La vraie valeur de KSQL réside dans la possibilité de combiner les flux entrants de données de localisation avec les données source des rubriques (voir
03_ksql.sql ), c'est-à-dire en ajoutant des informations utiles au flux de données brutes. Ceci est très similaire à une «jointure gauche» dans une base de données traditionnelle. Le résultat est un autre thème Kafka créé sans une seule ligne de code Java!
source> CREATE STREAM location_and_details_stream AS \
SELECT l.ico, l.height, l.location, t.aircraft \
FROM flux_emplacement l \
JOINT GAUCHE icao_to_aircraft t ON l.ico = t.icao;
De plus, vous recevez une requête KSQL. Le flux de données ressemblera à ceci:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048
De plus, nous pouvons combiner le flux d'entrée de l'
indicatif d'
appel avec le thème fixe
callsign_details :
CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland
Nous avons maintenant deux sujets informatifs:
- location_and_details_stream , qui fournit un flux d'informations mises à jour sur l'emplacement et la vitesse de l'aéronef;
- ident_callsign_stream , qui décrit les détails du vol, y compris la compagnie aérienne et la destination.
Avec ces thèmes constamment mis à jour, nous pouvons créer de superbes panneaux de présentation. J'ai utilisé
Kafka Connect pour télécharger des thèmes Kafka peuplés par KSQL vers
Elasticsearch (scripts complets
ici ).
Tableau de bord Kibana
Voici un exemple d'un panneau d'aperçu montrant l'emplacement d'un avion sur une carte. De plus, vous pouvez voir un graphique par compagnie aérienne, un graphique de l'altitude de vol et des nuages de mots par destination principale. La carte thermique montre les zones où les avions sont concentrés, c'est-à-dire les zones avec le niveau de bruit le plus élevé.

Retour au chat
Aujourd'hui, le chat m'a réveillé vers 6 heures du matin. KSQL peut-il m'aider à trouver l'avion qui a survolé à ce moment-là au-dessus de ma maison à une altitude inférieure à 3 500 pieds?
select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0
Génial! Je peux repérer un avion sur mon toit à 6 h 15. Il s'avère que Snowflake a été réveillé par l'Airbus A380 (un énorme paquebot, soit dit en passant), qui s'est envolé pour Dubaï.
Juste quelques jours de repos et vous avez un système de traitement en streaming avec KSQL. De plus, ce qui vous permet de trouver rapidement des événements de données intéressants. Bien que Snowflake puisse être sceptique à leur sujet.
