我们将精力分配给了几位分析师:API Livy,用于实现典型银行业务的自动化

哈Ha!

银行使用来自各种来源(信贷局,移动运营商等)的数据来评估客户的偿付能力已经不是什么秘密了。 外部合作伙伴的数量可以达到几十个,而我们团队中的分析师只会招募少数人。 问题是优化一个小团队的工作并将例行任务转移到计算系统。

我们将分析这些数据如何进入银行,以及一组分析师如何监视此过程。



让我们按顺序开始。

我们基于Hadoop的分布式系统以及与之相关的所有流程,我们简称为SmartData。 SmartData从外部代理接收API数据。 (此外,它的代理人既是银行的外部合作伙伴,又是银行的内部系统)。 当然,为我们的每个客户收集一定的“当前配置文件”将很有用。 来自源的更新数据属于Operprofil。 Operprofile实现了Customer 360的想法,并存储为Hbase表。 与客户进行进一步的工作很方便。

客户360
客户360-一种实现操作存储的方法,该方法具有在组织中与客户及其数据配合使用的所有流程中使用的客户数据的各种属性,可通过客户的密钥进行访问。

与代理的合作正在进行中,需要加以控制。 为了快速检查交互的质量和命中率,以及将这些信息转移到其他团队的难易程度,我们使用可视化工具,例如Tableau中的报告。

源数据被发送到Kafka ,进行预处理并放入基于HDFS构建的DataLake中。 有必要提出一种解决方案,如何组织来自HDFS的日志文件的解析,日志文件的处理以及每天上传到分析和可视化系统。 并将这与分析师对Python笔记本电脑的热爱相结合。

完成内部厨房并继续练习。

我们的解决方案是使用Livy API。 Livy允许您直接从Jupyter向集群提交代码。 将包含用Python(或Scala)编写的代码和元数据的HTTP请求发送到Livy。 Livy在群集上启动Spark会话,该会话由Yarn资源管理器管理。 请求模块适用于发送HTTP请求。 那些喜欢解析网站的人可能已经认识他(如果不是,这是一个学习他的机会)。

我们导入必要的模块并创建一个会话。 (我们还将立即找到本次会议的地址,以后将派上用场)。 在参数中,我们传递用于用户授权的数据以及集群将执行的脚本语言的名称。

import json, requests, schedule, time host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) 

我们正在等待会话状态变为空闲状态。 如果超时超过设置的超时-发送错误消息。

 timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) send_message("Scheduler_error", req_st) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) 

现在,您可以将代码发送给Livy。

 statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data), headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) 

在循环中,我们等待代码执行结束,然后获得处理结果:

 r.get('data').get('text/plain') 

delete方法将删除会话。

 requests.delete(session_url, headers=headers) 

对于日常卸载,您可以使用几个选项,它们已经在集线器上写了有关cron的内容,但是关于用户友好的调度模块的内容。 只需将其添加到代码中,就无需解释。 而且,为方便起见,我将所有计算结果集中在一个地方。

代号
 import json, requests, schedule, time schedule.every().day.at("16:05").do(job, 300) while True: schedule.run_pending() def job(wait_time): host = 'http://***:8998' data = {'kind': 'spark', 'proxyUser': 'user'} headers = {'Content-Type': 'application/json'} r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers) session_id = r.json().get('id') print("session_id: " + str(session_id)) session_url = host + r.headers['location'] r = requests.get(session_url, headers=headers) timeout = time.time() + wait_time sess_state = ['starting', 'success', 'idle'] while(True): time.sleep(7) req_st = requests.get(session_url, headers=headers).json().get('state') if req_st != 'idle' and time.time() > timeout: requests.delete(session_url, headers=headers) break if req_st == 'idle': break if req_st not in sess_state: send_message("Scheduler_error", req_st) break print("Session_state: ", req_st) statements_url = session_url + '/statements' data = {'code': '1 + 1'} r = requests.post(statements_url, data=json.dumps(data),headers=headers) statement_url = host + r.headers['location'] r = requests.get(statement_url, headers=headers) while (requests.get(statement_url, headers=headers).json()['progress'] != 1): time.sleep(15) r = requests.get(statement_url, headers=headers).json()['output'] session_url = 'http://***:8998/sessions/' + str(session_id) print(r.get('data').get('text/plain')) #requests.delete(session_url, headers=headers) def send_message(subject, text): import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText me = "my_email_adress" you = "email_adress" msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = me msg['To'] = you text = text part1 = MIMEText(text, 'plain') msg.attach(part1) s = smtplib.SMTP('domain.org') s.ehlo() s.starttls() s.login("user", "password") s.sendmail(me, you, msg.as_string()) s.quit() 


结论:


也许这种解决方案并不声称是最好的,但是对分析人员团队是透明的。 我在其中看到的优点:

  • 使用熟悉的Jupyter进行自动化的能力
  • 视觉互动
  • 团队成员有权选择如何处理文件(spark-zoo),因此,无需重写现有脚本

当然,当启动大量任务时,您将必须监视释放的资源,配置卸载之间的通信。 这些问题将单独解决并与同事达成共识。

如果至少有一个团队注意到这一决定,那将是很好的。

参考文献


Livy文档

Source: https://habr.com/ru/post/zh-CN457096/


All Articles