使用KSQL,Raspberry Pi和Radio进行噪声映射



乍一看,这个故事具有在3月8日前夕获得浪漫禁食的所有条件:飞机,爱情,一些间谍活动,最后还有一只猫(更确切地说是一只猫)。 很难想象所有这些都与卡夫卡,KSQL和“如何利用家庭信息技术找到最吵的飞机”的实验直接相关。 这很困难,但有必要:西蒙·奥伯里(Simon Obury)进行了这样的实验,我们翻译了他的著作,描述了该过程的所有细节。

我们的新猫叫雪花,起得很早。 飞机飞过我们家的声音唤醒了她。 但是,如果我可以使用Apache Kafka,KSQL和Raspberry Pi确定哪架飞机可以使猫保持清醒呢? 最好创建一个有趣的跟踪面板,猫可以在该面板上转移注意力-给我更多的睡眠。

一般而言



我们使用Kafka和KSQL将飞机从天空转移到图形

飞机使用GPS接收器确定其位置。 车载发射器使用短距离无线电发射定期报告船舶的位置,识别号,高度和速度。 广播自动相关监视( AZN-V )的这些广播本质上是开放供地面站访问的数据包。

仅需要一台微型计算机(如Raspberry Pi)和几个辅助组件,就可以从飞机的机上发射机接收在我家中掠过的消息。

飞机的空中信号看起来像是一团纠结的信息,需要系统化。 识别这些混乱的数据流就像在嘈杂的聚会上听谈话。 因此,为了找到让我的猫担心的飞机,我决定使用Kafka和KSQL的组合。


唤醒猫和树莓派

Raspberry Pi收集AZN-B读数


为了收集车载传输,我使用了Raspberry Pi和RTL2832U,这是一个USB调制解调器,最初是作为在计算机上观看数字电视的设备而出售的。 在Raspberry Pi上,我安装了dump1090-一个使用小天线通过RTL2832U从AZN-V接收数​​据的程序。


我的Raspberry Pi和RTL2832U软件无线电

将AZN-B信号转换为Kafka主题


现在,我已经收到了原始的AZN-B信号流,我们应该注意流量。 Raspberry Pi没有足够的能力进行严肃的计算,因此我不得不将数据处理转移到我在Kafka上的本地集群。



收到的消息分为位置 消息有关板标识的消息 。 该位置看起来像是以下形式的消息: “ 7c6db8板在坐标-33.8,151.0处以6,250英尺的高度飞行 有关板标识的信息看起来像: “板7c451c沿着QJE1726路线飞行

我的Raspberry Pi的一个小型Python脚本共享所有传入的AZN-B消息。 我使用Confluent Rest Proxy代理将Raspberry Pi中的数据分发到Kafka上的location-topicident-topic主题。 代理服务器为Kafka集群提供了RESTful接口,使用Pi上的简单REST调用可以轻松创建消息。



我想了解哪些飞机飞过我的屋顶以及哪些路线。 OpenFlights数据库使可以将机场代码(例如国际民航组织(ICAO)分配的7C6DB8)与飞机类型(在本例中为波音737)进行比较。 我地图数据上传icao飞机主题。

KSQL提供了一个“ SQL引擎”,使您可以实时处理Apache Kafka主题上的数据。 例如,要找到板载代码7C6DB8,我们可以编写以下查询:

CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO'); ksql> SELECT manufacturer, aircraft, registration \ FROM icao_to_aircraft \ WHERE icao = '7C6DB8'; Boeing | B738 | VH-VYI 

同样,在呼号详细信息主题中,我上传了呼号(即QFA563,这是澳航从布里斯班飞往悉尼的航班)。

 CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN'); ksql> SELECT operatorname, fromairport, toairport \ FROM callsign_details \ WHERE callsign = 'QFA563'; Qantas | Brisbane | Sydney 

现在,让我们看一下位置主题数据流。 在这里,我们可以观察到源源不断的有关飞行飞机位置的消息流。

 kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic {"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"} 

KSQL查询将如下所示:
 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'), \ ico, height, location \ FROM location_stream \ WHERE ico = '7C6DB8'; 2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495 

KSQL:流协调...


KSQL的真正价值在于能够将位置数据的传入流与主题的源数据相结合(请参阅03_ksql.sql ),即在原始数据流中添加有用的信息。 这非常类似于传统数据库中的“左联接”。 结果是创建了另一个Kafka主题,而没有一行Java代码!

源>创建流location_and_details_stream AS \
选择l.ico,l.height,l.location,t.aircraft \
从location_stream l \
左联接icao_to_aircraft t ON l.ico = t.icao;
此外,您还会收到一个KSQL查询。 数据流将如下所示:

 ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , manufacturer \ , aircraft \ , registration \ , height \ , location \ FROM location_and_details_stream; 18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052 18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049 18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048 

此外,我们可以将callsign输入流与固定的callsign_details主题结合使用:

 CREATE STREAM ident_callsign_stream AS \ SELECT i.ico \ , c.operatorname \ , c.callsign \ , c.fromairport \ , c.toairport \ FROM ident_stream i \ LEFT JOIN callsign_details c ON i.indentification = c.callsign; ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss') \ , operatorname \ , callsign \ , fromairport \ , toairport \ FROM ident_callsign_stream ; 18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns 18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney 18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland 

现在我们有两个有用的主题:

  1. location_and_details_stream ,提供有关飞机位置和速度的更新信息流;
  2. ident_callsign_stream ,它描述航班的详细信息,包括航空公司和目的地。

通过这些不断更新的主题,我们可以创建一些出色的概述面板。 我使用Kafka Connect将 KSQL填充的Kafka主题上传到Elasticsearch此处为完整脚本)。

Kibana仪表板


这是一个概览面板的示例,显示了飞机在地图上的位置。 此外,您还可以查看航空公司的图表,航班高度的图表以及主要目的地的词云。 热图显示了飞机集中的区域,即噪声水平最高的区域。



回到猫


今天猫在早上6点左右把我叫醒。 KSQL可以帮助我找到这次飞行在我的房子上方,不到3500英尺的高度上的飞机吗?

 select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss') , manufacturer , aircraft , registration , height from location_and_details_stream where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm'); 2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0 2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0 

太棒了! 我可以在早上6:15在屋顶上发现一架飞机。 事实证明,雪花已被飞往迪拜的空中客车A380(顺便说一句,巨大的班轮)唤醒了。

只需几天的时间,您就可以使用带有KSQL的流处理系统。 而且,它使您可以快速找到有趣的数据事件。 尽管Snowflake可能对此表示怀疑。

Source: https://habr.com/ru/post/zh-CN442876/


All Articles