Hola Habr!
No es ningún secreto que los bancos utilizan datos de varias fuentes (agencias de crédito, operadores móviles, etc.) para evaluar la solvencia de los clientes. El número de socios externos puede llegar a varias docenas, y los analistas de nuestro equipo reclutarán solo unas pocas personas. El problema surge de optimizar el trabajo de un equipo pequeño y transferir tareas rutinarias a los sistemas informáticos.
Analizaremos cómo van estos datos al banco y cómo un equipo de analistas monitorea este proceso.

Comencemos en orden.
Nuestro sistema distribuido basado en Hadoop, y todos los procesos asociados con él, lo llamamos brevemente SmartData. SmartData recibe datos API de agentes externos. (Además, sus agentes son socios externos y sistemas internos del banco). Por supuesto, sería útil recopilar un cierto "perfil actual" para cada cliente, lo cual hacemos. Los datos actualizados de las fuentes caen en Operprofil. El Operprofile implementa la idea de Customer 360 y se almacena como tablas Hbase. Es conveniente para seguir trabajando con el cliente.
Cliente 360Cliente 360: un enfoque para implementar el almacenamiento operativo con todo tipo de atributos de los datos del cliente utilizados en todos los procesos de la organización que trabajan con el cliente y sus datos, accesibles mediante la clave del cliente.
El trabajo con agentes es continuo y necesita ser controlado. Para verificar rápidamente la calidad de la interacción y la tasa de aciertos, así como la facilidad de transferir esta información a otros equipos, utilizamos la visualización, por ejemplo, informes en Tableau.
Los datos de origen se envían a
Kafka , se procesan previamente y se colocan en un DataLake construido sobre la base de
HDFS . Me llevó a encontrar una solución sobre cómo organizar el análisis de archivos de registro desde HDFS, su procesamiento y carga diaria en sistemas analíticos y de visualización. Y también combine esto con el amor de los analistas por las computadoras portátiles Python.
Termina con la cocina interna y pasa a practicar.
Nuestra solución fue utilizar la API Livy. Livy te permite enviar código a un clúster directamente desde Jupyter. Una solicitud HTTP que contiene código escrito en Python (o Scala) y metadatos se envía a Livy. Livy inicia el lanzamiento de la sesión de Spark en el clúster, que es administrado por el administrador de recursos de Yarn. El módulo de solicitudes es adecuado para enviar solicitudes HTTP. Aquellos a quienes les gusta analizar sitios probablemente ya lo conocerán (y si no, aquí hay una oportunidad de aprender un poco sobre él).
Importamos los módulos necesarios y creamos una sesión. (También descubrimos de inmediato la dirección de nuestra sesión, en el futuro será útil). En los parámetros pasamos los datos para la autorización del usuario y el nombre del lenguaje de script que ejecutará el clúster.
import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers)
Estamos esperando que el estado de la sesión vaya a inactivo. Si el tiempo de espera excede el tiempo de espera establecido, envíe un mensaje de error.
timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st)
Ahora puedes enviar el código a Livy.
statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id)
En el bucle, esperamos el final de la ejecución del código, obtenemos el resultado del procesamiento:
r.get('data').get('text/plain')
El método delete eliminará la sesión.
requests.delete(session_url, headers=headers)
Para la descarga diaria, puede usar varias opciones, ya escribieron sobre cron en el concentrador, pero sobre el módulo de programación fácil de usar: no. Simplemente agréguelo al código, no requerirá explicación. Y, por conveniencia, recolectaré todos los cálculos en un solo lugar.
Código import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain'))
Conclusión
Quizás esta solución no pretende ser la mejor, pero es transparente para el equipo de analistas. Pros que veo en él:
- la capacidad de usar Jupyter familiar para la automatización
- interacción visual
- el miembro del equipo tiene derecho a elegir cómo trabajará con los archivos (spark-zoo), como resultado, no es necesario reescribir los scripts existentes
Por supuesto, cuando comience una gran cantidad de tareas, tendrá que monitorear los recursos liberados, configurar la comunicación entre descargas. Estos problemas se resuelven individualmente y se acuerdan con sus colegas.
Será genial si al menos un equipo toma nota de esta decisión.
Referencias
Documentación Livy