Cartographie du bruit avec KSQL, Raspberry Pi et radio



À première vue, cette histoire a tout pour mériter le statut de jeûne romantique la veille du 8 mars: avions, amour, un peu d'espionnage et, enfin, un chat (plus précisément, un chat). Il est difficile d'imaginer que tout cela soit directement lié à Kafka, KSQL et à l'expérience «comment trouver l'avion le plus bruyant en utilisant les technologies de l'information à domicile». C'est difficile, mais c'est nécessaire: c'est une telle expérience que Simon Obury a menée, et nous avons traduit un article de sa paternité avec une description de tous les détails du processus.

Notre nouveau chat nommé Snowflake se réveille tôt. Le bruit des avions survolant notre maison la réveille. Mais que faire si, en utilisant Apache Kafka, KSQL et Raspberry Pi, je pouvais déterminer quel avion gardait mon chat éveillé? Ce serait bien de créer un panneau de suivi divertissant, sur lequel le chat pourrait attirer son attention - et me donner un peu plus de sommeil.

En termes généraux



Nous transférons des avions du ciel vers des graphiques en utilisant Kafka et KSQL

Les avions déterminent leur emplacement à l'aide de récepteurs GPS. L'émetteur de bord signale périodiquement l'emplacement, le numéro d'identification, l'altitude et la vitesse du navire en utilisant de courtes transmissions radio. Ces émissions de surveillance dépendante automatique de diffusion ( AZN-V ) sont essentiellement des paquets de données qui sont ouverts à l'accès depuis les stations au sol.

Un micro-ordinateur, comme le Raspberry Pi, et plusieurs composants auxiliaires suffisent pour recevoir des messages des émetteurs aéroportés de l'avion se précipitant au-dessus de ma maison.

Les signaux aéroportés de l'avion ressemblent à une boule de messages emmêlés et nécessitent une systématisation. Reconnaître ces flux de données chaotiques, c'est comme écouter une conversation lors d'une fête bruyante. Par conséquent, pour trouver un avion qui inquiète mon chat, j'ai décidé d'utiliser une combinaison de Kafka et KSQL.


Chat éveillé et Raspberry Pi

Collection de lectures AZN-B avec le Raspberry Pi


Pour collecter les transmissions à bord, j'ai utilisé le Raspberry Pi et le RTL2832U, un modem USB qui était à l'origine vendu comme appareil pour regarder la télévision numérique sur un ordinateur. Sur Raspberry Pi, j'ai installé dump1090 - un programme qui reçoit les données d'AZN-V via RTL2832U à l'aide d'une petite antenne.


Ma radio logicielle de Raspberry Pi et RTL2832U

Convertissez les signaux AZN-B en thèmes Kafka


Maintenant que j'ai reçu un flux de signaux AZN-B bruts, nous devons prêter attention au trafic. Le Raspberry Pi n'a pas assez de puissance pour un calcul sérieux, j'ai donc dû transférer le traitement des données vers mon cluster local sur Kafka.



Les messages reçus sont divisés en messages de localisation ou messages sur l'identification de la carte . L'emplacement ressemble à un message de la forme: "la planche 7c6db8 vole à une altitude de 6250 pieds à la coordonnée -33.8.151.0 . " Les informations sur l'identification de la carte ressembleront à ceci : "la carte 7c451c vole le long de la route QJE1726" .

Un petit script Python pour mon Raspberry Pi partage tous les messages AZN-B entrants. J'ai utilisé le proxy Confluent Rest Proxy pour distribuer les données du Raspberry Pi aux thèmes location-topic et ident-topic sur Kafka. Le serveur proxy fournit une interface RESTful pour le cluster Kafka, ce qui facilite la création de messages à l'aide d'un simple appel REST sur Pi.



Je voulais comprendre quels avions survolaient mon toit et quels itinéraires. La base de données OpenFlights vous permet de comparer le code d'aéroport, par exemple 7C6DB8, attribué par l'Organisation de l'aviation civile internationale (OACI), avec le type d'avion - dans notre cas, le Boeing 737. J'ai téléchargé mes données cartographiques sur le thème icao-to-aircraft .

KSQL fournit un «moteur SQL» qui vous permet de traiter des données en temps réel sur des sujets Apache Kafka. Par exemple, pour trouver le code embarqué 7C6DB8, nous pouvons écrire la requête suivante:

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI 

De même, dans la rubrique détails des indicatifs , j'ai téléchargé les indicatifs (c'est-à-dire QFA563, c'est le vol de Qantas de Brisbane à Sydney).

 CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney 

Jetons maintenant un œil au flux de données d' emplacement-sujet . Ici, nous pouvons observer un flux constant de messages entrants sur l'emplacement d'un avion volant.

 kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"} 

La requête KSQL ressemblera à ceci:
 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495 

KSQL: harmonisation des flux ...


La vraie valeur de KSQL réside dans la possibilité de combiner les flux entrants de données de localisation avec les données source des rubriques (voir 03_ksql.sql ), c'est-à-dire en ajoutant des informations utiles au flux de données brutes. Ceci est très similaire à une «jointure gauche» dans une base de données traditionnelle. Le résultat est un autre thème Kafka créé sans une seule ligne de code Java!

source> CREATE STREAM location_and_details_stream AS \
SELECT l.ico, l.height, l.location, t.aircraft \
FROM flux_emplacement l \
JOINT GAUCHE icao_to_aircraft t ON l.ico = t.icao;
De plus, vous recevez une requête KSQL. Le flux de données ressemblera à ceci:

 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048 

De plus, nous pouvons combiner le flux d'entrée de l' indicatif d' appel avec le thème fixe callsign_details :

 CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland 

Nous avons maintenant deux sujets informatifs:

  1. location_and_details_stream , qui fournit un flux d'informations mises à jour sur l'emplacement et la vitesse de l'aéronef;
  2. ident_callsign_stream , qui décrit les détails du vol, y compris la compagnie aérienne et la destination.

Avec ces thèmes constamment mis à jour, nous pouvons créer de superbes panneaux de présentation. J'ai utilisé Kafka Connect pour télécharger des thèmes Kafka peuplés par KSQL vers Elasticsearch (scripts complets ici ).

Tableau de bord Kibana


Voici un exemple d'un panneau d'aperçu montrant l'emplacement d'un avion sur une carte. De plus, vous pouvez voir un graphique par compagnie aérienne, un graphique de l'altitude de vol et des nuages ​​de mots par destination principale. La carte thermique montre les zones où les avions sont concentrés, c'est-à-dire les zones avec le niveau de bruit le plus élevé.



Retour au chat


Aujourd'hui, le chat m'a réveillé vers 6 heures du matin. KSQL peut-il m'aider à trouver l'avion qui a survolé à ce moment-là au-dessus de ma maison à une altitude inférieure à 3 500 pieds?

 select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0 

Génial! Je peux repérer un avion sur mon toit à 6 h 15. Il s'avère que Snowflake a été réveillé par l'Airbus A380 (un énorme paquebot, soit dit en passant), qui s'est envolé pour Dubaï.

Juste quelques jours de repos et vous avez un système de traitement en streaming avec KSQL. De plus, ce qui vous permet de trouver rapidement des événements de données intéressants. Bien que Snowflake puisse être sceptique à leur sujet.

Source: https://habr.com/ru/post/fr442876/


All Articles