Liberamos nossas mãos para vários analistas: API Livy para automação de tarefas bancárias típicas

Olá Habr!

Não é segredo que os bancos usem dados de várias fontes (agências de crédito, operadoras de celular, etc.) para avaliar a solvência dos clientes. O número de parceiros externos pode chegar a várias dezenas, e os analistas de nossa equipe recrutam apenas algumas pessoas. Surge o problema de otimizar o trabalho de uma equipe pequena e transferir tarefas de rotina para os sistemas de computação.

Analisaremos como esses dados vão para o banco e como uma equipe de analistas monitora esse processo.



Vamos começar em ordem.

Nosso sistema distribuído baseado no Hadoop, e todos os processos associados a ele, chamamos brevemente de SmartData. O SmartData recebe dados da API de agentes externos. (Além disso, os agentes são parceiros externos e sistemas internos do banco). Obviamente, seria útil coletar um determinado "perfil atual" para cada cliente, o que fazemos. Dados atualizados de fontes se enquadram no Operprofil. O Operprofile implementa a ideia do Customer 360 e é armazenado como tabelas Hbase. É conveniente para trabalhos futuros com o cliente.

Customer 360
Cliente 360 ​​- uma abordagem para implementar o armazenamento operacional com todos os tipos de atributos de dados do cliente usados ​​em todos os processos da organização que trabalham com o cliente e seus dados, acessíveis pela chave do cliente.

O trabalho com agentes está em andamento e precisa ser controlado. Para verificar rapidamente a qualidade da interação e a taxa de acertos, bem como a facilidade de transferir essas informações para outras equipes, usamos a visualização, por exemplo, relatórios no Tableau.

Os dados de origem são enviados para Kafka , pré-processados ​​e colocados em um DataLake construído com base no HDFS . Levei para encontrar uma solução para organizar a análise de arquivos de log do HDFS, seu processamento e upload diário para sistemas analíticos e de visualização. E também combine isso com o amor dos analistas pelos laptops Python.

Termine com a cozinha interna e prossiga para a prática.

Nossa solução foi usar a API Livy. O Livy permite enviar código para um cluster diretamente do Jupyter. Uma solicitação HTTP contendo código escrito em Python (ou Scala) e metadados é enviada para Livy. Livy inicia o lançamento da sessão Spark no cluster, que é gerenciado pelo gerenciador de recursos Yarn. O módulo de solicitações é adequado para o envio de solicitações HTTP. Quem gosta de analisar sites provavelmente já o conhece (e, se não, aqui está a chance de aprender um pouco sobre ele).

Importamos os módulos necessários e criamos uma sessão. (Também descobrimos imediatamente o endereço da nossa sessão, no futuro será útil). Nos parâmetros, passamos os dados para autorização do usuário e o nome da linguagem de script que o cluster executará.

import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) 

Estamos aguardando o status da sessão ficar ocioso. Se o tempo limite exceder o tempo limite definido - envie uma mensagem de erro.

 timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) 

Agora você pode enviar o código para Livy.

 statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) 

No loop, aguardamos o final da execução do código, obtemos o resultado do processamento:

 r.get('data').get('text/plain') 

O método delete excluirá a sessão.

 requests.delete(session_url, headers=headers) 

Para o descarregamento diário, você pode usar várias opções, elas já escreveram sobre o cron no hub, mas sobre o módulo de agendamento amigável - não. Basta adicioná-lo ao código, não exigirá explicação. E, por conveniência, vou coletar todos os cálculos em um só lugar.

Código
 import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain')) #requests.delete(session_url, headers=headers) def send_message(subject, text): import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText me = "my_email_adress" you = "email_adress" msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = me msg['To'] = you text = text part1 = MIMEText(text, 'plain') msg.attach(part1) s = smtplib.SMTP('domain.org') s.ehlo() s.starttls() s.login("user", "password") s.sendmail(me, you, msg.as_string()) s.quit() 


Conclusão:


Talvez essa solução não afirme ser a melhor, mas seja transparente para a equipe de analistas. Prós que vejo nele:

  • a capacidade de usar o Jupyter familiar para automação
  • interação visual
  • um membro da equipe tem o direito de escolher como ele trabalhará com arquivos (spark-zoo), como resultado, não há necessidade de reescrever scripts existentes

Obviamente, ao iniciar um grande número de tarefas, você precisará monitorar os recursos liberados, configurar a comunicação entre descarregamentos. Esses problemas são resolvidos individualmente e acordados com os colegas.

Será ótimo se pelo menos uma equipe tomar nota dessa decisão.

Referências


Documentação Livy

Source: https://habr.com/ru/post/pt457096/


All Articles