Spring中的客户端请求同步

今天,我建议您分析一个有关为我的移动应用程序MT_FREE开发后端时在MaximTelecom中遇到的客户请求竞争的实际任务。

启动时,客户端应用程序将请求的“数据包”异步发送到API。 该应用程序具有clientId标识符,基于该标识符可以区分来自一个客户端的请求与另一个客户端的请求。 对于服务器上的每个请求,将执行以下形式的代码:

//      Client client = clientRepository.findByClientId(clientId); //      if(client == null){ client = clientRepository.save(new Client(clientId)); } //    

其中,客户实体具有一个clientId字段,该字段必须是唯一的,并且为此在数据库中具有唯一的约束。 由于在Spring中,每个请求都将在单独的线程中执行此代码,即使这些请求是来自同一客户端应用程序的请求,也会出现以下形式的错误:
完整性约束违规:唯一约束或索引违规; UK_BFJDOY2DPUSSYLQ7G1S3S1TN8表:客户端

发生错误的原因很明显:2个或更多具有相同clientId的线程接收到client == null实体并开始创建它,然后在提交时收到错误。

挑战:


必须同步来自一个clientId的请求,以便只有第一个请求才能完成Client实体的创建,其余请求将在创建时被阻止并接收已经创建的对象。

解决方案1


  //      if(client == null){ //   synchronized (this){ //    client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } } 

由于需要创建的所有请求(线程)都会被阻止,即使它们使用不同的clientId创建Client并且彼此不竞争,该解决方案仍然有效,但代价非常高。

请注意,同步与@Transactional批注的组合

 @Transactional public synchronized Client getOrCreateUser(String clientId){ //      Client client = clientRepository.findByClientId(clientId); //      if(client == null){ client = clientRepository.save(new Client(clientId)); } return client; } 

同样的错误将再次发生。 原因是监视器(已同步)首先被释放,下一个线程进入同步区域,并且只有在此之后,事务才由代理对象中的第一个线程提交。 解决此问题的方法很简单-您需要在提交后释放监视器,因此必须在上面调用synced:

  synchronized (this){ client = clientService.getOrCreateUser(clientId); } 

决定2


我真的很想使用表单的设计:

 synchronized (clientId) 

但是问题是,即使它们的值相等,也会为每个请求创建一个新的clientId对象,因此,无法以这种方式执行同步。 为了解决使用不同clientId对象的问题,您需要使用池:

 Client client = clientRepository.findByClientId(clientId); //      if(client == null){ //   synchronized (clientId.intern()){ //    client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } } 

此解决方案使用java字符串池,分别具有等效clientId的请求,通过调用clientId.intern(),将接收相同的对象。 不幸的是,在实践中,此解决方案不适用,因为不可能管理“腐烂”的clientId,迟早会导致OutOfMemory。

决定3


为了使用ReentrantLock,您需要以下形式的池:

 private final ConcurrentMap<String, ReentrantLock> locks; 

然后:

 Client client = clientRepository.findByClientId(clientId); //      if(client == null){ //   ReentrantLock lock = locks.computeIfAbsent(clientId, (k) -> new ReentrantLock()); lock.lock(); try{ //    client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } finally { //   lock.unlock(); } } 

唯一的问题是“过时的” clientId的管理,可以使用ConcurrentMap的非标准实现来解决,该实现已经支持expire,例如,使用guava Cache:

 locks = CacheBuilder.newBuilder() .concurrencyLevel(4) .expireAfterWrite(Duration.ofMinutes(1)) .<String, ReentrantLock>build().asMap(); 

决定4


上述解决方案在单个实例内同步请求。 如果您的服务在N个节点上旋转,并且请求可以同时转到不同的位置,该怎么办? 对于这种情况,使用Redisson库是一种完美的解决方案:

  Client client = clientRepository.findByClientId(clientId); //      if(client == null){ //   RLock lock = redissonClient.getFairLock(clientId); lock.lock(); try{ //    client = clientRepository.findByClientId(clientId); if(client == null){ client = clientRepository.save(new Client(clientId)); } } finally { //   lock.unlock(); } } 

该库使用redis作为存储库解决了分布式锁问题。

结论


应采用哪种决定当然取决于问题的规模:解决方案1-3非常适合小型单实例服务,解决方案4针对分布式服务。 值得一提的是,使用Redisson或类似物(例如经典的Zookeeper)解决此问题当然是一种特殊情况,因为它们是为分布式系统的更多任务而设计的。

在我们的案例中,我们选择了解决方案4,因为我们的服务是分布式的,与类似物相比,Redisson集成最容易。

朋友们,请在评论中建议您解决该问题的选择,我将非常高兴!
示例的源代码在GitHub可用

顺便说一下,我们正在不断扩充开发人员,有关职位空缺可以在我们的职业页面上找到。

UPD 1.读者的解决方案1


此解决方案建议不要同步请求,但是如果出现以下形式的错误:
完整性约束违规:唯一约束或索引违规; UK_BFJDOY2DPUSSYLQ7G1S3S1TN8表:客户端

必须处理并召回
 client = clientRepository.findByClientId(clientId); 

或通过spring-retry来完成:
 @Retryable(value = { SQLException.class }, maxAttempts = 3, backoff = @Backoff(delay = 1000)) @Transactional public Client getOrCreateUser(String clientId) 

(以Throwable 为例
在这种情况下,将对数据库进行“额外”查询,但是实际上,客户端实体的创建通常不会发生,并且如果仅需要同步来解决插入数据库的问题,则可以省去此解决方案。

UPD 2.读者的解决方案2


该解决方案建议通过会话进行同步:
 HttpSession session = request.getSession(false); if (session != null) { Object mutex = WebUtils.getSessionMutex(session); synchronized (mutex) { ... } } 

该解决方案适用于单实例服务,但是必须解决该问题,以便从一个客户端到API的所有请求都在同一会话中执行。

Source: https://habr.com/ru/post/zh-CN434714/


All Articles