Multiprocessing und Abgleich von Daten aus verschiedenen Quellen

Hallo habr

Angesichts der Vielfalt der verteilten Systeme ist die Verfügbarkeit verifizierter Informationen im Zielspeicher ein wichtiges Kriterium für die Datenkonsistenz.

Zu diesem Zweck gibt es viele Ansätze und Methoden, und wir werden uns auf die Versöhnung konzentrieren, deren theoretische Aspekte hier in diesem Artikel erörtert wurden. Ich schlage vor, die praktische Implementierung dieses Systems in Betracht zu ziehen, das skalierbar und an eine große Datenmenge angepasst ist.

Wie Sie diesen Fall auf dem guten alten Python implementieren - lesen Sie es unter dem Schnitt! Lass uns gehen!


(Bildquelle)

Einleitung


Stellen wir uns vor, ein Finanzinstitut verfügt über mehrere verteilte Systeme und wir stehen vor der Aufgabe, Transaktionen in diesen Systemen zu überprüfen und die abgeglichenen Daten in den Zielspeicher hochzuladen.

Nehmen Sie als Datenquelle eine große Textdatei und eine Tabelle in eine PostgreSQL-Datenbank. Angenommen, die Daten in diesen Quellen haben dieselben Transaktionen, können jedoch Unterschiede aufweisen. Daher müssen sie überprüft und zur Analyse in die überprüften Daten im endgültigen Speicher geschrieben werden.

Darüber hinaus ist es erforderlich, mehrere Abstimmungen in derselben Datenbank parallel zu starten und das System mithilfe von Multiprocessing an ein großes Volumen anzupassen.

Das Multiprocessing- Modul eignet sich hervorragend zum Parallelisieren von Operationen in Python und umgeht gewissermaßen bestimmte GIL-Fehler. Wir werden die Funktionen dieser Bibliothek weiter unten nutzen.

Architektur des in Entwicklung befindlichen Systems



Verwendete Komponenten:

  • Zufallsdatengenerator - Ein Python-Skript, das eine CSV-Datei generiert und auf dieser Basis eine Tabelle in einer Datenbank füllt.
  • Datenquellen - CSV-Datei und Tabelle in der PostgreSQL-Datenbank;
  • Adapter - In diesem Fall verwenden wir zwei Adapter, die Daten aus ihren Quellen (CSV oder Datenbank) extrahieren und Informationen in die Zwischendatenbank eingeben.
  • Datenbanken - in der Größe von drei Teilen: Rohdaten, eine Zwischendatenbank, in der die von den Adaptern erfassten Informationen gespeichert werden, und eine "saubere" Datenbank, die abgeglichene Transaktionen aus beiden Quellen enthält.

Erstausbildung


Als Datenspeichertool verwenden wir die PostgreSQL-Datenbank im Docker-Container und interagieren mit unserer Datenbank über pgAdmin, das im Container ausgeführt wird :

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres 

PgAdmin ausführen:

 docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4 

Vergessen Sie nach dem Start nicht, in der Konfigurationsdatei (conf / db.ini) die Verbindungszeichenfolge zur Datenbank anzugeben (für ein Trainingsbeispiel können Sie das!):

 [POSTGRESQL] db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user 

Grundsätzlich ist die Verwendung eines Containers optional und Sie können Ihren Datenbankserver verwenden.

Eingabeerzeugung


Das Python-Skript generate_test_data ist für die Generierung von Testdaten verantwortlich, die die gewünschte Anzahl von zu generierenden Einträgen benötigen. Die Operationssequenz kann leicht durch die Hauptfunktion der GenerateTestData- Klasse verfolgt werden:

  @m.timing def run(self, num_rows): """ Run the process """ m.info('START!') self.create_db_schema() self.create_folder('data') self.create_csv_file(num_rows) self.bulk_copy_to_db() self.random_delete_rows() self.random_update_rows() m.info('END!') 

Die Funktion führt also die folgenden Schritte aus:

  • Erstellen von Schemata in der Datenbank (wir erstellen alle grundlegenden Schemata und Tabellen);
  • Erstellen eines Ordners zum Speichern einer Testdatei;
  • Generieren einer Testdatei mit einer bestimmten Anzahl von Zeilen;
  • Bulk-Insert-Daten in die Zieltabelle transaction_db_raw.transaction_log;
  • Versehentliches Löschen mehrerer Zeilen in dieser Tabelle;
  • Zufällige Aktualisierung mehrerer Zeilen in dieser Tabelle.

Das Löschen und Ändern ist erforderlich, damit die verglichenen Objekte zumindest eine gewisse Diskrepanz aufweisen. Es ist wichtig, diese Diskrepanzen suchen zu können!

 @m.timing @m.wrapper(m.entering, m.exiting) def random_delete_rows(self): """ Random deleting some rows from the table """ sql_command = sql.SQL(""" delete from {0}.{1} where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) 

Die Generierung eines Testdatensatzes und die anschließende Aufzeichnung in eine Textdatei im CSV-Format sieht wie folgt aus:

  • Eine zufällige Transaktions-UID wird erstellt.
  • Es wird eine zufällige UID-Kontonummer erstellt (standardmäßig werden zehn eindeutige Konten verwendet, dieser Wert kann jedoch mithilfe der Konfigurationsdatei durch Ändern des Parameters "random_accounts" geändert werden).
  • Transaktionsdatum - ein zufälliges Datum aus dem in der Konfigurationsdatei angegebenen Datum (initial_date);
  • Art der Transaktion (Transaktion / Provision);
  • Transaktionsbetrag;
  • Die Hauptarbeit bei der Datengenerierung wird von der generate_test_data_by_chunk- Methode der TestDataCreator- Klasse ausgeführt:

 @m.timing def generate_test_data_by_chunk(self, chunk_start, chunk_end): """ Generating and saving to the file """ num_rows_mp = chunk_end - chunk_start new_rows = [] for _ in range(num_rows_mp): transaction_uid = uuid.uuid4() account_uid = choice(self.list_acc) transaction_date = (self.get_random_date(self.date_in, 0) .__next__() .strftime('%Y-%m-%d %H:%M:%S')) type_deal = choice(self.list_type_deal) transaction_amount = randint(-1000, 1000) new_rows.append([transaction_uid, account_uid, transaction_date, type_deal, transaction_amount]) self.write_in_file(new_rows, chunk_start, chunk_end) 

Ein Merkmal dieser Funktion ist der Start in mehreren parallelisierten asynchronen Prozessen, von denen jeder seinen eigenen Teil von 50.000 Datensätzen generiert. Mit diesem "Chip" können Sie schnell genug eine Datei in mehreren Millionen Zeilen erstellen

 def run_csv_writing(self): """ Writing the test data into csv file """ pool = mp.Pool(mp.cpu_count()) jobs = [] for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows): jobs.append(pool.apply_async(self.generate_test_data_by_chunk, (chunk_start, chunk_end))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() 

Nach Abschluss der Textdatei wird der Befehl bulk_insert verarbeitet und alle Daten aus dieser Datei werden in die Tabelle transaction_db_raw.transaction_log geschrieben.

Außerdem enthalten die beiden Quellen genau dieselben Daten, und der Abgleich findet nichts Interessantes. Daher löschen und ändern wir mehrere zufällige Zeilen in der Datenbank.

Führen Sie das Skript aus und generieren Sie eine Test-CSV-Datei mit Transaktionen in 10-KB-Zeilen:

 ./generate_test_data.py 10000 


Der Screenshot zeigt, dass eine Datei mit 10 KByte Zeilen empfangen wurde, 10 KByte in die Datenbank geladen wurden, dann jedoch 112 Zeilen aus der Datenbank gelöscht und weitere 108 geändert wurden Ergebnis: Die Datei und die Tabelle in der Datenbank unterscheiden sich um 220 Einträge.

„Nun, wo ist Multiprocessing?“, Fragen Sie.
Und seine Arbeit kann gesehen werden, wenn Sie eine größere Datei erzeugen, nicht durch 10K-Datensätze, sondern zum Beispiel durch 1M. Versuchen wir es?

 ./generate_test_data.py 1000000 


Nach dem Laden der Daten, Löschen und Ändern von zufälligen Datensätzen sehen wir die Unterschiede der Textdatei aus der Tabelle: 19.939 Zeilen (von denen 10.022 zufällig gelöscht und 9.917 geändert wurden).

Das Bild zeigt, dass die Generierung von Datensätzen asynchron und inkonsistent war. Dies bedeutet, dass der nächste Prozess beginnen kann, ohne die Startreihenfolge zu berücksichtigen, sobald der vorherige abgeschlossen ist. Es kann nicht garantiert werden, dass das Ergebnis in der gleichen Reihenfolge wie die Eingabe vorliegt.

Ist es definitiv schneller?
Eine Million Leitungen, die sich nicht auf der schnellsten virtuellen Maschine befinden, wurden in 15,5 Sekunden „erfunden“ - und das ist eine Option, die es wert ist. Nachdem ich die gleiche Generation nacheinander gestartet hatte, ohne Multiprocessing zu verwenden, erhielt ich das Ergebnis: Die Dateierzeugung war mehr als dreimal langsamer (über 52 Sekunden statt 15,5):



Adapter für CSV


Dieser Adapter durchsucht die Zeile, wobei nur die erste Spalte, die Transaktionskennung, unverändert bleibt, und speichert die empfangenen Daten in der Datei data / transaction_hashed.csv . Der letzte Schritt seiner Arbeit besteht darin, diese Datei mit dem Befehl COPY in die temporäre Tabelle des Schemas reconciliation_db zu laden .

Das optimale Lesen der Dateien erfolgt durch mehrere parallele Prozesse. Wir lesen Zeile für Zeile in Stücken von jeweils 5 Megabyte. Die Zahl "5 Megabyte" wurde durch die empirische Methode erhalten. Mit dieser Größe von einem Textstück konnten wir die kürzeste Zeit zum Lesen großer Dateien auf unserer virtuellen Maschine erhalten. Sie können mit diesem Parameter in Ihrer Umgebung experimentieren und sehen, wie sich die Betriebszeit ändert:

 @m.timing def process_wrapper(self, chunk_start, chunk_size): """ Read a particular chunk """ with open(self.file_name_raw, newline='\n') as file: file.seek(chunk_start) lines = file.read(chunk_size).splitlines() for line in lines: self.process(line) def chunkify(self, size=1024*1024*5): """ Return a new chunk """ with open(self.file_name_raw, 'rb') as file: chunk_end = file.tell() while True: chunk_start = chunk_end file.seek(size, 1) file.readline() chunk_end = file.tell() if chunk_end > self.file_end: chunk_end = self.file_end yield chunk_start, chunk_end - chunk_start break else: yield chunk_start, chunk_end - chunk_start @m.timing def run_reading(self): """ The main method for the reading """ # init objects pool = mp.Pool(mp.cpu_count()) jobs = [] m.info('Run csv reading...') # create jobs for chunk_start, chunk_size in self.chunkify(): jobs.append(pool.apply_async(self.process_wrapper, (chunk_start, chunk_size))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() m.info('CSV file reading has been completed') 

Beispiel für das Lesen einer zuvor erstellten Datei in 1M-Datensätzen:


Der Screenshot zeigt die Erstellung einer temporären Tabelle mit einem eindeutigen Namen für den aktuellen Abstimmungslauf. Als nächstes erfolgt das asynchrone Lesen der Datei in Teilen und das Aufnehmen des Hash jeder Zeile. Das Einfügen von Daten vom Adapter in die Zieltabelle vervollständigt die Arbeit mit diesem Adapter.
Durch die Verwendung einer temporären Tabelle mit einem eindeutigen Namen für jeden Abstimmungsprozess können Sie den Abstimmungsprozess zusätzlich in einer Datenbank parallelisieren.

Adapter für PostgreSQL


Der Adapter zum Verarbeiten der in der Tabelle gespeicherten Daten funktioniert ungefähr so ​​wie der Adapter für die Datei:

  • Einlesen von Teilen der Tabelle (wenn sie groß ist, über 100 KB Einträge) und Aufnehmen eines Hashs für alle Spalten mit Ausnahme der Transaktionskennung;
  • Dann werden die verarbeiteten Daten in die Tabelle reconciliation_db eingefügt . Speicher _ $ (int (time.time ()) .

Ein interessantes Merkmal dieses Adapters besteht darin, dass er einen Pool von Verbindungen zur Datenbank verwendet, der anhand des Index nach den erforderlichen Daten in der Tabelle sucht und diese verarbeitet.

Basierend auf der Größe der Tabelle wird die Anzahl der für die Verarbeitung erforderlichen Prozesse berechnet und innerhalb jedes Prozesses in 10 Aufgaben unterteilt.

 def read_data(self): """ Read the data from the postgres and shared those records with each processor to perform their operation using threads """ threads_array = self.get_threads(0, self.max_id_num_row, self.pid_max) for pid in range(1, len(threads_array) + 1): m.info('Process %s' % pid) # Getting connection from the connection pool select_conn = self._select_conn_pool.getconn() select_conn.autocommit = 1 # Creating 10 process to perform the operation process = Process(target=self.process_data, args=(self.data_queque, pid, threads_array[pid-1][0], threads_array[pid-1][1], select_conn)) process.daemon = True process.start() process.join() select_conn.close() 


Suchen Sie nach Unstimmigkeiten


Wir fahren mit der Überprüfung der von zwei Adaptern empfangenen Daten fort.

Der Abgleich (oder das Empfangen eines Diskrepanzberichts) erfolgt auf der Serverseite der Datenbank unter Verwendung der gesamten Leistungsfähigkeit der SQL-Sprache.

Die SQL-Abfrage ist recht unkompliziert - es handelt sich lediglich um eine Tabellenverknüpfung mit Daten von den Adaptern zu sich selbst nach Transaktions-ID:

 sql_command = sql.SQL(""" select s1.adapter_name, count(s1.transaction_uid) as tran_count from {0}.{1} s1 full join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table)) 

Die Ausgabe ist ein Bericht:


Überprüfen Sie, ob im obigen Bild alles korrekt ist. Wir erinnern uns, dass 9917 aus der Tabelle in der Datenbank gelöscht und 10.022 Zeilen geändert wurden. Insgesamt 19939 Zeilen, wie aus dem Bericht hervorgeht.

Übersichtstabelle


Es bleibt nur, "saubere" Transaktionen in die Speichertabelle einzufügen, die in jeder Hinsicht (nach Hash) in verschiedenen Adaptern übereinstimmen. Dieser Vorgang wird von der folgenden SQL-Abfrage ausgeführt:

 sql_command = sql.SQL(""" with reconcil_data as ( select s1.transaction_uid from {0}.{1} s1 join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table), sql.Identifier(self.schema_db_clean), sql.Identifier(self.schema_raw)) 

Die temporäre Tabelle, die wir als Zwischenspeicher für Daten von den Adaptern verwendet haben, kann gelöscht werden.


Fazit


Im Laufe der Arbeit wurde ein System zum Abgleich von Daten aus verschiedenen Quellen entwickelt: eine Textdatei und eine Tabelle in der Datenbank. Benutzte ein Minimum an zusätzlichen Werkzeugen.

Vielleicht kann ein erfahrener Leser feststellen, dass die Verwendung von Frameworks wie Apache Spark in Verbindung mit der Konvertierung von Rohdaten in ein Parkettformat diesen Prozess erheblich beschleunigen kann, insbesondere bei großen Volumina. Das Hauptziel dieser Arbeit ist es jedoch, ein System in Bare-Python zu schreiben und die Multiprozessor-Datenverarbeitung zu untersuchen. Womit wir uns meiner Meinung nach befasst haben.

Der Quellcode des gesamten Projekts befindet sich in meinem Repository auf GitHub . Ich empfehle Ihnen, sich damit vertraut zu machen.

Gerne beantworte ich alle Fragen und informiere mich über Ihre Kommentare.

Ich wünsche Ihnen viel Erfolg!

Source: https://habr.com/ru/post/de480076/


All Articles