Escribir un balanceador simple en Go


Los equilibradores de carga juegan un papel clave en la arquitectura web. Le permiten distribuir la carga en varios backends, lo que mejora la escalabilidad. Y dado que tenemos varios backends configurados, el servicio se vuelve altamente disponible, porque en caso de falla en un servidor, el equilibrador puede elegir otro servidor que funcione.

Después de jugar con equilibradores profesionales como NGINX, traté de crear un equilibrador simple por diversión. Lo escribí en Go, es un lenguaje moderno que admite un paralelismo completo. La biblioteca estándar en Go tiene muchas características y le permite escribir aplicaciones de alto rendimiento con menos código. Además, para facilitar la distribución, genera un único binario enlazado estáticamente.

Cómo funciona nuestro equilibrador


Se utilizan diferentes algoritmos para distribuir la carga entre los backends. Por ejemplo:

  • Round Robin: la carga se distribuye de manera uniforme, teniendo en cuenta la misma potencia informática de los servidores.
  • Robin ponderado: según la potencia de procesamiento, a los servidores se les pueden asignar diferentes pesos.
  • Menos conexiones: la carga se distribuye entre los servidores con el menor número de conexiones activas.

En nuestro equilibrador, implementamos el algoritmo más simple: Round Robin.



Selección en Round Robin


El algoritmo Round Robin es simple. Ofrece a todos los artistas la misma oportunidad de completar tareas.


Seleccione servidores en Round Robin para manejar las solicitudes entrantes.

Como se muestra en la ilustración, el algoritmo selecciona los servidores en un círculo, cíclicamente. Pero no podemos seleccionarlos directamente , ¿verdad?

¿Y si el servidor está mintiendo? Probablemente no necesitemos enviarle tráfico. Es decir, el servidor no se puede usar directamente hasta que lo llevemos al estado deseado. Es necesario dirigir el tráfico solo a los servidores que están en funcionamiento.

Definir la estructura


Necesitamos rastrear todos los detalles relacionados con el backend. Necesita saber si está vivo y rastrear la URL. Para hacer esto, podemos definir la siguiente estructura:

type Backend struct { URL *url.URL Alive bool mux sync.RWMutex ReverseProxy *httputil.ReverseProxy } 

No se preocupe, explicaré el significado de los campos en el Backend.

Ahora, en el equilibrador, necesita rastrear de alguna manera todos los backends. Para hacer esto, puede usar Slice y un contador variable. Defínalo en ServerPool:

 type ServerPool struct { backends []*Backend current uint64 } 

Usando ReverseProxy


Como ya hemos determinado, la esencia del equilibrador está en distribuir el tráfico a diferentes servidores y devolver los resultados al cliente. Como dice la documentación de Go:

ReverseProxy es un controlador HTTP que toma las solicitudes entrantes y las envía a otro servidor, enviando respuestas al cliente.

Exactamente lo que necesitamos. No es necesario reinventar la rueda. Simplemente puede transmitir nuestras solicitudes a través de ReverseProxy .

 u, _ := url.Parse("http://localhost:8080") rp := httputil.NewSingleHostReverseProxy(u) // initialize your server and add this as handler http.HandlerFunc(rp.ServeHTTP) 

Con httputil.NewSingleHostReverseProxy(url) puede inicializar ReverseProxy , que transmitirá las solicitudes a la url pasada. En el ejemplo anterior, todas las solicitudes se enviaron a localhost: 8080 y los resultados se enviaron al cliente.

Si observa la firma del método ServeHTTP, puede encontrar la firma del controlador HTTP en él. Por lo tanto, puede pasarlo a HandlerFunc en http .

Otros ejemplos están en la documentación .

Para nuestro equilibrador, puede iniciar ReverseProxy con la URL asociada en Backend para que ReverseProxy enrute las solicitudes a la URL .

Proceso de selección del servidor


Durante la siguiente selección de servidor, debemos omitir los servidores subyacentes. Pero necesitas organizar el conteo.

Numerosos clientes se conectarán al equilibrador, y cuando cada uno de ellos solicite al siguiente nodo que transfiera tráfico, puede ocurrir una condición de carrera. Para evitar esto, podemos bloquear ServerPool con mutex . Pero será redundante, además no queremos bloquear ServerPool . Solo necesitamos aumentar el contador en uno.

La mejor solución para cumplir con estos requisitos sería el incremento atómico. Go lo admite con el paquete atomic .

 func (s *ServerPool) NextIndex() int { return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends))) } 

Aumentamos atómicamente el valor actual en uno y devolvemos el índice cambiando la longitud de la matriz. Esto significa que el valor siempre debe estar en el rango de 0 a la longitud de la matriz. Al final, nos interesará un índice específico, no el contador completo.

Elegir un servidor en vivo


Ya sabemos que nuestras solicitudes se rotan cíclicamente en todos los servidores. Y solo necesitamos omitir el inactivo.

GetNext() siempre devuelve un valor que varía de 0 a la longitud de la matriz. En cualquier momento, podemos obtener el siguiente nodo, y si está inactivo, necesitamos buscar más a través de la matriz como parte del ciclo.


Recorremos la matriz.

Como se muestra en la ilustración, queremos pasar del siguiente nodo al final de la lista. Esto se puede hacer usando next + length . Pero para seleccionar un índice, debe limitarlo a la longitud de la matriz. Esto se puede hacer fácilmente usando la operación de modificación.

Después de encontrar un servidor que funcione durante la búsqueda, debe marcarse como actual:

 // GetNextPeer returns next active peer to take a connection func (s *ServerPool) GetNextPeer() *Backend { // loop entire backends to find out an Alive backend next := s.NextIndex() l := len(s.backends) + next // start from next and move a full cycle for i := next; i < l; i++ { idx := i % len(s.backends) // take an index by modding with length // if we have an alive backend, use it and store if its not the original one if s.backends[idx].IsAlive() { if i != next { atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one } return s.backends[idx] } } return nil } 

Evitar la condición de carrera en la estructura Backend


Aquí debes recordar un tema importante. La estructura Backend contiene una variable que varias goroutines pueden modificar o consultar al mismo tiempo.

Sabemos que las gorutinas leerán la variable más que escribirla. Por lo tanto, para serializar el acceso a Alive elegimos RWMutex .

 // SetAlive for this backend func (b *Backend) SetAlive(alive bool) { b.mux.Lock() b.Alive = alive b.mux.Unlock() } // IsAlive returns true when backend is alive func (b *Backend) IsAlive() (alive bool) { b.mux.RLock() alive = b.Alive b.mux.RUnlock() return } 

Solicitudes de equilibrio


Ahora podemos formular un método simple para equilibrar nuestras solicitudes. Solo fallará si todos los servidores caen.

 // lb load balances the incoming request func lb(w http.ResponseWriter, r *http.Request) { peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable) } 

Este método se puede pasar al servidor HTTP simplemente como HandlerFunc .

 server := http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: http.HandlerFunc(lb), } 

Enrutamos el tráfico solo a servidores en ejecución


Nuestro equilibrador tiene un problema grave. No sabemos si el servidor se está ejecutando. Para averiguarlo, debe verificar el servidor. Hay dos formas de hacer esto:

  • Activo: al ejecutar la solicitud actual, encontramos que el servidor seleccionado no responde y lo marcamos como inactivo.
  • Pasivo: puede hacer ping a los servidores en algún intervalo y verificar el estado.

Comprobación activa de servidores en ejecución


Si se ReverseProxy algún error ReverseProxy inicia la función de devolución de llamada ErrorHandler . Esto se puede usar para detectar fallas:

 proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) { log.Printf("[%s] %s\n", serverUrl.Host, e.Error()) retries := GetRetryFromContext(request) if retries < 3 { select { case <-time.After(10 * time.Millisecond): ctx := context.WithValue(request.Context(), Retry, retries+1) proxy.ServeHTTP(writer, request.WithContext(ctx)) } return } // after 3 retries, mark this backend as down serverPool.MarkBackendStatus(serverUrl, false) // if the same request routing for few attempts with different backends, increase the count attempts := GetAttemptsFromContext(request) log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts) ctx := context.WithValue(request.Context(), Attempts, attempts+1) lb(writer, request.WithContext(ctx)) } 

Al desarrollar este controlador de errores, utilizamos las capacidades de los cierres. Esto nos permite capturar variables externas como las URL del servidor en nuestro método. El controlador verifica el contador de reintentos, y si es menor que 3, nuevamente enviamos la misma solicitud al mismo servidor. Esto se debe a que, debido a errores temporales, el servidor puede descartar nuestras solicitudes, pero pronto estará disponible (es posible que el servidor no tenga sockets libres para nuevos clientes). Por lo tanto, debe configurar el temporizador de retraso para un nuevo intento después de aproximadamente 10 ms. Con cada solicitud aumentamos el contador de intentos.

Después del fracaso de cada intento, marcamos el servidor como inactivo.

Ahora debe asignar un nuevo servidor para la misma solicitud. Haremos esto usando el contador de intentos usando el paquete de context . Después de aumentar el contador de intentos, lo pasamos a lb para seleccionar un nuevo servidor para procesar la solicitud.

No podemos hacer esto indefinidamente, por lo que comprobaremos en lb si se ha alcanzado el número máximo de intentos antes de continuar con el procesamiento de la solicitud.

Simplemente puede obtener el contador de intentos de la solicitud, si alcanza el máximo, entonces interrumpimos la solicitud.

 // lb load balances the incoming request func lb(w http.ResponseWriter, r *http.Request) { attempts := GetAttemptsFromContext(r) if attempts > 3 { log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) http.Error(w, "Service not available", http.StatusServiceUnavailable) return } peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable) } 

Esta es una implementación recursiva.

Usando el paquete de contexto


El paquete de context permite almacenar datos útiles en solicitudes HTTP. Utilizaremos esto activamente para rastrear los datos relacionados con las solicitudes: contadores de Retry y Retry .

Primero, debe establecer las claves para el contexto. Se recomienda usar no cadenas, sino valores numéricos únicos. Go tiene una palabra clave iota para la implementación incremental de constantes, cada una de las cuales contiene un valor único. Esta es una gran solución para definir teclas numéricas.

 const ( Attempts int = iota Retry ) 

Luego puede extraer el valor, como solemos hacer con HashMap . El valor predeterminado puede depender de la situación actual.

 // GetAttemptsFromContext returns the attempts for request func GetRetryFromContext(r *http.Request) int { if retry, ok := r.Context().Value(Retry).(int); ok { return retry } return 0 } 

Validación pasiva del servidor


Las verificaciones pasivas identifican y recuperan servidores caídos. Los enviamos a un cierto intervalo para determinar su estado.

Para hacer ping, intente establecer una conexión TCP. Si el servidor responde, lo marcamos funcionando. Este método se puede adaptar para llamar a puntos finales específicos como /status . Asegúrese de cerrar la conexión después de crearla para reducir la carga adicional en el servidor. De lo contrario, intentará mantener esta conexión y eventualmente agotará sus recursos.

 // isAlive checks whether a backend is Alive by establishing a TCP connection func isBackendAlive(u *url.URL) bool { timeout := 2 * time.Second conn, err := net.DialTimeout("tcp", u.Host, timeout) if err != nil { log.Println("Site unreachable, error: ", err) return false } _ = conn.Close() // close it, we dont need to maintain this connection return true } 

Ahora puede iterar los servidores y marcar sus estados:

 // HealthCheck pings the backends and update the status func (s *ServerPool) HealthCheck() { for _, b := range s.backends { status := "up" alive := isBackendAlive(b.URL) b.SetAlive(alive) if !alive { status = "down" } log.Printf("%s [%s]\n", b.URL, status) } } 

Para ejecutar este código periódicamente, puede ejecutar el temporizador en Ir. Le permitirá escuchar eventos en el canal.

 // healthCheck runs a routine for check status of the backends every 2 mins func healthCheck() { t := time.NewTicker(time.Second * 20) for { select { case <-tC: log.Println("Starting health check...") serverPool.HealthCheck() log.Println("Health check completed") } } } 

En este código, el canal <-tC devolverá un valor cada 20 segundos. select permite definir este evento. En ausencia de una situación default , espera hasta que se pueda ejecutar al menos un caso.

Ahora ejecute el código en una rutina diferente:

 go healthCheck() 

Conclusión


En este artículo, examinamos muchas preguntas:

  • Round Robin Algorithm
  • ReverseProxy de la biblioteca estándar
  • Mutexes
  • Operaciones atómicas
  • Cortocircuitos
  • Devoluciones de llamada
  • Operación de selección

Hay muchas más formas de mejorar nuestro equilibrador. Por ejemplo:

  • Use el montón para ordenar servidores en vivo para reducir el alcance de búsqueda.
  • Recoge estadísticas.
  • Implemente el algoritmo ponderado round-robin con el menor número de conexiones.
  • Agregar soporte para archivos de configuración.

Y así sucesivamente.

El código fuente está aquí .

Source: https://habr.com/ru/post/476276/


All Articles