Flightradar24-如何运作? 第2部分,ADS-B协议

我要说的是,每个朋友或家人曾经乘坐飞机的人都使用Flightradar24-一种免费,便捷的实时跟踪航班的服务。

图片

第一部分中,描述了操作的基本思想。 现在,让我们进一步了解一下,飞机和地面站之间正在正确发送和接收哪些数据。 我们还将使用Python解码此数据。

历史沿革


很明显,飞机数据不仅适用于想要在智能手机上查看数据的用户。 该系统称为ADS – B(自动相关监视-广播),旨在将飞机信息数据自动传输到控制中心-发送不同的参数,例如坐标,速度,航向,高度等。 以前,调度员只能在雷达屏幕上看到一点。 当飞机数量急剧增加时,这绝对是不够的。

从技术上讲,ADS-B由飞机内部的发射器组成,它以1090 MHz的相对较高的频率定期发送信息数据帧(还有其他一些模式,但是它们对我们而言并不那么有趣,因为坐标仅在此处传输) 当然,在机场某处也有一个接收器,但是对于我们和用户来说,我们自己的接收器更有趣。

为了进行比较,第一个为普通用户设计的系统Airnav Radarbox于2007年问世,价格约为900美元,每年约250美元的用户订购网络服务费用(该系统的主要思想是收集和共享来自许多接收器的数据,独立接收器则相对没有用)。



现在,当RTL-SDR接收器变得越来越可用时,只需30美元就可以制造出类似的设备。 可以在本文的第一部分中找到它,我们将进一步介绍该协议本身-让我们看看它是如何工作的。

接收信号


首先,我们需要记录一个信号样本。 整个信号只有120微秒的长度,要以良好的“分辨率”查看其细节,最好具有至少5 MHz采样率的SDR无线电。

图片

记录后,我们将获得一个具有5,000,000个样本/秒采样率的WAV文件,这种记录的30秒大小约为500MB。 当然,用您喜欢的媒体播放器收听它毫无用处-文件不包含声音,而是直接数字化的无线电信号本身-这正是软件无线电的工作原理。

我们可以使用Python打开和处理此文件。 那些希望自己进行实验的人可以从此链接下载示例。

让我们打开一个文件,看看里面是什么。

from scipy.io import wavfile import matplotlib.pyplot as plt import numpy as np fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") data = data.astype(float) I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) plt.plot(A) plt.show() 

结果:我们在噪声上看到了一些“冲动”。



每个“脉冲”都是一个信号,如果我们提高图形的分辨率,则其结构清晰可见。



我们可以看到,图片与上面的描述完全一致。 现在,我们可以处理这些数据。

解码方式


首先,我们需要一点点流。 信号本身使用曼彻斯特编码进行编码:



从半咬合的差异中,我们可以轻松获得真实的“ 0”和“ 1”。

  bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" 

信号本身的结构如下所示:



让我们更详细地查看字段。

DF (下行格式,5位)-定义消息的类型。 它们有几种类型:


页面来源

我们只对DF17类型感兴趣,因为只有其中一个包含飞机坐标。

ICAO (24位)-是唯一的国际飞机代码。 我们可以通过该网站上的代码检查飞机(不幸的是,作者已停止更新数据库,但仍然有用)。 例如,对于3c5ee2代码,我们可以具有以下信息:



DATA (56或112位)-是数据本身,我们将对其进行解码。 数据的前5位是“ 类型代码”字段,其中包含要存储的数据的子类型(不要与DF字段混合)。 有很多这样的类型:


表来源

让我们看一些例子。

飞机识别

二进制形式的示例:

00100 011 000101 010111 000111 110111 110001 111000

资料栏位:

 +------+------+------+------+------+------+------+------+------+------+ | TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 | +------+------+------+------+------+------+------+------+------+------+ 

TC = 00100b = 4,并且每个符号C1-C8包含代码,应与该字符串中的索引匹配:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ ##### _ ################ 0123456789 ######

解码后容易得到飞机的名称:EWG7184

 symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) 

空降位置

名称解码很简单,但是坐标更复杂。 它们以2帧的形式发送,偶数和奇数。 域代码TC = 01011b = 11。



偶数和奇数数据帧的示例:

 01011 000 000101110110 00 10111000111001000 10000110101111001 01011 000 000110010000 01 10010011110000110 10000011110001000 

坐标本身的计算使用一些棘手的公式:


来源

我不是GIS专家,所以我不知道它来自哪里。 谁知道的更好,请在评论中写。

高度计算更简单-根据特定的位,它可以表示为25或100英尺的倍数。

空降速度

TC = 19的数据帧。有趣的是,速度可以相对于地面(更精确,称为地面速度),而相对速度则由飞机的空气传感器测量(由于风而可能不那么精确)。 还传输许多其他不同的字段:


来源

结论


正如我们所看到的,当一个标准不仅对专业人士而且对于普通用户广泛使用时,ADS-B技术已经成为一种有趣的共生方式。 但是肯定的是,其中的关键作用是通过降低数字SDR接收器技术的价格来实现的,该技术允许在非常便宜的设备上接收频率高于千兆赫的信号。

当然,该标准本身具有更多数据。 有兴趣的人可以在ICAO页面上查看PDF或访问上面已经提到的mode-s.org网站 。 这篇文章不太可能真正被大多数读者使用,但是我希望,至少对于它的工作原理有一个大致的了解。

顺便说一下,ADS-B Python解码器已经存在,可以在github上进行研究 。 SDR接收器所有者还可以从此页面构建并运行即用型ADS-B解码器,并且(我将再次重复)在本文的第一部分中 ,我们还将详细介绍一些细节。

上面描述的解析器源代码在扰流器下提供。 这只是一个测试示例,它并不假冒生产质量,但总的来说,它可以工作,并且可以解析上面记录的WAV文件

源代码(Python)
 from __future__ import print_function from scipy.io import wavfile from scipy import signal import matplotlib.pyplot as plt import numpy as np import math import sys def parse_message(data, start, bit_len): max_len = bit_len*128 A = data[start:start + max_len] A = signal.resample(A, 10*max_len) bits = np.zeros(10*max_len) bit_len *= 10 start_data = bit_len*8 # Parse first 8 bits bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" df = int(bits_str[0:5], 2) # Aircraft address (db - https://junzis.com/adb/?q=3b1c5c ) bits_str = "" for p in range(8, 32): pos = start_data + bit_len * p p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" # print "Aircraft address:", bits_str, hex(int(bits_str, 2)) address = hex(int(bits_str, 2)) # Filter specific aircraft (optional) # if address != "0x3c5ee2": # return if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21: # print "Pos:", start, "DF:", msg_type # Data (56bit) bits_str = "" for p in range(32, 88): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" # bits[pos + bit_len / 2] = 50 elif avg1 > avg2: bits_str += "1" # http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html # print "Data:" # print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17] # Type Code: tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2) # print("DF:", df, "TC:", tc) # 1 - 4 Aircraft identification # 5 - 8 Surface position # 9 - 18 Airborne position (w/ Baro Altitude) # 19 Airborne velocities if tc >= 1 and tc <= 4: # and (df == 17 or df == 18): print("Aircraft address:", address) print("Data:") print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44]) symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) print() if tc == 11: print("Aircraft address:", address) print("Data: (11)") print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]) # Bit 22 contains the F flag which indicates which CPR format is used (odd or even) # First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd # f = bits_str[21:22] # print("F:", int(f, 2)) # Altitude alt1b = bits_str[8:20] if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) # lat_dec = int(bits_str[22:22+17], 2) # lon_dec = int(bits_str[39:39+17], 2) # print("Lat/Lon:", lat_dec, lon_dec) # http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf print() if tc == 19: print("Aircraft address:", address) print("Data:") # print(bits_str) print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65]) subtype = int(bits_str[5:8], 2) # https://mode-s.org/decode/adsb/airborne-velocity.html spd, hdg, rocd = -1, -1, -1 if subtype == 1 or subtype == 2: print("Velocity Subtype 1: Ground speed") v_ew_sign = int(bits_str[13], 2) v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity v_ns_sign = int(bits_str[24], 2) v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity v_we = -1*v_ew if v_ew_sign else v_ew v_sn = -1*v_ns if v_ns_sign else v_ns spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts hdg = math.atan2(v_we, v_sn) hdg = math.degrees(hdg) # convert to degrees hdg = hdg if hdg >= 0 else hdg + 360 # no negative val if subtype == 3: print("Subtype Subtype 3: Airspeed") hdg = int(bits_str[14:24], 2)/1024.0*360.0 spd = int(bits_str[25:35], 2) vr_sign = int(bits_str[36], 2) vr = int(bits_str[36:45], 2) rocd = -1*vr if vr_sign else vr # rate of climb/descend print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg) print() # print() def calc_coordinates(): def _cprN(lat, is_odd): nl = _cprNL(lat) - is_odd return nl if nl > 1 else 1 def _cprNL(lat): try: nz = 15 a = 1 - math.cos(math.pi / (2 * nz)) b = math.cos(math.pi / 180.0 * abs(lat)) ** 2 nl = 2 * math.pi / (math.acos(1 - a/b)) return int(math.floor(nl)) except: # happens when latitude is +/-90 degree return 1 def floor_(x): return int(math.floor(x)) lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001" lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111" lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2) lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2) # 131072 is 2^17, since CPR lat and lon are 17 bits each cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0 cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0 print(cprlat_even, cprlon_even) j = floor_(59*cprlat_even - 60*cprlat_odd) print(j) air_d_lat_even = 360.0 / 60 air_d_lat_odd = 360.0 / 59 # Lat lat_even = float(air_d_lat_even * (j % 60 + cprlat_even)) lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd)) if lat_even >= 270: lat_even = lat_even - 360 if lat_odd >= 270: lat_odd = lat_odd - 360 # Lon ni = _cprN(lat_even, 0) m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5) lon = (360.0 / ni) * (m % ni + cprlon_even) print("Lat", lat_even, "Lon", lon) # Altitude # Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft) # The value can represent altitudes from -1000 to +50175 ft. if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") T = 1/fs print("Sample rate %f MS/s" % (fs / 1e6)) print("Cnt samples %d" % len(data)) print("Duration: %fs" % (T * len(data))) data = data.astype(float) cnt = data.shape[0] # Processing only part on file (faster): # cnt = 10000000 # data = data[:cnt] print("Processing I/Q...") I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) bits = np.zeros(cnt) # To see scope without any processing, uncomment # plt.plot(A) # plt.show() # sys.exit(0) print("Extracting signals...") pos = 0 avg = 200 msg_start = 0 # Find beginning of each signal while pos < cnt - 16*1024: # P1 - message start while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000: msg_start = pos bits[pos] = 100 pos += 4 break pos += 1 start1, start2, start3, start4 = msg_start, 0, 0, 0 # P2 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start2 = pos bits[pos] = 90 pos += 1 break pos += 1 # P3 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start3 = pos bits[pos] = 80 pos += 1 break pos += 1 # P4 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start4 = pos bits[pos] = 70 pos += 1 break pos += 1 sig_diff = start4 - start1 if 20 < sig_diff < 25: bits[msg_start] = 500 bit_len = int((start4 - start1) / 4.5) # print(pos, start1, start4, ' - ', bit_len) # start = start1 + 8*bit_len parse_message(A, msg_start, bit_len) pos += 450 # For debugging: check signal start # plt.plot(A) # plt.plot(bits) # plt.show() 


我希望它有用,感谢您的阅读。

Source: https://habr.com/ru/post/zh-CN447078/


All Articles