Flightradar24: ¿cómo funciona? Parte 2, Protocolo ADS-B

Hola habr Probablemente, todos los que alguna vez conocieron o acompañaron a familiares o amigos en un avión utilizaron el servicio gratuito Flightradar24. Esta es una forma muy conveniente de rastrear la posición de la aeronave en tiempo real.

imagen

La primera parte describe el principio de funcionamiento de dicho servicio en línea. Ahora iremos más allá, y descubriremos qué datos se transmiten y reciben del avión a la estación receptora, y los decodificamos independientemente usando Python.

La historia


Obviamente, los datos sobre el avión no se transmiten para que los usuarios puedan verlos en sus teléfonos inteligentes. El sistema se denomina ADS - B (vigilancia dependiente automática - transmisión), y se utiliza para transmitir automáticamente información de la aeronave a un centro de control: se transmiten su identificador, coordenadas, dirección, velocidad, altitud y otros datos. Anteriormente, antes del advenimiento de tales sistemas, el despachador solo podía ver un punto en el radar. Esto no fue suficiente cuando había demasiados aviones.

Técnicamente, ADS-B consiste en un transmisor en una aeronave que envía periódicamente paquetes con información a una frecuencia bastante alta de 1090 MHz (hay otros modos, pero no son tan interesantes para nosotros, porque las coordenadas se transmiten solo aquí). Por supuesto, además del transmisor, también hay un receptor en algún lugar del aeropuerto, pero para nosotros, como para los usuarios, nuestro propio receptor es interesante.

Por cierto, en comparación, el primer sistema de este tipo, Airnav Radarbox, diseñado para usuarios comunes, apareció en 2007 y costó alrededor de $ 900, alrededor de $ 250 al año valía una suscripción a los servicios de red.



Puedes leer los comentarios de esos primeros propietarios rusos en el foro de radioscanner . Ahora que los receptores RTL-SDR se han vuelto ampliamente disponibles, se puede ensamblar un dispositivo similar por $ 30, más sobre esto en la primera parte . Procederemos al protocolo en sí, veamos cómo funciona.

Recepción de señales


Para comenzar, la señal necesita ser grabada. Toda la señal tiene una duración de solo 120 microsegundos, por lo tanto, para desarmar cómodamente sus componentes, es deseable un receptor SDR con una frecuencia de muestreo de al menos 5 MHz.

imagen

Después de la grabación, obtenemos un archivo WAV con una frecuencia de muestreo de 5,000,000 muestras / seg, 30 segundos de tal grabación "pesan" aproximadamente 500 MB. Escucharlo con un reproductor multimedia es, por supuesto, inútil: el archivo no contiene sonido, sino una señal de radio digitalizada directamente: así es como funciona la Radio definida por software.

Abriremos y procesaremos el archivo usando Python. Aquellos que deseen experimentar por su cuenta pueden descargar la grabación de muestra desde el enlace .

Descargue el archivo y vea lo que hay dentro.

from scipy.io import wavfile import matplotlib.pyplot as plt import numpy as np fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") data = data.astype(float) I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) plt.plot(A) plt.show() 

Resultado: vemos "impulsos" obvios en el contexto del ruido.



Cada "impulso" es una señal cuya estructura es claramente visible si aumenta la resolución en el gráfico.



Como puede ver, la imagen es consistente con lo que se describe en la descripción anterior. Puede comenzar a procesar los datos.

Decodificación


Primero necesitas obtener un flujo de bits. La señal en sí misma se codifica utilizando la codificación manchester:



A partir de la diferencia en los niveles de mordiscos, es fácil obtener "0" y "1" reales.

  bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" 

La estructura de la señal en sí es la siguiente:



Consideremos los campos con más detalle.

DF (Formato de enlace descendente, 5 bits): define el tipo de mensaje. Hay varios tipos de ellos:


( fuente de la tabla )

Solo nos interesa el tipo DF17, como Contiene las coordenadas de la aeronave.

La OACI (24 bits) es un código internacional único de aeronave. Puede verificar el avión por su código en el sitio web (desafortunadamente, el autor ha dejado de actualizar la base de datos, pero sigue siendo relevante). Por ejemplo, para el código 3c5ee2 tenemos la siguiente información:



Editar: en el comentario sobre el artículo, la descripción del código de la OACI se da con más detalle; le recomiendo que la lea a los interesados.

DATOS (56 o 112 bits): en realidad, los datos que decodificaremos. Los primeros 5 bits de datos son el campo Código de tipo que contiene el subtipo de los datos almacenados (que no debe confundirse con DF). Hay bastantes de estos tipos:


( fuente de la tabla )

Veamos algunos paquetes de muestra.

Identificación de la aeronave

Ejemplo binario:

00100 011 000101 010111 000111 110111 110001 111000

Campos de datos:

 +------+------+------+------+------+------+------+------+------+------+ | TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 | +------+------+------+------+------+------+------+------+------+------+ 

TC = 00100b = 4, cada carácter C1-C8 contiene códigos correspondientes a los índices en la cadena:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ ##### _ ################# 0123456789 ######

Una vez decodificada la línea, es fácil obtener el código del avión: EWG7184

 symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) 

Posición en el aire

Si el nombre es simple, entonces las coordenadas son más complicadas. Se transmiten en forma de tramas 2x, pares e impares. Código de campo TC = 01011b = 11.



Ejemplo de paquetes pares e impares:

 01011 000 000101110110 00 10111000111001000 10000110101111001 01011 000 000110010000 01 10010011110000110 10000011110001000 

El cálculo de las coordenadas se realiza de acuerdo con una fórmula bastante inteligente:


( fuente )

No soy un especialista en SIG, así que no sé de dónde viene. Quién sabe, escribe en los comentarios.

La altitud se considera más fácil: dependiendo de un bit en particular, puede aparecer como un múltiplo de 25 o 100 pies.

Velocidad en el aire

Paquete con TC = 19. Lo interesante aquí es que la velocidad puede ser precisa, en relación con el suelo (Velocidad de avance) o el aire, según lo medido por un sensor de avión (Velocidad aérea). También se transmiten muchos campos diferentes:


( fuente )

Conclusión


Como puede ver, la tecnología ADS-B se ha convertido en una simbiosis interesante cuando un estándar es útil no solo para profesionales, sino también para usuarios comunes. Pero, por supuesto, el papel clave en esto fue desempeñado por el abaratamiento de la tecnología de los receptores SDR digitales, que permite que el dispositivo reciba señales con una frecuencia superior a gigahercios, literalmente "por un centavo".

En el estándar mismo, por supuesto, mucho más que nada. Los interesados ​​pueden ver el PDF en la página de la OACI o visitar el sitio ya mencionado anteriormente.

Es poco probable que muchos de los anteriores sean útiles, pero al menos la idea general de cómo funciona esto, espero.

Por cierto, ya existe un decodificador listo para usar en Python, puede estudiarse aquí . Y los propietarios de los receptores SDR pueden ensamblar y ejecutar el decodificador ADS-B terminado desde la página , más sobre esto en la primera parte .

El código fuente del analizador descrito en el artículo se proporciona debajo del corte. Este es un ejemplo de prueba que no pretende ser producción, pero algo funciona en él, y puede analizar el archivo grabado anteriormente.
Código fuente (Python)
 from __future__ import print_function from scipy.io import wavfile from scipy import signal import matplotlib.pyplot as plt import numpy as np import math import sys def parse_message(data, start, bit_len): max_len = bit_len*128 A = data[start:start + max_len] A = signal.resample(A, 10*max_len) bits = np.zeros(10*max_len) bit_len *= 10 start_data = bit_len*8 # Parse first 8 bits bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" df = int(bits_str[0:5], 2) # Aircraft address (db - https://junzis.com/adb/?q=3b1c5c ) bits_str = "" for p in range(8, 32): pos = start_data + bit_len * p p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" # print "Aircraft address:", bits_str, hex(int(bits_str, 2)) address = hex(int(bits_str, 2)) # Filter specific aircraft (optional) # if address != "0x3c5ee2": # return if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21: # print "Pos:", start, "DF:", msg_type # Data (56bit) bits_str = "" for p in range(32, 88): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" # bits[pos + bit_len / 2] = 50 elif avg1 > avg2: bits_str += "1" # http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html # print "Data:" # print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17] # Type Code: tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2) # print("DF:", df, "TC:", tc) # 1 - 4 Aircraft identification # 5 - 8 Surface position # 9 - 18 Airborne position (w/ Baro Altitude) # 19 Airborne velocities if tc >= 1 and tc <= 4: # and (df == 17 or df == 18): print("Aircraft address:", address) print("Data:") print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44]) symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) print() if tc == 11: print("Aircraft address:", address) print("Data: (11)") print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]) # Bit 22 contains the F flag which indicates which CPR format is used (odd or even) # First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd # f = bits_str[21:22] # print("F:", int(f, 2)) # Altitude alt1b = bits_str[8:20] if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) # lat_dec = int(bits_str[22:22+17], 2) # lon_dec = int(bits_str[39:39+17], 2) # print("Lat/Lon:", lat_dec, lon_dec) # http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf print() if tc == 19: print("Aircraft address:", address) print("Data:") # print(bits_str) print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65]) subtype = int(bits_str[5:8], 2) # https://mode-s.org/decode/adsb/airborne-velocity.html spd, hdg, rocd = -1, -1, -1 if subtype == 1 or subtype == 2: print("Velocity Subtype 1: Ground speed") v_ew_sign = int(bits_str[13], 2) v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity v_ns_sign = int(bits_str[24], 2) v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity v_we = -1*v_ew if v_ew_sign else v_ew v_sn = -1*v_ns if v_ns_sign else v_ns spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts hdg = math.atan2(v_we, v_sn) hdg = math.degrees(hdg) # convert to degrees hdg = hdg if hdg >= 0 else hdg + 360 # no negative val if subtype == 3: print("Subtype Subtype 3: Airspeed") hdg = int(bits_str[14:24], 2)/1024.0*360.0 spd = int(bits_str[25:35], 2) vr_sign = int(bits_str[36], 2) vr = int(bits_str[36:45], 2) rocd = -1*vr if vr_sign else vr # rate of climb/descend print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg) print() # print() def calc_coordinates(): def _cprN(lat, is_odd): nl = _cprNL(lat) - is_odd return nl if nl > 1 else 1 def _cprNL(lat): try: nz = 15 a = 1 - math.cos(math.pi / (2 * nz)) b = math.cos(math.pi / 180.0 * abs(lat)) ** 2 nl = 2 * math.pi / (math.acos(1 - a/b)) return int(math.floor(nl)) except: # happens when latitude is +/-90 degree return 1 def floor_(x): return int(math.floor(x)) lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001" lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111" lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2) lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2) # 131072 is 2^17, since CPR lat and lon are 17 bits each cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0 cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0 print(cprlat_even, cprlon_even) j = floor_(59*cprlat_even - 60*cprlat_odd) print(j) air_d_lat_even = 360.0 / 60 air_d_lat_odd = 360.0 / 59 # Lat lat_even = float(air_d_lat_even * (j % 60 + cprlat_even)) lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd)) if lat_even >= 270: lat_even = lat_even - 360 if lat_odd >= 270: lat_odd = lat_odd - 360 # Lon ni = _cprN(lat_even, 0) m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5) lon = (360.0 / ni) * (m % ni + cprlon_even) print("Lat", lat_even, "Lon", lon) # Altitude # Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft) # The value can represent altitudes from -1000 to +50175 ft. if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") T = 1/fs print("Sample rate %f MS/s" % (fs / 1e6)) print("Cnt samples %d" % len(data)) print("Duration: %fs" % (T * len(data))) data = data.astype(float) cnt = data.shape[0] # Processing only part on file (faster): # cnt = 10000000 # data = data[:cnt] print("Processing I/Q...") I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) bits = np.zeros(cnt) # To see scope without any processing, uncomment # plt.plot(A) # plt.show() # sys.exit(0) print("Extracting signals...") pos = 0 avg = 200 msg_start = 0 # Find beginning of each signal while pos < cnt - 16*1024: # P1 - message start while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000: msg_start = pos bits[pos] = 100 pos += 4 break pos += 1 start1, start2, start3, start4 = msg_start, 0, 0, 0 # P2 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start2 = pos bits[pos] = 90 pos += 1 break pos += 1 # P3 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start3 = pos bits[pos] = 80 pos += 1 break pos += 1 # P4 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start4 = pos bits[pos] = 70 pos += 1 break pos += 1 sig_diff = start4 - start1 if 20 < sig_diff < 25: bits[msg_start] = 500 bit_len = int((start4 - start1) / 4.5) # print(pos, start1, start4, ' - ', bit_len) # start = start1 + 8*bit_len parse_message(A, msg_start, bit_len) pos += 450 # For debugging: check signal start # plt.plot(A) # plt.plot(bits) # plt.show() 


Espero que alguien esté interesado, gracias por su atención.

Source: https://habr.com/ru/post/443498/


All Articles