Mapeo de ruido con KSQL, Raspberry Pi y radio



A primera vista, esta historia tiene todo para ganar el estatus de una publicación romántica en la víspera del 8 de marzo: aviones, amor, un poco de espionaje y, finalmente, un gato (más precisamente, un gato). Es difícil imaginar que todo esto esté directamente relacionado con Kafka, KSQL y el experimento "cómo encontrar el avión más ruidoso utilizando la tecnología de información doméstica". Es difícil, pero es necesario: fue un experimento que realizó Simon Obury, y tradujimos un artículo de su autoría con una descripción de todos los detalles del proceso.

Nuestro nuevo gato llamado Snowflake se despierta temprano. El sonido de los aviones volando sobre nuestra casa la despierta. Pero, ¿y si, usando Apache Kafka, KSQL y Raspberry Pi, pudiera determinar qué avión mantiene despierto a mi gato? Sería bueno crear un panel de seguimiento entretenido, en el que el gato pudiera cambiar su atención, y darme un poco más de sueño.

En términos generales



Transferimos planos del cielo a gráficos usando Kafka y KSQL

Las aeronaves determinan su ubicación utilizando receptores GPS. El transmisor a bordo informa periódicamente la ubicación, el número de identificación, la altitud y la velocidad del barco mediante transmisiones de radio cortas. Estas transmisiones de vigilancia dependiente automática de transmisión ( AZN-V ) son esencialmente paquetes de datos que están abiertos para el acceso desde estaciones terrestres.

Una microcomputadora, como la Raspberry Pi, y varios componentes auxiliares son todo lo que se necesita para recibir mensajes de los transmisores a bordo de la aeronave que corren por mi casa.

Las señales aéreas de la aeronave parecen una bola de mensajes enredada y requieren sistematización. Reconocer estos flujos de datos caóticos es como escuchar una conversación en una fiesta ruidosa. Por lo tanto, para encontrar un avión que preocupa a mi gato, decidí usar una combinación de Kafka y KSQL.


Gato Despierto y Frambuesa Pi

Colección de lecturas AZN-B con Raspberry Pi


Para recopilar transmisiones a bordo, utilicé Raspberry Pi y RTL2832U, un módem USB que se vendió originalmente como un dispositivo para mirar televisión digital en una computadora. En Raspberry Pi instalé dump1090 , un programa que recibe datos de AZN-V a través de RTL2832U usando una antena pequeña.


Mi radio de software de Raspberry Pi y RTL2832U

Convierta señales AZN-B a temas Kafka


Ahora que he recibido un flujo de señales AZN-B sin procesar, debemos prestar atención al tráfico. Raspberry Pi no tiene suficiente potencia para la informática seria, por lo que tuve que transferir el procesamiento de datos a mi clúster local en Kafka.



Los mensajes recibidos se dividen en mensajes de ubicación o mensajes sobre la identificación de la pizarra . La ubicación parece un mensaje de la forma: "el tablero 7c6db8 vuela a una altitud de 6,250 pies en la coordenada -33.8,151.0" . La información sobre la identificación del tablero se verá así: "el tablero 7c451c vuela a lo largo de la ruta QJE1726" .

Un pequeño script de Python para mi Raspberry Pi comparte todos los mensajes entrantes de AZN-B. Utilicé el proxy Confluent Rest Proxy para distribuir datos de Raspberry Pi a los temas de ubicación y temas de identidad en Kafka. El servidor proxy proporciona una interfaz RESTful para el clúster Kafka, lo que facilita la creación de mensajes utilizando una simple llamada REST en Pi.



Quería entender qué aviones sobrevuelan mi techo y qué rutas. La base de datos OpenFlights le permite comparar el código del aeropuerto, por ejemplo 7C6DB8 asignado por la Organización de Aviación Civil Internacional (OACI), con el tipo de avión, en nuestro caso, el Boeing 737. Subí mis datos de mapeo al tema de icao a avión .

KSQL proporciona un "motor SQL" que le permite procesar datos en tiempo real sobre temas de Apache Kafka. Por ejemplo, para encontrar el código integrado 7C6DB8, podemos escribir la siguiente consulta:

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI 

De manera similar, en el tema de detalles del indicativo de llamada , cargué los indicativos de llamada (es decir, QFA563, este es el vuelo de Qantas de Brisbane a Sydney).

 CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney 

Ahora echemos un vistazo a la secuencia de datos de tema de ubicación . Aquí podemos observar un flujo constante de mensajes entrantes sobre la ubicación de un avión volador.

 kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"} 

La consulta KSQL se verá así:
 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495 

KSQL: armonización de flujo ...


El valor real de KSQL es la capacidad de combinar flujos entrantes de datos de ubicación con los datos de origen de los temas (ver 03_ksql.sql ), es decir, agregar información útil al flujo de datos sin procesar. Esto es muy similar a una "combinación izquierda" en una base de datos tradicional. ¡El resultado es otro tema de Kafka creado sin una sola línea de código Java!

fuente> CREAR STREAM location_and_details_stream AS \
SELECCIONE l.ico, l.height, l.location, t.aircraft \
DESDE location_stream l \
IZQUIERDA UNIRSE a icao_to_aircraft t ON l.ico = t.icao;
Además, recibe una consulta KSQL. El flujo de datos se verá así:

 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048 

Además, podemos combinar la secuencia de entrada del indicativo con el tema fijo indicativo :

 CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland 

Ahora tenemos dos temas informativos:

  1. location_and_details_stream , que proporciona un flujo de información actualizada sobre la ubicación y la velocidad de la aeronave;
  2. ident_callsign_stream , que describe los detalles del vuelo, incluida la aerolínea y el destino.

Con estos temas que se actualizan constantemente, podemos crear algunos paneles de resumen geniales. Utilicé Kafka Connect para subir temas de Kafka poblados por KSQL a Elasticsearch (guiones completos aquí ).

Tablero de Kibana


Aquí hay un ejemplo de un panel de resumen que muestra la ubicación de un avión en un mapa. Además, puede ver un gráfico por aerolínea, un gráfico de altitud de vuelo y nubes de palabras por destino principal. El mapa de calor muestra las áreas donde se concentran los aviones, es decir, las áreas con el nivel de ruido más alto.



De vuelta al gato


Hoy el gato me despertó alrededor de las 6 a.m. ¿Puede KSQL ayudarme a encontrar el avión que voló en este momento sobre mi casa a una altitud de menos de 3,500 pies?

 select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0 

Impresionante! Puedo ver un avión sobre mi techo a las 6:15 a.m. Resulta que Snowflake fue despertado por el Airbus A380 (un gran transatlántico, por cierto), que voló a Dubai.

Solo un par de días de descanso, y tiene un sistema de procesamiento de transmisión con KSQL. Lo cual, además, le permite encontrar rápidamente eventos de datos interesantes. Aunque Snowflake puede ser escéptico sobre ellos.

Source: https://habr.com/ru/post/442876/


All Articles