Flightradar24 - comment ça marche? Partie 2, Protocole ADS-B

Salut Habr. Tous ceux qui ont déjà rencontré ou accompagné des parents ou des amis dans un avion ont probablement utilisé le service gratuit Flightradar24. C'est un moyen très pratique de suivre la position de l'avion en temps réel.

image

La première partie décrit le principe de fonctionnement d'un tel service en ligne. Nous allons maintenant aller plus loin et découvrir quelles données sont transmises et reçues de l'avion à la station de réception, et les décoder indépendamment en utilisant Python.

L'histoire


De toute évidence, les données sur l'avion ne sont pas transmises afin que les utilisateurs puissent les voir sur leurs smartphones. Le système est appelé ADS-B (surveillance dépendante automatique - diffusion) et est utilisé pour transmettre automatiquement les informations de l'avion à un centre de contrôle - son identifiant, ses coordonnées, sa direction, sa vitesse, son altitude et d'autres données sont transmis. Auparavant, avant l'avènement de tels systèmes, le répartiteur ne pouvait voir qu'un point sur le radar. Ce n'était pas suffisant quand il y avait trop d'avions.

Techniquement, l'ADS-B consiste en un émetteur sur un avion qui envoie périodiquement des paquets d'informations à une fréquence assez élevée de 1090 MHz (il existe d'autres modes, mais ils ne nous intéressent pas tellement, car les coordonnées ne sont transmises qu'ici). Bien sûr, en plus de l'émetteur, il y a aussi un récepteur quelque part à l'aéroport, mais pour nous, comme pour les utilisateurs, notre propre récepteur est intéressant.

Soit dit en passant, à titre de comparaison, le premier système de ce type, Airnav Radarbox, conçu pour les utilisateurs ordinaires, est apparu en 2007 et coûte environ 900 $, environ 250 $ par an valant un abonnement aux services réseau.



Vous pouvez lire les commentaires de ces premiers propriétaires russes sur le forum radioscanner . Maintenant que les récepteurs RTL-SDR sont devenus largement disponibles, un appareil similaire peut être assemblé pour 30 $, plus à ce sujet dans la première partie . Nous allons procéder au protocole lui-même - voyons comment cela fonctionne.

Réception de signaux


Pour commencer, le signal doit être enregistré. Le signal entier a une durée de seulement 120 microsecondes, par conséquent, afin de démonter confortablement ses composants, un récepteur SDR avec une fréquence d'échantillonnage d'au moins 5 MHz est souhaitable.

image

Après l'enregistrement, nous obtenons un fichier WAV avec une fréquence d'échantillonnage de 5 000 000 d'échantillons / sec, 30 secondes d'un tel enregistrement "pesant" environ 500 Mo. L'écouter avec un lecteur multimédia est bien sûr inutile - le fichier ne contient pas de son, mais un signal radio directement numérisé - c'est ainsi que fonctionne la radio définie par logiciel.

Nous allons ouvrir et traiter le fichier en utilisant Python. Ceux qui souhaitent expérimenter par eux-mêmes peuvent télécharger l'échantillon d'enregistrement à partir du lien .

Téléchargez le fichier et voyez ce qu'il contient.

from scipy.io import wavfile import matplotlib.pyplot as plt import numpy as np fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") data = data.astype(float) I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) plt.plot(A) plt.show() 

Résultat: on voit des «impulsions» évidentes sur fond de bruit.



Chaque «impulsion» est un signal dont la structure est clairement visible si vous augmentez la résolution sur le graphique.



Comme vous pouvez le voir, l'image est conforme à ce qui est décrit dans la description ci-dessus. Vous pouvez commencer à traiter les données.

Décodage


Vous devez d'abord obtenir un bitstream. Le signal lui-même est codé en utilisant le codage manchester:



De la différence de niveaux dans les grignotages, il est facile d'obtenir de vrais «0» et «1».

  bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" 

La structure du signal lui-même est la suivante:



Examinons les champs plus en détail.

DF (Downlink Format, 5 bits) - définit le type de message. Il en existe plusieurs types:


( source du tableau )

Nous sommes uniquement intéressés par le type DF17, car il contient les coordonnées de l'avion.

L'OACI (24 bits) est un code d'avion international unique. Vous pouvez vérifier l'avion par son code sur le site Web (malheureusement, l'auteur a cessé de mettre à jour la base de données, mais elle est toujours pertinente). Par exemple, pour le code 3c5ee2, nous avons les informations suivantes:



Edit: dans le commentaire de l'article, la description du code OACI est donnée plus en détail, je vous recommande de la lire aux personnes intéressées.

DONNÉES (56 ou 112 bits) - en fait les données que nous allons décoder. Les 5 premiers bits de données sont le champ Code de type contenant le sous-type des données stockées (à ne pas confondre avec DF). Il existe plusieurs de ces types:


( source du tableau )

Regardons quelques exemples de packages.

Identification de l'aéronef

Exemple binaire:

00100 011 000101 010111 000111 110111 110001 111000

Champs de données:

 +------+------+------+------+------+------+------+------+------+------+ | TC,5 | EC,3 | C1,6 | C2,6 | C3,6 | C4,6 | C5,6 | C6,6 | C7,6 | C8,6 | +------+------+------+------+------+------+------+------+------+------+ 

TC = 00100b = 4, chaque caractère C1-C8 contient des codes correspondant aux indices de la chaîne:
#ABCDEFGHIJKLMNOPQRSTUVWXYZ ##### _ ################## 0123456789 ######

Après avoir décodé la ligne, il est facile d'obtenir le code d'avion: EWG7184

 symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) 

Position aéroportée

Si le nom est simple, les coordonnées sont plus compliquées. Ils sont transmis sous forme de trames 2x, paires et impaires. Code de champ TC = 01011b = 11.



Exemple de paquets pairs et impairs:

 01011 000 000101110110 00 10111000111001000 10000110101111001 01011 000 000110010000 01 10010011110000110 10000011110001000 

Le calcul des coordonnées s'effectue selon une formule assez astucieuse:


( source )

Je ne suis pas un spécialiste des SIG, donc je ne sais pas d'où il vient. Qui est au courant, écrivez dans les commentaires.

L'altitude est considérée comme plus facile - en fonction d'un bit particulier, elle peut apparaître sous la forme d'un multiple de 25 ou 100 pieds.

Vitesse aéroportée

Paquet avec TC = 19. Ce qui est intéressant ici, c'est que la vitesse peut être soit précise, par rapport au sol (Ground Speed), soit à l'air, telle que mesurée par un capteur d'avion (Airspeed). De nombreux champs différents sont également transmis:


( source )

Conclusion


Comme vous pouvez le voir, la technologie ADS-B est devenue une symbiose intéressante quand une norme est utile non seulement pour les professionnels, mais aussi pour les utilisateurs ordinaires. Mais bien sûr, le rôle clé à cet égard a été joué par la réduction du coût de la technologie des récepteurs SDR numériques, qui permet à l'appareil de recevoir des signaux avec une fréquence supérieure à gigahertz sur l'appareil littéralement «pour un sou».

Dans la norme elle-même, bien sûr, beaucoup plus que tout. Les personnes intéressées peuvent voir le PDF sur la page de l' OACI ou visiter le site déjà mentionné ci-dessus.

Il est peu probable que bon nombre des éléments ci-dessus soient utiles, mais au moins l'idée générale de la façon dont cela fonctionne, j'espère, demeure.

Soit dit en passant, un décodeur prêt à l'emploi en Python existe déjà, il peut être étudié ici . Et les propriétaires de récepteurs SDR peuvent assembler et exécuter le décodeur ADS-B fini à partir de la page , plus à ce sujet dans la première partie .

Le code source de l'analyseur décrit dans l'article est donné sous la coupe. Il s'agit d'un exemple de test qui ne prétend pas être de la production, mais quelque chose fonctionne, et vous pouvez analyser le fichier enregistré ci-dessus.
Code source (Python)
 from __future__ import print_function from scipy.io import wavfile from scipy import signal import matplotlib.pyplot as plt import numpy as np import math import sys def parse_message(data, start, bit_len): max_len = bit_len*128 A = data[start:start + max_len] A = signal.resample(A, 10*max_len) bits = np.zeros(10*max_len) bit_len *= 10 start_data = bit_len*8 # Parse first 8 bits bits_str = "" for p in range(8): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" df = int(bits_str[0:5], 2) # Aircraft address (db - https://junzis.com/adb/?q=3b1c5c ) bits_str = "" for p in range(8, 32): pos = start_data + bit_len * p p1, p2 = A[pos: pos + bit_len / 2], A[pos + bit_len / 2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" elif avg1 > avg2: bits_str += "1" # print "Aircraft address:", bits_str, hex(int(bits_str, 2)) address = hex(int(bits_str, 2)) # Filter specific aircraft (optional) # if address != "0x3c5ee2": # return if df == 16 or df == 17 or df == 18 or df == 19 or df == 20 or df == 21: # print "Pos:", start, "DF:", msg_type # Data (56bit) bits_str = "" for p in range(32, 88): pos = start_data + bit_len*p p1, p2 = A[pos: pos + bit_len/2], A[pos + bit_len/2: pos + bit_len] avg1, avg2 = np.average(p1), np.average(p2) if avg1 < avg2: bits_str += "0" # bits[pos + bit_len / 2] = 50 elif avg1 > avg2: bits_str += "1" # http://www.lll.lu/~edward/edward/adsb/DecodingADSBposition.html # print "Data:" # print bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17] # Type Code: tc, ec = int(bits_str[:5], 2), int(bits_str[5:8], 2) # print("DF:", df, "TC:", tc) # 1 - 4 Aircraft identification # 5 - 8 Surface position # 9 - 18 Airborne position (w/ Baro Altitude) # 19 Airborne velocities if tc >= 1 and tc <= 4: # and (df == 17 or df == 18): print("Aircraft address:", address) print("Data:") print(bits_str[:8], bits_str[8:14], bits_str[14:20], bits_str[20:26], bits_str[26:32], bits_str[32:38], bits_str[38:44]) symbols = "#ABCDEFGHIJKLMNOPQRSTUVWXYZ#####_###############0123456789######" code_str = "" for p in range(8): c = int(bits_str[8 + 6*p:8 + 6*(p + 1)], 2) code_str += symbols[c] print("Aircraft Identification:", code_str.replace('#', '')) print() if tc == 11: print("Aircraft address:", address) print("Data: (11)") print(bits_str[:8], bits_str[8:20], bits_str[20:22], bits_str[22:22+17], bits_str[39:39+17]) # Bit 22 contains the F flag which indicates which CPR format is used (odd or even) # First frame has F flag = 0 so is even and the second frame has F flag = 1 so odd # f = bits_str[21:22] # print("F:", int(f, 2)) # Altitude alt1b = bits_str[8:20] if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) # lat_dec = int(bits_str[22:22+17], 2) # lon_dec = int(bits_str[39:39+17], 2) # print("Lat/Lon:", lat_dec, lon_dec) # http://airmetar.main.jp/radio/ADS-B%20Decoding%20Guide.pdf print() if tc == 19: print("Aircraft address:", address) print("Data:") # print(bits_str) print(bits_str[:5], bits_str[5:8], bits_str[8:10], bits_str[10:13], bits_str[13] ,bits_str[14:24], bits_str[24], bits_str[25:35], bits_str[35:36], bits_str[36:65]) subtype = int(bits_str[5:8], 2) # https://mode-s.org/decode/adsb/airborne-velocity.html spd, hdg, rocd = -1, -1, -1 if subtype == 1 or subtype == 2: print("Velocity Subtype 1: Ground speed") v_ew_sign = int(bits_str[13], 2) v_ew = int(bits_str[14:24], 2) - 1 # east-west velocity v_ns_sign = int(bits_str[24], 2) v_ns = int(bits_str[25:35], 2) - 1 # north-south velocity v_we = -1*v_ew if v_ew_sign else v_ew v_sn = -1*v_ns if v_ns_sign else v_ns spd = math.sqrt(v_sn*v_sn + v_we*v_we) # unit in kts hdg = math.atan2(v_we, v_sn) hdg = math.degrees(hdg) # convert to degrees hdg = hdg if hdg >= 0 else hdg + 360 # no negative val if subtype == 3: print("Subtype Subtype 3: Airspeed") hdg = int(bits_str[14:24], 2)/1024.0*360.0 spd = int(bits_str[25:35], 2) vr_sign = int(bits_str[36], 2) vr = int(bits_str[36:45], 2) rocd = -1*vr if vr_sign else vr # rate of climb/descend print("Speed (kts):", spd, "Rate:", rocd, "Heading:", hdg) print() # print() def calc_coordinates(): def _cprN(lat, is_odd): nl = _cprNL(lat) - is_odd return nl if nl > 1 else 1 def _cprNL(lat): try: nz = 15 a = 1 - math.cos(math.pi / (2 * nz)) b = math.cos(math.pi / 180.0 * abs(lat)) ** 2 nl = 2 * math.pi / (math.acos(1 - a/b)) return int(math.floor(nl)) except: # happens when latitude is +/-90 degree return 1 def floor_(x): return int(math.floor(x)) lat1b, lon1b, alt1b = "10111000111010011", "10000110111111000", "000101111001" lat2b, lon2b, alt2b = "10010011101011100", "10000011000011011", "000101110111" lat1, lon1, alt1 = int(lat1b, 2), int(lon1b, 2), int(alt1b, 2) lat2, lon2, alt2 = int(lat2b, 2), int(lon2b, 2), int(alt2b, 2) # 131072 is 2^17, since CPR lat and lon are 17 bits each cprlat_even, cprlon_even = lat1/131072.0, lon1/131072.0 cprlat_odd, cprlon_odd = lat2/131072.0, lon2/131072.0 print(cprlat_even, cprlon_even) j = floor_(59*cprlat_even - 60*cprlat_odd) print(j) air_d_lat_even = 360.0 / 60 air_d_lat_odd = 360.0 / 59 # Lat lat_even = float(air_d_lat_even * (j % 60 + cprlat_even)) lat_odd = float(air_d_lat_odd * (j % 59 + cprlat_odd)) if lat_even >= 270: lat_even = lat_even - 360 if lat_odd >= 270: lat_odd = lat_odd - 360 # Lon ni = _cprN(lat_even, 0) m = floor_(cprlon_even * (_cprNL(lat_even)-1) - cprlon_odd * _cprNL(lat_even) + 0.5) lon = (360.0 / ni) * (m % ni + cprlon_even) print("Lat", lat_even, "Lon", lon) # Altitude # Q-bit (bit 48) indicates whether the altitude is encoded in multiples of 25 or 100 ft (0: 100 ft, 1: 25 ft) # The value can represent altitudes from -1000 to +50175 ft. if alt1b[-5] == '1': bits = alt1b[:-5] + alt1b[-4:] n = int(bits, 2) alt_ft = n*25 - 1000 print("Alt (ft)", alt_ft) fs, data = wavfile.read("adsb_20190311_191728Z_1090000kHz_RF.wav") T = 1/fs print("Sample rate %f MS/s" % (fs / 1e6)) print("Cnt samples %d" % len(data)) print("Duration: %fs" % (T * len(data))) data = data.astype(float) cnt = data.shape[0] # Processing only part on file (faster): # cnt = 10000000 # data = data[:cnt] print("Processing I/Q...") I, Q = data[:, 0], data[:, 1] A = np.sqrt(I*I + Q*Q) bits = np.zeros(cnt) # To see scope without any processing, uncomment # plt.plot(A) # plt.show() # sys.exit(0) print("Extracting signals...") pos = 0 avg = 200 msg_start = 0 # Find beginning of each signal while pos < cnt - 16*1024: # P1 - message start while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg and pos - msg_start > 1000: msg_start = pos bits[pos] = 100 pos += 4 break pos += 1 start1, start2, start3, start4 = msg_start, 0, 0, 0 # P2 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start2 = pos bits[pos] = 90 pos += 1 break pos += 1 # P3 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start3 = pos bits[pos] = 80 pos += 1 break pos += 1 # P4 while pos < cnt - 16*1024: if A[pos] < avg and A[pos+1] > avg: start4 = pos bits[pos] = 70 pos += 1 break pos += 1 sig_diff = start4 - start1 if 20 < sig_diff < 25: bits[msg_start] = 500 bit_len = int((start4 - start1) / 4.5) # print(pos, start1, start4, ' - ', bit_len) # start = start1 + 8*bit_len parse_message(A, msg_start, bit_len) pos += 450 # For debugging: check signal start # plt.plot(A) # plt.plot(bits) # plt.show() 


J'espère que quelqu'un était intéressé, merci pour votre attention.

Source: https://habr.com/ru/post/fr443498/


All Articles