Bonjour, Habr!
Ce n'est un secret pour personne que les banques utilisent des données provenant de diverses sources (bureaux de crédit, opérateurs mobiles, etc.) pour évaluer la solvabilité des clients. Le nombre de partenaires externes peut atteindre plusieurs dizaines, et les analystes de notre équipe ne recruteront que quelques personnes. Le problème se pose d'optimiser le travail d'une petite équipe et de transférer des tâches de routine vers des systèmes informatiques.
Nous analyserons comment ces données sont transmises à la banque et comment une équipe d'analystes surveille ce processus.

Commençons dans l'ordre.
Notre système distribué basé sur Hadoop, et tous les processus qui lui sont associés, nous appelons brièvement SmartData. SmartData reçoit les données API des agents externes. (De plus, ses agents sont à la fois des partenaires externes et des systèmes internes de la banque). Bien sûr, il serait utile de collecter un certain «profil actuel» pour chaque client, ce que nous faisons. Les données mises à jour provenant des sources tombent dans Operprofil. L'Operprofile met en œuvre l'idée du client 360 et est stocké sous forme de tables Hbase. Il est pratique pour un travail ultérieur avec le client.
Client 360Client 360 - une approche de mise en œuvre du stockage opérationnel avec toutes sortes d'attributs de données client utilisés dans tous les processus de l'organisation qui travaillent avec le client et ses données, accessibles par la clé du client.
Le travail avec les agents est en cours et doit être contrôlé. Pour vérifier rapidement la qualité de l'interaction et du taux de succès, ainsi que la facilité de transfert de ces informations à d'autres équipes, nous utilisons la visualisation, par exemple, les rapports dans Tableau.
Les données source sont envoyées à
Kafka , prétraitées et placées dans un DataLake construit sur la base de
HDFS . Il était nécessaire de trouver une solution pour organiser l'analyse des fichiers journaux de HDFS, leur traitement et leur téléchargement quotidien vers les systèmes d'analyse et de visualisation. Et combinez cela avec l'amour des analystes pour les ordinateurs portables Python.
Terminez avec la cuisine intérieure et passez à la pratique.
Notre solution a été d'utiliser l'API Livy. Livy vous permet de soumettre du code à un cluster directement depuis Jupyter. Une requête HTTP contenant du code écrit en Python (ou Scala) et des métadonnées est envoyée à Livy. Livy lance le lancement de la session Spark sur le cluster, qui est géré par le gestionnaire de ressources Yarn. Le module de requêtes est adapté à l'envoi de requêtes HTTP. Ceux qui aiment analyser des sites le connaîtront probablement déjà (et sinon, voici une chance d'en apprendre un peu plus sur lui).
Nous importons les modules nécessaires et créons une session. (Nous découvrons également immédiatement l'adresse de notre session, elle nous sera utile à l'avenir). Dans les paramètres, nous transmettons les données pour l'autorisation de l'utilisateur et le nom du langage de script que le cluster exécutera.
import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers)
Nous attendons que l'état de la session passe au repos. Si le délai dépasse le délai défini - envoyez un message d'erreur.
timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st)
Vous pouvez maintenant envoyer le code à Livy.
statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id)
Dans la boucle, on attend la fin de l'exécution du code, on obtient le résultat du traitement:
r.get('data').get('text/plain')
La méthode de suppression supprimera la session.
requests.delete(session_url, headers=headers)
Pour le déchargement quotidien, vous pouvez utiliser plusieurs options, ils ont déjà écrit sur cron sur le hub, mais sur le module de planification convivial - non. Il suffit de l'ajouter au code, il n'aura pas besoin d'explication. Et, pour plus de commodité, je vais rassembler tous les calculs en un seul endroit.
Code import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain'))
Conclusion:
Cette solution ne prétend peut-être pas être la meilleure, mais elle est transparente pour l'équipe d'analystes. Avantages que j'y vois:
- la possibilité d'utiliser Jupyter familier pour l'automatisation
- interaction visuelle
- un membre de l'équipe a le droit de choisir comment il va travailler avec les fichiers (spark-zoo), par conséquent, il n'est pas nécessaire de réécrire les scripts existants
Bien sûr, lors du démarrage d'un grand nombre de tâches, vous devrez surveiller les ressources libérées, configurer la communication entre les déchargements. Ces problèmes sont résolus sur une base individuelle et convenus avec des collègues.
Ce sera formidable si au moins une équipe prend cette décision.
Les références
Documentation de Livy