你好 我们训练人们使用大数据。 如果没有自己的集群,所有参与者共同努力,就无法想象关于大数据的教育计划。 因此,我们始终在程序中使用它:)我们从事其调优,调优和管理工作,他们直接在此处启动MapReduce-jobs并使用Spark。
在这篇文章中,我们将描述如何使用Mail.ru Cloud Solutions云编写自动伸缩程序来解决群集负载不均的问题。
问题
我们使用的集群不是很典型。 处置非常不均匀。 例如,当30个人和老师全部进入集群并开始使用它时,会有一些练习。 或者,还是在截止日期之前的几天里,负载急剧增加。 其余时间,集群以欠载模式运行。
解决方案1是保持一个群集,该群集可以承受峰值负载,但在其余时间保持空闲状态。
第二个解决方案是保留一个小的群集,在群集中在类之前和峰值负载期间手动添加节点。
解决方案3是保留一个小的群集,并编写一个自动缩放器,以监视当前的群集负载,并使用各种API从群集中添加和删除节点。
在这篇文章中,我们将讨论第3号决定。 这样的自动缩放器高度依赖于外部因素,而不是内部因素,提供商通常不提供它。 我们使用Mail.ru Cloud Solutions云基础架构,并使用MCS API编写了自动缩放器。 并且由于我们正在接受数据处理方面的培训,因此我们决定向您展示如何为您的目的编写类似的自动缩放器并与云一起使用
先决条件
首先,您必须具有Hadoop集群。 例如,我们使用HDP分发。
为了可以快速添加和删除节点,必须在节点之间具有一定的角色分布。
- 主节点。 好吧,这里不需要特别说明:集群的主节点,例如,如果您使用交互方式,则将在该主节点上启动Spark驱动程序。
- 日期节点。 这是您在HDFS上存储数据并对其执行计算的节点。
- 计算节点。 在此节点上,您不会在HDFS上存储任何内容,但会对其进行计算。
重要的一点。 由于第三类节点,将发生自动缩放。 如果您开始拾取并添加第二种类型的节点,则响应速度将非常低-解压缩并建议在群集上花费数小时。 当然,这不是自动缩放所期望的。 也就是说,我们不触摸第一类和第二类的节点。 它们将是一个最小可行的集群,将存在于整个计划中。
因此,我们的autoscoiler用Python 3编写,使用Ambari API管理集群服务,使用Mail.ru Cloud Solutions (MCS) API启动和停止计算机。
解决方案架构
- 模块
autoscaler.py
。 其中注册了三类:1)用于Ambari的功能,2)用于MCS的功能,3)与自动定标器逻辑直接相关的功能。 - 脚本
observer.py
。 实际上,它由不同的规则组成:何时以及何时调用自动定标器功能。 - 具有配置参数
config.py
的文件。 例如,它包含允许自动缩放的节点列表以及其他影响参数的参数,例如,从添加新节点起需要等待多少时间。 还有类开始的时间戳,以便在会话之前启动允许的最大群集配置。
让我们看一下前两个文件中的代码段。
1. autoscaler.py模块
类安巴里
这是包含Ambari
类的代码:
class Ambari: def __init__(self, ambari_url, cluster_name, headers, auth): self.ambari_url = ambari_url self.cluster_name = cluster_name self.headers = headers self.auth = auth def stop_all_services(self, hostname): url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/' url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname req0 = requests.get(url2, headers=self.headers, auth=self.auth) services = req0.json()['host_components'] services_list = list(map(lambda x: x['HostRoles']['component_name'], services)) data = { "RequestInfo": { "context":"Stop All Host Components", "operation_level": { "level":"HOST", "cluster_name": self.cluster_name, "host_names": hostname }, "query":"HostRoles/component_name.in({0})".format(",".join(services_list)) }, "Body": { "HostRoles": { "state":"INSTALLED" } } } req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth) if req.status_code in [200, 201, 202]: message = 'Request accepted' else: message = req.status_code return message
对于上面的示例,您可以查看stop_all_services
函数的实现,该函数将停止所需集群节点上的所有服务。
您需要通过Ambari
班级:
- 举例来说,
ambari_url
的格式为'http://localhost:8080/api/v1/clusters/'
, cluster_name
是您在Ambari中的集群的名称,headers = {'X-Requested-By': 'ambari'}
auth
内包含来自Ambari的用户名和密码: auth = ('login', 'password')
。
该函数本身不过是通过REST API对Ambari的几次调用。 从逻辑角度来看,我们首先获得该节点上正在运行的服务的列表,然后要求在该节点上的此集群上,将服务从列表转移到INSTALLED
状态。 用于启动所有服务,将节点置于Maintenance
状态等的功能看起来很相似-它们只是通过API的一些请求。
MCS级
这是包含Mcs
类的代码:
class Mcs: def __init__(self, id1, id2, password): self.id1 = id1 self.id2 = id2 self.password = password self.mcs_host = 'https://infra.mail.ru:8774/v2.1' def vm_turn_on(self, hostname): self.token = self.get_mcs_token() host = self.hostname_to_vmname(hostname) vm_id = self.get_vm_id(host) mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action' headers = { 'X-Auth-Token': '{0}'.format(self.token), 'Content-Type': 'application/json' } data = {'os-start' : 'null'} mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers) return mcs.status_code
对于Mcs
类,我们将云中的项目ID,用户ID以及他的密码传递给云。 在vm_turn_on
函数中,我们要启用其中一台计算机。 这里的逻辑有点复杂。 在代码的开头,将调用其他三个函数:1)我们需要获取令牌,2)我们需要将主机名转换为MCS中计算机的名称,3)获取该计算机的ID。 接下来,我们进行一个简单的后请求并运行这台机器。
这是令牌接收功能的样子:
def get_mcs_token(self): url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog' headers = {'Content-Type': 'application/json'} data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'id': self.id1, 'password': self.password } } }, 'scope': { 'project': { 'id': self.id2 } } } } params = (('nocatalog', ''),) req = requests.post(url, data=json.dumps(data), headers=headers, params=params) self.token = req.headers['X-Subject-Token'] return self.token
类自动缩放器
此类包含与工作逻辑本身有关的功能。
此类的代码如下所示:
class Autoscaler: def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node): self.scaling_hosts = scaling_hosts self.ambari = ambari self.mcs = mcs self.q_ram = deque() self.q_cpu = deque() self.num = 0 self.yarn_ram_per_node = yarn_ram_per_node self.yarn_cpu_per_node = yarn_cpu_per_node def scale_down(self, hostname): flag1 = flag2 = flag3 = flag4 = flag5 = False if hostname in self.scaling_hosts: while True: time.sleep(5) status1 = self.ambari.decommission_nodemanager(hostname) if status1 == 'Request accepted' or status1 == 500: flag1 = True logging.info('Decomission request accepted: {0}'.format(flag1)) break while True: time.sleep(5) status3 = self.ambari.check_service(hostname, 'NODEMANAGER') if status3 == 'INSTALLED': flag3 = True logging.info('Nodemaneger decommissioned: {0}'.format(flag3)) break while True: time.sleep(5) status2 = self.ambari.maintenance_on(hostname) if status2 == 'Request accepted' or status2 == 500: flag2 = True logging.info('Maintenance request accepted: {0}'.format(flag2)) break while True: time.sleep(5) status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER') if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST': flag4 = True self.ambari.stop_all_services(hostname) logging.info('Maintenance is on: {0}'.format(flag4)) logging.info('Stopping services') break time.sleep(90) status5 = self.mcs.vm_turn_off(hostname) while True: time.sleep(5) status5 = self.mcs.get_vm_info(hostname)['server']['status'] if status5 == 'SHUTOFF': flag5 = True logging.info('VM is turned off: {0}'.format(flag5)) break if flag1 and flag2 and flag3 and flag4 and flag5: message = 'Success' logging.info('Scale-down finished') logging.info('Cooldown period has started. Wait for several minutes') return message
我们接受类Ambari
和Mcs
,允许扩展的节点列表以及节点的配置参数:分配给YARN中节点的内存和cpu。 还有2个内部参数q_ram,q_cpu,它们是队列。 使用它们,我们可以存储当前集群负载的值。 如果我们发现过去5分钟内负载稳定增加,那么我们决定需要向集群添加+1节点。 集群欠载状态也是如此。
上面的代码显示了一个函数示例,该函数从群集中删除计算机并将其停止在云中。 首先, YARN Nodemanager
,然后打开Maintenance
模式,然后停止计算机上的所有服务并关闭云中的虚拟机。
2.脚本observer.py
那里的示例代码:
if scaler.assert_up(config.scale_up_thresholds) == True: hostname = cloud.get_vm_to_up(config.scaling_hosts) if hostname != None: status1 = scaler.scale_up(hostname) if status1 == 'Success': text = {"text": "{0} has been successfully scaled-up".format(hostname)} post = {"text": "{0}".format(text)} json_data = json.dumps(post) req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) time.sleep(config.cooldown_period*60)
在其中,我们检查是否存在增加集群容量的条件,以及是否根本有备用机器,我们获得其中一台的主机名,将其添加到集群中,并在团队的Slack中发布有关此消息。 之后,当我们不从集群中添加或删除任何东西,而只是监视负载时, cooldown_period
启动cooldown_period
。 如果它已经稳定并处于最佳负载值的范围之内,那么我们将继续进行监视。 如果一个节点不足,则添加另一个。
对于上一节课的情况,我们已经确定一个节点不够用,因此我们立即启动所有空闲节点并将其保持活动状态,直到课程结束。 时间戳类列表会发生这种情况。
结论
对于群集负载不均匀的情况,自动缩放器是一个很好的便捷解决方案。 您可以同时实现峰值负载所需的群集配置,同时在欠载期间不要保留该群集,从而节省了资金。 好吧,此外,所有这些都会自动发生,而无需您的参与。 自动缩放器本身不过是对集群管理器API和云提供者API的一组请求,它们是根据某种逻辑进行注册的。 正如我们先前所写,确切需要记住的是将节点分为三种类型。 这样您会很高兴的。