أتمتة عملية مراقبة جودة بيانات التخزين للشركات

في Rostelecom ، كما هو الحال في أي شركة كبيرة ، يوجد مستودع بيانات خاص بالشركة (WCD). WCD لدينا ينمو ويتوسع باستمرار ، ونحن نبني واجهات عرض مفيدة والتقارير ومكعبات البيانات على ذلك. في مرحلة ما ، واجهنا حقيقة أن البيانات ذات الجودة الرديئة تتداخل معنا عند عرض نافذة ، لا تتلاقى الوحدات الناتجة مع وحدات أنظمة المصدر وتتسبب في عدم فهم العمل. على سبيل المثال ، البيانات ذات القيم الخالية في المفاتيح الخارجية (المفاتيح الخارجية) غير متصلة ببيانات من جداول أخرى.
رسم تخطيطي موجز WCD:



لقد فهمنا أنه لضمان الثقة في جودة البيانات ، نحتاج إلى عملية تسوية منتظمة. بطبيعة الحال ، أتمتة والسماح لكل من المستويات التكنولوجية للتأكد من جودة البيانات وتقاربها ، عموديا وأفقيا. نتيجة لذلك ، قمنا في وقت واحد بمراجعة ثلاثة منصات جاهزة لإدارة عمليات التسوية من مختلف البائعين وكتبنا أنظمةنا الخاصة. ونحن نشارك تجربتنا في هذا المنصب.

يعرف الجميع السلبيات الخاصة بالأنظمة الأساسية: السعر ، والمرونة المحدودة ، والقدرة على إضافة وإصلاح الوظائف. الايجابيات - أجزاء MDM (البيانات الذهبية ، وغيرها) ، والتدريب والدعم مغلقة أيضا. بعد تقدير هذا الأمر ، نسينا عملية الشراء وركزنا على تطوير حلولنا.

تتم كتابة جوهر نظامنا في Python ، ويتم كتابة قاعدة بيانات البيانات الأولية لتخزين وتسجيل وتخزين النتائج في Oracle. هناك العديد من المكتبات لبيثون ، نستخدم الحد الأدنى الضروري لاتصالات Hive (pyhive) و GreenPlum (pgdb) و Oracle (cx_Oracle). لا ينبغي أن يكون توصيل نوع آخر من المصدر مشكلة.

مجموعة البيانات الناتجة (مجموعة النتائج) التي نضعها في جدول Oracle الناتج ، أثناء تقييم حالة التوفيق (SUCCESS / ERROR). تم تكوين APEX على الجداول الناتجة التي تم فيها تكوين تصور النتائج ، وهو مناسب لكل من الصيانة والإدارة.

لتشغيل عمليات التحقق في المستودع ، يتم استخدام أوركسترا إنفورماتيكا التي تقوم بتنزيل البيانات. عند استلام حالة نجاح التنزيل ، ستبدأ التحقق من هذه البيانات تلقائيًا. يسمح استخدام معلمات الاستعلام وبيانات WCD الأولية باستخدام قوالب تسوية الاستعلام لمجموعات الجداول.

الآن عن التسويات المنفذة على هذه المنصة.

لقد بدأنا بالتسويات الفنية ، والتي تقارن كمية البيانات في مدخلات وطبقات WCD مع تطبيق بعض المرشحات. نأخذ ملف ctl الذي جاء إلى مدخلات WCD ، وقراءة عدد السجلات منه ومقارنته مع الجدول على Stage ODL و / أو Stage ODS (1 ، 2 ، 3 في الرسم التخطيطي). يتم تعريف معيار التحقق في المساواة بين عدد السجلات (العد). إذا تقاربت الكمية ، فإن النتيجة هي النجاح ، لا - الخطأ والتحليل اليدوي للخطأ.

تمتد سلسلة التسويات التقنية هذه ، مقارنة بعدد السجلات ، إلى طبقة ADS (8 في الرسم التخطيطي). يتم تغيير المرشحات بين الطبقات ، والتي تعتمد على نوع التحميل - DIM (كتاب مرجعي) ، HDIM (كتاب مرجعي تاريخي) ، FACT (جداول التراكم الفعلية) ، إلخ - وكذلك على إصدار SCD والطبقة. كلما اقتربنا من طبقة العرض ، خوارزميات التصفية الأكثر تطورا التي نستخدمها.

تم إجراء فحص تقني أيضًا على المدخلات في Python ، التي تكتشف التكرارات في الحقول الرئيسية. في GreenPlum لدينا ، الحقول الرئيسية (PK) غير محمية ضد التكرارات بواسطة أدوات نظام قاعدة البيانات. لذلك قمنا بكتابة برنامج Python الذي يقرأ حقول الجدول الذي تم تحميله من البيانات الأولية لـ PK ويقوم بإنشاء برنامج نصي SQL يتحقق من عدم وجود يأخذ عليها. تسمح لنا مرونة النهج باستخدام PK المكون من حقل واحد أو عدة حقول ، وهو أمر مريح للغاية. تمتد هذه التسوية إلى طبقة STG ADS.

unique_check  import sys import os from datetime import datetime log_tmstmp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") def do_check(args, context): tab = args[0] data = [] fld_str = "" try: sql = """SELECT 't_'||lower(table_id) as tn, lower(column_name) as cn FROM src_column@meta_data WHERE  table_id = '%s' and is_primary_key = 'Y'""" % (tab,) for fld in context['ora_get_data'](context['ora_con'], sql): fld_str = fld_str + (fld_str and ",") + fld[1] if fld_str: config = context['script_config'] con_gp = context['pg_open_con'](config['user'], config['pwd'], config['host'], config['port'], config['dbname']) sql = """select %s as pkg_id, 't_%s' as table_name, 'PK fields' as column_name, coalesce(sum(cnt), 0) as NOT_UNIQUE_PK, to_char(current_timestamp, 'YYYY-MM-DD HH24:MI:SS') as sys_creation from (select 1 as cnt from edw_%s.t_%s where %s group by %s  having count(*) > 1 ) sq; """ % (context["package"] or '0',tab.lower(), args[1], tab.lower(), (context["package"] and ("package_id = " + context["package"]) or "1=1"), fld_str ) data.extend(context['pg_get_data'](con_gp, sql)) con_gp.close() except Exception as e: raise return data or [[(context["package"] or 0),'t_'+tab.lower(), None, 0, log_tmstmp]] if __name__ == '__main__': sys.exit(do_check([sys.argv[1], sys.argv[2]], {})) 

التفرد المصالحة بيثون رمز المثال. يتم تنفيذ المكالمة ونقل معلمات الاتصال ووضع النتائج في الجدول الناتج بواسطة وحدة التحكم في بيثون.

يتم بناء المصالحة لعدم وجود قيم فارغة (NULL) بشكل مشابه لتلك السابقة وكذلك في بيثون. نقرأ من حقول بيانات تعريف التحميل التي لا يمكن أن تحتوي على قيم فارغة (فارغة) والتحقق من ملئها. يتم استخدام المصالحة قبل طبقة DDS (6 في الرسم التخطيطي الأول).

عند مدخل التخزين ، يتم أيضًا تحليل اتجاه حزم البيانات التي تصل إلى المدخلات. يتم إدخال مقدار البيانات المستلمة عند وصول حزمة جديدة في جدول المحفوظات. مع حدوث تغيير كبير في كمية البيانات ، يتلقى الشخص المسؤول عن الجدول و SI (نظام المصدر) إشعارًا عبر البريد الإلكتروني (في الخطط) ، ويرى خطأ في APEX قبل أن تدخل حزمة البيانات إلى المستودع ، وتكتشف سبب ذلك مع SI.

بين STG (STAGE) _ODS و ODS (طبقة البيانات التشغيلية) (3 و 4 في الرسم التخطيطي) ، تظهر حقول الحذف الفني (مؤشر الحذف = delete_ind) ، وصحتها التي نتحقق منها أيضًا عن طريق استعلامات SQL. يجب وضع علامة على المدخلات المفقودة المحذوفة في نظام الوثائق الرسمية.

من المتوقع أن تظهر نتيجة البرنامج النصي للتسوية أخطاء صفرية. في مثال التسوية المقدمة ، يتم تمرير المعلمات ~ # PKG_ID # ~ عبر كتلة التحكم Python ، ويتم تعبئة المعلمات من النوع ~ P_JOIN_CONDITION ~ و ~ PERIOD_COL ~ من بيانات تعريف الجدول ، اسم الجدول نفسه ~ TABLE ~ من معلمات الإطلاق.

ما يلي هو تسوية المعلمات. مثال كود توفيق SQL بين STG_ODS و ODS للنوع HDIM:

 select package_id as pkg_id, 'T_~TABLE~' as table_name, to_char(current_timestamp, 'YYYY-MM-DD HH24:MI:SS'), coalesce(empty_in_ods, 0) as empty_in_ods, coalesce(not_equal_md5, 0) as not_equal_md5, coalesce(deleted_in_ods, 0) as deleted_in_ods, coalesce(not_deleted_in_ods, 0) as not_deleted_in_ods, max_load_dttm from (select    max (src.package_id) as package_id,    sum (case when tgt.md5 is null then 1 else 0 end) as empty_in_ods,    sum (case when src.md5<>tgt.md5 and tgt.~PK~ is not null and tgt.deleted_ind = 0 then 1 else 0 end) as not_equal_md5,    sum (case when tgt.deleted_ind = 1 and src.md5=tgt.md5 then 1 else 0 end) as deleted_in_ods from EDW_STG_ODS.T_~TABLE~  src left join EDW_ODS.T_~TABLE~  tgt       on ~P_JOIN_CONDITION~ and tgt.active_ind ='Y' where ~#PKG_ID#~ = 0   or src.package_id = ~#PKG_ID#~ ) aa, (select sum (case when src.~PK~ is null then 1 else 0 end) as not_deleted_in_ods, max (tgt.load_dttm) as max_load_dttm from EDW_STG_ODS.T_~TABLE~  src right join EDW_ODS.T_~TABLE~  tgt        on ~P_JOIN_CONDITION~ where tgt.deleted_ind = 0 and tgt.active_ind ='Y'  and tgt.~PERIOD_COL~ between (select min(~PERIOD_COL~) from EDW_STG_ODS.T_~TABLE~ where ~#PKG_ID#~ = 0 or package_id = ~#PKG_ID#~)                           and (select max(~PERIOD_COL~) from EDW_STG_ODS.T_~TABLE~ where ~#PKG_ID#~ = 0 or package_id = ~#PKG_ID#~) ) bb where 1=1 

مثال على كود تسوية SQL بين STG_ODS و ODS لنوع HDIM مع استبدال المعلمات:


 --------------HDIM_CHECKS--------------- select package_id as pkg_id, 'TABLE_NAME' as table_name, to_char(current_timestamp, 'YYYY-MM-DD HH24:MI:SS'), coalesce(empty_in_ods, 0) as empty_in_ods, coalesce(not_equal_md5, 0) as not_equal_md5, coalesce(deleted_in_ods, 0) as deleted_in_ods, coalesce(not_deleted_in_ods, 0) as not_deleted_in_ods, max_load_dttm from (select    max (src.package_id) as package_id,    sum (case when tgt.md5 is null then 1 else 0 end) as empty_in_ods,    sum (case when src.md5<>tgt.md5 and tgt.ACTION_ID is not null and tgt.deleted_ind = 0 then 1 else 0 end) as not_equal_md5,    sum (case when tgt.deleted_ind = 1 and src.md5=tgt.md5 then 1 else 0 end) as deleted_in_ods from EDW_STG_ODS.TABLE_NAME  src left join EDW_ODS.TABLE_NAME  tgt       on SRC.PK_ID=TGT.PK_ID and tgt.active_ind ='Y' where 709083887 = 0   or src.package_id = 709083887 ) aa, (select sum (case when src.PK_ID is null then 1 else 0 end) as not_deleted_in_ods, max (tgt.load_dttm) as max_load_dttm from EDW_STG_ODS.TABLE_NAME  src right join EDW_ODS.TABLE_NAME  tgt        on SRC.PK_ID =TGT.PK_ID where tgt.del_ind = 0 and tgt.active_ind ='Y'  and tgt.DATE_SYS between (select min(DATE_SYS) from EDW_STG_ODS.TABLE_NAME where 70908 = 0 or package_id = 70908)                           and (select max(DATE_SYS) from EDW_STG_ODS.TABLE_NAME where 70908 = 0 or package_id = 70908) ) bb where 1=1 

بدءًا من نظام الوثائق الرسمية (ODS) ، يتم الحفاظ على السجل في الأدلة ، وبالتالي ، يجب التحقق من عدم وجود تقاطعات وثغرات. يتم ذلك عن طريق حساب عدد القيم غير الصحيحة في السجل وكتابة عدد الأخطاء الناتج إلى الجدول الناتج. إذا كانت هناك أخطاء محفوظات في الجدول ، فيجب البحث عنها يدويًا. تعتمد المصالحة على نوع التنزيل - HDIM (دليل مرجع السجل) في المقام الأول. نجري تسويات صحة التاريخ للأدلة حتى طبقة ADS.

في طبقة DDS (6 في الرسم التخطيطي الأول) ، يتم دمج SIs مختلفة (أنظمة المصدر) في جدول واحد ؛ تظهر جداول HUB لإنشاء مفاتيح بديلة لربط البيانات من أنظمة مصدر مختلفة. نحن نتفحصها بحثًا عن التفرد من خلال فحص بيثون مماثل لطبقة المسرح.

في طبقة DDS ، تحتاج إلى التحقق من أنه بعد الدمج مع جدول HUB ، لم تظهر قيم الأنواع 0 و -1 و -2 في حقول المفاتيح ، مما يعني ربط جدول غير صحيح ونقص البيانات. يمكن أن تظهر في حالة عدم وجود البيانات اللازمة في جدول HUB. وهذا خطأ للتحليل اليدوي.

التسويات الأكثر تعقيدًا لبيانات طبقة نافذة ADS (8 في المخطط الأول). من أجل الثقة التامة في تقارب النتيجة التي تم الحصول عليها ، يتم نشر التحقق باستخدام نظام مصدر لتجميع مبلغ الرسوم هنا. من ناحية ، هناك فئة من المؤشرات التي تشمل المستحقات الإجمالية. نقوم بجمعها لمدة شهر من نوافذ WCD. من ناحية أخرى ، فإننا نأخذ مجاميع نفس الرسوم من نظام المصدر. هناك تباين لا يزيد عن 1٪ أو قيمة مطلقة معينة ومتفق عليها. يتم وضع مجموعات النتائج التي تم الحصول عليها عن طريق التسوية في مجموعات بيانات تم إنشاؤها خصيصًا وتتلقى جداول Oracle. تتم مقارنة البيانات في طريقة عرض Oracle. تصور النتائج في APEX. إن وجود مجموعة كاملة من البيانات (مجموعة النتائج) يتيح لنا ، إذا كانت هناك أخطاء ، أن نعمق أعمق ونحلل البيانات التفصيلية للنتيجة ، ونجد مقالًا محددًا حدث فيه التناقض ، وابحث عن أسبابه.


عرض نتائج التسوية للمستخدمين في APEX

في الوقت الحالي ، حصلنا على تطبيق عملي ومستخدم بنشاط للتوفيق بين البيانات. بالطبع ، لدينا خطط لمواصلة تطوير كل من كمية ونوعية التسويات ، وتطوير النظام الأساسي نفسه. التنمية الخاصة بنا تسمح لنا بتغيير وتعديل الوظيفة بسرعة كافية.

أعد هذا المقال فريق Rostelecom لإدارة البيانات

Source: https://habr.com/ru/post/ar434542/


All Articles