معالجة وتوفيق البيانات من مصادر مختلفة

مرحبا يا هبر!

نظرًا لتنوع الأنظمة الموزعة ، يعد توفر المعلومات التي تم التحقق منها في التخزين المستهدف معيارًا مهمًا لتناسق البيانات.

هناك العديد من الطرق والأساليب في هذا الصدد ، وسوف نركز على المصالحة ، والتي تمت مناقشة الجوانب النظرية لها في هذا المقال. أقترح النظر في التنفيذ العملي لهذا النظام ، قابلة للتطوير وتكييفها مع كمية كبيرة من البيانات.

كيفية تنفيذ هذه الحالة على بيثون القديم الجيد - قراءتها تحت خفض! دعنا نذهب!


(مصدر الصورة)

مقدمة


دعنا نتخيل أن المؤسسة المالية لديها العديد من الأنظمة الموزعة وأننا نواجه مهمة التحقق من المعاملات في هذه الأنظمة وتحميل البيانات التي تمت تسويتها إلى التخزين المستهدف.

كمصدر للبيانات ، خذ ملفًا نصيًا كبيرًا وجدولًا في قاعدة بيانات PostgreSQL. لنفترض أن البيانات الموجودة في هذه المصادر لها نفس المعاملات ، لكن يمكن أن يكون لها اختلافات ، وبالتالي يجب التحقق منها وكتابتها إلى البيانات التي تم التحقق منها في التخزين النهائي للتحليل.

بالإضافة إلى ذلك ، من الضروري توفير التشغيل المتوازي للعديد من التسويات على نفس قاعدة البيانات وتكييف النظام مع وحدة تخزين كبيرة باستخدام المعالجة المتعددة.

تعد وحدة المعالجة المتعددة كبيرة بالنسبة لموازنة العمليات في بيثون ، ومن ناحية أخرى ، تتجنب بعض عيوب GIL. سوف نستخدم قدرات هذه المكتبة أدناه.

بنية النظام قيد التطوير



المكونات المستخدمة:

  • منشئ بيانات عشوائي - برنامج نصي Python يقوم بإنشاء ملف CSV ويقوم على أساسه بتعبئة جدول في قاعدة بيانات ؛
  • مصادر البيانات - ملف CSV والجدول في قاعدة بيانات PostgreSQL ؛
  • المحولات - في هذه الحالة ، نستخدم محوّلين يقومان باستخراج البيانات من مصادرهما (CSV أو DB) وإدخال المعلومات في قاعدة البيانات الوسيطة ؛
  • قواعد البيانات - بكمية ثلاثة أجزاء: البيانات الأولية ، وقاعدة بيانات وسيطة تقوم بتخزين المعلومات التي تم التقاطها بواسطة المحولات ، وقاعدة بيانات "نظيفة" تحتوي على معاملات تم التوفيق بينها من كلا المصدرين.

التدريب الأولي


كأداة لتخزين البيانات ، سوف نستخدم قاعدة بيانات PostgreSQL في حاوية Docker ونتفاعل مع قاعدة البيانات الخاصة بنا من خلال pgAdmin التي تعمل في الحاوية :

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres 

تشغيل pgAdmin:

 docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4 

بعد أن بدأ كل شيء ، لن ننسى أن نضع في ملف التكوين (conf / db.ini) سلسلة الاتصال بقاعدة البيانات (على سبيل المثال ، يمكنك التدريب!):

 [POSTGRESQL] db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user 

من حيث المبدأ ، يكون استخدام الحاوية اختياريًا ويمكنك استخدام خادم قاعدة البيانات الخاص بك.

توليد المدخلات


يعد البرنامج النصي Python create_test_data مسؤولاً عن إنشاء بيانات الاختبار ، والتي تأخذ العدد المطلوب من الإدخالات التي تريد إنشاؤها. يمكن تتبع تسلسل العمليات بسهولة بواسطة الوظيفة الرئيسية لفئة GenerateTestData :

  @m.timing def run(self, num_rows): """ Run the process """ m.info('START!') self.create_db_schema() self.create_folder('data') self.create_csv_file(num_rows) self.bulk_copy_to_db() self.random_delete_rows() self.random_update_rows() m.info('END!') 

لذلك ، تقوم الدالة بالخطوات التالية:

  • إنشاء مخططات في قاعدة البيانات (نقوم بإنشاء جميع المخططات والجداول الأساسية) ؛
  • إنشاء مجلد لتخزين ملف اختبار ؛
  • توليد ملف اختبار مع عدد معين من الخطوط ؛
  • إدراج بيانات مجمعة في الجدول المعاملة transaction_db_raw.transaction_log في الجدول الهدف ؛
  • الحذف العرضي لصفوف متعددة في هذا الجدول ؛
  • تحديث عشوائي لعدة صفوف في هذا الجدول.

يعد الحذف والتعديل ضروريًا بحيث تحتوي الكائنات المقارنة على بعض التناقض على الأقل. من المهم أن تكون قادرًا على البحث عن هذه التناقضات!

 @m.timing @m.wrapper(m.entering, m.exiting) def random_delete_rows(self): """ Random deleting some rows from the table """ sql_command = sql.SQL(""" delete from {0}.{1} where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) @m.timing @m.wrapper(m.entering, m.exiting) def random_update_rows(self): """ Random update some rows from the table """ sql_command = sql.SQL(""" update {0}.{1} set transaction_amount = round(random()::numeric, 2) where ctid = any(array( select ctid from {0}.{1} tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw), sql.Identifier(self.raw_table_name)) try: rows = self.database.execute(sql_command) m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name)) except psycopg2.Error as err: m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror) 

فيما يلي إنشاء مجموعة بيانات الاختبار والتسجيل اللاحق لملف نصي بتنسيق CSV:

  • يتم إنشاء معاملة عشوائية UID ؛
  • يتم إنشاء رقم حساب UID عشوائي (بشكل افتراضي ، نأخذ عشرة حسابات فريدة ، ولكن يمكن تغيير هذه القيمة باستخدام ملف التكوين عن طريق تغيير المعلمة "random_accounts") ؛
  • تاريخ المعاملة - تاريخ عشوائي من التاريخ المحدد في ملف التكوين (initial_date) ؛
  • نوع المعاملة (المعاملة / العمولة) ؛
  • مبلغ الصفقة
  • يتم تنفيذ العمل الرئيسي في توليد البيانات عن طريق الأسلوب gener_test_data_by_chunk للفئة TestDataCreator :

 @m.timing def generate_test_data_by_chunk(self, chunk_start, chunk_end): """ Generating and saving to the file """ num_rows_mp = chunk_end - chunk_start new_rows = [] for _ in range(num_rows_mp): transaction_uid = uuid.uuid4() account_uid = choice(self.list_acc) transaction_date = (self.get_random_date(self.date_in, 0) .__next__() .strftime('%Y-%m-%d %H:%M:%S')) type_deal = choice(self.list_type_deal) transaction_amount = randint(-1000, 1000) new_rows.append([transaction_uid, account_uid, transaction_date, type_deal, transaction_amount]) self.write_in_file(new_rows, chunk_start, chunk_end) 

تتمثل ميزة هذه الوظيفة في الإطلاق في العديد من العمليات غير المتزامنة المتوازية ، حيث تقوم كل منها بإنشاء الجزء الخاص بها من سجلات 50K. تتيح لك هذه "الشريحة" إنشاء ملف على عدة ملايين من الخطوط بسرعة كافية

 def run_csv_writing(self): """ Writing the test data into csv file """ pool = mp.Pool(mp.cpu_count()) jobs = [] for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows): jobs.append(pool.apply_async(self.generate_test_data_by_chunk, (chunk_start, chunk_end))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() 

بعد اكتمال الملف النصي ، تتم معالجة الأمر bulk_insert وتندرج جميع البيانات من هذا الملف في جدول transaction_db_raw.transaction_log.

علاوة على ذلك ، سيحتوي المصدران على نفس البيانات تمامًا ولن تجد التسوية أي شيء مثير للاهتمام ، لذلك نقوم بحذف وتغيير عدة صفوف عشوائية في قاعدة البيانات.

قم بتشغيل البرنامج النصي وإنشاء ملف CSV اختبار مع المعاملات على خطوط 10K:

 ./generate_test_data.py 10000 


توضح لقطة الشاشة أنه تم استلام ملف من 10K خطوط ، وتم تحميل 10K في قاعدة البيانات ، ولكن بعد ذلك تم حذف 112 سطرًا من قاعدة البيانات وتغيير 108. النتيجة: يختلف الملف والجدول في قاعدة البيانات بمقدار 220 إدخالًا.

"حسنًا ، أين المعالجة المتعددة؟" ، أنت تسأل.
ويمكن رؤية عملها عند إنشاء ملف أكبر ، وليس بسجلات 10K ، ولكن على سبيل المثال ، بواسطة 1M. هل سنحاول؟

 ./generate_test_data.py 1000000 


بعد تحميل البيانات وحذفها وتغيير السجلات العشوائية ، نرى الاختلافات بين الملف النصي والجدول: 193939 صفًا (تم حذف 1022 منها عشوائيًا ، وتم تغيير 91717).

توضح الصورة أن توليد السجلات كان غير متزامن وغير متناسق. هذا يعني أن العملية التالية يمكن أن تبدأ دون مراعاة أمر البدء بمجرد اكتمال العملية السابقة. ليس هناك ما يضمن أن النتيجة ستكون بنفس ترتيب الإدخال.

هل هو بالتأكيد أسرع؟
تم اختراع مليون خط غير موجود على أسرع جهاز افتراضي في 15.5 ثانية - وهذا خيار يستحق. بعد أن بدأت في إنشاء نفس التتابع ، دون استخدام المعالجة المتعددة ، حصلت على النتيجة: كان إنشاء الملف أبطأ من ثلاث مرات (أكثر من 52 ثانية بدلاً من 15.5):



محول ل CSV


يقوم هذا المحول بتجزئة الصف ، تاركًا فقط العمود الأول ، معرف المعاملة ، بدون تغيير ويحفظ البيانات المستلمة في ملف data / transaction_hashed.csv . الخطوة الأخيرة من عمله هي تحميل هذا الملف باستخدام أمر COPY في الجدول المؤقت لمخطط الصلاحيات .

يتم تنفيذ القراءة الأمثل للملف من خلال عدة عمليات متوازية. نقرأ سطرا سطرا ، في قطعة 5 ميغابايت لكل منهما. تم الحصول على الرقم "5 ميغابايت" بواسطة الطريقة التجريبية. بهذا الحجم من نص واحد تمكنا من الحصول على أصغر وقت لقراءة الملفات الكبيرة على الجهاز الظاهري لدينا. يمكنك تجربة بيئتك باستخدام هذه المعلمة ومعرفة كيفية تغيير وقت التشغيل:

 @m.timing def process_wrapper(self, chunk_start, chunk_size): """ Read a particular chunk """ with open(self.file_name_raw, newline='\n') as file: file.seek(chunk_start) lines = file.read(chunk_size).splitlines() for line in lines: self.process(line) def chunkify(self, size=1024*1024*5): """ Return a new chunk """ with open(self.file_name_raw, 'rb') as file: chunk_end = file.tell() while True: chunk_start = chunk_end file.seek(size, 1) file.readline() chunk_end = file.tell() if chunk_end > self.file_end: chunk_end = self.file_end yield chunk_start, chunk_end - chunk_start break else: yield chunk_start, chunk_end - chunk_start @m.timing def run_reading(self): """ The main method for the reading """ # init objects pool = mp.Pool(mp.cpu_count()) jobs = [] m.info('Run csv reading...') # create jobs for chunk_start, chunk_size in self.chunkify(): jobs.append(pool.apply_async(self.process_wrapper, (chunk_start, chunk_size))) # wait for all jobs to finish for job in jobs: job.get() # clean up pool.close() pool.join() m.info('CSV file reading has been completed') 

مثال على قراءة ملف تم إنشاؤه مسبقًا في سجلات 1M:


توضح لقطة الشاشة إنشاء جدول مؤقت باسم فريد لتشغيل التسوية الحالية. التالي هو القراءة غير المتزامنة للملف في أجزاء وأخذ تجزئة كل سطر. إدراج البيانات من المحول في الجدول الهدف يكمل العمل مع هذا المحول.
يتيح لك استخدام جدول مؤقت باسم فريد لكل عملية تسوية إمكانية موازنة عملية التسوية بشكل إضافي في قاعدة بيانات واحدة.

محول ل PostgreSQL


يعمل المهايئ الخاص بمعالجة البيانات المخزنة في الجدول على نفس منطق المحول للملف تقريبًا:

  • القراءة في أجزاء الجدول (إذا كانت كبيرة ، تتجاوز مائة ألف إدخال) وأخذ علامة تجزئة لجميع الأعمدة باستثناء معرف المعاملة ؛
  • ثم هناك إدراج البيانات التي تمت معالجتها في الجدولصالحة_ db . التخزين _ $ (int (time.time ()) .

تتمثل إحدى الميزات المهمة لهذا المحول في أنه يستخدم مجموعة من الاتصالات بقاعدة البيانات ، والتي ستقوم بالبحث حسب الفهرس عن البيانات الضرورية في الجدول ومعالجتها.

بناءً على حجم الجدول ، يتم حساب عدد العمليات اللازمة للمعالجة ويوجد في كل عملية قسم إلى 10 مهام.

 def read_data(self): """ Read the data from the postgres and shared those records with each processor to perform their operation using threads """ threads_array = self.get_threads(0, self.max_id_num_row, self.pid_max) for pid in range(1, len(threads_array) + 1): m.info('Process %s' % pid) # Getting connection from the connection pool select_conn = self._select_conn_pool.getconn() select_conn.autocommit = 1 # Creating 10 process to perform the operation process = Process(target=self.process_data, args=(self.data_queque, pid, threads_array[pid-1][0], threads_array[pid-1][1], select_conn)) process.daemon = True process.start() process.join() select_conn.close() 


البحث عن التناقضات


ننتقل إلى التحقق من البيانات الواردة من محولين.

تحدث المصالحة (أو تلقي تقرير تعارض) على جانب الخادم من قاعدة البيانات ، وذلك باستخدام كل قوة لغة SQL.

استعلام SQL غير معقد تمامًا - إنه مجرد ربط جدول مع البيانات من المحولات إلى نفسه بواسطة معرف المعاملة:

 sql_command = sql.SQL(""" select s1.adapter_name, count(s1.transaction_uid) as tran_count from {0}.{1} s1 full join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name and s2.hash = s1.hash where s2.transaction_uid is null group by s1.adapter_name;""").format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table)) 

الإخراج هو تقرير:


تحقق ما إذا كان كل شيء صحيح في الصورة أعلاه. نتذكر أنه تم حذف 9917 من الجدول في قاعدة البيانات وتغيير 1022 الصفوف. مجموع 19939 خطوط ، وهو ما يتضح في التقرير.

جدول ملخص


يبقى فقط إدراج المعاملات "النظيفة" في جدول التخزين الذي يتطابق من جميع النواحي (بواسطة التجزئة) في محولات مختلفة. يتم تنفيذ هذه العملية بواسطة استعلام SQL التالي:

 sql_command = sql.SQL(""" with reconcil_data as ( select s1.transaction_uid from {0}.{1} s1 join {0}.{1} s2 on s2.transaction_uid = s1.transaction_uid and s2.adapter_name != s1.adapter_name where s2.hash = s1.hash and s1.adapter_name = 'postresql_adapter' ) insert into {2}.transaction_log select t.transaction_uid, t.account_uid, t.transaction_date, t.type_deal, t.transaction_amount from {3}.transaction_log t join reconcil_data r on t.transaction_uid = r.transaction_uid where not exists ( select 1 from {2}.transaction_log tl where tl.transaction_uid = t.transaction_uid ) """).format(sql.Identifier(self.schema_target), sql.Identifier(self.storage_table), sql.Identifier(self.schema_db_clean), sql.Identifier(self.schema_raw)) 

يمكن حذف الجدول المؤقت الذي استخدمناه كتخزين وسيط للبيانات من المحولات.


استنتاج


في سياق العمل المنجز ، تم تطوير نظام للتوفيق بين البيانات من مصادر مختلفة: ملف نصي وجدول في قاعدة البيانات. استخدام الحد الأدنى من الأدوات الإضافية.

ربما يلاحظ القارئ المتطور أن استخدام الأطر مثل Apache Spark ، إلى جانب تحويل البيانات الخام إلى تنسيق النيابة العامة ، يمكن أن يسرع بشكل كبير هذه العملية ، وخاصة بالنسبة للكميات الكبيرة. لكن الهدف الرئيسي من هذا العمل هو كتابة نظام في بيثون العارية ودراسة معالجة البيانات المتعددة المعالجة. مع ما ، في رأيي ، تعاملنا معه.

تكمن شفرة المصدر للمشروع بأكمله في مستودع بياناتي على GitHub ، أقترح عليك أن تتعرف عليه.

سأكون سعيدًا بالإجابة على جميع الأسئلة والتعرف على تعليقاتك.

اتمنى لك التوفيق

Source: https://habr.com/ru/post/ar480076/


All Articles