Olá Habr!
Não é segredo que os bancos usem dados de várias fontes (agências de crédito, operadoras de celular, etc.) para avaliar a solvência dos clientes. O número de parceiros externos pode chegar a várias dezenas, e os analistas de nossa equipe recrutam apenas algumas pessoas. Surge o problema de otimizar o trabalho de uma equipe pequena e transferir tarefas de rotina para os sistemas de computação.
Analisaremos como esses dados vão para o banco e como uma equipe de analistas monitora esse processo.

Vamos começar em ordem.
Nosso sistema distribuído baseado no Hadoop, e todos os processos associados a ele, chamamos brevemente de SmartData. O SmartData recebe dados da API de agentes externos. (Além disso, os agentes são parceiros externos e sistemas internos do banco). Obviamente, seria útil coletar um determinado "perfil atual" para cada cliente, o que fazemos. Dados atualizados de fontes se enquadram no Operprofil. O Operprofile implementa a ideia do Customer 360 e é armazenado como tabelas Hbase. É conveniente para trabalhos futuros com o cliente.
Customer 360Cliente 360 - uma abordagem para implementar o armazenamento operacional com todos os tipos de atributos de dados do cliente usados em todos os processos da organização que trabalham com o cliente e seus dados, acessíveis pela chave do cliente.
O trabalho com agentes está em andamento e precisa ser controlado. Para verificar rapidamente a qualidade da interação e a taxa de acertos, bem como a facilidade de transferir essas informações para outras equipes, usamos a visualização, por exemplo, relatórios no Tableau.
Os dados de origem são enviados para
Kafka , pré-processados e colocados em um DataLake construído com base no
HDFS . Levei para encontrar uma solução para organizar a análise de arquivos de log do HDFS, seu processamento e upload diário para sistemas analíticos e de visualização. E também combine isso com o amor dos analistas pelos laptops Python.
Termine com a cozinha interna e prossiga para a prática.
Nossa solução foi usar a API Livy. O Livy permite enviar código para um cluster diretamente do Jupyter. Uma solicitação HTTP contendo código escrito em Python (ou Scala) e metadados é enviada para Livy. Livy inicia o lançamento da sessão Spark no cluster, que é gerenciado pelo gerenciador de recursos Yarn. O módulo de solicitações é adequado para o envio de solicitações HTTP. Quem gosta de analisar sites provavelmente já o conhece (e, se não, aqui está a chance de aprender um pouco sobre ele).
Importamos os módulos necessários e criamos uma sessão. (Também descobrimos imediatamente o endereço da nossa sessão, no futuro será útil). Nos parâmetros, passamos os dados para autorização do usuário e o nome da linguagem de script que o cluster executará.
import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers)
Estamos aguardando o status da sessão ficar ocioso. Se o tempo limite exceder o tempo limite definido - envie uma mensagem de erro.
timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st)
Agora você pode enviar o código para Livy.
statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id)
No loop, aguardamos o final da execução do código, obtemos o resultado do processamento:
r.get('data').get('text/plain')
O método delete excluirá a sessão.
requests.delete(session_url, headers=headers)
Para o descarregamento diário, você pode usar várias opções, elas já escreveram sobre o cron no hub, mas sobre o módulo de agendamento amigável - não. Basta adicioná-lo ao código, não exigirá explicação. E, por conveniência, vou coletar todos os cálculos em um só lugar.
Código import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain'))
Conclusão:
Talvez essa solução não afirme ser a melhor, mas seja transparente para a equipe de analistas. Prós que vejo nele:
- a capacidade de usar o Jupyter familiar para automação
- interação visual
- um membro da equipe tem o direito de escolher como ele trabalhará com arquivos (spark-zoo), como resultado, não há necessidade de reescrever scripts existentes
Obviamente, ao iniciar um grande número de tarefas, você precisará monitorar os recursos liberados, configurar a comunicação entre descarregamentos. Esses problemas são resolvidos individualmente e acordados com os colegas.
Será ótimo se pelo menos uma equipe tomar nota dessa decisão.
Referências
Documentação Livy