集群交互的演变。 我们如何实现ActiveMQ和Hazelcast

在过去的7年中,我与我的团队一起一直在支持和开发Miro产品的核心(ex-RealtimeBoard):客户端-服务器和集群交互以及与数据库的合作。

我们拥有带有不同库的Java。 一切都通过Maven插件在容器外部启动。 它基于合作伙伴的平台,使我们能够处理数据库和流程,管理客户端与服务器的交互等。 DB-Redis和PostgreSQL(我的同事写过关于我们如何从一个数据库迁移到另一个数据库的信息 )。

就业务逻辑而言,该应用程序包含:

  • 处理自定义板及其内容;
  • 用户注册,创建和管理董事会的功能;
  • 自定义资源生成器。 例如,它优化了上传到应用程序的大图像,以免影响我们的客户;
  • 与第三方服务的许多集成。

在2011年刚开始时,整个Miro都在同一台服务器上。 一切都在上面:Nginx打开了站点的php,Java应用程序和数据库。

开发产品,增加用户数量以及他们添加到开发板上的内容都增加了,因此服务器上的负载也增加了。 由于服务器上的应用程序数量众多,当时我们无法理解到底是什么给了负载,因此也无法对其进行优化。要解决此问题,我们将所有内容拆分为不同的服务器,我们得到了一个Web服务器,一个服务器与我们的应用程序和数据库服务器。

不幸的是,一段时间后,随着应用程序负载的持续增长,问题再次出现。 然后,我们考虑了如何扩展基础架构。



接下来,我将讨论在开发集群以及扩展Java应用程序和基础结构方面遇到的困难。

水平扩展基础架构


我们首先收集指标:内存和CPU的使用,执行用户查询所需的时间,系统资源的使用以及使用数据库。 从指标来看,很明显,用户资源的生成是一个不可预测的过程。 我们可以将处理器100%加载并等待数十秒,直到一切完成。 用户对电路板的要求有时也会带来意想不到的负担。 例如,当用户选择一千个小部件并开始自发移动它们时。

我们开始考虑如何扩展系统的这些部分,并得出了明显的解决方案。

扩大董事会和内容的工作量 。 用户以如下方式打开板子:用户打开客户端→指示他要打开哪个板子→连接到服务器→服务器上创建了一个流→该板子的所有用户都连接到一个流子→该流中发生的任何更改或创建窗口小部件。 事实证明,与主板的所有工作都受到流程的严格限制,这意味着我们可以在服务器之间分配这些流程。

扩展用户资源生成 。 我们可以取出服务器来分别生成资源,它会接收生成的消息,然后响应一切都已生成。

一切似乎都很简单。 但是,一旦我们开始更深入地研究该主题,事实证明我们需要额外解决一些间接问题。 例如,如果用户使付费订阅到期,那么无论他们在哪个董事会上,我们都必须通知他们。 或者,如果用户已更新资源的版本,则需要确保在所有服务器上正确刷新了缓存,并且我们提供了正确的版本。

我们已经确定了系统要求。 下一步是了解如何将其付诸实践。 实际上,我们需要一个允许集群中的服务器相互通信并能够在此基础上实现所有思想的系统。

开箱即用的第一个集群


我们没有选择系统的第一个版本,因为它已经在我们使用的合作伙伴平台中部分实现。 在其中,所有服务器都通过TCP相互连接,使用此连接,我们可以一次将RPC消息发送到一个或所有服务器。

例如,我们有三台服务器,它们通过TCP相互连接,在Redis中,我们有这些服务器的列表。 我们在集群中启动一个新服务器→将其自身添加到Redis的列表中→读取列表以了解集群中的所有服务器→连接所有服务器。



基于RPC,已经实现了对刷新缓存和将用户重定向到所需服务器的支持。 我们必须生成一代用户资源,并通知用户发生了某些事情(例如,帐户已过期)。 为了生成资源,我们选择了一个任意服务器,并向他发送了生成请求,并且为了通知订阅已到期,我们向所有服务器发送了一条命令,希望消息能够达到目标。

服务器本身确定向谁发送消息。


听起来像一个功能,不是问题。 但是服务器仅专注于与另一台服务器的连接。 如果存在连接,则有一个候选对象来发送消息。

问题在于1号服务器目前不知道4号服务器处于高负载状态,因此无法足够快地回答它。 结果,服务器1的请求处理的速度比其处理速度要慢。



服务器不知道第二台服务器已冻结


但是,如果服务器不仅负载沉重,而且通常会冻结,该怎么办? 而且,它挂起后就不再栩栩如生了。 例如,我已经用光了所有可用的内存。

在这种情况下,服务器#1不知道问题是什么,因此它将继续等待答案。 集群中的其余服务器也不知道4号服务器的情况,因此它们将向4号服务器发送大量消息并等待响应。 因此,直到4号服务器死亡。



怎么办 我们可以独立地向系统添加服务器状态检查。 或者,我们可以将邮件从“病态”服务器重定向到“健康”服务器。 所有这些将花费太多的开发人员时间。 在2012年,我们在这一领域的经验很少,因此我们开始寻求针对所有问题的现成解决方案。

消息代理。 Activemq


我们决定朝着消息代理的方向正确配置服务器之间的通信。 他们之所以选择ActiveMQ,是因为能够在特定时间配置使用者的接收消息。 没错,我们从来没有抓住这个机会,所以我们可以选择RabbitMQ。

结果,我们将整个群集系统转移到了ActiveMQ。 它给了什么:

  1. 服务器不再自行确定将消息发送给谁,因为所有消息都通过队列。
  2. 配置的容错能力。 要读取队列,您不能运行一台服务器,而可以运行多台服务器。 即使其中之一掉落,系统也将继续运行。
  3. 服务器出现了角色,该角色允许按负载类型划分服务器。 例如,资源生成器只能连接到队列以读取消息以生成资源,而带有板的服务器可以连接到队列以打开板。
  4. 是否进行了RPC通信,即 每个服务器都有自己的专用队列,其他服务器在该专用队列中发送事件。
  5. 您可以通过主题(用于重置订阅)将消息发送到所有服务器。


该方案看起来很简单:所有服务器都连接到代理,并管理它们之间的通信。 一切正常,发送和接收消息,创建资源。 但是有新问题。

当所有必要的服务器都在躺着时该怎么办?


假设3号服务器希望发送一条消息以生成队列中的资源。 他希望可以处理他的消息。 但是他不知道由于某种原因,消息没有一个收件人。 例如,收件人由于错误而崩溃。

对于所有等待时间,服务器都会发送大量带有请求的消息,这就是为什么出现消息队列的原因。 因此,当工作服务器出现时,它们被迫首先处理累​​积的队列,这需要时间。 在用户方面,这导致以下事实:他上传的图像不会立即显示。 他还没准备好等待,所以他离开了董事会。

结果,我们将服务器容量用于资源的生成,没有人需要结果。



我该如何解决这个问题? 我们可以设置监视,它将通知您正在发生的事情。 但是,从监视报告某些事件的那一刻起,直到我们了解我们的服务器故障的那一刻,时间将会过去。 这不适合我们。

另一种选择是运行服务发现,或运行服务注册表,该服务将知道正在运行哪些角色的服务器。 在这种情况下,如果没有空闲服务器,我们将立即收到错误消息。

某些服务无法水平扩展


这是我们早期代码的问题,而不是ActiveMQ。 让我给你看一个例子:

Permission ownerPermission = service.getOwnerPermission(board); Permission permission = service.getPermission(board,user); ownerPermission.setRole(EDITOR); permission.setRole(OWNER); 

我们提供了一个在董事会上使用用户权限的服务:用户可以是董事会的所有者或其编辑者。 董事会只能有一位所有者。 假设有一种情况,我们想将董事会的所有权从一个用户转移到另一个用户。 在第一行中,我们获得董事会的当前所有者,在第二行中,我们获得了担任编辑的用户,现在成为所有者。 此外,当前的所有者是EDITOR,而以前的编辑器是OWNER。

让我们看看这在多线程环境中如何工作。 当第一个线程建立EDITOR角色,而第二个线程尝试采用当前的OWNER时,可能会发生OWNER不存在,但是有两个EDITOR的情况。

原因是缺乏同步。 我们可以通过在板上添加一个同步块来解决该问题。

 synchronized (board) { Permission ownerPermission = service.getOwnerPermission(board); Permission permission = service.getPermission(board,user); ownerPermission.setRole(EDITOR); permission.setRole(OWNER); } 

此解决方案在群集中不起作用。 SQL数据库可以借助事务来帮助我们。 但是我们有Redis。

另一种解决方案是将分布式锁添加到群集,以便同步在整个群集内部,而不仅仅是一个服务器。

进入电路板时出现单点故障


客户端和服务器之间的交互模型是有状态的。 因此,我们必须将板的状态存储在服务器上。 因此,我们为服务器设置了一个单独的角色-BoardServer,它处理与电路板相关的用户请求。

假设我们有三台BoardServer,其中一台是主要的。 用户向他发送“打开ID为123的板给我”的请求→服务器在其数据库中查看板是否处于打开状态以及在哪个服务器上。 在此示例中,板是打开的。



主服务器答复您需要连接到1号服务器→用户正在连接。 显然,如果主服务器死了,那么用户将不再能够访问新板。

那为什么我们需要一台知道板子在哪里打开的服务器? 这样我们就有了一个决策点。 如果服务器发生故障,我们需要了解该板是否确实可用,以便将其从注册表中删除或在其他位置重新打开。 当多个服务器解决类似的问题时,有可能在仲裁的帮助下进行组织,但是那时我们还不具备独立实施仲裁的知识。

切换到Hazelcast


一种或另一种方式,我们解决了出现的问题,但这可能不是最美丽的方式。 现在,我们需要了解如何正确解决它们,因此我们为新的群集解决方案制定了一系列要求:

  1. 我们需要一些东西来监视所有服务器的状态及其角色。 称之为服务发现。
  2. 我们需要集群锁,以帮助确保执行危险查询时的一致性。
  3. 我们需要一个分布式数据结构,以确保这些板位于某些服务器上,并告知是否出了问题。

那是2015年。 我们选择了Hazelcast-内存数据网格,这是一个用于在RAM中存储信息的集群系统。 然后我们以为我们找到了一个奇迹解决方案,这是集群交互世界的圣杯,一个奇迹般的框架,可以完成所有工作,并结合分布式数据结构,锁,RPC消息和队列。



与ActiveMQ一样,我们几乎将所有内容都转移到了Hazelcast:

  • 通过ExecutorService生成用户资源;
  • 更改权限时的分布式锁;
  • 服务器的角色和属性(服务发现);
  • 开放董事会的单一注册表等

Hazelcast拓扑


可以在两种拓扑中配置Hazelcast。 第一个选项是“客户端服务器”,当成员与主应用程序分开放置时,它们自己形成集群,并且所有应用程序都作为数据库连接到它们。



当Hazelcast成员嵌入在应用程序本身中时,第二个拓扑是“嵌入式”。 在这种情况下,我们可以使用更少的实例,访问数据的速度更快,因为数据和业务逻辑本身位于同一位置。



我们选择第二种解决方案是因为我们认为它更有效,更经济。 有效,因为访问Hazelcast数据的速度会降低,因为 也许这些数据在当前服务器上。 很经济,因为我们不需要在其他实例上花钱。

成员挂起时集群挂起


开启Hazelcast几周后,产品上出现了问题。

最初,我们的监控显示其中一台服务器开始逐渐使内存超载。 当我们观察该服务器时,其余服务器也开始加载:CPU增长,然后是RAM,五分钟后,所有服务器都使用了所有可用内存。

此时,在控制台中,我们看到了以下消息:

 2015-07-15 15:35:51,466 [WARN] (cached18) com.hazelcast.spi.impl.operationservice.impl.Invocation: [my.host.address.com]:5701 [dev] [3.5] Asking ifoperation execution has been started: com.hazelcast.spi.impl.operationservice.impl.IsStillRunningService$InvokeIsStillRunningOperationRunnable@6d4274d7 2015-07-15 15:35:51,467 [WARN] (hz._hzInstance_1_dev.async.thread-3) com.hazelcast.spi.impl.operationservice.impl.Invocation:[my.host.address.com]:5701 [dev] [3.5] 'is-executing': true -> Invocation{ serviceName='hz:impl:executorService', op=com.hazelcast.executor.impl.operations.MemberCallableTaskOperation{serviceName='null', partitionId=-1, callId=18062, invocationTime=1436974430783, waitTimeout=-1,callTimeout=60000}, partitionId=-1, replicaIndex=0, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeout=60000,target=Address[my.host2.address.com]:5701, backupsExpected=0, backupsCompleted=0} 

在这里,Hazelcast会检查发送到第一台“即将死机”的服务器的操作是否正在进行。 Hazelcast试图保持同步,并每秒检查几次操作状态。 结果,他通过此操作向所有其他服务器发送了垃圾邮件,几分钟后它们又飞出了内存,我们从每个服务器收集了几GB的日志。

情况重复了几次。 事实证明,这是Hazelcast 3.5版中的一个错误,该错误中实施了心跳机制,该机制检查请求的状态。 它没有检查我们遇到的一些边界情况。 我不得不优化应用程序,以免出现这种情况,几周后,Hazelcast在家中修复了该错误。

经常在Hazelcast中添加和删除成员


我们发现的下一个问题是从Hazelcast中添加和删除成员。

首先,我将简要介绍Hazelcast如何与分区一起使用。 例如,有四个服务器,每个服务器存储一部分数据(在图中,它们具有不同的颜色)。 单位是主要分区,演绎是次要分区,即 主分区的备份。



关闭服务器后,分区将发送到其他服务器。 万一服务器死了,分区不是从该服务器转移的,而是从仍处于活动状态并持有这些分区的备份的那些服务器转移的。



这是一个可靠的机制。 问题在于,我们经常打开和关闭服务器以平衡负载,并且重新平衡分区也需要时间。 而且,运行的服务器越多,我们在Hazelcast中存储的数据越多,重新平衡分区所花费的时间就越多。

当然,我们可以减少备份数量,即 二级分区。 但这并不安全,因为肯定会出问题。

另一种解决方案是切换到客户端-服务器拓扑,以便打开和关闭服务器不会影响核心的Hazelcast群集。 我们试图这样做,结果证明无法在客户端上执行RPC请求。 让我们看看为什么。

为此,请考虑将一个RPC请求发送到另一台服务器的示例。 我们使用ExecutorService,该服务使您可以发送RPC消息,并使用新任务进行提交。

 hazelcastInstance .getExecutorService(...) .submit(new Task(), ...); 

任务本身看起来像实现Callable的常规Java类。
 public class Task implements Callable<Long> { @Override public Long call() { return 42; } } 

问题在于,Hazelcast客户端不仅可以是Java应用程序,还可以是C ++应用程序,.NET等。 自然,我们无法生成Java类并将其转换为另一个平台。

一种选择是切换到使用http请求,以防万一我们想将内容从一台服务器发送到另一台服务器并获得答案。 但是随后我们将不得不部分放弃Hazelcast。

因此,作为一种解决方案,我们选择使用队列而不是ExecutorService。 为此,我们独立实现了一种机制,用于等待队列中的元素被执行,该机制处理边界情况并将结果返回给请求服务器。

我们学到了什么


为系统提供灵活性。 未来在不断变化,因此没有完美的解决方案。 正确地做到“正确”是行不通的,但是您可以尝试保持灵活性并将其放入系统中。 这使我们可以推迟重要的架构决策,直到不再可能不再接受它们为止。

清洁建筑学的罗伯特·马丁(Robert Martin)谈到了这一原则:
“架构师的目标是为系统创建表单,该表单将使政治成为最重要的元素,而细节与政治无关。 这将延迟并延迟有关细节的决定。”


通用工具和解决方案不存在。 如果您觉得某个框架可以解决您的所有问题,那么很可能并非如此。 因此,在实施任何框架时,不仅要了解它将解决的问题,而且会带来哪些问题,这一点很重要。

不要立即重写所有内容。 如果您遇到体系结构方面的问题,并且似乎唯一正确的解决方案是从头开始编写所有内容,请等待。 如果问题确实很严重,请找到快速修复并观察系统将来如何工作。 很有可能,这将不是体系结构中的唯一问题,随着时间的推移,您会发​​现更多问题。 并且只有当您选择了足够多的问题区域时,您才可以开始重构。 只有在这种情况下,它的优势才会超过其价值。

Source: https://habr.com/ru/post/zh-CN441590/


All Articles