Noise Mapping mit KSQL, Raspberry Pi und Radio



Auf den ersten Blick hat diese Geschichte alles, um sich am Vorabend des 8. März den Status eines romantischen Postens zu verdienen: Flugzeuge, Liebe, ein wenig Spionage und schließlich eine Katze (genauer gesagt eine Katze). Es ist kaum vorstellbar, dass all dies in direktem Zusammenhang mit Kafka, KSQL und dem Experiment „Wie man die lautesten Flugzeuge mithilfe der Heiminformationstechnologie findet“ steht. Es ist schwierig, aber notwendig: Es war ein solches Experiment, das Simon Obury durchgeführt hat, und wir haben einen Artikel seiner Urheberschaft mit einer Beschreibung aller Details des Prozesses übersetzt.

Unsere neue Katze namens Snowflake wacht früh auf. Die Geräusche von Flugzeugen, die über unser Haus fliegen, wecken sie. Aber was ist, wenn ich mit Apache Kafka, KSQL und Raspberry Pi feststellen kann, welches Flugzeug meine Katze wach hält? Es wäre schön, ein unterhaltsames Tracking-Panel zu erstellen, auf das die Katze ihre Aufmerksamkeit lenken könnte - und mir etwas mehr Schlaf geben könnte.

Im Allgemeinen



Mit Kafka und KSQL übertragen wir Flugzeuge vom Himmel auf Grafiken

Flugzeuge bestimmen ihren Standort mithilfe von GPS-Empfängern. Der Bordsender meldet regelmäßig den Standort, die Identifikationsnummer, die Höhe und die Geschwindigkeit des Schiffes mithilfe kurzer Funkübertragungen. Diese Sendungen der automatischen abhängigen Rundfunküberwachung ( AZN-V ) sind im Wesentlichen Datenpakete, die für den Zugriff von Bodenstationen offen sind.

Ein Mikrocomputer wie der Raspberry Pi und mehrere Zusatzkomponenten sind alles, was benötigt wird, um Nachrichten von den Bordsendern des Flugzeugs zu empfangen, die über mein Haus huschen.

Die Signale des Flugzeugs in der Luft sehen aus wie ein Wirrwarr von Nachrichten und müssen systematisiert werden. Das Erkennen dieser chaotischen Datenströme ist wie das Abhören eines Gesprächs auf einer lauten Party. Um ein Flugzeug zu finden, das meine Katze beunruhigt, habe ich mich für eine Kombination aus Kafka und KSQL entschieden.


Erwachte Katze und Himbeer-Pi

Sammlung von AZN-B-Messwerten mit dem Raspberry Pi


Zum Sammeln von On-Board-Übertragungen habe ich den Raspberry Pi und RTL2832U verwendet, ein USB-Modem, das ursprünglich als Gerät zum Ansehen von digitalem Fernsehen auf einem Computer verkauft wurde. Auf Raspberry Pi habe ich dump1090 installiert - ein Programm, das mit einer kleinen Antenne Daten von AZN-V über RTL2832U empfängt.


Mein Software-Radio von Raspberry Pi und RTL2832U

Konvertieren Sie AZN-B-Signale in Kafka-Themen


Nachdem ich einen Strom von rohen AZN-B-Signalen erhalten habe, sollten wir auf den Verkehr achten. Raspberry Pi hat nicht genug Leistung für ernsthafte Computer, daher musste ich die Datenverarbeitung auf meinen lokalen Cluster auf Kafka übertragen.



Empfangene Nachrichten werden entweder in Standortnachrichten oder Nachrichten über die Identifizierung der Karte unterteilt . Der Standort sieht aus wie eine Nachricht der Form: "Das 7c6db8-Board fliegt in einer Höhe von 6.250 Fuß an der Koordinate -33,8,151,0 . " Informationen zur Identifizierung der Karte sehen folgendermaßen aus: "Die Karte 7c451c fliegt entlang der Route QJE1726."

Ein kleines Python-Skript für meinen Raspberry Pi teilt alle eingehenden AZN-B-Nachrichten. Ich habe den Confluent Rest Proxy-Proxy verwendet, um Daten vom Raspberry Pi an die Themen " Standortthema" und " Ident-Thema" in Kafka zu verteilen. Der Proxyserver bietet eine RESTful-Schnittstelle für den Kafka-Cluster, mit der Nachrichten mithilfe eines einfachen REST-Aufrufs auf Pi einfach erstellt werden können.



Ich wollte verstehen, welche Flugzeuge über mein Dach fliegen und welche Routen. Mit der OpenFlights- Datenbank können Sie den von der Internationalen Zivilluftfahrt-Organisation (ICAO) zugewiesenen Flughafencode, beispielsweise 7C6DB8, mit dem Flugzeugtyp vergleichen - in unserem Fall der Boeing 737. Ich habe meine Kartendaten in das Thema icao-to-Flugzeug hochgeladen .

KSQL bietet eine „SQL-Engine“, mit der Sie Daten zu Apache Kafka-Themen in Echtzeit verarbeiten können. Um beispielsweise den integrierten Code 7C6DB8 zu finden, können wir die folgende Abfrage schreiben:

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI 

In ähnlicher Weise habe ich im Thema Rufzeichendetails die Rufzeichen hochgeladen (d. H. QFA563, dies ist Qantas 'Flug von Brisbane nach Sydney).

 CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney 

Schauen wir uns nun den Datenstrom zum Standortthema an . Hier können wir einen konstanten Strom eingehender Nachrichten über den Standort eines fliegenden Flugzeugs beobachten.

 kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"} 

Die KSQL-Abfrage sieht folgendermaßen aus:
 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495 

KSQL: Stream Harmonisierung ...


Der wahre Wert von KSQL liegt in der Fähigkeit, eingehende Ströme von Standortdaten mit den Quelldaten von Themen zu kombinieren (siehe 03_ksql.sql ), dh nützliche Informationen zum Rohdatenstrom hinzuzufügen. Dies ist einem "Left Join" in einer herkömmlichen Datenbank sehr ähnlich. Das Ergebnis ist ein weiteres Kafka-Thema, das ohne eine einzige Zeile Java-Code erstellt wurde!

source> CREATE STREAM location_and_details_stream AS \
SELECT l.ico, l.height, l.location, t.aircraft \
FROM location_stream l \
LEFT JOIN icao_to_aircraft t ON l.ico = t.icao;
Zusätzlich erhalten Sie eine KSQL-Abfrage. Der Datenstrom sieht folgendermaßen aus:

 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048 

Darüber hinaus können wir den Rufzeichen- Eingabestream mit dem festen Thema Rufzeichen_details kombinieren:

 CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland 

Jetzt haben wir zwei informative Themen:

  1. location_and_details_stream , der einen Strom aktualisierter Informationen über den Standort und die Geschwindigkeit des Flugzeugs bereitstellt;
  2. ident_callsign_stream , das die Details des Fluges beschreibt, einschließlich der Fluggesellschaft und des Ziels.

Mit diesen ständig aktualisierten Themen können wir einige großartige Übersichtsfenster erstellen. Ich habe Kafka Connect verwendet , um von KSQL aufgefüllte Kafka-Themen in Elasticsearch hochzuladen (vollständige Skripte hier ).

Kibana Dashboard


Hier ist ein Beispiel eines Übersichtsfensters, das den Standort eines Flugzeugs auf einer Karte zeigt. Darüber hinaus können Sie eine Karte nach Fluggesellschaft, eine Grafik der Flughöhe und Wortwolken nach Hauptziel anzeigen. Die Wärmekarte zeigt die Bereiche, in denen Flugzeuge konzentriert sind, dh die Bereiche mit dem höchsten Geräuschpegel.



Zurück zur Katze


Heute hat mich die Katze gegen 6 Uhr morgens geweckt. Kann KSQL mir helfen, das Flugzeug zu finden, das zu diesem Zeitpunkt über meinem Haus in einer Höhe von weniger als 3.500 Fuß geflogen ist?

 select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0 

Super! Ich kann um 6:15 Uhr ein Flugzeug über meinem Dach erkennen. Es stellt sich heraus, dass Snowflake vom Airbus A380 (übrigens ein riesiger Liner) geweckt wurde, der nach Dubai flog.

Nur ein paar Tage frei, und Sie haben ein Streaming-Verarbeitungssystem mit KSQL. So können Sie außerdem schnell interessante Datenereignisse finden. Obwohl Snowflake skeptisch gegenüber ihnen sein kann.

Source: https://habr.com/ru/post/de442876/


All Articles