نحن نحرر أيدينا للعديد من المحللين: API Livy لأتمتة المهام المصرفية المعتادة

مرحبا يا هبر!

ليس سراً أن البنوك تستخدم بيانات من مصادر مختلفة (مكاتب الائتمان ، مشغلي شبكات الهاتف النقال ، إلخ) لتقييم ملاءة العملاء. يمكن أن يصل عدد الشركاء الخارجيين إلى عشرات ، وسوف يقوم المحللون في فريقنا بتوظيف عدد قليل من الأشخاص فقط. تنشأ مشكلة تحسين عمل فريق صغير ونقل المهام الروتينية إلى أنظمة الحوسبة.

سنقوم بتحليل كيفية انتقال هذه البيانات إلى البنك ، وكيف يراقب فريق من المحللين هذه العملية.



لنبدأ بالترتيب.

نظامنا الموزع على أساس Hadoop ، وجميع العمليات المرتبطة به ، ونحن ندعو SmartData لفترة وجيزة. SmartData يتلقى بيانات API من وكلاء الخارجية. (علاوة على ذلك ، فإن وكلاء ذلك هم شركاء خارجيون وأنظمة داخلية للبنك). بالطبع ، سيكون من المفيد جمع "ملف تعريف حالي" معين لكل عميل ، وهو ما نقوم به. البيانات المحدثة من المصادر تندرج في Operprofil. يطبق Operprofile فكرة Customer 360 ويتم تخزينه كجداول Hbase. أنها مريحة لمزيد من العمل مع العميل.

العملاء 360
Customer 360 - طريقة لتنفيذ التخزين التشغيلي مع جميع أنواع سمات بيانات العميل المستخدمة في جميع العمليات في المؤسسة التي تعمل مع العميل وبياناته ، يمكن الوصول إليها بواسطة مفتاح العميل.

العمل مع الوكلاء مستمر ويحتاج إلى السيطرة عليه. للتحقق بسرعة من جودة التفاعل ومعدل الدخول ، فضلاً عن سهولة نقل هذه المعلومات إلى فرق أخرى ، نستخدم التصور ، على سبيل المثال ، التقارير في Tableau.

يتم إرسال البيانات المصدر إلى كافكا ، تتم معالجتها مسبقًا وتوضع في DataLake مبنية على أساس HDFS . كان من الضروري التوصل إلى حل حول كيفية تنظيم تحليل ملفات السجل من HDFS ، ومعالجتها والتحميل اليومي للأنظمة التحليلية والتصور. وتجمع أيضًا مع حب المحللين لأجهزة الكمبيوتر المحمولة Python.

مع الانتهاء من المطبخ الداخلي والانتقال إلى ممارسة.

كان حلنا هو استخدام API Livy. يتيح لك Livy إرسال التعليمات البرمجية إلى كتلة مباشرة من Jupyter. يتم إرسال طلب HTTP يحتوي على رمز مكتوب في Python (أو Scala) وبيانات التعريف إلى Livy. بدأت Livy إطلاق جلسة Spark على الكتلة ، والتي يديرها مدير موارد Yarn. وحدة الطلبات مناسبة لإرسال طلبات HTTP. من المحتمل أن يعرفه الأشخاص الذين يحبون تحليل المواقع (وإذا لم يكن الأمر كذلك ، فهناك فرصة لمعرفة القليل عنه).

نحن نستورد الوحدات اللازمة وننشئ جلسة. (نكتشف أيضًا على الفور عنوان جلستنا ، في المستقبل سيكون مفيدًا). في المعلمات نقوم بتمرير البيانات الخاصة بترخيص المستخدم واسم لغة البرنامج النصي التي سيتم تنفيذها.

import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) 

نحن في انتظار حالة الجلسة للذهاب إلى الخمول. إذا تجاوزت المهلة المهلة المحددة - أرسل رسالة خطأ.

 timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) 

الآن يمكنك إرسال الرمز إلى ليفي.

 statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) 

في الحلقة ، ننتظر نهاية تنفيذ التعليمات البرمجية ، نحصل على نتيجة المعالجة:

 r.get('data').get('text/plain') 

طريقة الحذف سوف تحذف الجلسة.

 requests.delete(session_url, headers=headers) 

للتفريغ اليومي ، يمكنك استخدام العديد من الخيارات ، فقد كتبوا بالفعل عن كرون على المحور ، ولكن عن وحدة الجدول الزمني سهلة الاستخدام - لا. فقط أضفه إلى الكود ، ولن يحتاج إلى شرح. وللراحة ، سأجمع كل الحسابات في مكان واحد.

قانون
 import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain')) #requests.delete(session_url, headers=headers) def send_message(subject, text): import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText me = "my_email_adress" you = "email_adress" msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = me msg['To'] = you text = text part1 = MIMEText(text, 'plain') msg.attach(part1) s = smtplib.SMTP('domain.org') s.ehlo() s.starttls() s.login("user", "password") s.sendmail(me, you, msg.as_string()) s.quit() 


الاستنتاج:


ربما لا يدعي هذا الحل أنه الأفضل ، لكنه شفاف بالنسبة لفريق المحللين. الايجابيات التي أراها في ذلك:

  • القدرة على استخدام Jupyter مألوفة للتشغيل الآلي
  • التفاعل البصري
  • يحق لعضو الفريق اختيار طريقة تعامله مع الملفات (spark-zoo) ، ونتيجة لذلك ، ليست هناك حاجة لإعادة كتابة النصوص الموجودة

بالطبع ، عند بدء عدد كبير من المهام ، سيتعين عليك مراقبة الموارد المحررة ، وتكوين الاتصال بين عمليات التفريغ. يتم حل هذه المشكلات على أساس فردي ويتم الاتفاق عليها مع الزملاء.

سيكون أمرا رائعا إذا لاحظ فريق واحد على الأقل هذا القرار.

مراجع


الوثائق الحية

Source: https://habr.com/ru/post/ar457096/


All Articles