请求的批处理问题及其解决方案(第2部分)


这是文章“请求及其处理的批处理问题”的续篇。 建议您首先熟悉第一部分,因为它详细描述了问题的实质以及解决方案的一些方法。 在这里,我们看看其他方法。


简短重复任务


通过聊天可以将文档与一组预定义的参与者进行协调。 消息包含文本和文件。 而且,就像在常规聊天中一样,消息可以被答复和转发。

聊天消息模型
data class ChatMessage (
// nullable persist
val id : Long ? = null ,
/** */
val author : UserReference ,
/** */
val message : String ,
/** */
// - JPA+ null,
val files : List < FileReference > ? = null ,
/** , */
val replyTo : ChatMessage ? = null ,
/** , */
val forwardFrom : ChatMessage ? = null
)

通过依赖注入,我们可以实现以下外部服务:
interface ChatMessageRepository {
fun findLast ( n : Int ) : List < ChatMessage >
}

data class FileHeadRemote (
val id : FileReference ,
val name : String
)

interface FileRemoteApi {
fun getHeadById ( id : FileReference ) : FileHeadRemote
   fun getHeadsByIds ( id : Set < FileReference > ) : Set < FileHeadRemote >
fun getHeadsByIds ( id : List < FileReference > ) : List < FileHeadRemote >
}

data class UserRemote (
val id : UserReference ,
val name : String
)

interface UserRemoteApi {
fun getUserById ( id : UserReference ) : UserRemote
   fun getUsersByIds ( id : Set < UserReference > ) : Set < UserRemote >
fun getUsersByIds ( id : List < UserReference > ) : List < UserRemote >
}

我们需要实现一个REST控制器:

interface ChatRestApi {
fun getLast ( n : Int ) : List < ChatMessageUI >
}


其中:
/** */
data class ReferenceUI (
/** url */
val ref : String ,
/** */
val name : String
)

data class ChatMessageUI (
val id : Long ,
/** */
val author : ReferenceUI ,
/** */
val message : String ,
/** */
val files : List < ReferenceUI >,
/** , */
val replyTo : ChatMessageUI ? = null ,
/** , */
val forwardFrom : ChatMessageUI ? = null
)

在上一部分中,我们研究了使用批处理的服务的朴素实现以及几种加速它的方法。 这些方法非常简单,但是它们的应用程序不能提供足够好的性能。

增加套餐


天真的解决方案的主要问题是包装尺寸小。

为了将呼叫分组为更大的数据包,您需要以某种方式累积请求。 该行并不意味着请求的累积:

author = userRepository . getUserById ( author ) . toFrontReference () ,

现在,在我们的运行时中,没有特殊的位置可以存储用户列表-它正在逐步形成。 这将不得不改变。

首先,您需要在ChatMessage.toFrontModel方法中将数据获取的逻辑与映射分开:

private fun ChatMessage . toFrontModel (
getUser : ( UserReference ) -> UserRemote ,
getFile : ( FileReference ) -> FileHeadRemote ,
serializeMessage : ( ChatMessage ) -> ChatMessageUI
) : ChatMessageUI =
ChatMessageUI (
id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
author = getUser ( author ) . toFrontReference () ,
message = message ,
files = files ?. let {
       it . map ( getFile ) . map { it . toFrontReference () }
} ?: listOf () ,
forwardFrom = forwardFrom ?. let ( serializeMessage ) ,
replyTo = replyTo ?. let ( serializeMessage )
)


事实证明,此函数仅依赖于三个外部函数(而不像最初那样依赖于整个类)。

经过这样的重做后,功能的主体并没有变得更加清晰,合同也变得更加艰巨(这既有利又有弊)。

实际上,您不能缩小协定范围,并保留对接口的依赖性。 最主要的是,它们绝对没有多余,因为我们将需要执行其他实现。

由于serializeMessage函数类似于递归函数,因此可以在重构的第一步中作为显式递归来完成:

class ChatRestController (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : ChatRestApi {
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. map { it . toFrontModel () }


我为toFrontModel方法创建了一个存根,到目前为止,该存根的工作方式与我们第一个简单的实现完全相同(所有三个外部功能的实现都相同)。

private fun ChatMessage . toFrontModel () : ChatMessageUI =
toFrontModel (
getUser = userRepository ::getUserById ,
getFile = fileRepository ::getHeadById ,
serializeMessage = { it . toFrontModel () }
   )


但是我们需要使getUser,getFile和serializeMessage函数有效地工作,也就是说,以适当大小(理论上每个服务的大小可以不同)的数据包将请求发送到适当的服务,或者如果允许无限制的请求,通常每个服务一个请求。

实现此分组的最简单方法是,在开始处理之前手头有所有必要的查询。 为此,在调用toFrontModel之前,请收集所有必要的链接,进行批处理,然后使用结果。

您还可以尝试通过累积请求和逐步执行该方案。 但是,这样的方案将需要异步执行,但是现在我们将集中在同步方案上。

因此,为了开始使用批处理,我们必须预先找出我们将要提出的尽可能多的请求(最好是全部)。 如果我们谈论的是REST控制器,则最好在整个会话期间合并对每个服务的请求。

分组所有通话
在某些情况下,可以立即获取会话中所有必要的数据,并且不会导致来自请求发起者或执行者的资源问题。 在这种情况下,我们不能限制用于调用服务的数据包大小,并且立即立即接收所有数据。

使生活更轻松的另一个假设是假设发起者有足够的资源来处理所有数据。 如果需要,对外部服务的请求也可以以有限的包形式发送。

在这种情况下,逻辑的简化涉及如何将需要数据的位置与调用结果进行比较。 如果我们认为发起方的资源非常有限,并且同时尝试最小化外部调用的数量,那么对于图的最佳裁剪,我们将面临一项艰巨的任务。 最有可能的是,您只需要牺牲性能来减少资源消耗。

我们将考虑到,在我们的演示项目中,发起者的资源没有特别限制,他可以接收所有必要的数据并将其存储到会话结束。 如果资源有问题,我们将进行较小的分页。

由于在我的实践中,最需要这种方法,因此更多示例将涉及此选项。

我们可以区分获得大量查询的此类方法:

  • 逆向工程;
  • 业务启发法;
  • 以DDD样式聚合;
  • 代理和双重通话。

让我们浏览一下项目示例中的所有选项。

逆向工程


收集所有请求

由于我们具有用于实现收集信息并将其转换为前端的所有功能的代码,因此我们可以进行逆向工程,并且从此代码中我们可以了解将要发出的请求:

class ChatRestController (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : ChatRestApi {
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
         // , forward reply
val allMessages = messages . asSequence () . flatMap {
           sequenceOf ( it , it . forwardFrom , it . replyTo ) . filterNotNull ()
} . toSet ()
val allUserReq = allMessages . map { it . author }
         val allFileReq = allMessages . flatMap { it . files ?: listOf () } . toSet ()


所有请求均已收集,现在您需要进行实际的批处理。

对于allUserReq和allFileReq,我们进行外部查询并按ID进行分组。 如果对包装的大小没有限制,那么它将看起来像这样:

userRepository . getUsersByIds ( allMessages . map { it . author } . toSet ())
. associateBy { it . id } ::get
fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id } ::get


如果有限制,那么代码将采用以下形式:

val userApiChunkLimit = 100
allMessages . map { it . author } . asSequence () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get


不幸的是,与Stream不同,Sequence无法轻松切换到并行数据包请求。

如果您认为并行查询是有效且必要的,则可以执行以下操作:

allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get


可以看出,没有什么太大变化。 使用一定数量的Kotlin魔术可以帮助我们:

fun < T , R > Stream < out T >. chunked ( size : Int , transform : ( List < T > ) -> R ) : Stream < out R > =
batches ( this , size ) . map ( transform )

fun < T > Stream < out Collection < T >>. flatten () : Stream < T > =
flatMap { it . stream () }

fun < T , K > Stream < T >. associateBy ( keySelector : ( T ) -> K ) : Map < K , T > =
collect ( Collectors . toMap ( keySelector , { it } ))


现在仍然可以将所有内容放在一起:
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       // , forward reply
val allMessages = messages . asSequence () . flatMap { message ->
         sequenceOf ( message , message . forwardFrom , message . replyTo )
. filterNotNull ()
} . toSet ()

messages . map ( ValueHolder < ( ChatMessage ) -> ChatMessageUI > () . apply {
         value = memoize { message : ChatMessage ->
           message . toFrontModel (
// ,
getUser = allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "User $ it" ) } ,
//
getFile = fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "File $ it" ) } ,
//
serializeMessage = value
)
}
} . value )
}

解释和简化
肯定会引起您注意的第一件事是备忘录功能。 事实是,几乎肯定会为相同的消息多次调用serializeMessage函数(由于回复和转发)。 不清楚为什么我们应该为每个这样的消息分别进行toFrontModel(在某些情况下,这是必要的,但不是我们的要求)。 因此,您可以对serializeMessage函数进行记忆。 例如,可以如下实现:

fun < A , R > memoize ( func : ( A ) -> R ) = func as? Memoize2 ?: Memoize2 ( func )
class Memoize2 < A , R > ( val func : ( A ) -> R ) : ( A ) -> R , java . util . function . Function < A , R > {
private val cache = hashMapOf < A , R > ()
override fun invoke ( p1 : A ) = cache . getOrPut ( p1 , { func ( p1 ) } )
override fun apply ( t : A ) : R = invoke ( t )
}


接下来,我们需要构造一个已记忆的函数serializeMessage,但同时将在其中使用它。 在内部使用功能的完全相同的实例非常重要,否则所有备注都会变得无用。 要解决此冲突,我们使用ValueHolder类,该类仅存储对该值的引用(您可以改用标准的东西,例如AtomicReference)。 要缩短递归记录,可以执行以下操作:

inline fun < A , R > recursiveMemoize ( crossinline func : ( A , ( A ) -> R ) -> R ) : ( A ) -> R =
ValueHolder < ( A ) -> R > () . apply {
     value = memoize { a -> func ( a , value ) }
} . value


如果您第一次可以理解这种箭头三段论,那么恭喜您,您是一名函数式程序员:-)

现在,代码将如下所示:

override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       // , forward reply
val allMessages = messages . asSequence () . flatMap { message ->
         sequenceOf ( message , message . forwardFrom , message . replyTo )
. filterNotNull ()
} . toSet ()

// ,
val getUser = allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "User $ it " ) }

       //
val getFile = fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "File $ it" ) }

       messages . map ( recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatMessageUI ->
         message . toFrontModel (
getUser = getUser ,
getFile = getFile ,
//
serializeMessage = memoized
         )
} )


您还会注意到orThrow,其定义如下:

/** [exception] , null */
fun < P , R > (( P ) -> R ? ) . orThrow ( exception : ( P ) -> Exception ) : ( P ) -> R =
{ p -> invoke ( p ) . let { it ?: throw exception ( p ) } }


如果在外部服务中没有关于我们ID的数据,并且这被视为合法情况,则您需要以不同的方式处理它。

修复之后,getLast运行时预计约为300毫秒。 而且,即使请求不再适合数据包大小的限制(由于并行请求数据包),该时间也会略有增加。 让我提醒您,我们的最低目标是500毫秒,可以将250毫秒视为正常工作。

平行性

但是您需要继续前进。 对userRepository和fileRepository的调用是完全独立的,并且可以轻松并行化,理论上接近200毫秒。

例如,通过我们的加入功能:
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       // , forward reply
val allMessages = messages . asSequence () . flatMap { message ->
         sequenceOf ( message , message . forwardFrom , message . replyTo )
. filterNotNull ()
} . toSet ()

join ( {
         // ,
allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id }
} , {
         //
fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id }
} ) . let { ( users , files ) ->
         messages . map ( recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatMessageUI ->
           message . toFrontModel (
getUser = users ::get . orThrow { IllegalArgumentException ( "User $ it" ) } ,
getFile = files ::get . orThrow { IllegalArgumentException ( "File $ it " ) } ,
//
serializeMessage = memoized
           )
} )
}
}

如实践所示,执行大约需要200毫秒,并且随着消息数量的增加,时间不会增加太多非常重要。

问题所在

总的来说,代码的可读性当然不如我们的朴素的第一版,但是序列化本身(toFrontModel的实现)并没有太大变化并且保持完全可读性是一件好事。 外部服务的狡猾工作的整个逻辑存在于一个地方。

这种方法的缺点是我们的抽象正在进行中。

如果需要对FrontModel进行更改,则几乎可以肯定必须对getLast函数进行更改,这违反了替换原则Barbara Liskov (Liskov替换原理)。

例如,我们同意仅在主要消息中解密附件,而不在答复和转发(答复/转发)中解密,或者仅在第一级的答复和转发中解密。 在这种情况下,对toFrontModel代码进行更改后,您将必须在文件的请求收集代码中进行相应的更正。 而且,校正将是不平凡的:

fileRepository . getHeadsByIds (
allMessages . flatMap { it . files ?: listOf () } . toSet ()
)


在这里,我们正在顺利解决与上一个问题密切相关的另一个问题:整个代码的正确操作取决于逆向工程的素养。 在某些复杂的情况下,由于错误的查询收集,代码可能无法正确正确地工作。 无法保证您将能够快速提出涵盖所有此类棘手情况的单元测试。

结论

优点:

  1. 一种预先接收请求的明显方法,该方法很容易与主代码分离。
  2. 几乎完全没有内存和时间开销,这与仅使用本来应该接收到的数据相关。
  3. 良好的伸缩性和构建服务的能力,从理论上讲,这将负责可预测的时间,而不考虑外部请求的大小。

缺点:

  1. 批处理本身的代码非常复杂。
  2. 在现有实施中分析要求的大型负责任的工作。
  3. 流动的抽象以及因此导致的整个方案的脆弱性(与实现的变化有关)。
  4. 支持困难:查询预测块中的错误很难与主代码中的错误区分开。 理想情况下,您需要使用两倍的单元测试,因此在生产中出现错误的过程将是困难的两倍。
  5. 编写代码时,请遵守SOLID原则:必须准备代码以疏远批处理的逻辑。 仅这些原则的介绍将提供一些优点,因此这个负数是最无关紧要的。

重要的是要注意,无需进行反向工程就可以使用此方法。 我们需要获得一个getLast合同,用于初步计算请求的合同取决于该合同(以下称为预取)。 在这种情况下,我们通过查看getLast(逆向工程)的实现来做到这一点。 但是,使用这种方法会出现困难:编辑这两段代码应该始终是同步的,并且无法确保做到这一点(记住hashCode和equals,是完全一样的东西)。 我想展示的下一种方法旨在解决此问题(或至少缓解)。

业务启发法


解决合同问题

如果我们没有严格的合同,因此没有一组确切的请求,却有一个近似的请求,该怎么办? 此外,我们将建立一个近似集,以便它严格包含精确集,并基于主题区域的特征。

因此,我们建立了两者都依赖于用户将要决定的某个公共合同,而不是预取合同对getLast的依赖。 主要困难将是以某种方式以代码形式体现该一般合同。

搜索有用的限制

让我们尝试用我们的例子来做到这一点。
就我们而言,具有以下业务功能:

  • 聊天参与者列表是预定义的;
  • 聊天彼此完全隔离;
  • 答复/转发链的嵌套很小(约2–3条消息)。

从第一个限制开始,您无需绕过消息,查看那里有哪些用户,选择唯一的用户并向他们提出请求。 您可以简单地查询预定义列表。 如果您同意这一说法,那么我抓住了您。

实际上,一切都不是那么简单。 列表可以预定义,但可以有数千个用户。 这些事情需要澄清。 在我们的情况下,通常会有两个或三个聊天参与者,很少有。 因此,完全可以接收所有数据。

此外,如果聊天用户的列表是预先确定的,但是该信息不在用户服务中(这很有可能),那么从这些信息中将没有任何意义。 我们将额外请求聊天用户列表,然后您仍然必须向用户服务提出请求。

假设有关用户连接和聊天的信息存储在用户服务中。 在我们的情况下,是这样,因为连接是由用户的权限决定的。 然后,对于用户来说,它将变成这样的预取代码:

在这里似乎没有传递任何聊天标识符似乎令人惊讶。 我故意这样做,以免弄乱示例代码。

乍一看,第二个限制没有任何意义。 无论如何,我仍然无法从他身上得到任何有用的东西。

我们之前已经使用了第三个限制。 它会对我们存储和接收对话的方式产生重大影响。 我们不会开始开发该主题,因为它与REST控制器和批处理无关。

如何处理文件? 我想在一个简单的请求中获得所有聊天文件的列表。 根据API的条款,我们只需要文件头,没有主体,因此对于调用者来说,这看起来像是一项资源密集且危险的任务。

另一方面,我们必须记住,我们不会收到所有聊天消息,而只会收到最后N条消息,因此很容易发现它们根本不包含任何文件。

没有统一的答案:这完全取决于业务细节和用例。 创建产品解决方案时,如果对一个用例进行启发式设计,就会遇到麻烦,然后用户将以不同的方式使用该功能。 对于演示和预售,这是一个不错的选择,但是现在我们正在尝试编写诚实的生产服务。

因此,很遗憾,仅基于操作和统计信息收集的结果(或经过专家评估),才可以对文件进行业务启发。

由于我们仍然想以某种方式应用我们的方法,因此假设统计数据显示如下:

  1. 典型的对话以包含一个或多个文件的消息开始,然后是不包含文件的回复消息。
  2. 几乎所有消息都来自典型的对话。
  3. 每次聊天中唯一文件的预期数量约为20。

因此,要显示几乎所有消息,您将需要获取某些文件的标头(因为ChatMessageUI如此安排),并且文件总数很小。 在这种情况下,在一个请求中接收所有聊天文件似乎是合理的。 为此,我们必须将以下内容添加到文件的API中:

fun getHeadsByChat () : List < FileHeadRemote >

getHeadsByChat方法看起来并不牵强,纯粹是因为我们希望优化性能(尽管这也是一个很好的理由)。 通常,在带有文件的聊天室中,用户希望查看所有使用的文件以及添加顺序(因此,我们使用列表)。

这种显式连接的实现将需要在文件服务或我们的应用程序中存储其他信息。 我们认为,这完全取决于应在谁的责任范围内存储有关文件与聊天的连接的冗余信息。 这是多余的,因为该消息已经与文件关联,而消息又与聊天关联。 您不能使用非规范化,而是从消息中即时提取此信息,即在SQL内部,立即接收整个聊天过程中的所有文件(在我们的应用程序中),并立即从文件服务中请求所有这些文件。 如果有很多聊天消息,则此选项的效果会更差,但我们不需要非规范化。 我会将两个选项都隐藏在getHeadsByChat后面。

代码如下:

override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       join (
{ userRepository . getUsersByChat () . associateBy { it . id } } ,
{ fileRepository . getHeadsByChat () . associateBy { it . id } }
       )
. let { //
}
}


可以看出,与以前的版本相比,变化很小,只有预取部分受到了影响,这是很大的。

预取代码变得更短,更清晰。

执行时间没有改变,这是合乎逻辑的,因为请求数保持不变。 从理论上讲,伸缩可能会比诚实的逆向工程更好(仅由于消除了复杂计算的环节)。 但是,相反的情况也有可能发生:试探法排得太多。 如实践所示,如果您设法提出足够的试探法,则执行时间不应有任何特殊变化。

但是,这还不是全部。 我们没有考虑到现在接收有关用户和文件的详细数据与接收消息无关,并且可以并行启动请求:

此选项为每个请求提供稳定的100毫秒。

启发式错误

如果在使用启发式方法时查询集不大但略小于应有的情况该怎么办? 对于大多数选项,此类试探法将起作用,但有些例外则需要您单独提出要求。 在我的实践中,这样的决定是不成功的,因为每个异常都会对性能产生很大的影响,最后,一些用户发出了一个完全由异常组成的请求。 我要说的是,在这种情况下,即使查询收集算法令人毛骨悚然且不可读,也最好使用逆向工程,但是,当然,这全都取决于服务的关键性。

结论

优点:

  1. 业务启发法的逻辑很容易阅读,通常很简单。 为了了解适用性的限制,验证和修改预取合同,这是很好的。
  2. 可伸缩性与逆向工程一样好。
  3. 代码与数据的一致性降低了,这可以导致代码更好的并行化。
  4. 像REST控制器的主要逻辑一样,预取逻辑也是基于需求的。 如果需求经常变化,这是一个弱项。

缺点:

  1. 从需求中得出启发式查询预测并不是那么容易。 在与敏捷兼容性不佳的程度上,可能有必要澄清需求。
  2. 您可以获得更多数据。
  3. 为了确保预取合同有效地工作,可能需要对数据存储进行非规范化。 这是一个弱小的减法,因为这些优化遵循业务逻辑,因此很可能将由不同的流程主张。

从我们的示例中,我们可以得出结论,应用这种方法非常困难,因此不值得一试。实际上,在实际的业务项目中,限制的数量是巨大的,并且经常可以从此堆中获得有用的信息,从而可以对数据进行分区或预测统计信息。这种方法的主要优点是业务可以解释所使用的限制,因此很容易理解和验证它们。

通常,尝试使用此方法时最大的问题是活动分离。开发人员应该完全沉浸在业务逻辑中,并向分析师提出问题以澄清问题,这需要一定程度的主动性。

DDD样式的单位


在大型项目中,您经常可以看到DDD做法的使用,因为它们使您可以有效地构建代码。不必在项目中使用所有DDD模板-有时即使引入一个DDD模板也可以获得良好的回报。将DDD的概念视为一个汇总。集合是逻辑上连接的实体的并集,只能通过集合的根来执行工作(通常,这是一个实体,位于实体连接图的顶部)。

从获取数据的角度来看,聚合中的主要内容是处理实体列表的整个逻辑都在一个地方-聚合。在构建期间应采用两种方法将其传输到单元:

  1. 我们将功能转移到本机以获取外部数据。确定必要数据的逻辑位于单元内部。
  2. 我们传输所有必要的数据。确定必要数据的逻辑位于聚合之外。

方法的选择很大程度上取决于预取可以轻松地移到聚合之外的方式。如果预取逻辑是基于业务启发式的,则通常很容易将其与聚合分离。基于逻辑的使用分析(逆向工程)将逻辑超出聚合的范围可能会很危险,因为我们可以将逻辑相关的代码分配到不同的类中。

在集合中扩大查询的逻辑

让我们尝试勾勒出一个与“聊天”概念相对应的集合。我们的ChatMessage,UserReference和FileReference类与存储模型相对应,因此可以使用一些适当的前缀来重命名它们,但是我们有一个小项目,因此我们将其保留。我们将程序集称为Chat,它的组件是ChatPage和ChatPageMes​​sage:到目前为止,已经获得了很多毫无意义的重复。这是由于以下事实:我们的主题模型类似于存储模型,并且两者都类似于前端模型。我直接使用FileHeadRemote和UserRemote类,以免编写过多代码,尽管通常在域中应避免直接使用此类。

interface Chat {
fun getLastPage ( n : Int ) : ChatPage
}

interface ChatPage {
val messages : List < ChatPageMessage >
}

data class ChatPageMessage (
val id : Long ,
val author : UserRemote ,
val message : String ,
val files : List < FileHeadRemote >,
val replyTo : ChatPageMessage ? ,
val forwardFrom : ChatPageMessage ?
)






如果使用这样的聚合,我们的REST控制器可以重写如下:该选项在很大程度上使我们联想到第一个幼稚的实现,但是它具有一个重要的优势:该控制器不再从事直接接收数据的工作,并且不依赖于与数据存储关联的类,而是依赖于仅来自通过接口设置的单元。因此,预取逻辑不再在控制器中。控制器仅处理将单元转换为前端模型的过程,这使我们符合单一责任原则(SRP)。不幸的是,对于汇总中描述的所有方法,您都必须编写一个实现。

class ChatRestController (
private val chat : Chat
) : ChatRestApi {
override fun getLast ( n : Int ) =
chat . getLastPage ( n ) . toFrontModel ()

private fun ChatPage . toFrontModel () =
messages . map { it . toFrontModel () }

   private fun ChatPageMessage . toFrontModel () : ChatMessageUI =
ChatMessageUI (
id = id ,
author = author . toFrontReference () ,
message = message ,
files = files . toFrontReference () ,
forwardFrom = forwardFrom ?. toFrontModel () ,
replyTo = replyTo ?. toFrontModel ()
)
}






让我们尝试仅保存使用业务试探法时实现的控制器逻辑。
class ChatImpl (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : Chat {
override fun getLastPage ( n : Int ) = object : ChatPage {
override val messages : List < ChatPageMessage >
get () =
runBlocking ( IO ) {
           val prefetch = async (
{ userRepository . getUsersByChat () . associateBy { it . id } } ,
{ fileRepository . getHeadsByChat () . associateBy { it . id } }
           )

withContext ( IO ) { messageRepository . findLast ( n ) }
             . map (
prefetch . await () . let { ( users , files ) ->
                 recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatPageMessage ->
                   message . toDomainModel (
getUser = users ::get . orThrow { IllegalArgumentException ( "User $ it " ) } ,
getFile = files ::get . orThrow { IllegalArgumentException ( "File $ it " ) } ,
//
serializeMessage = memoized
                   )
}
}
             )
}
   }
}

private fun ChatMessage . toDomainModel (
getUser : ( UserReference ) -> UserRemote ,
getFile : ( FileReference ) -> FileHeadRemote ,
serializeMessage : ( ChatMessage ) -> ChatPageMessage
) = ChatPageMessage (
id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
author = getUser ( author ) ,
message = message ,
files = files ?. map ( getFile ) ?: listOf () ,
forwardFrom = forwardFrom ?. let ( serializeMessage ) ,
replyTo = replyTo ?. let ( serializeMessage )
)

事实证明,getLastPage函数本身具有数据获取策略,包括预取,并且toDomainModel函数纯粹是技术性的,负责将存储的模型转换为域模型。

我以对Kotlin更熟悉的形式重写了对userRepository,fileRepository和messageRepository的并行调用。我希望代码的可理解性不会因此而受到影响。

通常,这种方法已经完全可以操作,应用时的性能将与简单使用逆向工程或业务启发法相同。

在聚合之外扩大查询的逻辑

在创建聚合的过程中,我们将立即遇到一个问题:构造ChatPage时,需要在创建Chat时将页面大小设置为常量,而不是像往常一样将其传递给getLast()。将不得不改变自己聚合接口:既然我们有dochitka其他消息,我们强烈希望获得所有单元的数据外,我们将不得不选择与他人聊天根ChatPage的总体水平的出来:接下来,你需要创建一个预取码,从单元分离:现在,为了要创建聚合,您需要将其与预取对接。在DDD中,这种编排是由Application Services完成的。

interface Chat {
fun getPage () : ChatPage
}




class ChatPageImpl (
private val messageData : List < ChatMessage >,
private val userData : List < UserRemote >,
private val fileData : List < FileHeadRemote >
) : ChatPage {
override val messages : List < ChatPageMessage >
get () =
messageData . map (
( userData . associateBy { it . id } to fileData . associateBy { it . id } )
. let { ( users , files ) ->
             recursiveMemoize { message , self : ( ChatMessage ) -> ChatPageMessage ->
               message . toDomainModel (
getUser = users ::get . orThrow () ,
getFile = files ::get . orThrow () ,
//
serializeMessage = self
               )
}
}
       )
}




fun chatPagePrefetch (
pageSize : Int ,
messageRepository : ChatMessageRepository ,
userRepository : UserRemoteApi ,
fileRepository : FileRemoteApi
) =
runBlocking ( IO ) {
     async (
{ userRepository . getUsersByChat () } ,
{ fileRepository . getHeadsByChat () } ,
{ messageRepository . findLast ( pageSize ) }
     )
}




class ChatService (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) {
private fun chatPagePrefetch ( pageSize : Int ) =
runBlocking ( IO ) {
       async (
{ messageRepository . findLast ( pageSize ) } ,
{ userRepository . getUsersByChat () } ,
{ fileRepository . getHeadsByChat () }
       ) . await ()
}

   fun getLastPage ( n : Int ) : ChatPage =
chatPagePrefetch ( n )
. let { ( messageData , userData , fileData ) ->
         ChatPageImpl ( messageData , userData , fileData )
}
}


好吧,控制器不会有太大变化,您只需要使用ChatService :: getLastPage而不是Chat :: getLastPage。也就是说,代码将像这样更改:

class ChatRestController (
private val chat : ChatService
) : ChatRestApi


结论

  1. 预取逻辑可以放置在设备内部或单独的位置。
  2. 如果预取逻辑与聚合的内部逻辑紧密相关,则最好不要将其取出,因为这会破坏封装。我个人认为将预取移出聚合没有太大意义,因为这极大地限制了可能性,并增加了代码的隐式一致性。
  3. 汇总组织本身对批处理的性能产生积极影响,因为对繁重请求的控制变得更多,预取逻辑的位置也变得十分明确。

在下一章中,我们将考虑无法与主函数隔离地实现的预取实现。

代理和双重通话


解决合同问题

正如我们在前面的部分中已经看到的那样,预取合同的主要问题在于,它与必须为其准备数据的功能的合同紧密相关。更确切地说,这取决于主要功能可能需要哪些数据。如果我们不尝试预测而是尝试使用代码本身进行逆向工程该怎么办?在简单的情况下,测试中常用的代理方法可以为我们提供帮助。Mockito之类的库使用接口实现生成类,这些实现也可以积累有关调用的信息。我们的图书馆使用了类似的方法

如果使用代理存储库调用main函数并收集有关必要数据的信息,则可以以包的形式获取此数据并重新调用main函数以获得最终结果。

主要条件如下:所请求的数据不应影响后续请求。代理不会返回真实数据,而只会返回一些存根,因此所有分支和接收关联数据都将消失。

在我们的情况下,这意味着代理messageRepository是没有用的,因为将根据消息请求的结果进行进一步的请求。这是没有问题的,因为我们只有一个请求messageRepository,所以这里不需要批处理。

因为我们将代理简单的函数UserReference-> UserRemote和FileReference-> FileHeadRemote,所以您只需要累加两个参数列表。

结果,我们得到以下信息:
class ChatRestController (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : ChatRestApi {
override fun getLast ( n : Int ) : List < ChatMessageUI > {
val messages = messageRepository . findLast ( n )

//
fun transform (
getUser : ( UserReference ) -> UserRemote ,
getFile : ( FileReference ) -> FileHeadRemote
     ) : List < ChatMessageUI > =
messages . map (
recursiveMemoize { message , self ->
           message . toFrontModel ( getUser , getFile , self )
}
       )

//
val userIds = mutableSetOf < UserReference > ()
val fileIds = mutableSetOf < FileReference > ()
transform (
{ userIds += it ; UserRemote ( 0L , "" ) } ,
{ fileIds += it ; FileHeadRemote ( 0L , "" ) }
     )

return runBlocking ( IO ) {
       //
async (
{ userRepository . getUsersByIds ( userIds ) . associateBy { it . id } ::get . orThrow () } ,
{ fileRepository . getHeadsByIds ( fileIds ) . associateBy { it . id } ::get . orThrow () }
       ) . await () . let { ( getUser , getFile ) ->
         transform ( getUser , getFile )
}
}
   }
}

如果衡量性能,事实证明,尽管我们两次调用了该函数,但使用这种方法并不比使用逆向工程方法更糟。这是由于这样的事实,与外部查询的执行时间相比,转换函数的执行时间可以忽略(在我们的示例中)。

与使用业务启发式方法时的性能相比,在我们的案例中,查询的累积将不太有效。但是请记住,并非总是能够找到如此出色的启发式方法。例如,如果聊天中的用户数量很大,文件数量也很大,并且文件很少附加到消息中,那么我们的业务启发式算法将立即失去诚实接收请求列表的能力。

结论

优点:

  1. prefetch - .
  2. prefetch.
  3. , -.

缺点:

  1. .
  2. , .

尽管有明显的异国情调,但通过代理和重新调用进行的请求累积在主功能逻辑与接收的数据不相关的情况下非常适用。这里的主要困难与逆向工程相同:尽管功能程度要小得多(仅基于以下查询不依赖于先前查询的结果这一事实),我们仍在依赖该函数的当前实现。

性能会略有下降,但是在预取代码中,您无需考虑主要功能实现的所有细微差别。

当您无法建立良好的业务启发式方法来进行查询预测,并且希望减少预取和功能连接时,可以使用此方法。

结论


乍看之下,使用批处理并非如此简单。我认为所有设计模式都具有此属性(请记住缓存)。

为了有效地处理请求,重要的是,调用者必须收集尽可能多的请求,而这通常受应用程序结构的阻碍。有两种解决方法:要么设计应用程序以有效地处理数据(它很可能导致应用程序的反应式设备),要么(经常发生)尝试在不进行重大重组的情况下在现有应用程序中实现批处理。

堆查询的最显而易见的方法是对大量查询进行反向工程设计。这种方法的主要缺点将是隐式代码连接性的增加。一种替代方法是使用有关业务功能的信息,以便将数据划分为多个块,这些块通常被共享。有时,为了实现这种有效的分离,有必要对存储进行非规范化,但是如果成功,则批处理的逻辑将由主题区域确定,这很好。

获得所有请求的一种不太明显的方法是实现两次通过。在第一阶段,我们收集所有必要的请求,在第二阶段,我们将处理已经收到的数据。这种方法的适用性受到要求彼此独立的要求的限制。

Source: https://habr.com/ru/post/zh-CN467919/


All Articles