So erstellen Sie Ihren Autoscaler für einen Cluster

Hallo! Wir schulen Menschen in der Arbeit mit Big Data. Ein eigenes Cluster, an dem alle Beteiligten zusammenarbeiten, ist aus einem Bildungsprogramm zu Big Data nicht mehr wegzudenken. Aus diesem Grund haben wir es immer auf unserem Programm :) Wir sind mit Tuning, Tuning und Administration beschäftigt und die Jungs starten MapReduce-Jobs direkt dort und nutzen Spark.


In diesem Beitrag beschreiben wir, wie wir das Problem des ungleichmäßigen Ladens von Clustern gelöst haben, indem wir unseren Autoscaler mithilfe der Cloud von Mail.ru Cloud Solutions geschrieben haben .


Das problem


Der von uns verwendete Cluster ist nicht ganz typisch. Die Entsorgung ist sehr ungleichmäßig. Beispielsweise gibt es praktische Übungen, bei denen alle 30 Personen und der Lehrer in den Cluster eintreten und ihn verwenden. Oder es gibt Tage vor dem Stichtag, an dem die Last dramatisch zunimmt. In der restlichen Zeit arbeitet der Cluster im Unterlastmodus.


Lösung Nr. 1 besteht darin, einen Cluster beizubehalten, der Spitzenlasten standhält, aber den Rest der Zeit im Leerlauf bleibt.


Lösung Nr. 2 besteht darin, einen kleinen Cluster beizubehalten, in dem Knoten vor Klassen und während Spitzenlasten manuell hinzugefügt werden können.


Lösung Nr. 3 besteht darin, einen kleinen Cluster beizubehalten und einen Autoscaler zu schreiben, der die aktuelle Clusterlast überwacht und mithilfe verschiedener APIs Knoten zum Cluster hinzufügt und daraus entfernt.


In diesem Beitrag werden wir über die Entscheidung Nr. 3 sprechen. Ein solcher Autoscaler ist in hohem Maße von externen und nicht von internen Faktoren abhängig und wird von Anbietern häufig nicht bereitgestellt. Wir verwenden die Cloud-Infrastruktur von Mail.ru Cloud Solutions und haben einen Autoscaler unter Verwendung der MCS-API geschrieben. Da wir in der Arbeit mit Daten geschult sind, haben wir uns entschlossen zu zeigen, wie Sie einen ähnlichen Autoscaler für Ihre Zwecke schreiben und mit Ihrer Cloud verwenden können


Voraussetzungen


Zunächst muss ein Hadoop-Cluster vorhanden sein. Zum Beispiel verwenden wir die HDP-Distribution.


Damit Ihre Knoten schnell hinzugefügt und entfernt werden können, müssen Sie eine bestimmte Rollenverteilung zwischen den Knoten haben.


  1. Hauptknoten. Hier ist nichts besonders zu erklären: Der Hauptknoten des Clusters, auf dem beispielsweise der Spark-Treiber gestartet wird, wenn Sie den interaktiven Modus verwenden.
  2. Datumsknoten. Dies ist der Knoten, auf dem Sie Daten in HDFS speichern und auf dem Berechnungen ausgeführt werden.
  3. Rechenknoten. Dies ist ein Knoten, auf dem Sie nichts in HDFS speichern, sondern Berechnungen durchführen.

Ein wichtiger Punkt. Die automatische Skalierung erfolgt aufgrund des dritten Knotentyps. Wenn Sie mit dem Aufnehmen und Hinzufügen von Knoten des zweiten Typs beginnen, ist die Antwortgeschwindigkeit sehr niedrig - dekomprimiert und empfohlen dauert es Stunden, bis Ihr Cluster vollständig entpackt ist. Dies ist natürlich nicht das, was Sie von der automatischen Skalierung erwarten. Das heißt, wir berühren nicht die Knoten des ersten und zweiten Typs. Sie werden ein minimal lebensfähiger Cluster sein, der während des gesamten Programms existieren wird.


Daher ist unser Autoscoiler in Python 3 geschrieben, verwendet die Ambari-API zum Verwalten von Clusterdiensten und die Mail.ru Cloud Solutions (MCS) -API zum Starten und Stoppen von Computern.


Lösungsarchitektur


  1. Modul autoscaler.py . Darin sind drei Klassen registriert: 1) Funktionen für die Arbeit mit Ambari, 2) Funktionen für die Arbeit mit MCS, 3) Funktionen, die sich direkt auf die Autoscaler-Logik beziehen.
  2. Script observer.py . Tatsächlich besteht es aus verschiedenen Regeln: Wann und zu welchen Zeitpunkten sollen die Autoscaler-Funktionen aufgerufen werden?
  3. Die Datei mit den Konfigurationsparametern config.py . Es enthält zum Beispiel eine Liste von Knoten, die für die automatische Skalierung zugelassen sind, und andere Parameter, die sich zum Beispiel darauf auswirken, wie lange nach dem Hinzufügen eines neuen Knotens gewartet werden muss. Es gibt auch Zeitstempel für den Beginn von Klassen, sodass vor der Sitzung die maximal zulässige Cluster-Konfiguration gestartet wird.

Schauen wir uns die Code-Teile in den ersten beiden Dateien an.


1. Das Modul autoscaler.py


Klasse Ambari


Dies ist der Code, der die Ambari Klasse enthält:


 class Ambari: def __init__(self, ambari_url, cluster_name, headers, auth): self.ambari_url = ambari_url self.cluster_name = cluster_name self.headers = headers self.auth = auth def stop_all_services(self, hostname): url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/' url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname req0 = requests.get(url2, headers=self.headers, auth=self.auth) services = req0.json()['host_components'] services_list = list(map(lambda x: x['HostRoles']['component_name'], services)) data = { "RequestInfo": { "context":"Stop All Host Components", "operation_level": { "level":"HOST", "cluster_name": self.cluster_name, "host_names": hostname }, "query":"HostRoles/component_name.in({0})".format(",".join(services_list)) }, "Body": { "HostRoles": { "state":"INSTALLED" } } } req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth) if req.status_code in [200, 201, 202]: message = 'Request accepted' else: message = req.status_code return message 

Ein Beispiel oben zeigt die Implementierung der Funktion stop_all_services , mit der alle Dienste auf dem gewünschten Clusterknoten stop_all_services werden.


An die Ambari Klasse übergeben Sie:


  • ambari_url zum Beispiel mit der Form 'http://localhost:8080/api/v1/clusters/' ,
  • cluster_name ist der Name Ihres Clusters in Ambari.
  • headers = {'X-Requested-By': 'ambari'}
  • und in auth liegt dein Benutzername und Passwort von Ambari: auth = ('login', 'password') .

Die Funktion selbst besteht lediglich aus ein paar Aufrufen über die REST-API an Ambari. Aus logischer Sicht erhalten wir zunächst eine Liste der auf dem Knoten ausgeführten Dienste und fordern dann diesen Cluster auf, die Dienste von der Liste in den INSTALLED . Die Funktionen zum Starten aller Dienste, zum Versetzen der Knoten in den Maintenance usw. sehen ähnlich aus - dies sind nur einige wenige Anforderungen über die API.


Klasse mcs


Dies ist der Code, der die Mcs Klasse enthält:


 class Mcs: def __init__(self, id1, id2, password): self.id1 = id1 self.id2 = id2 self.password = password self.mcs_host = 'https://infra.mail.ru:8774/v2.1' def vm_turn_on(self, hostname): self.token = self.get_mcs_token() host = self.hostname_to_vmname(hostname) vm_id = self.get_vm_id(host) mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action' headers = { 'X-Auth-Token': '{0}'.format(self.token), 'Content-Type': 'application/json' } data = {'os-start' : 'null'} mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers) return mcs.status_code 

An die Mcs Klasse übergeben wir die Projekt-ID in der Cloud und die Benutzer-ID sowie sein Passwort. In der Funktion vm_turn_on möchten wir eine der Maschinen aktivieren. Die Logik hier ist etwas komplizierter. Zu Beginn des Codes werden drei weitere Funktionen aufgerufen: 1) Wir müssen das Token abrufen, 2) Wir müssen den Hostnamen in den Namen der Maschine in MCS konvertieren, 3) Die ID dieser Maschine abrufen. Als Nächstes erstellen wir eine einfache Nachanforderung und führen diese Maschine aus.


So sieht die Funktion zum Empfangen von Token aus:


 def get_mcs_token(self): url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog' headers = {'Content-Type': 'application/json'} data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'id': self.id1, 'password': self.password } } }, 'scope': { 'project': { 'id': self.id2 } } } } params = (('nocatalog', ''),) req = requests.post(url, data=json.dumps(data), headers=headers, params=params) self.token = req.headers['X-Subject-Token'] return self.token 

Klasse Autoscaler


Diese Klasse enthält Funktionen, die sich auf die Logik der Arbeit selbst beziehen.


So sieht ein Teil des Codes dieser Klasse aus:


 class Autoscaler: def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node): self.scaling_hosts = scaling_hosts self.ambari = ambari self.mcs = mcs self.q_ram = deque() self.q_cpu = deque() self.num = 0 self.yarn_ram_per_node = yarn_ram_per_node self.yarn_cpu_per_node = yarn_cpu_per_node def scale_down(self, hostname): flag1 = flag2 = flag3 = flag4 = flag5 = False if hostname in self.scaling_hosts: while True: time.sleep(5) status1 = self.ambari.decommission_nodemanager(hostname) if status1 == 'Request accepted' or status1 == 500: flag1 = True logging.info('Decomission request accepted: {0}'.format(flag1)) break while True: time.sleep(5) status3 = self.ambari.check_service(hostname, 'NODEMANAGER') if status3 == 'INSTALLED': flag3 = True logging.info('Nodemaneger decommissioned: {0}'.format(flag3)) break while True: time.sleep(5) status2 = self.ambari.maintenance_on(hostname) if status2 == 'Request accepted' or status2 == 500: flag2 = True logging.info('Maintenance request accepted: {0}'.format(flag2)) break while True: time.sleep(5) status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER') if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST': flag4 = True self.ambari.stop_all_services(hostname) logging.info('Maintenance is on: {0}'.format(flag4)) logging.info('Stopping services') break time.sleep(90) status5 = self.mcs.vm_turn_off(hostname) while True: time.sleep(5) status5 = self.mcs.get_vm_info(hostname)['server']['status'] if status5 == 'SHUTOFF': flag5 = True logging.info('VM is turned off: {0}'.format(flag5)) break if flag1 and flag2 and flag3 and flag4 and flag5: message = 'Success' logging.info('Scale-down finished') logging.info('Cooldown period has started. Wait for several minutes') return message 

Wir akzeptieren die Klassen Ambari und Mcs , die Liste der für die Skalierung zulässigen Knoten und die Konfigurationsparameter der Knoten: Speicher und CPU, die dem Knoten in YARN zugewiesen sind. Es gibt auch 2 interne Parameter q_ram, q_cpu, die Warteschlangen sind. Mit ihnen speichern wir die Werte der aktuellen Clusterlast. Wenn wir in den letzten 5 Minuten feststellen, dass die Last stabil gestiegen ist, müssen wir dem Cluster einen +1-Knoten hinzufügen. Gleiches gilt für den Cluster-Unterlastzustand.


Der obige Code zeigt ein Beispiel für eine Funktion, mit der ein Computer aus einem Cluster entfernt und in der Cloud gestoppt wird. Zuerst wird der YARN Nodemanager , dann der Maintenance YARN Nodemanager , dann werden alle Dienste auf dem YARN Nodemanager und die virtuelle Maschine in der Cloud ausgeschaltet.


2. Script observer.py


Beispielcode von dort:


 if scaler.assert_up(config.scale_up_thresholds) == True: hostname = cloud.get_vm_to_up(config.scaling_hosts) if hostname != None: status1 = scaler.scale_up(hostname) if status1 == 'Success': text = {"text": "{0} has been successfully scaled-up".format(hostname)} post = {"text": "{0}".format(text)} json_data = json.dumps(post) req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) time.sleep(config.cooldown_period*60) 

Darin prüfen wir, ob die Bedingungen für die Erhöhung der Clusterkapazitäten gegeben sind und ob sich überhaupt Maschinen in der Reserve befinden, erhalten den Hostnamen einer von ihnen, fügen ihn dem Cluster hinzu und veröffentlichen eine entsprechende Meldung in Slack unseres Teams. Danach wird cooldown_period gestartet, wenn wir nichts zum Cluster hinzufügen oder daraus entfernen, sondern lediglich die Auslastung überwachen. Wenn es sich stabilisiert hat und sich innerhalb des Korridors der optimalen Lastwerte befindet, setzen wir die Überwachung einfach fort. Wenn ein Knoten nicht ausreicht, fügen Sie einen anderen hinzu.


In Fällen, in denen wir eine Lektion vor uns haben, wissen wir bereits, dass ein Knoten nicht ausreicht. Deshalb starten wir sofort alle freien Knoten und lassen sie bis zum Ende der Lektion aktiv. Dies geschieht mit einer Liste von Zeitstempelklassen.


Fazit


Der Autoscaler ist eine gute und praktische Lösung für Fälle, in denen die Clusterlast ungleichmäßig ist. Sie erreichen gleichzeitig die gewünschte Cluster-Konfiguration für Spitzenlasten und halten diesen Cluster gleichzeitig nicht während des Unterladens, was Geld spart. Außerdem geschieht alles automatisch ohne Ihre Teilnahme. Der Autoscaler selbst ist nichts anderes als eine Reihe von Anforderungen an die Cluster-Manager-API und die Cloud-Provider-API, die gemäß einer bestimmten Logik registriert werden. Was genau beachtet werden muss, ist die Trennung der Knoten in 3 Typen, wie wir zuvor geschrieben haben. Und du wirst glücklich sein.

Source: https://habr.com/ru/post/de482198/


All Articles