Proses ganda dan rekonsiliasi data dari berbagai sumber

Halo, Habr!

Mengingat beragamnya sistem terdistribusi, ketersediaan informasi terverifikasi dalam penyimpanan target adalah kriteria penting untuk konsistensi data.

Ada banyak pendekatan dan metode untuk efek ini, dan kami akan fokus pada rekonsiliasi, aspek teoretis yang dibahas di sini dalam artikel ini. Saya mengusulkan untuk mempertimbangkan implementasi praktis dari sistem ini, dapat diskalakan dan disesuaikan dengan sejumlah besar data.

Bagaimana menerapkan kasus ini pada Python lama yang baik - baca di bawah cut! Ayo pergi!


(Sumber gambar)

Pendahuluan


Mari kita bayangkan bahwa lembaga keuangan memiliki beberapa sistem terdistribusi dan kita dihadapkan dengan tugas memverifikasi transaksi dalam sistem ini dan mengunggah data yang direkonsiliasi ke penyimpanan target.

Sebagai sumber data, ambil file teks besar dan tabel di database PostgreSQL. Misalkan data dalam sumber-sumber ini memiliki transaksi yang sama, tetapi mereka dapat memiliki perbedaan, dan oleh karena itu mereka perlu diverifikasi dan ditulis ke data yang diverifikasi dalam penyimpanan akhir untuk analisis.

Selain itu, perlu untuk menyediakan peluncuran paralel dari beberapa rekonsiliasi pada database yang sama dan mengadaptasi sistem ke volume besar menggunakan multiprocessing.

Modul multiprosesing sangat bagus untuk memparalelkan operasi dengan Python dan, dalam arti tertentu, menghindari kekurangan GIL tertentu. Kami akan menggunakan kemampuan perpustakaan ini di bawah ini.

Arsitektur sistem dalam pengembangan



Komponen yang Digunakan:

  • Pembuat data acak - skrip Python yang menghasilkan file CSV dan atas dasar itu mengisi tabel dalam database;
  • Sumber data - file dan tabel CSV dalam database PostgreSQL;
  • Adapters - dalam hal ini, kami menggunakan dua adapter yang akan mengekstraksi data dari sumbernya (CSV atau DB) dan memasukkan informasi ke dalam database perantara;
  • Basis data - dalam jumlah tiga bagian: data mentah, database perantara yang menyimpan informasi yang ditangkap oleh adaptor, dan basis data "bersih" yang berisi transaksi yang direkonsiliasi dari kedua sumber.

Pelatihan awal


Sebagai alat penyimpanan data, kami akan menggunakan basis data PostgreSQL di wadah Docker dan berinteraksi dengan basis data kami melalui pgAdmin yang berjalan di wadah :

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres 

Menjalankan pgAdmin:

 docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4 

Setelah semuanya dimulai, jangan lupa untuk menentukan di file konfigurasi (conf / db.ini) string koneksi ke database (untuk contoh pelatihan, Anda bisa!):

 [POSTGRESQL] db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user 

Pada prinsipnya, penggunaan wadah adalah opsional dan Anda dapat menggunakan server basis data Anda.

Pembuatan Input


Script Python generate_test_data bertanggung jawab atas pembuatan data uji, yang mengambil jumlah entri yang diinginkan untuk dihasilkan. Urutan operasi dapat dengan mudah dilacak oleh fungsi utama dari kelas GenerateTestData :

  @m.timing def run(self, num_rows): """ Run the process """ m.info('START!') self.create_db_schema() self.create_folder('data') self.create_csv_file(num_rows) self.bulk_copy_to_db() self.random_delete_rows() self.random_update_rows() m.info('END!') 

Jadi, fungsi melakukan langkah-langkah berikut:

  • Membuat skema dalam database (kami membuat semua skema dan tabel utama);
  • Membuat folder untuk menyimpan file uji;
  • Membuat file uji dengan sejumlah baris tertentu;
  • Sisipkan massal data ke tabel target transaction_db_raw.transaction_log;
  • Penghapusan tak disengaja dari beberapa baris dalam tabel ini;
  • Pembaruan acak dari beberapa baris dalam tabel ini.

Penghapusan dan modifikasi diperlukan agar objek yang dibandingkan memiliki setidaknya beberapa perbedaan. Penting untuk dapat mencari perbedaan ini!

 @m.timing @m.wrapper(m.entering, m.exiting) def random_delete_rows(self): """ Random deleting some rows from the table """ sql_command = sql.SQL(""" delete from {0}.{1} where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) 

Pembuatan kumpulan data uji dan perekaman selanjutnya ke file teks dalam format CSV adalah sebagai berikut:

  • UID transaksi acak dibuat;
  • Nomor akun UID acak dibuat (secara default, kami mengambil sepuluh akun unik, tetapi nilai ini dapat diubah menggunakan file konfigurasi dengan mengubah parameter "random_accounts");
  • Tanggal transaksi - tanggal acak dari tanggal yang ditentukan dalam file konfigurasi (initial_date);
  • Jenis transaksi (transaksi / komisi);
  • Jumlah transaksi;
  • Pekerjaan utama dalam pembuatan data dilakukan oleh metode generate_test_data_by_chunk dari kelas TestDataCreator :

 @m.timing def generate_test_data_by_chunk(self, chunk_start, chunk_end): """ Generating and saving to the file """ num_rows_mp = chunk_end - chunk_start new_rows = [] for _ in range(num_rows_mp): transaction_uid = uuid.uuid4() account_uid = choice(self.list_acc) transaction_date = (self.get_random_date(self.date_in, 0) .__next__() .strftime('%Y-%m-%d %H:%M:%S')) type_deal = choice(self.list_type_deal) transaction_amount = randint(-1000, 1000) new_rows.append([transaction_uid, account_uid, transaction_date, type_deal, transaction_amount]) self.write_in_file(new_rows, chunk_start, chunk_end) 

Fitur dari fungsi ini adalah peluncuran dalam beberapa proses asinkron yang diparalelkan, yang masing-masing menghasilkan bagiannya sendiri dari catatan 50K. "Chip" ini akan memungkinkan Anda untuk membuat file pada beberapa juta baris dengan cukup cepat

 def run_csv_writing(self): """ Writing the test data into csv file """ pool = mp.Pool(mp.cpu_count()) jobs = [] for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows): jobs.append(pool.apply_async(self.generate_test_data_by_chunk, (chunk_start, chunk_end))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() 

Setelah file teks selesai, perintah bulk_insert diproses dan semua data dari file ini masuk ke tabel transaction_db_raw.transaction_log.

Selanjutnya, kedua sumber akan berisi data yang persis sama dan rekonsiliasi tidak akan menemukan sesuatu yang menarik, jadi kami menghapus dan mengubah beberapa baris acak dalam database.

Jalankan skrip dan hasilkan file uji CSV dengan transaksi pada 10 ribu baris:

 ./generate_test_data.py 10000 


Tangkapan layar menunjukkan bahwa file 10K baris diterima, 10K dimuat ke dalam database, tetapi kemudian 112 baris dihapus dari database dan 108 lainnya diubah. Hasil: file dan tabel dalam database berbeda dengan 220 entri.

"Nah, di mana multiprocessing?", Anda bertanya.
Dan pekerjaannya dapat dilihat saat Anda menghasilkan file yang lebih besar, bukan dengan catatan 10 ribu, tetapi, misalnya, sebesar 1 juta. Akankah kita mencoba?

 ./generate_test_data.py 1000000 


Setelah memuat data, menghapus dan mengubah catatan acak, kita melihat perbedaan file teks dari tabel: 19.939 baris (yang 10.022 dihapus secara acak, dan 9.917 diubah).

Gambar menunjukkan bahwa pembuatan rekaman tidak sinkron, tidak konsisten. Ini berarti bahwa proses selanjutnya dapat dimulai tanpa memperhitungkan urutan mulai segera setelah yang sebelumnya selesai. Tidak ada jaminan bahwa hasilnya akan berada dalam urutan yang sama dengan input.

Apakah ini pasti lebih cepat?
Satu juta baris yang bukan pada mesin virtual tercepat "diciptakan" dalam 15,5 detik - dan ini adalah opsi yang layak. Setelah memulai generasi yang sama secara berurutan, tanpa menggunakan multiprosesor, saya mendapatkan hasilnya: pembuatan file lebih dari tiga kali lebih lambat (lebih dari 52 detik, bukan 15,5):



Adaptor untuk CSV


Adaptor ini hash baris, hanya menyisakan kolom pertama, pengidentifikasi transaksi, tidak berubah dan menyimpan data yang diterima ke file data / transaction_hashed.csv . Langkah terakhir dari karyanya adalah memuat file ini menggunakan perintah COPY ke dalam tabel sementara skema reconciliation_db.

Pembacaan file yang optimal dilakukan oleh beberapa proses paralel. Kami membaca baris demi baris, masing-masing dalam ukuran 5 megabyte. Angka "5 megabyte" diperoleh dengan metode empiris. Dengan ukuran selembar teks inilah kami bisa mendapatkan waktu terkecil untuk membaca file besar di mesin virtual kami. Anda dapat melakukan percobaan pada lingkungan Anda dengan parameter ini dan melihat bagaimana waktu operasi akan berubah:

 @m.timing def process_wrapper(self, chunk_start, chunk_size): """ Read a particular chunk """ with open(self.file_name_raw, newline='\n') as file: file.seek(chunk_start) lines = file.read(chunk_size).splitlines() for line in lines: self.process(line) def chunkify(self, size=1024*1024*5): """ Return a new chunk """ with open(self.file_name_raw, 'rb') as file: chunk_end = file.tell() while True: chunk_start = chunk_end file.seek(size, 1) file.readline() chunk_end = file.tell() if chunk_end > self.file_end: chunk_end = self.file_end yield chunk_start, chunk_end - chunk_start break else: yield chunk_start, chunk_end - chunk_start @m.timing def run_reading(self): """ The main method for the reading """ # init objects pool = mp.Pool(mp.cpu_count()) jobs = [] m.info('Run csv reading...') # create jobs for chunk_start, chunk_size in self.chunkify(): jobs.append(pool.apply_async(self.process_wrapper, (chunk_start, chunk_size))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() m.info('CSV file reading has been completed') 

Contoh membaca file yang dibuat sebelumnya pada catatan 1M:


Tangkapan layar memperlihatkan pembuatan tabel sementara dengan nama unik untuk menjalankan rekonsiliasi saat ini. Selanjutnya adalah pembacaan asinkron file di bagian dan mengambil hash dari setiap baris. Memasukkan data dari adaptor ke tabel target menyelesaikan pekerjaan dengan adaptor ini.
Menggunakan tabel sementara dengan nama unik untuk setiap proses rekonsiliasi memungkinkan Anda untuk memparalelkan proses rekonsiliasi dalam satu basis data.

Adaptor untuk PostgreSQL


Adaptor untuk memproses data yang disimpan dalam tabel bekerja kira-kira dengan logika yang sama dengan adaptor untuk file:

  • membaca bagian tabel (jika besar, lebih dari 100K entri) dan mengambil hash untuk semua kolom kecuali pengenal transaksi;
  • kemudian ada penyisipan data yang diproses ke dalam tabel reconciliation_db. storage _ $ (int (time.time ()) .

Fitur yang menarik dari adaptor ini adalah ia menggunakan kumpulan koneksi ke database, yang akan mencari berdasarkan indeks untuk data yang diperlukan dalam tabel dan memprosesnya.

Berdasarkan ukuran tabel, jumlah proses yang diperlukan untuk pemrosesan dihitung dan dalam setiap proses ada pembagian menjadi 10 tugas.

 def read_data(self): """ Read the data from the postgres and shared those records with each processor to perform their operation using threads """ threads_array = self.get_threads(0, self.max_id_num_row, self.pid_max) for pid in range(1, len(threads_array) + 1): m.info('Process %s' % pid) # Getting connection from the connection pool select_conn = self._select_conn_pool.getconn() select_conn.autocommit = 1 # Creating 10 process to perform the operation process = Process(target=self.process_data, args=(self.data_queque, pid, threads_array[pid-1][0], threads_array[pid-1][1], select_conn)) process.daemon = True process.start() process.join() select_conn.close() 


Cari perbedaan


Kami melanjutkan ke verifikasi data yang diterima dari dua adapter.

Rekonsiliasi (atau menerima laporan perbedaan) terjadi di sisi server dari database, menggunakan semua kekuatan bahasa SQL.

Query SQL cukup rumit - hanya sebuah tabel yang digabung dengan data dari adapter ke dirinya sendiri dengan ID transaksi:

 sql_command = sql.SQL(""" select s1.adapter_name, count(s1.transaction_uid) as tran_count from {0}.{1} s1 full join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table)) 

Outputnya adalah laporan:


Periksa apakah semuanya benar pada gambar di atas. Kami ingat bahwa 9917 dihapus dari tabel dalam database dan 10.022 baris diubah. Total 19939 baris, yang terbukti dalam laporan.

Tabel ringkasan


Tetap hanya memasukkan transaksi "bersih" ke dalam tabel penyimpanan, yang bertepatan dalam semua hal (dengan hash) di adaptor yang berbeda. Proses ini dilakukan oleh kueri SQL berikut:

 sql_command = sql.SQL(""" with reconcil_data as ( select s1.transaction_uid from {0}.{1} s1 join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table), sql.Identifier(self.schema_db_clean), sql.Identifier(self.schema_raw)) 

Tabel sementara yang kami gunakan sebagai penyimpanan data antara dari adaptor dapat dihapus.


Kesimpulan


Dalam perjalanan pekerjaan yang dilakukan, sistem untuk merekonsiliasi data dari berbagai sumber dikembangkan: file teks dan tabel dalam database. Minimal menggunakan alat tambahan.

Mungkin pembaca yang canggih mungkin memperhatikan bahwa menggunakan kerangka kerja seperti Apache Spark, ditambah dengan mengubah data sumber ke format parket, dapat secara signifikan mempercepat proses ini, terutama untuk volume besar. Tetapi tujuan utama dari pekerjaan ini adalah untuk menulis sebuah sistem dengan Python kosong dan untuk mempelajari pemrosesan data multi-pemrosesan. Dengan apa yang kami, menurut saya, telah atasi.

Kode sumber dari seluruh proyek terletak di repositori saya di GitHub , saya sarankan Anda membiasakan diri dengannya.

Saya akan dengan senang hati menjawab semua pertanyaan dan berkenalan dengan komentar Anda.

Semoga sukses!

Source: https://habr.com/ru/post/id480076/


All Articles