在Go上编写一个简单的平衡器


负载平衡器在Web体系结构中起着关键作用。 它们使您可以在多个后端之间分配负载,从而提高可伸缩性。 并且由于我们配置了多个后端,因此该服务变得高度可用,因为如果一台服务器发生故障,那么平衡器可以选择另一台正常工作的服务器。

在与NGINX等专业平衡器一起玩过之后,我尝试创建一个简单的平衡器以求乐趣。 我在Go上编写了它,它是支持完全并行性的现代语言。 Go中的标准库具有许多功能,可让您用更少的代码编写高性能的应用程序。 另外,为了便于分发,它会生成一个静态链接的二进制文件。

我们的平衡器如何运作


使用不同的算法在后端之间分配负载。 例如:

  • Round Robin-考虑到服务器的相同计算能力,负载将平均分配。
  • 加权循环法-根据处理能力,可以为服务器分配不同的权重。
  • 最少的连接-负载分布在活动连接数最少的服务器之间。

在我们的平衡器中,我们实现了最简单的算法-循环。



循环赛选拔


循环算法很简单。 它为所有表演者提供了完成任务的机会。


在Round Robin中选择服务器以处理传入的请求。

如图所示,该算法周期性地选择一个服务器。 但是我们不能直接选择它们,对吧?

如果服务器在说谎? 我们可能不需要向其发送流量。 也就是说,在我们将服务器置于所需状态之前,不能直接使用该服务器。 必须将流量仅定向到已启动并正在运行的服务器。

定义结构


我们需要跟踪与后端相关的所有详细信息。 您需要知道他是否还活着,并跟踪URL。 为此,我们可以定义以下结构:

type Backend struct { URL *url.URL Alive bool mux sync.RWMutex ReverseProxy *httputil.ReverseProxy } 

不用担心,我将在后端解释这些字段的含义。

现在,在平衡器中,您需要以某种方式跟踪所有后端。 为此,您可以使用Slice和变量计数器。 在ServerPool中定义它:

 type ServerPool struct { backends []*Backend current uint64 } 

使用反向代理


正如我们已经确定的那样,平衡器的本质是将流量分配到不同的服务器并将结果返回给客户端。 正如Go文档所述:

ReverseProxy是一个HTTP处理程序,它接收传入的请求并将其发送到另一台服务器,将响应代理回给客户端。

正是我们需要的。 无需重新发明轮子。 您可以简单地通过ReverseProxy流式传输我们的请求。

 u, _ := url.Parse("http://localhost:8080") rp := httputil.NewSingleHostReverseProxy(u) // initialize your server and add this as handler http.HandlerFunc(rp.ServeHTTP) 

使用httputil.NewSingleHostReverseProxy(url)您可以初始化ReverseProxy ,该httputil.NewSingleHostReverseProxy(url)将广播请求到传递的url 。 在上面的示例中,所有请求都发送到了本地主机:8080,结果被发送到了客户端。

如果查看ServeHTTP方法的签名,则可以在其中找到HTTP处理程序的签名。 因此,您可以将其传递给http HandlerFunc

其他示例在文档中

对于我们的平衡器,您可以使用Backend的关联URL来启动ReverseProxy ,以便ReverseProxy将请求路由到URL

服务器选择过程


在下一个服务器选择期间,我们需要跳过基础服务器。 但是您需要组织计数。

许多客户端将连接到平衡器,并且当每个客户端要求下一个节点传输流量时,可能会出现竞争状况。 为了防止这种情况,我们可以使用mutex阻止ServerPool 。 但这将是多余的,除了我们根本不想阻塞ServerPool之外。 我们只需要将计数器增加一即可。

满足这些要求的最佳解决方案是原子增量。 Go支持atomic包。

 func (s *ServerPool) NextIndex() int { return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends))) } 

我们以原子方式将当前值增加1,然后通过更改数组的长度来返回索引。 这意味着该值应始终在从0到数组长度的范围内。 最后,我们将对特定索引而不是整个计数器感兴趣。

选择实时服务器


我们已经知道我们的请求在所有服务器上循环轮换。 而且我们只需要跳过空闲状态。

GetNext()始终返回一个介于0到数组长度之间的值。 在任何时候,我们都可以获取下一个节点,如果它是不活动的,则需要在数组中进一步搜索作为循环的一部分。


我们遍历数组。

如图所示,我们想从下一个节点到列表的末尾。 这可以使用next + length 。 但是要选择索引,您需要将其限制为数组的长度。 使用修改操作可以轻松完成此操作。

在搜索过程中找到可用的服务器后,应将其标记为当前服务器:

 // GetNextPeer returns next active peer to take a connection func (s *ServerPool) GetNextPeer() *Backend { // loop entire backends to find out an Alive backend next := s.NextIndex() l := len(s.backends) + next // start from next and move a full cycle for i := next; i < l; i++ { idx := i % len(s.backends) // take an index by modding with length // if we have an alive backend, use it and store if its not the original one if s.backends[idx].IsAlive() { if i != next { atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one } return s.backends[idx] } } return nil } 

避免后端结构中的竞争状况


在这里,您需要记住一个重要的问题。 Backend结构包含一个变量,多个goroutine可以同时修改或查询该变量。

我们知道,goroutines对变量的读取要比对变量的写入要多。 因此,要序列化对Alive访问Alive我们选择RWMutex

 // SetAlive for this backend func (b *Backend) SetAlive(alive bool) { b.mux.Lock() b.Alive = alive b.mux.Unlock() } // IsAlive returns true when backend is alive func (b *Backend) IsAlive() (alive bool) { b.mux.RLock() alive = b.Alive b.mux.RUnlock() return } 

平衡要求


现在,我们可以制定一种简单的方法来平衡我们的请求。 仅当所有服务器都崩溃时,它才会失败。

 // lb load balances the incoming request func lb(w http.ResponseWriter, r *http.Request) { peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable) } 

该方法可以简单地作为HandlerFunc传递给HTTP服务器。

 server := http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: http.HandlerFunc(lb), } 

我们仅将流量路由到正在运行的服务器


我们的平衡器有一个严重的问题。 我们不知道服务器是否正在运行。 要找出答案,您需要检查服务器。 有两种方法可以做到这一点:

  • 活动:执行当前请求,我们发现所选服务器没有响应,并将其标记为空闲。
  • 被动:您可以每隔一段时间对服务器进行ping操作并检查状态。

积极检查正在运行的服务器


如果ReverseProxy任何错误ReverseProxy启动ErrorHandler回调函数。 这可以用来检测故障:

 proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) { log.Printf("[%s] %s\n", serverUrl.Host, e.Error()) retries := GetRetryFromContext(request) if retries < 3 { select { case <-time.After(10 * time.Millisecond): ctx := context.WithValue(request.Context(), Retry, retries+1) proxy.ServeHTTP(writer, request.WithContext(ctx)) } return } // after 3 retries, mark this backend as down serverPool.MarkBackendStatus(serverUrl, false) // if the same request routing for few attempts with different backends, increase the count attempts := GetAttemptsFromContext(request) log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts) ctx := context.WithValue(request.Context(), Attempts, attempts+1) lb(writer, request.WithContext(ctx)) } 

在开发此错误处理程序时,我们使用了闭包的功能。 这使我们可以将外部变量(例如服务器URL)捕获到我们的方法中。 处理程序检查重试计数器,如果小于3,则我们再次将同一请求发送到同一服务器。 这是因为由于临时错误,服务器可能会丢弃我们的请求,但很快就会变得可用(服务器可能没有用于新客户端的空闲套接字)。 因此,您需要在大约10毫秒后为新的尝试设置延迟计时器。 对于每个请求,我们都会增加尝试次数。

每次尝试失败后,我们将服务器标记为空闲。

现在,您需要为同一请求分配一个新服务器。 我们将使用context包中的尝试计数器来完成此操作。 增加尝试次数后,我们将其传递给lb以选择新服务器来处理请求。

我们不能无限期地执行此操作,因此在继续处理请求之前,我们将检查lb是否已达到最大尝试次数。

您可以简单地从请求中获取尝试计数器,如果尝试计数器达到最大值,我们将中断请求。

 // lb load balances the incoming request func lb(w http.ResponseWriter, r *http.Request) { attempts := GetAttemptsFromContext(r) if attempts > 3 { log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path) http.Error(w, "Service not available", http.StatusServiceUnavailable) return } peer := serverPool.GetNextPeer() if peer != nil { peer.ReverseProxy.ServeHTTP(w, r) return } http.Error(w, "Service not available", http.StatusServiceUnavailable) } 

这是一个递归实现。

使用上下文包


context包允许您在HTTP请求中存储有用的数据。 我们将积极地使用它来跟踪与请求相关的数据- AttemptRetry计数器。

首先,您需要设置上下文的键。 建议不要使用字符串,而应使用唯一的数值。 Go有一个用于增量实现常量的iota关键字,每个常量都包含一个唯一值。 这是定义数字键的绝佳解决方案。

 const ( Attempts int = iota Retry ) 

然后,您可以提取值,就像我们通常使用HashMap 。 默认值可能取决于当前情况。

 // GetAttemptsFromContext returns the attempts for request func GetRetryFromContext(r *http.Request) int { if retry, ok := r.Context().Value(Retry).(int); ok { return retry } return 0 } 

被动服务器验证


被动检查可识别并恢复掉落的服务器。 我们以一定的时间间隔对其进行ping操作以确定其状态。

要ping通,请尝试建立TCP连接。 如果服务器响应,则将其标记为工作。 该方法可以适应于调用特定端点,例如/status 。 确保在创建连接后关闭该连接,以减少服务器上的额外负载。 否则,他将尝试维持这种连接,并最终耗尽其资源。

 // isAlive checks whether a backend is Alive by establishing a TCP connection func isBackendAlive(u *url.URL) bool { timeout := 2 * time.Second conn, err := net.DialTimeout("tcp", u.Host, timeout) if err != nil { log.Println("Site unreachable, error: ", err) return false } _ = conn.Close() // close it, we dont need to maintain this connection return true } 

现在,您可以迭代服务器并标记其状态:

 // HealthCheck pings the backends and update the status func (s *ServerPool) HealthCheck() { for _, b := range s.backends { status := "up" alive := isBackendAlive(b.URL) b.SetAlive(alive) if !alive { status = "down" } log.Printf("%s [%s]\n", b.URL, status) } } 

要定期运行此代码,可以在Go中运行计时器。 它将允许您收听频道中的事件。

 // healthCheck runs a routine for check status of the backends every 2 mins func healthCheck() { t := time.NewTicker(time.Second * 20) for { select { case <-tC: log.Println("Starting health check...") serverPool.HealthCheck() log.Println("Health check completed") } } } 

在此代码中, <-tC通道将每20秒返回一个值。 select允许您定义此事件。 在没有default情况的default下,它将等待直到至少可以执行一种情况。

现在,在单独的goroutine中运行代码:

 go healthCheck() 

结论


在本文中,我们研究了许多问题:

  • 循环算法
  • 来自标准库的ReverseProxy
  • 互斥体
  • 原子操作
  • 短路
  • 回呼
  • 选择操作

还有许多改进平衡器的方法。 例如:

  • 使用堆对活动服务器进行排序以缩小搜索范围。
  • 收集统计信息。
  • 用最少的连接数实现加权轮询算法。
  • 添加对配置文件的支持。

依此类推。

源代码在这里

Source: https://habr.com/ru/post/zh-CN476276/


All Articles