Flightradar24: ¿cómo funciona? Parte 2, protocolo ADS-B

Voy a adivinar y decir que todas las personas cuyos amigos o familiares han volado en un avión, han usado Flightradar24, un servicio gratuito y conveniente para rastrear vuelos en tiempo real.

imagen

En la primera parte se describieron las ideas básicas de operación. Ahora vamos más allá y descubramos qué datos se transmiten y reciben exactamente entre el avión y una estación terrestre. También decodificaremos estos datos usando Python.

Historia


Debería ser obvio que los datos de los aviones no están destinados solo a los usuarios, que desean verlos en sus teléfonos inteligentes. Este sistema se llama ADS - B (vigilancia dependiente automática - transmisión), y fue diseñado para la transmisión automática de los datos de información de la aeronave al centro de control: se envían diferentes parámetros, como coordenadas, velocidad, rumbo, altitud y otros. Anteriormente, el despachador solo podía ver un punto en la pantalla del radar. Y definitivamente no fue suficiente cuando el número de aviones aumentó drásticamente.

Técnicamente, ADS-B consiste en un transmisor dentro de la aeronave, que envía periódicamente cuadros de datos de información a una frecuencia relativamente alta de 1090 MHz (hay algunos otros modos, pero no son tan interesantes para nosotros, porque las coordenadas se transmiten solo aquí ) Por supuesto, también hay un receptor en algún lugar del aeropuerto, pero para nosotros, como para los usuarios, nuestro propio receptor es más interesante.

Solo para comparar, el primer sistema diseñado para usuarios comunes, el Airnav Radarbox, apareció en 2007 y costó alrededor de $ 900, y alrededor de $ 250 por año costaba una suscripción a sus servicios de red (la idea principal de este sistema es recopilar y compartir datos de muchos receptores , un receptor independiente es relativamente inútil).



Ahora, cuando los receptores RTL-SDR están mucho más disponibles, se puede hacer un dispositivo similar por $ 30. Se puede encontrar en la primera parte del artículo , e iremos más allá y describiremos el protocolo en sí, veamos cómo funciona.

Recibiendo una señal


Primero, necesitamos registrar una muestra de señal. Toda la señal tiene solo 120 microsegundos de longitud, y para ver sus detalles en una buena "resolución", es mejor tener radio SDR con una frecuencia de muestreo de al menos 5 MHz.

imagen

Después de la grabación, estamos obteniendo un archivo WAV con una tasa de muestreo de 5,000,000 muestras / seg, 30 segundos de tal registro tiene un tamaño de aproximadamente 500MB. Por supuesto, es inútil escucharlo con su reproductor multimedia favorito: el archivo no contiene sonido, sino una señal de radio digitalizada directamente, así es exactamente cómo funciona la Radio definida por software.

Podemos abrir y procesar este archivo con Python. Aquellos que deseen realizar el experimento por su cuenta, pueden descargar una muestra desde este enlace .

Vamos a abrir un archivo y ver qué hay dentro.

from scipy.io import wavfile import matplotlib.pyplot as plt import numpy as np fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") data = data.astype(float) I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) plt.plot(A) plt.show() 

Resultado: vemos algunos 'impulsos' sobre el ruido.



Cada "impulso" es una señal, cuya estructura es claramente visible si aumentamos la resolución en el gráfico.



Como podemos ver, la imagen es totalmente coherente con su descripción anterior. Ahora podemos procesar estos datos.

Decodificación


Primero, necesitamos tener un poco de flujo. La señal en sí está codificada con una codificación de Manchester:



De las diferencias de mordida podemos obtener fácilmente "0" y "1" reales.

  bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" 

La estructura de la señal en sí se ve así:



Veamos los campos con más detalle.

DF (Formato de enlace descendente, 5 bits): define el tipo de mensaje. Hay varios tipos de ellos:


( fuente de la página )

Solo estamos interesados ​​en el tipo DF17, porque solo este contiene las coordenadas del avión.

OACI (24 bits): es un código internacional único de aeronave. Podemos verificar el avión por su código en este sitio web (desafortunadamente, el autor ha dejado de actualizar la base de datos, pero aún es relevante). Por ejemplo, para el código 3c5ee2 podemos tener la siguiente información:



DATOS (56 o 112 bits): son los datos en sí, que decodificaremos. Los primeros 5 bits de datos son el campo Código de tipo , que contiene el subtipo de los datos que se almacenan (no se mezclarán con el campo DF ). Hay muchos tipos de este tipo:


( fuente de la tabla )

Veamos algunos ejemplos.

Identificación de la aeronave

Un ejemplo en forma binaria:

00100 011 000101 010111 000111 110111 110001 111000

Campos de datos:

 +------+------+------+------+------+------+------+------+------+------+ | TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 | +------+------+------+------+------+------+------+------+------+------+ 

TC = 00100b = 4, y cada símbolo C1-C8 contiene códigos que deben coincidir con los índices en esta cadena:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ ##### _ ################# 0123456789 ######

Después de decodificar, es fácil obtener el nombre del avión: EWG7184

 symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) 

Posición en el aire

La decodificación del nombre era simple, pero las coordenadas son más complicadas. Se transmiten en forma de 2 cuadros, pares e impares. El código de campo TC = 01011b = 11.



Ejemplo de marcos de datos pares e impares:

 01011 000 000101110110 00 10111000111001000 10000110101111001 01011 000 000110010000 01 10010011110000110 10000011110001000 

El cálculo de las coordenadas en sí usa una fórmula un poco complicada:


( fuente )

No soy un experto en SIG, así que no sé de dónde viene. Quién lo sabe mejor, por favor escriba en los comentarios.

El cálculo de altitud es más simple: dependiendo de un bit específico, se puede representar como un múltiplo de 25 o 100 pies.

Velocidad en el aire

Marco de datos con TC = 19. Lo interesante aquí es que la velocidad puede ser relativa al suelo (más precisa, llamada Velocidad de avance) y Velocidad aérea, medida por el sensor de aire de la aeronave (puede ser menos precisa debido al viento). También se transmiten muchos otros campos diferentes:


( fuente )

Conclusión


Como podemos ver, la tecnología ADS-B se ha convertido en una simbiosis interesante, cuando un estándar es ampliamente utilizable no solo para profesionales, sino también para usuarios comunes. Pero definitivamente, el papel clave en esto fue hecho por el abaratamiento de la tecnología de los receptores digitales SDR, que permite recibir señales con una frecuencia superior a gigahercios en un dispositivo muy barato.

El estándar en sí, por supuesto, tiene muchos más datos. Los interesados ​​pueden ver el PDF en la página de la OACI o visitar el sitio web mode-s.org ya mencionado anteriormente. Es poco probable que este artículo sea realmente utilizado por la mayoría de los lectores, pero espero que, al menos la idea general de cómo funciona, ahora sea más clara.

Por cierto, el decodificador Python ADS-B ya existe, puede investigarse en el github . Los propietarios de receptores SDR también pueden construir y ejecutar un decodificador ADS-B listo para usar desde esta página , y (repetiré de nuevo) algunos detalles que también en la primera parte de este artículo.

El código fuente del analizador, descrito anteriormente, se proporciona bajo el spoiler. Este es solo un ejemplo de prueba que no pretende la calidad de producción, pero en general funciona, y puede analizar el archivo WAV , registrado anteriormente.

Código fuente (Python)
 from __future__ import print_function from scipy.io import wavfile from scipy import signal import matplotlib.pyplot as plt import numpy as np import math import sys def parse_message(data, start, bit_len): max_len = bit_len*128 A = data[start:start + max_len] A = signal.resample(A, 10*max_len) bits = np.zeros(10*max_len) bit_len *= 10 start_data = bit_len*8 # Parse first 8 bits bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" df = int(bits_str[0:5], 2) # Aircraft address (db - https://junzis.com/adb/?q=3b1c5c ) bits_str = "" for p in range(8, 32): pos = start_data + bit_len * p p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" # print "Aircraft address:", bits_str, hex(int(bits_str, 2)) address = hex(int(bits_str, 2)) # Filter specific aircraft (optional) # if address != "0x3c5ee2": # return if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21: # print "Pos:", start, "DF:", msg_type # Data (56bit) bits_str = "" for p in range(32, 88): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" # bits[pos + bit_len / 2] = 50 elif avg1 > avg2: bits_str += "1" # http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html # print "Data:" # print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17] # Type Code: tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2) # print("DF:", df, "TC:", tc) # 1 - 4 Aircraft identification # 5 - 8 Surface position # 9 - 18 Airborne position (w/ Baro Altitude) # 19 Airborne velocities if tc >= 1 and tc <= 4: # and (df == 17 or df == 18): print("Aircraft address:", address) print("Data:") print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44]) symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) print() if tc == 11: print("Aircraft address:", address) print("Data: (11)") print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]) # Bit 22 contains the F flag which indicates which CPR format is used (odd or even) # First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd # f = bits_str[21:22] # print("F:", int(f, 2)) # Altitude alt1b = bits_str[8:20] if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) # lat_dec = int(bits_str[22:22+17], 2) # lon_dec = int(bits_str[39:39+17], 2) # print("Lat/Lon:", lat_dec, lon_dec) # http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf print() if tc == 19: print("Aircraft address:", address) print("Data:") # print(bits_str) print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65]) subtype = int(bits_str[5:8], 2) # https://mode-s.org/decode/adsb/airborne-velocity.html spd, hdg, rocd = -1, -1, -1 if subtype == 1 or subtype == 2: print("Velocity Subtype 1: Ground speed") v_ew_sign = int(bits_str[13], 2) v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity v_ns_sign = int(bits_str[24], 2) v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity v_we = -1*v_ew if v_ew_sign else v_ew v_sn = -1*v_ns if v_ns_sign else v_ns spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts hdg = math.atan2(v_we, v_sn) hdg = math.degrees(hdg) # convert to degrees hdg = hdg if hdg >= 0 else hdg + 360 # no negative val if subtype == 3: print("Subtype Subtype 3: Airspeed") hdg = int(bits_str[14:24], 2)/1024.0*360.0 spd = int(bits_str[25:35], 2) vr_sign = int(bits_str[36], 2) vr = int(bits_str[36:45], 2) rocd = -1*vr if vr_sign else vr # rate of climb/descend print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg) print() # print() def calc_coordinates(): def _cprN(lat, is_odd): nl = _cprNL(lat) - is_odd return nl if nl > 1 else 1 def _cprNL(lat): try: nz = 15 a = 1 - math.cos(math.pi / (2 * nz)) b = math.cos(math.pi / 180.0 * abs(lat)) ** 2 nl = 2 * math.pi / (math.acos(1 - a/b)) return int(math.floor(nl)) except: # happens when latitude is +/-90 degree return 1 def floor_(x): return int(math.floor(x)) lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001" lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111" lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2) lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2) # 131072 is 2^17, since CPR lat and lon are 17 bits each cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0 cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0 print(cprlat_even, cprlon_even) j = floor_(59*cprlat_even - 60*cprlat_odd) print(j) air_d_lat_even = 360.0 / 60 air_d_lat_odd = 360.0 / 59 # Lat lat_even = float(air_d_lat_even * (j % 60 + cprlat_even)) lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd)) if lat_even >= 270: lat_even = lat_even - 360 if lat_odd >= 270: lat_odd = lat_odd - 360 # Lon ni = _cprN(lat_even, 0) m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5) lon = (360.0 / ni) * (m % ni + cprlon_even) print("Lat", lat_even, "Lon", lon) # Altitude # Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft) # The value can represent altitudes from -1000 to +50175 ft. if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") T = 1/fs print("Sample rate %f MS/s" % (fs / 1e6)) print("Cnt samples %d" % len(data)) print("Duration: %fs" % (T * len(data))) data = data.astype(float) cnt = data.shape[0] # Processing only part on file (faster): # cnt = 10000000 # data = data[:cnt] print("Processing I/Q...") I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) bits = np.zeros(cnt) # To see scope without any processing, uncomment # plt.plot(A) # plt.show() # sys.exit(0) print("Extracting signals...") pos = 0 avg = 200 msg_start = 0 # Find beginning of each signal while pos < cnt - 16*1024: # P1 - message start while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000: msg_start = pos bits[pos] = 100 pos += 4 break pos += 1 start1, start2, start3, start4 = msg_start, 0, 0, 0 # P2 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start2 = pos bits[pos] = 90 pos += 1 break pos += 1 # P3 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start3 = pos bits[pos] = 80 pos += 1 break pos += 1 # P4 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start4 = pos bits[pos] = 70 pos += 1 break pos += 1 sig_diff = start4 - start1 if 20 < sig_diff < 25: bits[msg_start] = 500 bit_len = int((start4 - start1) / 4.5) # print(pos, start1, start4, ' - ', bit_len) # start = start1 + 8*bit_len parse_message(A, msg_start, bit_len) pos += 450 # For debugging: check signal start # plt.plot(A) # plt.plot(bits) # plt.show() 


Espero que haya sido útil, gracias por leer.

Source: https://habr.com/ru/post/447078/


All Articles