Mapeamento de ruído com KSQL, Raspberry Pi e rádio



À primeira vista, esta história tem tudo para ganhar o status de um jejum romântico na véspera de 8 de março: aviões, amor, um pouco de espionagem e, finalmente, um gato (mais precisamente, um gato). É difícil imaginar que tudo isso esteja diretamente relacionado ao Kafka, KSQL e ao experimento "como encontrar as aeronaves mais barulhentas usando a tecnologia da informação doméstica". É difícil, mas é necessário: foi um experimento que Simon Obury conduziu e traduzimos um artigo de sua autoria descrevendo todos os detalhes do processo.

Nosso novo gato chamado Snowflake acorda cedo. Os sons de aviões voando sobre a nossa casa a despertam. Mas e se, usando Apache Kafka, KSQL e Raspberry Pi, eu pudesse determinar qual avião mantém meu gato acordado? Seria bom criar um divertido painel de rastreamento, no qual o gato pudesse desviar sua atenção - e me dar um pouco mais de sono.

Em termos gerais



Transferimos aviões do céu para gráficos usando Kafka e KSQL

A aeronave determina sua localização usando receptores GPS. O transmissor de bordo relata periodicamente a localização, número de identificação, altitude e velocidade do navio usando transmissões de rádio curtas. Essas transmissões de vigilância dependente automática de transmissão ( AZN-V ) são essencialmente pacotes de dados abertos para acesso a partir de estações terrestres.

Um microcomputador, como o Raspberry Pi, e vários componentes auxiliares são tudo o que é necessário para receber mensagens dos transmissores aéreos da aeronave correndo pela minha casa.

Os sinais aéreos da aeronave parecem uma bola de mensagens emaranhada e requerem sistematização. Reconhecer esses fluxos de dados caóticos é como ouvir uma conversa em uma festa barulhenta. Portanto, para encontrar um avião que preocupa meu gato, decidi usar uma combinação de Kafka e KSQL.


Gato despertado e Raspberry Pi

Coleção de leituras de AZN-B com o Raspberry Pi


Para coletar transmissões a bordo, usei o Raspberry Pi e o RTL2832U, um modem USB que foi originalmente vendido como um dispositivo para assistir TV digital em um computador. No Raspberry Pi, instalei o dump1090 - um programa que recebe dados do AZN-V via RTL2832U usando uma pequena antena.


Meu rádio de software do Raspberry Pi e RTL2832U

Converta sinais AZN-B em temas Kafka


Agora que recebi um fluxo de sinais AZN-B brutos, devemos prestar atenção ao tráfego. O Raspberry Pi não tem energia suficiente para computação séria; portanto, tive que transferir o processamento de dados para meu cluster local no Kafka.



As mensagens recebidas são divididas em mensagens de localização ou sobre a identificação do quadro . O local parece uma mensagem do formulário: "o painel 7c6db8 voa a uma altitude de 6.250 pés na coordenada -33,8,151.0" . As informações sobre a identificação do painel serão parecidas com: "o painel 7c451c voa ao longo da rota QJE1726" .

Um pequeno script Python para o meu Raspberry Pi compartilha todas as mensagens AZN-B recebidas. Usei o proxy Confluent Rest Proxy para distribuir dados do Raspberry Pi para os tópicos de localização e de identificação de tópicos no Kafka. O servidor proxy fornece uma interface RESTful para o cluster Kafka, o que facilita a criação de mensagens usando uma simples chamada REST no Pi.



Eu queria entender quais aviões voam sobre meu teto e quais rotas. O banco de dados do OpenFlights permite comparar o código do aeroporto, por exemplo, 7C6DB8 atribuído pela Organização Internacional de Aviação Civil (ICAO), com o tipo de aeronave - no nosso caso, o Boeing 737. Enviei meus dados de mapeamento para o tema icao-para-aeronave .

O KSQL fornece um "mecanismo SQL" que permite processar dados em tempo real nos tópicos do Apache Kafka. Por exemplo, para encontrar o código de bordo 7C6DB8, podemos escrever a seguinte consulta:

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI 

Da mesma forma, no tópico detalhes do indicativo, enviei os indicativos (por exemplo, QFA563, este é o voo da Qantas de Brisbane para Sydney).

 CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney 

Agora, vamos dar uma olhada no fluxo de dados do tópico do local . Aqui podemos observar um fluxo constante de mensagens recebidas sobre a localização de um avião voando.

 kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"} 

A consulta KSQL ficará assim:
 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495 

KSQL: harmonização de fluxo ...


O valor real do KSQL reside na capacidade de combinar fluxos de entrada de dados de localização com os dados de origem dos tópicos (consulte 03_ksql.sql ) - isto é, ao adicionar informações úteis ao fluxo de dados brutos. Isso é muito semelhante a uma "junção esquerda" em um banco de dados tradicional. O resultado é outro tema Kafka criado sem uma única linha de código Java!

origem> CREATE STREAM location_and_details_stream AS \
SELECIONAR l.ico, l.height, l.location, t.aircraft \
FROM location_stream l \
JUNTA ESQUERDA icao_para_aircraft t ON l.ico = t.icao;
Além disso, você recebe uma consulta KSQL. O fluxo de dados terá a seguinte aparência:

 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048 

Além disso, podemos combinar o fluxo de entrada do indicativo com o tema fixo callsign_details :

 CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland 

Agora, temos dois tópicos informativos:

  1. location_and_details_stream , que fornece um fluxo de informações atualizadas sobre a localização e a velocidade da aeronave;
  2. ident_callsign_stream , que descreve os detalhes do voo, incluindo a companhia aérea e o destino.

Com esses temas constantemente atualizados, podemos criar ótimos painéis de visão geral. Usei o Kafka Connect para fazer upload de temas Kafka preenchidos pelo KSQL no Elasticsearch (scripts completos aqui ).

Kibana Dashboard


Aqui está um exemplo de um painel de visão geral mostrando a localização de um avião em um mapa. Além disso, você pode ver um gráfico por companhia aérea, um gráfico da altitude do voo e nuvens de palavras por destino principal. Um mapa de calor mostra áreas onde as aeronaves estão concentradas, ou seja, áreas com os mais altos níveis de ruído.



De volta ao gato


Hoje o gato me acordou por volta das 6 da manhã. O KSQL pode me ajudar a encontrar o avião que sobrevoou minha casa a uma altitude inferior a 3.500 pés?

 select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0 

Awesome! Posso avistar um avião por cima do meu telhado às 6:15 da manhã. Acontece que Snowflake foi acordado pelo Airbus A380 (um grande navio, a propósito), que voou para Dubai.

Apenas alguns dias de folga e você tem um sistema de processamento de streaming com o KSQL. Além disso, permite encontrar rapidamente eventos de dados interessantes. Embora Snowflake possa ser cético em relação a eles.

Source: https://habr.com/ru/post/pt442876/


All Articles