Como fazer o seu autoescalonador para um cluster

Oi Treinamos pessoas para trabalhar com big data. É impossível imaginar um programa educacional sobre big data sem seu próprio cluster, no qual todos os participantes trabalhem juntos. Por esse motivo, sempre o apresentamos em nosso programa :) Estamos envolvidos no ajuste, ajuste e administração, e os caras iniciam diretamente o MapReduce-jobs lá e usam o Spark.


Nesta postagem, descreveremos como resolvemos o problema de carregamento desigual de cluster escrevendo nosso autoescalonador usando a nuvem Mail.ru Cloud Solutions .


O problema


O cluster que usamos não é muito típico. O descarte é altamente desigual. Por exemplo, existem exercícios práticos quando todas as 30 pessoas e o professor entram no cluster e começam a usá-lo. Ou, novamente, há dias antes do prazo, quando a carga aumenta dramaticamente. O restante do tempo, o cluster opera no modo de subcarga.


A solução 1 é manter um cluster que suporte cargas de pico, mas permanecerá ocioso o resto do tempo.


A solução nº 2 é manter um pequeno cluster no qual adicionar nós manualmente antes das classes e durante os picos de carga.


A solução nº 3 é manter um pequeno cluster e gravar um autoescalonador que monitore a carga atual do cluster e adicione e remova nós do cluster usando várias APIs.


Neste post, falaremos sobre a decisão nº 3. Esse autoscaler é altamente dependente de fatores externos, e não internos, e os fornecedores geralmente não o fornecem. Usamos a infraestrutura de nuvem Mail.ru Cloud Solutions e criamos um autoscaler usando a API do MCS. E como estamos treinando no trabalho com dados, decidimos mostrar como você pode escrever um autoescaler semelhante para seus propósitos e usá-lo com sua nuvem


Pré-requisitos


Primeiro, você deve ter um cluster Hadoop. Por exemplo, usamos a distribuição HDP.


Para que seus nós possam ser adicionados e removidos rapidamente, você deve ter uma certa distribuição de funções entre os nós.


  1. Nó mestre. Bem, nada é especialmente necessário para explicar aqui: o nó principal do cluster, no qual, por exemplo, o driver Spark é iniciado, se você usar o modo interativo.
  2. Nó de data. Este é o nó no qual você armazena dados no HDFS e os cálculos são realizados nele.
  3. Nó de computação. Este é um nó no qual você não armazena nada no HDFS, mas os cálculos são executados nele.

Um ponto importante. O dimensionamento automático ocorrerá devido ao terceiro tipo de nós. Se você começar a selecionar e adicionar nós do segundo tipo, a velocidade de resposta será muito baixa - descompactada e o procedimento recomendado levará horas no cluster. Obviamente, isso não é o que você espera do dimensionamento automático. Ou seja, não tocamos nos nós do primeiro e do segundo tipo. Eles serão um cluster minimamente viável que existirá ao longo do programa.


Portanto, nosso autoscoiler é escrito em Python 3, usa a API Ambari para gerenciar serviços de cluster, usa a API Mail.ru Cloud Solutions (MCS) para iniciar e parar máquinas.


Arquitetura da solução


  1. Módulo autoscaler.py . Três classes são registradas nele: 1) funções para trabalhar com o Ambari, 2) funções para trabalhar com o MCS, 3) funções relacionadas diretamente à lógica do autoscaler.
  2. Script observer.py . De fato, ele consiste em regras diferentes: quando e em que momentos chamar as funções de autoscaler.
  3. O arquivo com os parâmetros de configuração config.py . Ele contém, por exemplo, uma lista de nós permitidos para dimensionamento automático e outros parâmetros que afetam, por exemplo, quanto tempo esperar desde o momento em que um novo nó foi adicionado. Também existem registros de data e hora do início das classes, para que antes da sessão seja iniciada a configuração máxima permitida do cluster.

Vejamos os trechos de código dentro dos dois primeiros arquivos.


1. O módulo autoscaler.py


Classe ambari


Este é o trecho de código que contém a classe Ambari :


 class Ambari: def __init__(self, ambari_url, cluster_name, headers, auth): self.ambari_url = ambari_url self.cluster_name = cluster_name self.headers = headers self.auth = auth def stop_all_services(self, hostname): url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/' url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname req0 = requests.get(url2, headers=self.headers, auth=self.auth) services = req0.json()['host_components'] services_list = list(map(lambda x: x['HostRoles']['component_name'], services)) data = { "RequestInfo": { "context":"Stop All Host Components", "operation_level": { "level":"HOST", "cluster_name": self.cluster_name, "host_names": hostname }, "query":"HostRoles/component_name.in({0})".format(",".join(services_list)) }, "Body": { "HostRoles": { "state":"INSTALLED" } } } req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth) if req.status_code in [200, 201, 202]: message = 'Request accepted' else: message = req.status_code return message 

Para um exemplo acima, você pode examinar a implementação da função stop_all_services , que interrompe todos os serviços no nó de cluster desejado.


Para a classe Ambari você passa:


  • ambari_url , por exemplo, no formato 'http://localhost:8080/api/v1/clusters/' ,
  • cluster_name é o nome do seu cluster no Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • e dentro de auth encontra seu nome de usuário e senha do Ambari: auth = ('login', 'password') .

A função em si nada mais é do que algumas chamadas através da API REST para Ambari. Do ponto de vista da lógica, primeiro obtemos uma lista de serviços em execução no nó e, em seguida, solicitamos neste cluster, nesse nó, para transferir os serviços da lista para o estado INSTALLED . As funções para iniciar todos os serviços, colocar os nós no estado Maintenance , etc. são semelhantes - são apenas algumas solicitações por meio da API.


Classe mcs


Este é o trecho de código que contém a classe Mcs :


 class Mcs: def __init__(self, id1, id2, password): self.id1 = id1 self.id2 = id2 self.password = password self.mcs_host = 'https://infra.mail.ru:8774/v2.1' def vm_turn_on(self, hostname): self.token = self.get_mcs_token() host = self.hostname_to_vmname(hostname) vm_id = self.get_vm_id(host) mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action' headers = { 'X-Auth-Token': '{0}'.format(self.token), 'Content-Type': 'application/json' } data = {'os-start' : 'null'} mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers) return mcs.status_code 

Para a classe Mcs , passamos o ID do projeto dentro da nuvem e o ID do usuário, bem como a senha dele. Na função vm_turn_on , queremos ativar uma das máquinas. A lógica aqui é um pouco mais complicada. No início do código, três outras funções são chamadas: 1) precisamos obter o token, 2) precisamos converter o nome do host no nome da máquina no MCS, 3) obter o ID dessa máquina. Em seguida, fazemos uma pós-solicitação simples e executamos esta máquina.


É assim que a função de recebimento de token se parece:


 def get_mcs_token(self): url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog' headers = {'Content-Type': 'application/json'} data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'id': self.id1, 'password': self.password } } }, 'scope': { 'project': { 'id': self.id2 } } } } params = (('nocatalog', ''),) req = requests.post(url, data=json.dumps(data), headers=headers, params=params) self.token = req.headers['X-Subject-Token'] return self.token 

Autoscaler de classe


Esta classe contém funções relacionadas à lógica do próprio trabalho.


É assim que um código dessa classe se parece:


 class Autoscaler: def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node): self.scaling_hosts = scaling_hosts self.ambari = ambari self.mcs = mcs self.q_ram = deque() self.q_cpu = deque() self.num = 0 self.yarn_ram_per_node = yarn_ram_per_node self.yarn_cpu_per_node = yarn_cpu_per_node def scale_down(self, hostname): flag1 = flag2 = flag3 = flag4 = flag5 = False if hostname in self.scaling_hosts: while True: time.sleep(5) status1 = self.ambari.decommission_nodemanager(hostname) if status1 == 'Request accepted' or status1 == 500: flag1 = True logging.info('Decomission request accepted: {0}'.format(flag1)) break while True: time.sleep(5) status3 = self.ambari.check_service(hostname, 'NODEMANAGER') if status3 == 'INSTALLED': flag3 = True logging.info('Nodemaneger decommissioned: {0}'.format(flag3)) break while True: time.sleep(5) status2 = self.ambari.maintenance_on(hostname) if status2 == 'Request accepted' or status2 == 500: flag2 = True logging.info('Maintenance request accepted: {0}'.format(flag2)) break while True: time.sleep(5) status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER') if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST': flag4 = True self.ambari.stop_all_services(hostname) logging.info('Maintenance is on: {0}'.format(flag4)) logging.info('Stopping services') break time.sleep(90) status5 = self.mcs.vm_turn_off(hostname) while True: time.sleep(5) status5 = self.mcs.get_vm_info(hostname)['server']['status'] if status5 == 'SHUTOFF': flag5 = True logging.info('VM is turned off: {0}'.format(flag5)) break if flag1 and flag2 and flag3 and flag4 and flag5: message = 'Success' logging.info('Scale-down finished') logging.info('Cooldown period has started. Wait for several minutes') return message 

Para a entrada, usamos as classes Ambari e Ambari , a lista de nós permitidos para dimensionamento, bem como os parâmetros de configuração dos nós: memória e CPU alocados ao nó no YARN. Existem também 2 parâmetros internos q_ram, q_cpu, que são filas. Utilizando-os, armazenamos os valores da carga atual do cluster. Se percebermos que nos últimos 5 minutos houve um aumento estável de carga, decidimos que precisamos adicionar um nó +1 ao cluster. O mesmo vale para o estado de subcarga do cluster.


O código acima mostra um exemplo de função que remove uma máquina de um cluster e a interrompe na nuvem. Primeiro, o YARN Nodemanager , o modo de Maintenance é ativado, então paramos todos os serviços na máquina e desligamos a máquina virtual na nuvem.


2. Script observer.py


Código de exemplo de lá:


 if scaler.assert_up(config.scale_up_thresholds) == True: hostname = cloud.get_vm_to_up(config.scaling_hosts) if hostname != None: status1 = scaler.scale_up(hostname) if status1 == 'Success': text = {"text": "{0} has been successfully scaled-up".format(hostname)} post = {"text": "{0}".format(text)} json_data = json.dumps(post) req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) time.sleep(config.cooldown_period*60) 

Nele, verificamos se existem condições para aumentar as capacidades do cluster e se há máquinas na reserva, obtemos o nome do host de um deles, adicionamos ao cluster e publicamos uma mensagem sobre isso no Slack de nossa equipe. Depois disso, o cooldown_period iniciado, quando não adicionamos ou removemos nada do cluster, mas simplesmente monitoramos a carga. Se ele se estabilizou e está dentro do corredor de valores ideais de carga, continuamos monitorando. Se um nó não for suficiente, adicione outro.


Nos casos em que temos uma lição à frente, já sabemos com certeza que um nó não é suficiente; portanto, iniciamos imediatamente todos os nós livres e os mantemos ativos até o final da lição. Isso acontece com uma lista de classes de carimbo de data / hora.


Conclusão


O autoscaler é uma solução boa e conveniente para os casos em que há um carregamento desigual do cluster. Você obtém simultaneamente a configuração de cluster desejada para cargas de pico e, ao mesmo tempo, não mantém esse cluster durante a subcarga, economizando dinheiro. Além disso, tudo acontece automaticamente sem a sua participação. O próprio autoscaler nada mais é do que um conjunto de solicitações para a API do gerenciador de cluster e a API do provedor de nuvem, registradas de acordo com uma determinada lógica. O que exatamente precisa ser lembrado é a separação de nós em 3 tipos, como escrevemos anteriormente. E você será feliz.

Source: https://habr.com/ru/post/pt482198/


All Articles