Wir geben mehreren Analysten die Hand: API Livy zur Automatisierung typischer Bankaufgaben

Hallo Habr!

Es ist kein Geheimnis, dass Banken Daten aus verschiedenen Quellen (Kreditauskunfteien, Mobilfunkbetreiber usw.) verwenden, um die Zahlungsfähigkeit von Kunden zu bewerten. Die Anzahl der externen Partner kann mehrere Dutzend erreichen, und die Analysten in unserem Team werden nur wenige Mitarbeiter einstellen. Das Problem besteht darin, die Arbeit eines kleinen Teams zu optimieren und Routineaufgaben auf Computersysteme zu übertragen.

Wir werden analysieren, wie diese Daten an die Bank gehen und wie ein Analystenteam diesen Prozess überwacht.



Beginnen wir in der richtigen Reihenfolge.

Unser auf Hadoop basierendes verteiltes System und alle damit verbundenen Prozesse nennen wir kurz SmartData. SmartData empfängt API-Daten von externen Agenten. (Darüber hinaus sind die Vertreter sowohl externe Partner als auch interne Systeme der Bank). Natürlich wäre es nützlich, für jeden Kunden ein bestimmtes „aktuelles Profil“ zu sammeln, was wir auch tun. Aktualisierte Daten aus Quellen fallen in Operprofil. Das Operprofile implementiert die Idee von Customer 360 und wird als Hbase-Tabellen gespeichert. Es ist praktisch für die weitere Arbeit mit dem Kunden.

Kunde 360
Customer 360 - ein Ansatz zur Implementierung von Betriebsspeicher mit allen Arten von Attributen von Kundendaten, die in allen Prozessen in der Organisation verwendet werden, die mit dem Kunden und seinen Daten arbeiten, auf die über den Schlüssel des Kunden zugegriffen werden kann.

Die Arbeit mit Agenten ist noch nicht abgeschlossen und muss kontrolliert werden. Um die Qualität der Interaktion und die Trefferquote sowie die einfache Übertragung dieser Informationen an andere Teams schnell zu überprüfen, verwenden wir die Visualisierung, z. B. Berichte in Tableau.

Die Quelldaten werden an Kafka gesendet, vorverarbeitet und in einem auf HDFS basierenden DataLake abgelegt. Es war notwendig, eine Lösung zu finden, wie das Parsen von Protokolldateien aus HDFS, deren Verarbeitung und das tägliche Hochladen auf Analyse- und Visualisierungssysteme organisiert werden können. Und kombinieren Sie dies auch mit der Liebe von Analysten zu Python-Laptops.

Beenden Sie mit der internen Küche und üben Sie weiter.

Unsere Lösung bestand darin, die Livy-API zu verwenden. Mit Livy können Sie Code direkt von Jupyter an einen Cluster senden. Eine HTTP-Anfrage mit in Python (oder Scala) geschriebenem Code und Metadaten wird an Livy gesendet. Livy initiiert den Start der Spark-Sitzung im Cluster, die vom Yarn-Ressourcenmanager verwaltet wird. Das Anforderungsmodul eignet sich zum Senden von HTTP-Anforderungen. Diejenigen, die Websites analysieren möchten, kennen ihn wahrscheinlich bereits (und wenn nicht, haben Sie hier die Möglichkeit, etwas über ihn zu erfahren).

Wir importieren die notwendigen Module und erstellen eine Sitzung. (Wir finden auch sofort die Adresse unserer Sitzung heraus, in Zukunft wird es nützlich sein). In den Parametern übergeben wir die Daten für die Benutzerautorisierung und den Namen der Skriptsprache, die der Cluster ausführen wird.

import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) 

Wir warten darauf, dass der Sitzungsstatus in den Leerlauf wechselt. Wenn das Zeitlimit das eingestellte Zeitlimit überschreitet, senden Sie eine Fehlermeldung.

 timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) 

Jetzt können Sie den Code an Livy senden.

 statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) 

In der Schleife warten wir auf das Ende der Codeausführung und erhalten das Verarbeitungsergebnis:

 r.get('data').get('text/plain') 

Die Löschmethode löscht die Sitzung.

 requests.delete(session_url, headers=headers) 

Für das tägliche Entladen können Sie verschiedene Optionen verwenden, die bereits über cron auf dem Hub geschrieben wurden, aber über das benutzerfreundliche Zeitplanmodul - nein. Fügen Sie es einfach dem Code hinzu, es bedarf keiner Erklärung. Und der Einfachheit halber werde ich alle Berechnungen an einem Ort sammeln.

Code
 import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain')) #requests.delete(session_url, headers=headers) def send_message(subject, text): import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText me = "my_email_adress" you = "email_adress" msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = me msg['To'] = you text = text part1 = MIMEText(text, 'plain') msg.attach(part1) s = smtplib.SMTP('domain.org') s.ehlo() s.starttls() s.login("user", "password") s.sendmail(me, you, msg.as_string()) s.quit() 


Fazit:


Vielleicht behauptet diese Lösung nicht, die beste zu sein, aber sie ist für das Analystenteam transparent. Vorteile, die ich darin sehe:

  • die Fähigkeit, vertrauten Jupyter für die Automatisierung zu verwenden
  • visuelle Interaktion
  • Das Teammitglied hat das Recht zu wählen, wie es mit Dateien (Spark-Zoo) arbeiten soll. Daher müssen vorhandene Skripte nicht neu geschrieben werden

Wenn Sie eine große Anzahl von Aufgaben starten, müssen Sie natürlich die freigegebenen Ressourcen überwachen und die Kommunikation zwischen den Entladungen konfigurieren. Diese Probleme werden individuell gelöst und mit Kollegen vereinbart.

Es ist großartig, wenn mindestens ein Team diese Entscheidung notiert.

Referenzen


Livy Dokumentation

Source: https://habr.com/ru/post/de457096/


All Articles