Multitraitement et rapprochement des données de diverses sources

Bonjour, Habr!

Étant donné la variété des systèmes distribués, la disponibilité des informations vérifiées dans le stockage cible est un critère important pour la cohérence des données.

Il existe de nombreuses approches et méthodes à cet effet, et nous nous concentrerons sur la réconciliation, dont les aspects théoriques ont été abordés ici dans cet article. Je propose d'envisager la mise en œuvre pratique de ce système, évolutif et adapté à une grande quantité de données.

Comment implémenter ce cas sur le bon vieux Python - lisez-le sous la coupe! C'est parti!


(Source de l'image)

Présentation


Imaginons qu'une institution financière dispose de plusieurs systèmes distribués et nous sommes confrontés à la tâche de vérifier les transactions dans ces systèmes et de télécharger les données rapprochées vers le stockage cible.

En tant que source de données, prenez un grand fichier texte et une table dans une base de données PostgreSQL. Supposons que les données de ces sources aient les mêmes transactions, mais qu'elles peuvent avoir des différences, et qu'elles doivent donc être vérifiées et écrites dans les données vérifiées dans le stockage final pour analyse.

De plus, il est nécessaire de prévoir le lancement parallèle de plusieurs rapprochements sur la même base de données et d'adapter le système à un volume important en utilisant le multiprocessing.

Le module de multitraitement est idéal pour paralléliser les opérations en Python et, dans un sens, contourne certains défauts GIL. Nous utiliserons les capacités de cette bibliothèque ci-dessous.

Architecture du système en cours de développement



Composants utilisés:

  • Générateur de données aléatoires - un script Python qui génère un fichier CSV et remplit sur sa base une table dans une base de données;
  • Sources de données - fichier et table CSV dans la base de données PostgreSQL;
  • Adaptateurs - dans ce cas, nous utilisons deux adaptateurs qui extrairont les données de leurs sources (CSV ou base de données) et entreront des informations dans la base de données intermédiaire;
  • Bases de données - au nombre de trois: des données brutes, une base de données intermédiaire qui stocke les informations capturées par les adaptateurs et une base de données "propre" contenant des transactions rapprochées des deux sources.

Formation initiale


En tant qu'outil de stockage de données, nous utiliserons la base de données PostgreSQL dans le conteneur Docker et interagirons avec notre base de données via pgAdmin exécuté dans le conteneur :

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres 

Exécution de pgAdmin:

 docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4 

Une fois que tout a commencé, n'oubliez pas de spécifier dans le fichier de configuration (conf / db.ini) la chaîne de connexion à la base de données (pour un exemple de formation, vous pouvez!):

 [POSTGRESQL] db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user 

En principe, l'utilisation d'un conteneur est facultative et vous pouvez utiliser votre serveur de base de données.

Génération d'entrée


Le script Python generate_test_data est responsable de la génération des données de test, qui prend le nombre d'entrées à générer. La séquence des opérations peut être facilement tracée par la fonction principale de la classe GenerateTestData :

  @m.timing def run(self, num_rows): """ Run the process """ m.info('START!') self.create_db_schema() self.create_folder('data') self.create_csv_file(num_rows) self.bulk_copy_to_db() self.random_delete_rows() self.random_update_rows() m.info('END!') 

Ainsi, la fonction effectue les étapes suivantes:

  • Création de schémas dans la base de données (nous créons tous les schémas et tables principaux);
  • Création d'un dossier pour stocker un fichier de test;
  • Générer un fichier de test avec un nombre donné de lignes;
  • Insérer des données en bloc dans la table cible transaction_db_raw.transaction_log;
  • Suppression accidentelle de plusieurs lignes dans ce tableau;
  • Mise à jour aléatoire de plusieurs lignes de ce tableau.

La suppression et la modification sont nécessaires pour que les objets comparés présentent au moins une certaine différence. Il est important de pouvoir rechercher ces écarts!

 @m.timing @m.wrapper(m.entering, m.exiting) def random_delete_rows(self): """ Random deleting some rows from the table """ sql_command = sql.SQL(""" delete from {0}.{1} where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) 

La génération d'un ensemble de données de test et l'enregistrement ultérieur dans un fichier texte au format CSV est le suivant:

  • Un UID de transaction aléatoire est créé;
  • Un numéro de compte UID aléatoire est créé (par défaut, nous prenons dix comptes uniques, mais cette valeur peut être modifiée à l'aide du fichier de configuration en modifiant le paramètre "random_accounts");
  • Date de transaction - une date aléatoire à partir de la date spécifiée dans le fichier de configuration (initial_date);
  • Type de transaction (transaction / commission);
  • Montant de la transaction;
  • Le travail principal dans la génération de données est effectué par la méthode generate_test_data_by_chunk de la classe TestDataCreator :

 @m.timing def generate_test_data_by_chunk(self, chunk_start, chunk_end): """ Generating and saving to the file """ num_rows_mp = chunk_end - chunk_start new_rows = [] for _ in range(num_rows_mp): transaction_uid = uuid.uuid4() account_uid = choice(self.list_acc) transaction_date = (self.get_random_date(self.date_in, 0) .__next__() .strftime('%Y-%m-%d %H:%M:%S')) type_deal = choice(self.list_type_deal) transaction_amount = randint(-1000, 1000) new_rows.append([transaction_uid, account_uid, transaction_date, type_deal, transaction_amount]) self.write_in_file(new_rows, chunk_start, chunk_end) 

Une caractéristique de cette fonction est le lancement de plusieurs processus asynchrones parallélisés, chacun générant sa propre portion de 50 000 enregistrements. Cette "puce" vous permettra de créer un fichier sur plusieurs millions de lignes assez rapidement

 def run_csv_writing(self): """ Writing the test data into csv file """ pool = mp.Pool(mp.cpu_count()) jobs = [] for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows): jobs.append(pool.apply_async(self.generate_test_data_by_chunk, (chunk_start, chunk_end))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() 

Une fois le fichier texte terminé, la commande bulk_insert est traitée et toutes les données de ce fichier tombent dans la table transaction_db_raw.transaction_log.

De plus, les deux sources contiendront exactement les mêmes données et la réconciliation ne trouvera rien d'intéressant, nous supprimons et modifions donc plusieurs lignes aléatoires dans la base de données.

Exécutez le script et générez un fichier CSV de test avec des transactions sur 10 000 lignes:

 ./generate_test_data.py 10000 


La capture d'écran montre qu'un fichier de 10 000 lignes a été reçu, 10 000 ont été chargés dans la base de données, mais 112 lignes ont été supprimées de la base de données et 108 autres ont été modifiées. Résultat: le fichier et la table de la base de données diffèrent de 220 entrées.

"Eh bien, où est le multitraitement?", Demandez-vous.
Et son travail peut être vu lorsque vous générez un fichier plus volumineux, non pas par 10 000 enregistrements, mais, par exemple, par 1 Mo. Allons-nous essayer?

 ./generate_test_data.py 1000000 


Après avoir chargé les données, supprimé et modifié les enregistrements aléatoires, nous voyons les différences du fichier texte du tableau: 19 939 lignes (dont 10 022 ont été supprimées au hasard et 9 917 ont été modifiées).

L'image montre que la génération des enregistrements était asynchrone, incohérente. Cela signifie que le processus suivant peut commencer sans prendre en compte l'ordre de démarrage dès que le précédent est terminé. Il n'y a aucune garantie que le résultat sera dans le même ordre que l'entrée.

Est-ce vraiment plus rapide?
Un million de lignes ne se trouvant pas sur la machine virtuelle la plus rapide a été «inventée» en 15,5 secondes - et c'est une option valable. Après avoir démarré la même génération séquentiellement, sans utiliser le multi-traitement, j'ai obtenu le résultat: la génération de fichiers était plus de trois fois plus lente (plus de 52 secondes au lieu de 15,5):



Adaptateur pour CSV


Cet adaptateur hache la ligne, ne laissant que la première colonne, l'ID de transaction, inchangé et enregistre les données reçues dans le fichier data / transaction_hashed.csv . La dernière étape de son travail consiste à charger ce fichier à l'aide de la commande COPY dans la table temporaire du schéma reconciliation_db.

La lecture optimale des fichiers est effectuée par plusieurs processus parallèles. Nous lisons ligne par ligne, en morceaux de 5 mégaoctets chacun. Le chiffre "5 mégaoctets" a été obtenu par la méthode empirique. C'est avec cette taille d'un morceau de texte que nous avons pu obtenir le plus petit temps pour lire de gros fichiers sur notre machine virtuelle. Vous pouvez expérimenter sur votre environnement avec ce paramètre et voir comment la durée de fonctionnement va changer:

 @m.timing def process_wrapper(self, chunk_start, chunk_size): """ Read a particular chunk """ with open(self.file_name_raw, newline='\n') as file: file.seek(chunk_start) lines = file.read(chunk_size).splitlines() for line in lines: self.process(line) def chunkify(self, size=1024*1024*5): """ Return a new chunk """ with open(self.file_name_raw, 'rb') as file: chunk_end = file.tell() while True: chunk_start = chunk_end file.seek(size, 1) file.readline() chunk_end = file.tell() if chunk_end > self.file_end: chunk_end = self.file_end yield chunk_start, chunk_end - chunk_start break else: yield chunk_start, chunk_end - chunk_start @m.timing def run_reading(self): """ The main method for the reading """ # init objects pool = mp.Pool(mp.cpu_count()) jobs = [] m.info('Run csv reading...') # create jobs for chunk_start, chunk_size in self.chunkify(): jobs.append(pool.apply_async(self.process_wrapper, (chunk_start, chunk_size))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() m.info('CSV file reading has been completed') 

Exemple de lecture d'un fichier créé précédemment sur des enregistrements 1M:


La capture d'écran montre la création d'une table temporaire avec un nom unique pour le cycle de rapprochement en cours. Vient ensuite la lecture asynchrone du fichier en plusieurs parties et le hachage de chaque ligne. L'insertion de données de l'adaptateur dans la table cible termine le travail avec cet adaptateur.
L'utilisation d'une table temporaire avec un nom unique pour chaque processus de réconciliation vous permet en outre de paralléliser le processus de réconciliation dans une base de données.

Adaptateur pour PostgreSQL


L'adaptateur pour le traitement des données stockées dans la table fonctionne approximativement la même logique que l'adaptateur pour le fichier:

  • lire certaines parties du tableau (s'il est volumineux, plus de 100 000 entrées) et prendre un hachage pour toutes les colonnes à l'exception de l'identifiant de transaction;
  • puis il y a une insertion des données traitées dans la table reconciliation_db. stockage _ $ (int (time.time ()) .

Une caractéristique intéressante de cet adaptateur est qu'il utilise un pool de connexions à la base de données, qui recherchera par index les données nécessaires dans la table et les traitera.

En fonction de la taille du tableau, le nombre de processus nécessaires au traitement est calculé et au sein de chaque processus, il y a une division en 10 tâches.

 def read_data(self): """ Read the data from the postgres and shared those records with each processor to perform their operation using threads """ threads_array = self.get_threads(0, self.max_id_num_row, self.pid_max) for pid in range(1, len(threads_array) + 1): m.info('Process %s' % pid) # Getting connection from the connection pool select_conn = self._select_conn_pool.getconn() select_conn.autocommit = 1 # Creating 10 process to perform the operation process = Process(target=self.process_data, args=(self.data_queque, pid, threads_array[pid-1][0], threads_array[pid-1][1], select_conn)) process.daemon = True process.start() process.join() select_conn.close() 


Rechercher les écarts


Nous procédons à la vérification des données reçues de deux adaptateurs.

La réconciliation (ou la réception d'un rapport d'anomalie) se produit du côté serveur de la base de données, en utilisant toute la puissance du langage SQL.

La requête SQL est assez simple - c'est juste une jointure de table avec les données des adaptateurs à elle-même par ID de transaction:

 sql_command = sql.SQL(""" select s1.adapter_name, count(s1.transaction_uid) as tran_count from {0}.{1} s1 full join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table)) 

La sortie est un rapport:


Vérifiez si tout est correct dans l'image ci-dessus. Nous nous souvenons que 9917 ont été supprimés de la table dans la base de données et 10 022 lignes ont été modifiées. Total 19939 lignes, ce qui est évident dans le rapport.

Tableau récapitulatif


Il ne reste plus qu'à insérer dans la table de stockage des transactions «propres» qui correspondent à tous égards (par hachage) à différents adaptateurs. Ce processus est effectué par la requête SQL suivante:

 sql_command = sql.SQL(""" with reconcil_data as ( select s1.transaction_uid from {0}.{1} s1 join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table), sql.Identifier(self.schema_db_clean), sql.Identifier(self.schema_raw)) 

La table temporaire que nous avons utilisée comme stockage intermédiaire des données des adaptateurs peut être supprimée.


Conclusion


Au cours des travaux effectués, un système de réconciliation des données provenant de différentes sources a été développé: un fichier texte et un tableau dans la base de données. Utilisé un minimum d'outils supplémentaires.

Un lecteur averti peut peut-être remarquer que l'utilisation de frameworks tels qu'Apache Spark, associée à la conversion de données brutes au format parquet, peut accélérer considérablement ce processus, en particulier pour les gros volumes. Mais l'objectif principal de ce travail est d'écrire un système en Python nu et d'étudier le traitement de données multiprocessing. Avec ce que nous avons, à mon avis, traité.

Le code source de l'ensemble du projet se trouve dans mon référentiel sur GitHub , je vous suggère de vous familiariser avec lui.

Je serai heureux de répondre à toutes les questions et de prendre connaissance de vos commentaires.

Je vous souhaite du succès!

Source: https://habr.com/ru/post/fr480076/


All Articles