Flightradar24 - comment ça marche? Partie 2, protocole ADS-B

Je vais deviner et dire que tous ceux dont les amis ou la famille ont déjà pris l'avion ont utilisé Flightradar24 - un service gratuit et pratique pour suivre les vols en temps réel.

image

Dans la première partie, les idées de fonctionnement de base ont été décrites. Allons maintenant plus loin et découvrons quelles données transmettent et reçoivent exactement entre l'avion et une station au sol. Nous allons également décoder ces données en utilisant Python.

L'histoire


Il devrait être évident que les données des avions ne sont pas destinées uniquement aux utilisateurs, qui souhaitent les voir sur leurs smartphones. Ce système est appelé ADS-B (surveillance dépendante automatique - diffusion) et a été conçu pour la transmission automatique des données d'informations de l'avion au centre de contrôle - différents paramètres, tels que les coordonnées, la vitesse, le cap, l'altitude et d'autres, sont envoyés. Auparavant, le répartiteur ne pouvait voir qu'un point sur l'écran radar. Et cela est devenu définitivement insuffisant lorsque le nombre d'avions a considérablement augmenté.

Techniquement, l'ADS-B se compose d'un émetteur à l'intérieur de l'avion, qui envoie périodiquement des trames de données d'information à une fréquence relativement élevée de 1090 MHz (il existe d'autres modes, mais ils ne nous intéressent pas tellement, car les coordonnées ne sont transmises qu'ici) ) Bien sûr, il y a aussi un récepteur quelque part à l'aéroport, mais pour nous, comme pour les utilisateurs, notre propre récepteur est plus intéressant.

À titre de comparaison, le premier système de ce type conçu pour les utilisateurs ordinaires, l'Airnav Radarbox, est apparu en 2007 et coûte environ 900 $, et environ 250 $ par an coûte un abonnement à leurs services réseau (l'idée principale de ce système est de collecter et partager les données de nombreux récepteurs , un récepteur autonome est relativement inutile).



Maintenant, lorsque les récepteurs RTL-SDR sont devenus beaucoup plus disponibles, un appareil similaire peut être fabriqué pour 30 $. Il se trouve dans la première partie de l'article , et nous irons plus loin et décrirons le protocole lui-même - voyons comment cela fonctionne.

Recevoir un signal


Tout d'abord, nous devons enregistrer un échantillon de signal. L'ensemble du signal n'a qu'une longueur de 120 microsecondes, et pour voir ses détails dans une bonne "résolution", il est préférable d'avoir une radio SDR avec une fréquence d'échantillonnage d'au moins 5 MHz.

image

Après l'enregistrement, nous obtenons un fichier WAV avec un taux d'échantillonnage de 5 000 000 d'échantillons / s, 30 secondes d'un tel enregistrement ont une taille d'environ 500 Mo. Bien sûr, il est inutile de l'écouter avec votre lecteur multimédia préféré - le fichier ne contient pas de son, mais un signal radio directement numérisé lui-même - c'est exactement ainsi que fonctionne la radio définie par logiciel.

Nous pouvons ouvrir et traiter ce fichier avec Python. Ceux qui souhaitent faire l'expérience par eux-mêmes peuvent télécharger un échantillon à partir de ce lien .

Permet d'ouvrir un fichier et de voir ce qu'il contient.

from scipy.io import wavfile import matplotlib.pyplot as plt import numpy as np fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") data = data.astype(float) I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) plt.plot(A) plt.show() 

Résultat: on voit des «impulsions» sur le bruit.



Chaque "impulsion" est un signal, dont la structure est clairement visible si l'on augmente la résolution sur le graphique.



Comme nous pouvons le voir, l'image est entièrement cohérente avec sa description ci-dessus. Nous pouvons maintenant traiter ces données.

Décodage


Tout d'abord, nous devons avoir un flux de bits. Le signal lui-même est codé avec un codage manchester:



Grâce aux différences de demi-bouchée, nous pouvons facilement obtenir de vrais «0» et «1».

  bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" 

La structure du signal lui-même ressemble à ceci:



Permet de voir les champs plus en détail.

DF (Downlink Format, 5 bits) - définit le type de message. Il en existe plusieurs types:


( source de la page )

Nous ne sommes intéressés que par le type DF17, car seul celui-ci contient les coordonnées de l'avion.

OACI (24 bits) - est un code d'avion international unique. Nous pouvons vérifier l'avion par son code sur ce site (malheureusement, l'auteur a cessé de mettre à jour la base de données, mais elle est toujours pertinente). Par exemple, pour le code 3c5ee2, nous pouvons avoir les informations suivantes:



DONNÉES (56 ou 112 bits) - sont les données elles-mêmes, que nous allons décoder. Les 5 premiers bits de données sont le champ Code de type , qui contient le sous-type des données stockées (à ne pas mélanger avec le champ DF ). Il existe de nombreux types de ce type:


( source du tableau )

Regardons quelques exemples.

Identification de l'aéronef

Un exemple sous forme binaire:

00100 011 000101 010111 000111 110111 110001 111000

Champs de données:

 +------+------+------+------+------+------+------+------+------+------+ | TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 | +------+------+------+------+------+------+------+------+------+------+ 

TC = 00100b = 4, et chaque symbole C1-C8 contient des codes, qui doivent correspondre aux index de cette chaîne:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ ##### _ ################# 0123456789 ######

Après avoir décodé, il est facile d'obtenir le nom de l'avion: EWG7184

 symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) 

Position aéroportée

Le décodage du nom était simple, mais les coordonnées sont plus compliquées. Ils sont transmis sous forme de 2 trames, paires et impaires. Le code de champ TC = 01011b = 11.



Exemple de trames de données paires et impaires:

 01011 000 000101110110 00 10111000111001000 10000110101111001 01011 000 000110010000 01 10010011110000110 10000011110001000 

Le calcul des coordonnées lui-même utilise une formule un peu délicate:


( source )

Je ne suis pas un expert en SIG, donc je ne sais pas d'où il vient. Qui le sait mieux, veuillez écrire dans les commentaires.

Le calcul de l'altitude est plus simple - en fonction d'un bit spécifique, il peut être représenté comme un multiple de 25 ou 100 pieds.

Vitesse aéroportée

Dataframe avec TC = 19. Ce qui est intéressant ici, c'est que la vitesse peut être relative au sol (plus précise, appelée Ground Speed) et à la vitesse, mesurée par le capteur d'air de l'avion (peut être moins précise à cause du vent). De nombreux autres domaines différents sont également transmis:


( source )

Conclusion


Comme nous pouvons le voir, la technologie ADS-B est devenue une symbiose intéressante, lorsqu'un standard est largement utilisable non seulement pour les professionnels, mais aussi pour les utilisateurs ordinaires. Mais définitivement, le rôle clé dans cela a été fait par la réduction du coût de la technologie des récepteurs SDR numériques, qui permet de recevoir des signaux avec une fréquence supérieure à gigahertz sur un appareil très bon marché.

La norme elle-même, bien sûr, contient beaucoup plus de données. Les personnes intéressées peuvent consulter le PDF sur la page OACI ou visiter le site Web mode-s.org déjà mentionné ci-dessus. Il est peu probable que cet article soit réellement utilisé par la plupart des lecteurs, mais j'espère qu'au moins l'idée générale de son fonctionnement est désormais plus claire.

Par ailleurs, le décodeur ADS-B Python existe déjà, il peut être étudié sur le github . Les propriétaires de récepteurs SDR peuvent également créer et exécuter un décodeur ADS-B prêt à l'emploi à partir de cette page , et (je le répète) quelques détails que nous avons également dans la première partie de cet article.

Le code source de l'analyseur, décrit ci-dessus, est donné sous le spoiler. Ce n'est qu'un exemple de test qui ne prétend pas à la qualité de la production, mais en général cela fonctionne, et il peut analyser le fichier WAV , enregistré ci-dessus.

Code source (Python)
 from __future__ import print_function from scipy.io import wavfile from scipy import signal import matplotlib.pyplot as plt import numpy as np import math import sys def parse_message(data, start, bit_len): max_len = bit_len*128 A = data[start:start + max_len] A = signal.resample(A, 10*max_len) bits = np.zeros(10*max_len) bit_len *= 10 start_data = bit_len*8 # Parse first 8 bits bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" df = int(bits_str[0:5], 2) # Aircraft address (db - https://junzis.com/adb/?q=3b1c5c ) bits_str = "" for p in range(8, 32): pos = start_data + bit_len * p p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" # print "Aircraft address:", bits_str, hex(int(bits_str, 2)) address = hex(int(bits_str, 2)) # Filter specific aircraft (optional) # if address != "0x3c5ee2": # return if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21: # print "Pos:", start, "DF:", msg_type # Data (56bit) bits_str = "" for p in range(32, 88): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" # bits[pos + bit_len / 2] = 50 elif avg1 > avg2: bits_str += "1" # http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html # print "Data:" # print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17] # Type Code: tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2) # print("DF:", df, "TC:", tc) # 1 - 4 Aircraft identification # 5 - 8 Surface position # 9 - 18 Airborne position (w/ Baro Altitude) # 19 Airborne velocities if tc >= 1 and tc <= 4: # and (df == 17 or df == 18): print("Aircraft address:", address) print("Data:") print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44]) symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) print() if tc == 11: print("Aircraft address:", address) print("Data: (11)") print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]) # Bit 22 contains the F flag which indicates which CPR format is used (odd or even) # First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd # f = bits_str[21:22] # print("F:", int(f, 2)) # Altitude alt1b = bits_str[8:20] if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) # lat_dec = int(bits_str[22:22+17], 2) # lon_dec = int(bits_str[39:39+17], 2) # print("Lat/Lon:", lat_dec, lon_dec) # http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf print() if tc == 19: print("Aircraft address:", address) print("Data:") # print(bits_str) print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65]) subtype = int(bits_str[5:8], 2) # https://mode-s.org/decode/adsb/airborne-velocity.html spd, hdg, rocd = -1, -1, -1 if subtype == 1 or subtype == 2: print("Velocity Subtype 1: Ground speed") v_ew_sign = int(bits_str[13], 2) v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity v_ns_sign = int(bits_str[24], 2) v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity v_we = -1*v_ew if v_ew_sign else v_ew v_sn = -1*v_ns if v_ns_sign else v_ns spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts hdg = math.atan2(v_we, v_sn) hdg = math.degrees(hdg) # convert to degrees hdg = hdg if hdg >= 0 else hdg + 360 # no negative val if subtype == 3: print("Subtype Subtype 3: Airspeed") hdg = int(bits_str[14:24], 2)/1024.0*360.0 spd = int(bits_str[25:35], 2) vr_sign = int(bits_str[36], 2) vr = int(bits_str[36:45], 2) rocd = -1*vr if vr_sign else vr # rate of climb/descend print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg) print() # print() def calc_coordinates(): def _cprN(lat, is_odd): nl = _cprNL(lat) - is_odd return nl if nl > 1 else 1 def _cprNL(lat): try: nz = 15 a = 1 - math.cos(math.pi / (2 * nz)) b = math.cos(math.pi / 180.0 * abs(lat)) ** 2 nl = 2 * math.pi / (math.acos(1 - a/b)) return int(math.floor(nl)) except: # happens when latitude is +/-90 degree return 1 def floor_(x): return int(math.floor(x)) lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001" lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111" lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2) lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2) # 131072 is 2^17, since CPR lat and lon are 17 bits each cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0 cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0 print(cprlat_even, cprlon_even) j = floor_(59*cprlat_even - 60*cprlat_odd) print(j) air_d_lat_even = 360.0 / 60 air_d_lat_odd = 360.0 / 59 # Lat lat_even = float(air_d_lat_even * (j % 60 + cprlat_even)) lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd)) if lat_even >= 270: lat_even = lat_even - 360 if lat_odd >= 270: lat_odd = lat_odd - 360 # Lon ni = _cprN(lat_even, 0) m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5) lon = (360.0 / ni) * (m % ni + cprlon_even) print("Lat", lat_even, "Lon", lon) # Altitude # Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft) # The value can represent altitudes from -1000 to +50175 ft. if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") T = 1/fs print("Sample rate %f MS/s" % (fs / 1e6)) print("Cnt samples %d" % len(data)) print("Duration: %fs" % (T * len(data))) data = data.astype(float) cnt = data.shape[0] # Processing only part on file (faster): # cnt = 10000000 # data = data[:cnt] print("Processing I/Q...") I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) bits = np.zeros(cnt) # To see scope without any processing, uncomment # plt.plot(A) # plt.show() # sys.exit(0) print("Extracting signals...") pos = 0 avg = 200 msg_start = 0 # Find beginning of each signal while pos < cnt - 16*1024: # P1 - message start while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000: msg_start = pos bits[pos] = 100 pos += 4 break pos += 1 start1, start2, start3, start4 = msg_start, 0, 0, 0 # P2 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start2 = pos bits[pos] = 90 pos += 1 break pos += 1 # P3 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start3 = pos bits[pos] = 80 pos += 1 break pos += 1 # P4 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start4 = pos bits[pos] = 70 pos += 1 break pos += 1 sig_diff = start4 - start1 if 20 < sig_diff < 25: bits[msg_start] = 500 bit_len = int((start4 - start1) / 4.5) # print(pos, start1, start4, ' - ', bit_len) # start = start1 + 8*bit_len parse_message(A, msg_start, bit_len) pos += 450 # For debugging: check signal start # plt.plot(A) # plt.plot(bits) # plt.show() 


J'espère que cela a été utile, merci d'avoir lu.

Source: https://habr.com/ru/post/fr447078/


All Articles