Hola Capacitamos personas para trabajar con big data. Es imposible imaginar un programa educativo sobre big data sin su propio clúster, en el que todos los participantes trabajen juntos. Por esta razón, siempre lo tenemos en nuestro programa :) Estamos comprometidos con su ajuste, ajuste y administración, y los chicos lanzan directamente trabajos MapReduce allí y usan Spark.
En esta publicación, describiremos cómo resolvimos el problema de la carga desigual del clúster escribiendo nuestro autoescalador utilizando la nube Mail.ru Cloud Solutions .
El problema
El clúster que utilizamos no es del todo típico. La eliminación es muy desigual. Por ejemplo, hay ejercicios prácticos cuando las 30 personas y el maestro ingresan al grupo y comienzan a usarlo. O, nuevamente, hay días antes de la fecha límite, cuando la carga aumenta dramáticamente. El resto del tiempo, el clúster funciona en modo de baja carga.
La solución n. ° 1 es mantener un clúster que resista las cargas máximas, pero que permanezca inactivo el resto del tiempo.
La solución n. ° 2 es mantener un pequeño clúster en el que agregar nodos manualmente antes de las clases y durante las cargas máximas.
La solución n. ° 3 es mantener un clúster pequeño y escribir un autoescalador que monitoreará la carga actual del clúster y agregará y eliminará nodos del clúster utilizando varias API.
En este post hablaremos sobre la decisión No. 3. Tal autoescalador depende en gran medida de factores externos, y no de factores internos, y los proveedores a menudo no lo proporcionan. Utilizamos la infraestructura en la nube de Mail.ru Cloud Solutions y hemos escrito un autoescalador utilizando la API MCS. Y dado que estamos capacitándonos para trabajar con datos, decidimos mostrarle cómo puede escribir un escalador automático similar para sus propósitos y utilizarlo con su nube
Prerrequisitos
Primero, debe tener un clúster Hadoop. Por ejemplo, usamos la distribución HDP.
Para que sus nodos puedan agregarse y eliminarse rápidamente, debe tener una cierta distribución de roles entre los nodos.
- Nodo maestro Bueno, nada es especialmente necesario para explicar aquí: el nodo principal del clúster, en el que, por ejemplo, se inicia el controlador Spark, si utiliza el modo interactivo.
- Nodo de fecha. Este es el nodo en el que almacena datos en HDFS y se realizan cálculos en él.
- Nodo de computación. Este es un nodo en el que no almacena nada en HDFS, pero se realizan cálculos en él.
Un punto importante El autoescalado ocurrirá debido al tercer tipo de nodos. Si comienza a recoger y agregar nodos del segundo tipo, la velocidad de respuesta será muy baja: descomprimida y recomendada, llevará horas en su clúster. Esto, por supuesto, no es lo que esperas del autoescalado. Es decir, no tocamos los nodos del primer y segundo tipo. Serán un grupo mínimamente viable que existirá durante todo el programa.
Entonces, nuestro autocoiler está escrito en Python 3, usa la API Ambari para administrar los servicios del clúster, usa la API Mail.ru Cloud Solutions (MCS) para iniciar y detener máquinas.
Arquitectura de soluciones
- Módulo
autoscaler.py
. Se registran tres clases en él: 1) funciones para trabajar con Ambari, 2) funciones para trabajar con MCS, 3) funciones relacionadas directamente con la lógica del autoescalador. - Script
observer.py
. De hecho, consta de diferentes reglas: cuándo y en qué momentos llamar a las funciones de autoescaler. - El archivo con los parámetros de configuración
config.py
. Contiene, por ejemplo, una lista de nodos permitidos para el autoescalado y otros parámetros que afectan, por ejemplo, cuánto tiempo esperar desde el momento en que se agregó un nuevo nodo. También hay marcas de tiempo del inicio de clases, de modo que antes de la sesión se inicia la configuración máxima permitida del clúster.
Veamos los fragmentos de código dentro de los dos primeros archivos.
1. El módulo autoscaler.py
Clase ambari
Este es el fragmento de código que contiene la clase Ambari
:
class Ambari: def __init__(self, ambari_url, cluster_name, headers, auth): self.ambari_url = ambari_url self.cluster_name = cluster_name self.headers = headers self.auth = auth def stop_all_services(self, hostname): url = self.ambari_url + self.cluster_name + '/hosts/' + hostname + '/host_components/' url2 = self.ambari_url + self.cluster_name + '/hosts/' + hostname req0 = requests.get(url2, headers=self.headers, auth=self.auth) services = req0.json()['host_components'] services_list = list(map(lambda x: x['HostRoles']['component_name'], services)) data = { "RequestInfo": { "context":"Stop All Host Components", "operation_level": { "level":"HOST", "cluster_name": self.cluster_name, "host_names": hostname }, "query":"HostRoles/component_name.in({0})".format(",".join(services_list)) }, "Body": { "HostRoles": { "state":"INSTALLED" } } } req = requests.put(url, data=json.dumps(data), headers=self.headers, auth=self.auth) if req.status_code in [200, 201, 202]: message = 'Request accepted' else: message = req.status_code return message
Para un ejemplo anterior, puede ver la implementación de la función stop_all_services
, que detiene todos los servicios en el nodo del clúster deseado.
A la clase de Ambari
pasas:
ambari_url
, por ejemplo, de la forma 'http://localhost:8080/api/v1/clusters/'
,cluster_name
es el nombre de tu cluster en Ambari,headers = {'X-Requested-By': 'ambari'}
- y dentro de
auth
encuentra su nombre de usuario y contraseña de Ambari: auth = ('login', 'password')
.
La función en sí no es más que un par de llamadas a través de la API REST a Ambari. Desde el punto de vista de la lógica, primero obtenemos una lista de servicios en ejecución en el nodo, y luego le pedimos a este clúster, en este nodo, que transfiera los servicios de la lista al estado INSTALLED
. Las funciones para iniciar todos los servicios, para poner los nodos en el estado Maintenance
, etc., son similares: son solo algunas solicitudes a través de la API.
Clase mcs
Este es el fragmento de código que contiene la clase Mcs
:
class Mcs: def __init__(self, id1, id2, password): self.id1 = id1 self.id2 = id2 self.password = password self.mcs_host = 'https://infra.mail.ru:8774/v2.1' def vm_turn_on(self, hostname): self.token = self.get_mcs_token() host = self.hostname_to_vmname(hostname) vm_id = self.get_vm_id(host) mcs_url1 = self.mcs_host + '/servers/' + self.vm_id + '/action' headers = { 'X-Auth-Token': '{0}'.format(self.token), 'Content-Type': 'application/json' } data = {'os-start' : 'null'} mcs = requests.post(mcs_url1, data=json.dumps(data), headers=headers) return mcs.status_code
A la clase Mcs
, pasamos la identificación del proyecto dentro de la nube y la identificación del usuario, así como su contraseña. En la función vm_turn_on
queremos habilitar una de las máquinas. La lógica aquí es un poco más complicada. Al comienzo del código, se llaman otras tres funciones: 1) necesitamos obtener el token, 2) necesitamos convertir el nombre de host al nombre de la máquina en MCS, 3) obtener la identificación de esta máquina. A continuación, hacemos una simple solicitud posterior y ejecutamos esta máquina.
Así es como se ve la función de recepción de tokens:
def get_mcs_token(self): url = 'https://infra.mail.ru:35357/v3/auth/tokens?nocatalog' headers = {'Content-Type': 'application/json'} data = { 'auth': { 'identity': { 'methods': ['password'], 'password': { 'user': { 'id': self.id1, 'password': self.password } } }, 'scope': { 'project': { 'id': self.id2 } } } } params = (('nocatalog', ''),) req = requests.post(url, data=json.dumps(data), headers=headers, params=params) self.token = req.headers['X-Subject-Token'] return self.token
Autoescalador de clase
Esta clase contiene funciones relacionadas con la lógica del trabajo en sí.
Así es como se ve un fragmento de código de esta clase:
class Autoscaler: def __init__(self, ambari, mcs, scaling_hosts, yarn_ram_per_node, yarn_cpu_per_node): self.scaling_hosts = scaling_hosts self.ambari = ambari self.mcs = mcs self.q_ram = deque() self.q_cpu = deque() self.num = 0 self.yarn_ram_per_node = yarn_ram_per_node self.yarn_cpu_per_node = yarn_cpu_per_node def scale_down(self, hostname): flag1 = flag2 = flag3 = flag4 = flag5 = False if hostname in self.scaling_hosts: while True: time.sleep(5) status1 = self.ambari.decommission_nodemanager(hostname) if status1 == 'Request accepted' or status1 == 500: flag1 = True logging.info('Decomission request accepted: {0}'.format(flag1)) break while True: time.sleep(5) status3 = self.ambari.check_service(hostname, 'NODEMANAGER') if status3 == 'INSTALLED': flag3 = True logging.info('Nodemaneger decommissioned: {0}'.format(flag3)) break while True: time.sleep(5) status2 = self.ambari.maintenance_on(hostname) if status2 == 'Request accepted' or status2 == 500: flag2 = True logging.info('Maintenance request accepted: {0}'.format(flag2)) break while True: time.sleep(5) status4 = self.ambari.check_maintenance(hostname, 'NODEMANAGER') if status4 == 'ON' or status4 == 'IMPLIED_FROM_HOST': flag4 = True self.ambari.stop_all_services(hostname) logging.info('Maintenance is on: {0}'.format(flag4)) logging.info('Stopping services') break time.sleep(90) status5 = self.mcs.vm_turn_off(hostname) while True: time.sleep(5) status5 = self.mcs.get_vm_info(hostname)['server']['status'] if status5 == 'SHUTOFF': flag5 = True logging.info('VM is turned off: {0}'.format(flag5)) break if flag1 and flag2 and flag3 and flag4 and flag5: message = 'Success' logging.info('Scale-down finished') logging.info('Cooldown period has started. Wait for several minutes') return message
Para la entrada tomamos las clases Ambari
y Mcs
, la lista de nodos que están permitidos para escalar, así como los parámetros de configuración de los nodos: memoria y CPU asignados al nodo en YARN. También hay 2 parámetros internos q_ram, q_cpu, que son colas. Al usarlos, almacenamos los valores de la carga del clúster actual. Si vemos que en los últimos 5 minutos ha habido un aumento estable de la carga, entonces decidimos que necesitamos agregar un nodo +1 al clúster. Lo mismo es cierto para el estado de baja carga del clúster.
El código anterior muestra un ejemplo de una función que elimina una máquina de un clúster y la detiene en la nube. Primero, el YARN Nodemanager
, luego se activa el modo Maintenance
, luego detenemos todos los servicios en la máquina y apagamos la máquina virtual en la nube.
2. Script observer.py
Código de muestra desde allí:
if scaler.assert_up(config.scale_up_thresholds) == True: hostname = cloud.get_vm_to_up(config.scaling_hosts) if hostname != None: status1 = scaler.scale_up(hostname) if status1 == 'Success': text = {"text": "{0} has been successfully scaled-up".format(hostname)} post = {"text": "{0}".format(text)} json_data = json.dumps(post) req = requests.post(webhook, data=json_data.encode('ascii'), headers={'Content-Type': 'application/json'}) time.sleep(config.cooldown_period*60)
En él, verificamos si existen las condiciones para aumentar las capacidades del clúster y si hay máquinas en la reserva, obtenemos el nombre de host de uno de ellos, lo agregamos al clúster y publicamos un mensaje sobre esto en Slack de nuestro equipo. Después de eso, cooldown_period
inicia cooldown_period
, cuando no agregamos ni eliminamos nada del clúster, sino que simplemente monitoreamos la carga. Si se ha estabilizado y está dentro del corredor de valores de carga óptimos, entonces solo continuamos monitoreando. Si un nodo no fue suficiente, agregue otro.
Para los casos en que tenemos una lección por delante, ya sabemos con certeza que un nodo no es suficiente, por lo que inmediatamente iniciamos todos los nodos libres y los mantenemos activos hasta el final de la lección. Esto sucede con una lista de clases de marca de tiempo.
Conclusión
El escalador automático es una solución buena y conveniente para aquellos casos en los que tiene una carga de clúster desigual. Al mismo tiempo, logra la configuración de clúster deseada para cargas máximas y, al mismo tiempo, no mantiene este clúster durante la subcarga, ahorrando dinero. Bueno, además, todo sucede automáticamente sin tu participación. El escalador automático en sí no es más que un conjunto de solicitudes a la API del administrador de clúster y a la API del proveedor de la nube, que se registran de acuerdo con una lógica determinada. Lo que debe recordarse exactamente es la separación de nodos en 3 tipos, como escribimos anteriormente. Y serás feliz.