Comment faire votre autoscaler pour un cluster

Salut Nous formons les gens à travailler avec le Big Data. Il est impossible d'imaginer un programme éducatif sur les mégadonnées sans son propre cluster, sur lequel tous les participants travaillent ensemble. Pour cette raison, nous l'avons toujours sur notre programme :) Nous sommes engagés dans son réglage, son réglage et son administration, et les gars y lancent directement des jobs MapReduce et utilisent Spark.


Dans cet article, nous décrirons comment nous avons résolu le problème du chargement inégal des clusters en écrivant notre autoscaler à l'aide du cloud Mail.ru Cloud Solutions .


Le problème


Le cluster que nous utilisons n'est pas tout à fait typique. L'élimination est très inégale. Par exemple, il y a des exercices pratiques lorsque les 30 personnes et l'enseignant entrent dans le cluster et commencent à l'utiliser. Ou, encore une fois, il y a des jours avant la date limite, lorsque la charge augmente considérablement. Le reste du temps, le cluster fonctionne en mode de sous-charge.


La solution n ° 1 consiste à conserver un cluster qui résistera aux pics de charge, mais restera inactif le reste du temps.


La solution n ° 2 consiste à conserver un petit cluster dans lequel ajouter manuellement des nœuds avant les classes et pendant les pics de charge.


La solution n ° 3 consiste à conserver un petit cluster et à écrire un autoscaler qui surveillera la charge actuelle du cluster et, à l'aide de diverses API, ajoutera et supprimera des nœuds du cluster.


Dans cet article, nous parlerons de la décision n ° 3. Un tel autoscaler dépend fortement de facteurs externes et non internes, et les fournisseurs ne le fournissent souvent pas. Nous utilisons l'infrastructure cloud de Mail.ru Cloud Solutions et avons écrit un autoscaler à l'aide de l'API MCS. Et puisque nous nous entraînons à travailler avec les données, nous avons décidé de montrer comment vous pouvez écrire un autoscaler similaire pour vos besoins et l'utiliser avec votre cloud


Prérequis


Tout d'abord, vous devez disposer d'un cluster Hadoop. Par exemple, nous utilisons la distribution HDP.


Pour que vos nœuds puissent être rapidement ajoutés et supprimés, vous devez avoir une certaine répartition des rôles entre les nœuds.


  1. Noeud maître. Eh bien, rien n'est particulièrement nécessaire à expliquer ici: le nœud principal du cluster, sur lequel, par exemple, le pilote Spark est lancé, si vous utilisez le mode interactif.
  2. Noeud de date. Il s'agit du nœud sur lequel vous stockez les données sur HDFS et les calculs y sont effectués.
  3. Nœud informatique. Il s'agit d'un nœud sur lequel vous ne stockez rien sur HDFS, mais des calculs y sont effectués.

Un point important. La mise à l'échelle automatique se produira en raison du troisième type de nœuds. Si vous commencez à prendre et à ajouter des nœuds du deuxième type, la vitesse de réponse sera très faible - décompressée et recommandée prendra des heures sur votre cluster. Bien sûr, ce n'est pas ce que vous attendez de la mise à l'échelle automatique. Autrement dit, nous ne touchons pas les nœuds du premier et du deuxième type. Il s'agira d'un cluster minimalement viable qui existera tout au long du programme.


Ainsi, notre enrouleur automatique est écrit en Python 3, utilise l'API Ambari pour gérer les services de cluster, utilise l'API Mail.ru Cloud Solutions (MCS) pour démarrer et arrêter les machines.


Architecture de la solution


  1. Module autoscaler.py . Trois classes y sont enregistrées: 1) fonctions pour travailler avec Ambari, 2) fonctions pour travailler avec MCS, 3) fonctions liées directement à la logique de mise à l'échelle automatique.
  2. Script observer.py . En fait, il se compose de différentes règles: quand et à quels moments appeler les fonctions de mise à l'échelle automatique.
  3. Le fichier avec les paramètres de configuration config.py . Il contient, par exemple, une liste de nœuds autorisés pour la mise à l'échelle automatique et d'autres paramètres qui affectent, par exemple, le temps d'attente à partir du moment où un nouveau nœud a été ajouté. Il existe également des horodatages du début des classes, de sorte qu'avant la session, la configuration de cluster maximale autorisée est lancée.

Regardons les morceaux de code à l'intérieur des deux premiers fichiers.


1. Le module autoscaler.py


Ambari classe


Voici le morceau de code qui contient la classe Ambari :


 class Ambari: def __init__(self, ambari_url, cluster_name, headers, auth): self.ambari_url = ambari_url self.cluster_name = cluster_name self.headers = headers self.auth = auth def stop_all_services(self, hostname): url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/' url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname req0 = requests.get(url2, headers=self.headers, auth=self.auth) services = req0.json()['host_components'] services_list = list(map(lambda x: x['HostRoles']['component_name'], services)) data = { "RequestInfo": { "context":"Stop All Host Components", "operation_level": { "level":"HOST", "cluster_name": self.cluster_name, "host_names": hostname }, "query":"HostRoles/component_name.in({0})".format(",".join(services_list)) }, "Body": { "HostRoles": { "state":"INSTALLED" } } } req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth) if req.status_code in [200, 201, 202]: message = 'Request accepted' else: message = req.status_code return message 

Pour un exemple ci-dessus, vous pouvez regarder l'implémentation de la fonction stop_all_services , qui arrête tous les services sur le nœud de cluster souhaité.


Pour la classe Ambari vous passez:


  • ambari_url , par exemple, de la forme 'http://localhost:8080/api/v1/clusters/' ,
  • cluster_name est le nom de votre cluster dans Ambari,
  • headers = {'X-Requested-By': 'ambari'}
  • et à l'intérieur d' auth se trouve votre nom d'utilisateur et votre mot de passe d'Ambari: auth = ('login', 'password') .

La fonction elle-même n'est rien d'autre que quelques appels via l'API REST vers Ambari. Du point de vue de la logique, nous obtenons d'abord une liste des services en cours d'exécution sur le nœud, puis nous demandons sur ce cluster, sur ce nœud, de transférer les services de la liste à l'état INSTALLED . Les fonctions de lancement de tous les services, de mise des nœuds à l'état Maintenance , etc. se ressemblent - ce ne sont que quelques requêtes via l'API.


Classe mcs


C'est le morceau de code qui contient la classe Mcs :


 class Mcs: def __init__(self, id1, id2, password): self.id1 = id1 self.id2 = id2 self.password = password self.mcs_host = 'https://infra.mail.ru:8774/v2.1' def vm_turn_on(self, hostname): self.token = self.get_mcs_token() host = self.hostname_to_vmname(hostname) vm_id = self.get_vm_id(host) mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action' headers = { 'X-Auth-Token': '{0}'.format(self.token), 'Content-Type': 'application/json' } data = {'os-start' : 'null'} mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers) return mcs.status_code 

À la classe Mcs , nous transmettons l'ID du projet à l'intérieur du cloud et l'ID utilisateur, ainsi que son mot de passe. Dans la fonction vm_turn_on , nous voulons activer l'une des machines. La logique ici est un peu plus compliquée. Au début du code, trois autres fonctions sont appelées: 1) nous devons obtenir le jeton, 2) nous devons convertir le nom d'hôte en nom de la machine dans MCS, 3) obtenir l'ID de cette machine. Ensuite, nous faisons une simple post-demande et exécutons cette machine.


Voici à quoi ressemble la fonction de réception de jetons:


 def get_mcs_token(self): url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog' headers = {'Content-Type': 'application/json'} data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'id': self.id1, 'password': self.password } } }, 'scope': { 'project': { 'id': self.id2 } } } } params = (('nocatalog', ''),) req = requests.post(url, data=json.dumps(data), headers=headers, params=params) self.token = req.headers['X-Subject-Token'] return self.token 

Autoscaler de classe


Cette classe contient des fonctions liées à la logique du travail lui-même.


Voici à quoi ressemble un morceau de code de cette classe:


 class Autoscaler: def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node): self.scaling_hosts = scaling_hosts self.ambari = ambari self.mcs = mcs self.q_ram = deque() self.q_cpu = deque() self.num = 0 self.yarn_ram_per_node = yarn_ram_per_node self.yarn_cpu_per_node = yarn_cpu_per_node def scale_down(self, hostname): flag1 = flag2 = flag3 = flag4 = flag5 = False if hostname in self.scaling_hosts: while True: time.sleep(5) status1 = self.ambari.decommission_nodemanager(hostname) if status1 == 'Request accepted' or status1 == 500: flag1 = True logging.info('Decomission request accepted: {0}'.format(flag1)) break while True: time.sleep(5) status3 = self.ambari.check_service(hostname, 'NODEMANAGER') if status3 == 'INSTALLED': flag3 = True logging.info('Nodemaneger decommissioned: {0}'.format(flag3)) break while True: time.sleep(5) status2 = self.ambari.maintenance_on(hostname) if status2 == 'Request accepted' or status2 == 500: flag2 = True logging.info('Maintenance request accepted: {0}'.format(flag2)) break while True: time.sleep(5) status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER') if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST': flag4 = True self.ambari.stop_all_services(hostname) logging.info('Maintenance is on: {0}'.format(flag4)) logging.info('Stopping services') break time.sleep(90) status5 = self.mcs.vm_turn_off(hostname) while True: time.sleep(5) status5 = self.mcs.get_vm_info(hostname)['server']['status'] if status5 == 'SHUTOFF': flag5 = True logging.info('VM is turned off: {0}'.format(flag5)) break if flag1 and flag2 and flag3 and flag4 and flag5: message = 'Success' logging.info('Scale-down finished') logging.info('Cooldown period has started. Wait for several minutes') return message 

Pour l'entrée, nous prenons les classes Ambari et Ambari , la liste des nœuds qui sont autorisés pour la mise à l'échelle, ainsi que les paramètres de configuration des nœuds: mémoire et CPU alloués au nœud dans YARN. Il y a également 2 paramètres internes q_ram, q_cpu, qui sont des files d'attente. En les utilisant, nous stockons les valeurs de la charge de cluster actuelle. Si nous constatons qu'au cours des 5 dernières minutes, la charge a augmenté de manière stable, nous décidons que nous devons ajouter un nœud +1 au cluster. Il en va de même pour l'état de sous-charge du cluster.


Le code ci-dessus montre un exemple de fonction qui supprime une machine d'un cluster et l'arrête dans le cloud. Tout d'abord, le YARN Nodemanager , puis le mode Maintenance est activé, puis nous arrêtons tous les services sur la machine et désactivons la machine virtuelle dans le cloud.


2. Script observer.py


Exemple de code à partir de là:


 if scaler.assert_up(config.scale_up_thresholds) == True: hostname = cloud.get_vm_to_up(config.scaling_hosts) if hostname != None: status1 = scaler.scale_up(hostname) if status1 == 'Success': text = {"text": "{0} has been successfully scaled-up".format(hostname)} post = {"text": "{0}".format(text)} json_data = json.dumps(post) req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) time.sleep(config.cooldown_period*60) 

Dans ce document, nous vérifions si les conditions existent pour augmenter les capacités du cluster et s'il y a des machines dans la réserve, nous obtenons le nom d'hôte de l'une d'entre elles, l'ajoutons au cluster et publions un message à ce sujet dans Slack de notre équipe. Après cela, cooldown_period lancé, lorsque nous n'ajoutons ni ne supprimons rien du cluster, mais surveillons simplement la charge. S'il s'est stabilisé et se trouve dans le couloir des valeurs de charge optimales, alors nous continuons simplement la surveillance. Si un nœud n'était pas suffisant, ajoutez-en un autre.


Pour les cas où nous avons une leçon à venir, nous savons déjà avec certitude qu'un nœud n'est pas suffisant, donc nous démarrons immédiatement tous les nœuds libres et les gardons actifs jusqu'à la fin de la leçon. Cela se produit avec une liste de classes d'horodatage.


Conclusion


L'autoscaler est une bonne solution pratique pour les cas où le chargement des clusters est inégal. Vous obtenez simultanément la configuration de cluster souhaitée pour les charges de pointe et, en même temps, ne maintenez pas ce cluster pendant la sous-charge, ce qui permet d'économiser de l'argent. Eh bien, en plus, tout se passe automatiquement sans votre participation. L'autoscaler lui-même n'est rien d'autre qu'un ensemble de demandes adressées à l'API du gestionnaire de cluster et à l'API du fournisseur de cloud, qui sont enregistrées selon une certaine logique. Ce qu'il faut exactement retenir, c'est la séparation des nœuds en 3 types, comme nous l'avons écrit plus tôt. Et vous serez heureux.

Source: https://habr.com/ru/post/fr482198/


All Articles