Problemas de procesamiento por lotes de solicitudes y sus soluciones (parte 2)


Esta es una continuación del artículo "Problemas de procesamiento por lotes de solicitudes y sus soluciones" . Se recomienda que primero se familiarice con la primera parte, ya que describe en detalle la esencia del problema y algunos enfoques para su solución. Aquí nos fijamos en otros métodos.


Breve repetición de la tarea.


Hay un chat para coordinar el documento con un conjunto predefinido de participantes. Los mensajes contienen texto y archivos. Y, como en los chats regulares, los mensajes pueden ser respuestas y reenvíos.

Modelo de mensaje de chat
data class ChatMessage (
// nullable persist
val id : Long ? = null ,
/** */
val author : UserReference ,
/** */
val message : String ,
/** */
// - JPA+ null,
val files : List < FileReference > ? = null ,
/** , */
val replyTo : ChatMessage ? = null ,
/** , */
val forwardFrom : ChatMessage ? = null
)

A través de la inyección de dependencias, podemos implementar los siguientes servicios externos:
interface ChatMessageRepository {
fun findLast ( n : Int ) : List < ChatMessage >
}

data class FileHeadRemote (
val id : FileReference ,
val name : String
)

interface FileRemoteApi {
fun getHeadById ( id : FileReference ) : FileHeadRemote
   fun getHeadsByIds ( id : Set < FileReference > ) : Set < FileHeadRemote >
fun getHeadsByIds ( id : List < FileReference > ) : List < FileHeadRemote >
}

data class UserRemote (
val id : UserReference ,
val name : String
)

interface UserRemoteApi {
fun getUserById ( id : UserReference ) : UserRemote
   fun getUsersByIds ( id : Set < UserReference > ) : Set < UserRemote >
fun getUsersByIds ( id : List < UserReference > ) : List < UserRemote >
}

Necesitamos implementar un controlador REST:

interface ChatRestApi {
fun getLast ( n : Int ) : List < ChatMessageUI >
}


Donde:
/** */
data class ReferenceUI (
/** url */
val ref : String ,
/** */
val name : String
)

data class ChatMessageUI (
val id : Long ,
/** */
val author : ReferenceUI ,
/** */
val message : String ,
/** */
val files : List < ReferenceUI >,
/** , */
val replyTo : ChatMessageUI ? = null ,
/** , */
val forwardFrom : ChatMessageUI ? = null
)

En la parte anterior, analizamos la implementación ingenua de un servicio utilizando el procesamiento por lotes y varias formas de acelerarlo. Estos métodos son muy simples, pero su aplicación no proporciona un rendimiento suficientemente bueno.

Aumentar paquetes


El principal problema de las soluciones ingenuas era el pequeño tamaño de los paquetes.

Para agrupar llamadas en paquetes más grandes, debe acumular solicitudes de alguna manera. Esta línea no implica la acumulación de solicitudes:

author = userRepository . getUserById ( author ) . toFrontReference () ,

Ahora, en nuestro tiempo de ejecución, no hay un lugar especial para almacenar la lista de usuarios: se está formando gradualmente. Esto tendrá que cambiar.

Primero debe separar la lógica de adquisición de datos de la asignación en el método ChatMessage.toFrontModel:

private fun ChatMessage . toFrontModel (
getUser : ( UserReference ) -> UserRemote ,
getFile : ( FileReference ) -> FileHeadRemote ,
serializeMessage : ( ChatMessage ) -> ChatMessageUI
) : ChatMessageUI =
ChatMessageUI (
id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
author = getUser ( author ) . toFrontReference () ,
message = message ,
files = files ?. let {
       it . map ( getFile ) . map { it . toFrontReference () }
} ?: listOf () ,
forwardFrom = forwardFrom ?. let ( serializeMessage ) ,
replyTo = replyTo ?. let ( serializeMessage )
)


Resulta que esta función depende solo de tres funciones externas (y no de clases completas, como era al principio).

Después de tal revisión, el cuerpo de la función no quedó menos claro, y el contrato se hizo más difícil (esto tiene ventajas y desventajas).

De hecho, no puede hacer un estrechamiento del contrato y dejar dependencias en las interfaces. Lo principal es que definitivamente no hay nada superfluo en ellos, ya que tendremos que hacer implementaciones alternativas.

Dado que la función serializeMessage es similar a una función recursiva, esto se puede hacer como una recursión explícita en el primer paso de la refactorización:

class ChatRestController (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : ChatRestApi {
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. map { it . toFrontModel () }


Hice un trozo para el método toFrontModel, que hasta ahora funciona exactamente igual que en nuestra primera implementación ingenua (la implementación de las tres funciones externas sigue siendo la misma).

private fun ChatMessage . toFrontModel () : ChatMessageUI =
toFrontModel (
getUser = userRepository ::getUserById ,
getFile = fileRepository ::getHeadById ,
serializeMessage = { it . toFrontModel () }
   )


Pero debemos hacer que las funciones getUser, getFile y serializeMessage funcionen de manera eficiente, es decir, enviar solicitudes a los servicios apropiados en paquetes del tamaño correcto (en teoría, este tamaño puede ser diferente para cada servicio) o generalmente una solicitud por servicio si se permiten solicitudes ilimitadas.

La forma más fácil de lograr esta agrupación es si tenemos todas las consultas necesarias a la mano antes de comenzar el procesamiento. Para hacer esto, antes de llamar a FrontModel, recopile todos los enlaces necesarios, realice el procesamiento por lotes y luego use el resultado.

También puede probar el esquema con la acumulación de solicitudes y su ejecución gradual. Sin embargo, tales esquemas requerirán ejecución asincrónica, pero por ahora nos centraremos en los síncronos.

Entonces, para comenzar a utilizar el procesamiento por lotes, de una forma u otra tendremos que encontrar de antemano tantas solicitudes como sea posible (preferiblemente todas) que tendremos que hacer. Si estamos hablando de un controlador REST, sería bueno combinar las solicitudes de cada servicio a lo largo de la sesión.

Agrupar todas las llamadas
En algunas situaciones, todos los datos necesarios dentro de la sesión pueden obtenerse de inmediato y no causarán problemas con los recursos ni del iniciador de la solicitud ni del ejecutor. En este caso, no podemos limitar el tamaño del paquete para llamar al servicio e inmediatamente recibir todos los datos a la vez.

Otra suposición que hace la vida mucho más fácil es asumir que el iniciador tiene suficientes recursos para procesar todos los datos. Las solicitudes a servicios externos también se pueden enviar en paquetes limitados, si así lo requieren.

La simplificación de la lógica en este caso se refiere a cómo se compararán los lugares donde se necesitan los datos con los resultados de las llamadas. Si consideramos que los recursos del iniciador son muy limitados y, al mismo tiempo, intentamos minimizar el número de llamadas externas, tenemos una tarea bastante difícil para el corte óptimo del gráfico. Lo más probable es que solo tenga que sacrificar el rendimiento para reducir el consumo de recursos.

Consideraremos que, específicamente en nuestro proyecto de demostración, el iniciador no está particularmente limitado en recursos, puede recibir todos los datos necesarios y almacenarlos hasta el final de la sesión. Si hay problemas con los recursos, solo haremos una paginación más pequeña.

Dado que, en mi práctica, tal enfoque es el más demandado, otros ejemplos se referirán a esta opción.

Podemos distinguir tales métodos para obtener grandes conjuntos de consultas:

  • ingeniería inversa;
  • heurística empresarial;
  • agregados en el estilo de DDD;
  • proxy y doble llamada.

Veamos todas las opciones en el ejemplo de nuestro proyecto.

Ingeniería inversa


Recoge todas las solicitudes

Dado que tenemos un código para implementar todas las funciones involucradas en la recopilación de información y convertirla para la interfaz, podemos hacer ingeniería inversa y desde este código podemos entender qué solicitudes serán:

class ChatRestController (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : ChatRestApi {
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
         // , forward reply
val allMessages = messages . asSequence () . flatMap {
           sequenceOf ( it , it . forwardFrom , it . replyTo ) . filterNotNull ()
} . toSet ()
val allUserReq = allMessages . map { it . author }
         val allFileReq = allMessages . flatMap { it . files ?: listOf () } . toSet ()


Se recopilan todas las solicitudes, ahora debe realizar el procesamiento por lotes real.

Para allUserReq y allFileReq, realizamos consultas externas y las agrupamos por id. Si no hay restricciones en el tamaño del paquete, se verá así:

userRepository . getUsersByIds ( allMessages . map { it . author } . toSet ())
. associateBy { it . id } ::get
fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id } ::get


Si hay una restricción, entonces el código tomará la siguiente forma:

val userApiChunkLimit = 100
allMessages . map { it . author } . asSequence () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get


Desafortunadamente, a diferencia de Stream, Sequence no puede cambiar fácilmente a una solicitud de paquete paralelo.

Si considera que una consulta paralela es válida y necesaria, puede hacer, por ejemplo, esto:

allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get


Se puede ver que nada ha cambiado mucho. Usar una cierta cantidad de magia de Kotlin nos ayudó con esto:

fun < T , R > Stream < out T >. chunked ( size : Int , transform : ( List < T > ) -> R ) : Stream < out R > =
batches ( this , size ) . map ( transform )

fun < T > Stream < out Collection < T >>. flatten () : Stream < T > =
flatMap { it . stream () }

fun < T , K > Stream < T >. associateBy ( keySelector : ( T ) -> K ) : Map < K , T > =
collect ( Collectors . toMap ( keySelector , { it } ))


Ahora queda por poner todo junto:
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       // , forward reply
val allMessages = messages . asSequence () . flatMap { message ->
         sequenceOf ( message , message . forwardFrom , message . replyTo )
. filterNotNull ()
} . toSet ()

messages . map ( ValueHolder < ( ChatMessage ) -> ChatMessageUI > () . apply {
         value = memoize { message : ChatMessage ->
           message . toFrontModel (
// ,
getUser = allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "User $ it" ) } ,
//
getFile = fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "File $ it" ) } ,
//
serializeMessage = value
)
}
} . value )
}

Explicaciones y simplificaciones
Lo primero que seguramente llamará tu atención es la función de memorizar. El hecho es que la función serializeMessage seguramente se llamará varias veces para los mismos mensajes (debido a la respuesta y reenvío). No está claro por qué deberíamos hacer toFrontModel por separado para cada mensaje (en algunos casos esto puede ser necesario, pero no el nuestro). Por lo tanto, puede hacer una memorización para la función serializeMessage. Esto se implementa, por ejemplo, de la siguiente manera:

fun < A , R > memoize ( func : ( A ) -> R ) = func as? Memoize2 ?: Memoize2 ( func )
class Memoize2 < A , R > ( val func : ( A ) -> R ) : ( A ) -> R , java . util . function . Function < A , R > {
private val cache = hashMapOf < A , R > ()
override fun invoke ( p1 : A ) = cache . getOrPut ( p1 , { func ( p1 ) } )
override fun apply ( t : A ) : R = invoke ( t )
}


Luego, necesitamos construir una función memorizada serializeMessage, pero al mismo tiempo será utilizada dentro de ella. Es importante usar exactamente la misma instancia de la función en el interior, de lo contrario toda la memorización se irá por el desagüe. Para resolver esta colisión, utilizamos la clase ValueHolder, que simplemente almacena una referencia al valor (puede tomar algo estándar en su lugar, por ejemplo, AtomicReference). Para acortar el registro de recursividad, puede hacer esto:

inline fun < A , R > recursiveMemoize ( crossinline func : ( A , ( A ) -> R ) -> R ) : ( A ) -> R =
ValueHolder < ( A ) -> R > () . apply {
     value = memoize { a -> func ( a , value ) }
} . value


Si pudiste entender este silogismo de flecha la primera vez, felicidades, eres un programador funcional :-)

Ahora el código se verá así:

override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       // , forward reply
val allMessages = messages . asSequence () . flatMap { message ->
         sequenceOf ( message , message . forwardFrom , message . replyTo )
. filterNotNull ()
} . toSet ()

// ,
val getUser = allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "User $ it " ) }

       //
val getFile = fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id } ::get . orThrow { IllegalArgumentException ( "File $ it" ) }

       messages . map ( recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatMessageUI ->
         message . toFrontModel (
getUser = getUser ,
getFile = getFile ,
//
serializeMessage = memoized
         )
} )


También puede notar o Throw, que se define así:

/** [exception] , null */
fun < P , R > (( P ) -> R ? ) . orThrow ( exception : ( P ) -> Exception ) : ( P ) -> R =
{ p -> invoke ( p ) . let { it ?: throw exception ( p ) } }


Si no hay datos sobre nuestra identificación en servicios externos y esto se considera una situación legal, debe manejarlo de alguna manera diferente.

Después de esta solución, se espera que el tiempo de ejecución getLast sea de alrededor de 300 ms. Además, este tiempo crecerá ligeramente, incluso si las solicitudes ya no se ajustan a las restricciones sobre el tamaño del paquete (ya que los paquetes se solicitan en paralelo). Permítame recordarle que nuestro objetivo mínimo es de 500 ms, y 250 ms pueden considerarse trabajo normal.

Paralelismo

Pero necesitas seguir adelante. Las llamadas a userRepository y fileRepository son completamente independientes y se pueden paralelizar fácilmente, en teoría, se acercan a los 200 ms.

Por ejemplo, a través de nuestra función de unión:
override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       // , forward reply
val allMessages = messages . asSequence () . flatMap { message ->
         sequenceOf ( message , message . forwardFrom , message . replyTo )
. filterNotNull ()
} . toSet ()

join ( {
         // ,
allMessages . map { it . author } . parallelStream () . distinct ()
. chunked ( userApiChunkLimit , userRepository ::getUsersByIds )
. flatten ()
. associateBy { it . id }
} , {
         //
fileRepository . getHeadsByIds ( allMessages . flatMap { it . files ?: listOf () } . toSet ())
. associateBy { it . id }
} ) . let { ( users , files ) ->
         messages . map ( recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatMessageUI ->
           message . toFrontModel (
getUser = users ::get . orThrow { IllegalArgumentException ( "User $ it" ) } ,
getFile = files ::get . orThrow { IllegalArgumentException ( "File $ it " ) } ,
//
serializeMessage = memoized
           )
} )
}
}

Como muestra la práctica, la ejecución tarda unos 200 ms, y es muy importante que el tiempo no crezca mucho con un aumento en el número de mensajes.

Los problemas

En general, el código, por supuesto, se ha vuelto menos legible que nuestra ingenua primera versión, pero es bueno que la serialización en sí (la implementación de toFrontModel) no haya cambiado mucho y siga siendo completamente legible. Toda la lógica del trabajo astuto con servicios externos vive en un solo lugar.

La desventaja de este enfoque es que nuestra abstracción continúa.

Si necesitamos hacer cambios en toFrontModel, casi seguramente tendremos que hacer cambios en la función getLast, lo que viola el Principio de sustitución de Liskov.

Por ejemplo, acordamos descifrar los archivos adjuntos solo en los mensajes principales, pero no en las respuestas y transferencias (responder / reenviar), o solo en las respuestas y transferencias del primer nivel. En este caso, después de realizar cambios en el código toFrontModel, deberá realizar las correcciones correspondientes en el código de recopilación de solicitudes para los archivos. Además, la corrección no será trivial:

fileRepository . getHeadsByIds (
allMessages . flatMap { it . files ?: listOf () } . toSet ()
)


Y aquí nos estamos acercando sin problemas a otro problema que está estrechamente relacionado con el anterior: el funcionamiento correcto del código en su conjunto depende de la alfabetización de la ingeniería inversa. En algunos casos complejos, el código puede no funcionar correctamente precisamente debido a una recopilación de consultas incorrecta. No hay garantía de que pueda llegar rápidamente a pruebas unitarias que cubran todas esas situaciones difíciles.

Conclusiones

Pros:

  1. Una forma obvia de pre-recibir solicitudes, que se separa fácilmente del código principal.
  2. La ausencia casi completa de sobrecarga de memoria y tiempo asociada con el uso de solo los datos que se habrían recibido de todos modos.
  3. Buena escala y la capacidad de construir un servicio, que en teoría será responsable de un tiempo predecible, independientemente del tamaño de la solicitud desde el exterior.

Contras:

  1. Código bastante complejo para el procesamiento por lotes en sí.
  2. Trabajo grande y responsable en el análisis de solicitudes en la implementación existente.
  3. La abstracción fluida y, como consecuencia, la fragilidad de todo el esquema en relación con los cambios en la implementación.
  4. Dificultades en el soporte: los errores en el bloque de predicción de consulta son difíciles de distinguir de los errores en el código principal. Idealmente, debe usar el doble de pruebas unitarias, por lo que los procedimientos con errores en la producción serán el doble de difíciles.
  5. Cumplir con los principios SÓLIDOS al escribir código: el código debe estar preparado para alienar la lógica del procesamiento por lotes. La introducción de estos principios por sí sola proporcionará algunas ventajas, por lo que este menos es el más insignificante.

Es importante tener en cuenta que puede utilizar este método sin realizar ingeniería inversa como tal. Necesitamos obtener un contrato getLast, del cual depende el contrato para el cálculo preliminar de las solicitudes (en adelante, captación previa). En este caso, lo hicimos al observar la implementación de getLast (ingeniería inversa). Sin embargo, con este enfoque, surgen dificultades: la edición de estas dos piezas de código siempre debe ser sincrónica, y es imposible garantizar esto (recuerde hashCode y equals, es exactamente lo mismo). El siguiente enfoque, que me gustaría mostrar, está diseñado para resolver este problema (o al menos mitigarlo).

Heurística empresarial


Resolviendo un problema de contrato

¿Qué sucede si no operamos con un contrato exacto y, por lo tanto, con un conjunto exacto de solicitudes, sino con un aproximado? Además, crearemos un conjunto aproximado para que incluya estrictamente el conjunto exacto y se base en las características del área temática.

Por lo tanto, en lugar de la dependencia del contrato de captación previa en getLast, establecemos la dependencia de ambos en algún contrato común que será dictado por el usuario. La principal dificultad será encarnar de alguna manera este contrato general en forma de código.

Buscar limitaciones útiles

Intentemos hacer esto con nuestro ejemplo.
En nuestro caso, existen las siguientes características comerciales:

  • la lista de participantes en el chat está predefinida;
  • los chats están completamente aislados unos de otros;
  • El anidamiento de las cadenas de respuesta / reenvío es pequeño (~ 2–3 mensajes).

De la primera restricción se deduce que no es necesario correr alrededor de los mensajes, ver qué usuarios están allí, elegir los únicos y hacer una solicitud para ellos. Simplemente puede consultar una lista predefinida. Si estuviste de acuerdo con esta declaración, entonces te atrapé.

De hecho, no todo es tan simple. Una lista puede estar predefinida, pero puede haber miles de usuarios. Tales cosas necesitan ser aclaradas. En nuestro caso, generalmente habrá dos o tres participantes en el chat, rara vez más. Por lo tanto, es perfectamente aceptable recibir datos sobre todos ellos.

Además, si la lista de usuarios de chat está predeterminada, pero esta información no está en el servicio del usuario (lo cual es muy probable), entonces no tendrá sentido a partir de dicha información. Haremos una solicitud adicional para la lista de usuarios de chat, y luego aún tendrá que hacer la (s) solicitud (es) al servicio de usuario.

Suponga que la información sobre la conexión de los usuarios y el chat se almacena en el servicio del usuario. En nuestro caso, esto es así, ya que la conexión está determinada por los derechos del usuario. Luego, para los usuarios, se obtendrá un código de captación previa de este tipo:

Puede parecer sorprendente aquí que no estamos pasando ningún identificador de chat. Hice esto intencionalmente para no saturar el código de muestra.

A primera vista, nada se sigue de la segunda restricción. En cualquier caso, todavía no podía obtener nada útil de él.

Ya hemos usado la tercera restricción anteriormente. Puede tener un impacto significativo en cómo almacenamos y recibimos conversaciones. No comenzaremos a desarrollar este tema, ya que no tiene nada que ver con el controlador REST y el procesamiento por lotes.

¿Qué hacer con los archivos? Me gustaría obtener una lista de todos los archivos de chat en una simple solicitud. Según los términos de la API, solo necesitamos encabezados de archivo, sin cuerpos, por lo que no parece una tarea peligrosa y que requiera muchos recursos para la persona que llama.

Por otro lado, debemos recordar que no recibimos todos los mensajes de chat, sino solo la última N, y puede resultar fácilmente que no contengan ningún archivo.

No puede haber una respuesta universal: todo depende de los detalles del negocio y los casos de uso. Al crear una solución de producto, puede meterse en problemas si establece una heurística para un caso de uso, y luego los usuarios trabajarán con la funcionalidad de una manera diferente. Para demostraciones y preventas, esta es una buena opción, pero en este momento estamos tratando de escribir un servicio de producción honesto.

Entonces, por desgracia, será posible realizar heurísticas de negocios para los archivos aquí solo en función de los resultados de la operación y la recopilación de estadísticas (o después de una evaluación experta).

Como todavía queremos aplicar de alguna manera nuestro método, supongamos que las estadísticas muestran lo siguiente:

  1. Una conversación típica comienza con un mensaje que incluye uno o más archivos, seguido de mensajes de respuesta sin archivos.
  2. Casi todos los mensajes vienen en conversaciones típicas.
  3. El número esperado de archivos únicos dentro de un solo chat es de ~ 20.

De esto se deduce que para mostrar casi todos los mensajes necesitará obtener los encabezados de algunos archivos (porque ChatMessageUI está tan organizado) y que el número total de archivos es pequeño. En este caso, parece razonable recibir todos los archivos de chat en una sola solicitud. Para hacer esto, tendremos que agregar lo siguiente a nuestra API para archivos:

fun getHeadsByChat () : List < FileHeadRemote >

El método getHeadsByChat no parece exagerado y está hecho exclusivamente por nuestro deseo de optimizar el rendimiento (aunque esta también es una buena razón). Muy a menudo, en las salas de chat con archivos, los usuarios desean ver todos los archivos utilizados y en el orden en que se agregaron (por lo tanto, usamos List).

La implementación de una conexión tan explícita requerirá el almacenamiento de información adicional en un servicio de archivos o en nuestra aplicación. Todo depende de en qué área de responsabilidad, en nuestra opinión, esta información redundante sobre la conexión del archivo con el chat debe almacenarse. Es redundante porque el mensaje ya está asociado con el archivo y, a su vez, está asociado con el chat. No puede utilizar la desnormalización, pero extraiga esta información sobre la marcha de los mensajes, es decir, dentro de SQL, reciba de inmediato todos los archivos durante el chat (esto está en nuestra aplicación) y solicítelos todos de una vez desde el servicio de archivos. Esta opción funcionará peor si hay muchos mensajes de chat, pero no necesitamos desnormalización. Ocultaría ambas opciones detrás de getHeadsByChat.

El código es el siguiente:

override fun getLast ( n : Int ) =
messageRepository . findLast ( n )
. let { messages ->
       join (
{ userRepository . getUsersByChat () . associateBy { it . id } } ,
{ fileRepository . getHeadsByChat () . associateBy { it . id } }
       )
. let { //
}
}


Se puede ver que, en comparación con la versión anterior, muy poco ha cambiado y solo la parte de captación previa se ha visto afectada, lo cual es genial.

El código de captación previa se ha vuelto mucho más corto y claro.

El tiempo de ejecución no ha cambiado, lo cual es lógico, ya que el número de solicitudes sigue siendo el mismo. Existen casos teóricamente posibles en los que el escalado será mejor que la ingeniería inversa honesta (solo debido a la eliminación del enlace del cálculo complejo). Sin embargo, las situaciones opuestas son igualmente probables: las heurísticas reman demasiado. Como muestra la práctica, si logra llegar a una heurística adecuada, entonces no debería haber ningún cambio especial en el tiempo de ejecución.

Sin embargo, esto no es todo. No tomamos en cuenta que ahora recibir datos detallados sobre usuarios y archivos no está relacionado con la recepción de mensajes y las solicitudes se pueden lanzar en paralelo:

Esta opción proporciona unos 100 ms estables por solicitud.

Errores heurísticos

¿Qué sucede si, al usar la heurística, el conjunto de consultas no es más grande, sino un poco más pequeño de lo que debería ser? Para la mayoría de las opciones, tales heurísticas funcionarán, pero habrá excepciones para las cuales deberá realizar una solicitud por separado. En mi práctica, tales decisiones no tuvieron éxito, ya que cada excepción tuvo un gran impacto en el rendimiento y, al final, algunos usuarios hicieron una solicitud que consistió completamente en excepciones. Diría que en tales situaciones es mejor usar ingeniería inversa, incluso si el algoritmo de recopilación de consultas es espeluznante e ilegible, pero, por supuesto, todo depende de la importancia del servicio.

Conclusiones

Pros:

  1. La lógica de la heurística empresarial es fácil de leer y generalmente trivial. Esto es bueno para comprender los límites de aplicabilidad, verificar y modificar el contrato de captación previa.
  2. La escalabilidad es tan buena como la ingeniería inversa.
  3. La coherencia del código según los datos se reduce, lo que puede conducir a una mejor paralelización del código.
  4. La lógica de captación previa, como la lógica principal del controlador REST, se basa en los requisitos. Esta es una ventaja débil si los requisitos cambian con frecuencia.

Contras:

  1. A partir de los requisitos, no es tan fácil derivar la heurística para las predicciones de consultas. Puede ser necesario aclarar los requisitos, en la medida en que sea poco compatible con ágil.
  2. Puedes obtener datos adicionales.
  3. Para garantizar que el contrato de captación previa funcione de manera efectiva, probablemente se requerirá la desnormalización del almacenamiento de datos. Este es un punto débil, ya que estas optimizaciones se derivan de la lógica empresarial y, por lo tanto, lo más probable es que sean reclamadas por diferentes procesos.

A partir de nuestro ejemplo, podemos concluir que aplicar este enfoque es muy difícil y que el juego no vale la pena. De hecho, en proyectos comerciales reales, el número de restricciones es enorme y, a partir de este montón, a menudo es posible obtener algo útil, que le permite particionar datos o predecir estadísticas. La principal ventaja de este enfoque es que las restricciones utilizadas son interpretadas por la empresa, por lo tanto, se entienden y validan fácilmente.

Por lo general, el mayor problema cuando se intenta utilizar este enfoque es la separación de actividades. El desarrollador debe estar bien inmerso en la lógica empresarial y hacer preguntas a los analistas que aclaren las preguntas, lo que requiere un cierto nivel de iniciativa.

Unidades de estilo DDD


En proyectos grandes, a menudo puede ver el uso de prácticas DDD, ya que le permiten estructurar eficientemente su código. No es necesario utilizar todas las plantillas DDD en el proyecto; a veces puede obtener buenos resultados incluso con la introducción de una. Considere el concepto de DDD como un agregado. Un agregado es una unión de entidades conectadas lógicamente, cuyo trabajo se lleva a cabo solo a través de la raíz del agregado (por lo general, esta es una entidad que es la parte superior del gráfico de conectividad de la entidad).

Desde el punto de vista de la obtención de datos, lo principal en el agregado es que toda la lógica de trabajar con listas de entidades está en un solo lugar: el agregado. Hay dos enfoques sobre lo que debe transferirse a la unidad durante su construcción:

  1. Transferimos funciones a la unidad para obtener datos externos. La lógica para determinar los datos necesarios vive dentro de la unidad.
  2. Transferimos todos los datos necesarios. La lógica para determinar los datos necesarios vive fuera del agregado.

La elección del enfoque depende en gran medida de la facilidad con que la captación previa se puede mover fuera del agregado. Si la lógica de captación previa se basa en la heurística empresarial, generalmente es fácil separarla del agregado. Llevar la lógica más allá del alcance de un agregado basado en un análisis de su uso (ingeniería inversa) puede ser peligroso, ya que podemos distribuir código lógicamente relacionado en diferentes clases.

La lógica de ampliar las consultas dentro de un agregado

Intentemos esbozar un agregado que correspondería al concepto de "chat". Nuestras clases ChatMessage, UserReference, FileReference corresponden al modelo de almacenamiento, por lo que podrían cambiarse de nombre con algún prefijo apropiado, pero tenemos un pequeño proyecto, así que vamos a dejarlo como está. Llamamos al ensamblado Chat, y sus componentes son ChatPage y ChatPageMessage: Hasta ahora, se obtiene una gran cantidad de duplicaciones sin sentido. Esto se debe al hecho de que nuestro modelo de tema es similar al modelo de almacenamiento y ambos son similares al modelo frontend. Utilizo las clases FileHeadRemote y UserRemote directamente, para no escribir código innecesario, aunque generalmente en el dominio debes evitar usar tales clases directamente.

interface Chat {
fun getLastPage ( n : Int ) : ChatPage
}

interface ChatPage {
val messages : List < ChatPageMessage >
}

data class ChatPageMessage (
val id : Long ,
val author : UserRemote ,
val message : String ,
val files : List < FileHeadRemote >,
val replyTo : ChatPageMessage ? ,
val forwardFrom : ChatPageMessage ?
)






Si usa dicho agregado, nuestro controlador REST se puede reescribir de la siguiente manera: esta opción recuerda en gran medida nuestra primera implementación ingenua, pero tiene una ventaja importante: el controlador ya no se dedica a recibir datos directamente y no depende de las clases asociadas con el almacenamiento de datos, sino que depende solo desde la unidad, que se configura a través de las interfaces. Por lo tanto, la lógica de captación previa ya no está en el controlador. El controlador solo se ocupa de la conversión de la unidad en un modelo front-end, lo que nos da cumplimiento con el Principio de responsabilidad única (SRP). Desafortunadamente, para todos los métodos descritos en el agregado, tendrá que escribir una implementación.

class ChatRestController (
private val chat : Chat
) : ChatRestApi {
override fun getLast ( n : Int ) =
chat . getLastPage ( n ) . toFrontModel ()

private fun ChatPage . toFrontModel () =
messages . map { it . toFrontModel () }

   private fun ChatPageMessage . toFrontModel () : ChatMessageUI =
ChatMessageUI (
id = id ,
author = author . toFrontReference () ,
message = message ,
files = files . toFrontReference () ,
forwardFrom = forwardFrom ?. toFrontModel () ,
replyTo = replyTo ?. toFrontModel ()
)
}






Intentemos simplemente guardar la lógica del controlador implementada cuando se utiliza la heurística empresarial.
class ChatImpl (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : Chat {
override fun getLastPage ( n : Int ) = object : ChatPage {
override val messages : List < ChatPageMessage >
get () =
runBlocking ( IO ) {
           val prefetch = async (
{ userRepository . getUsersByChat () . associateBy { it . id } } ,
{ fileRepository . getHeadsByChat () . associateBy { it . id } }
           )

withContext ( IO ) { messageRepository . findLast ( n ) }
             . map (
prefetch . await () . let { ( users , files ) ->
                 recursiveMemoize { message , memoized : ( ChatMessage ) -> ChatPageMessage ->
                   message . toDomainModel (
getUser = users ::get . orThrow { IllegalArgumentException ( "User $ it " ) } ,
getFile = files ::get . orThrow { IllegalArgumentException ( "File $ it " ) } ,
//
serializeMessage = memoized
                   )
}
}
             )
}
   }
}

private fun ChatMessage . toDomainModel (
getUser : ( UserReference ) -> UserRemote ,
getFile : ( FileReference ) -> FileHeadRemote ,
serializeMessage : ( ChatMessage ) -> ChatPageMessage
) = ChatPageMessage (
id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
author = getUser ( author ) ,
message = message ,
files = files ?. map ( getFile ) ?: listOf () ,
forwardFrom = forwardFrom ?. let ( serializeMessage ) ,
replyTo = replyTo ?. let ( serializeMessage )
)

Resultó que la función getLastPage en sí misma tiene una estrategia de adquisición de datos, incluida la captación previa, y la función toDomainModel es puramente técnica y es responsable de convertir los modelos almacenados en un modelo de dominio.

Reescribí las llamadas paralelas a userRepository, fileRepository y messageRepository en una forma más familiar para Kotlin. Espero que la comprensión del código no haya sufrido por esto.

En general, dicho método ya está completamente operativo, el rendimiento al aplicarlo será el mismo que con el uso simple de ingeniería inversa o heurística empresarial.

La lógica de ampliar las consultas fuera del agregado

En el proceso de creación del agregado, inmediatamente encontraremos un problema: para construir ChatPage, el tamaño de página deberá establecerse como una constante al crear Chat, y no pasarlo a getLast (), como de costumbre. Tendremos que cambiar la interfaz agregada en sí: dado que tenemos una hija de los mensajes restantes y queremos firmemente que todos los datos queden fuera del agregado, tendremos que abandonar por completo el agregado de nivel de Chat y hacer de ChatPage la raíz: a continuación, debemos crear un código de captación previa separado del agregado: ahora para eso Para crear un agregado, debe acoplarlo con captación previa. En DDD, este tipo de orquestación es realizada por Application Services.

interface Chat {
fun getPage () : ChatPage
}




class ChatPageImpl (
private val messageData : List < ChatMessage >,
private val userData : List < UserRemote >,
private val fileData : List < FileHeadRemote >
) : ChatPage {
override val messages : List < ChatPageMessage >
get () =
messageData . map (
( userData . associateBy { it . id } to fileData . associateBy { it . id } )
. let { ( users , files ) ->
             recursiveMemoize { message , self : ( ChatMessage ) -> ChatPageMessage ->
               message . toDomainModel (
getUser = users ::get . orThrow () ,
getFile = files ::get . orThrow () ,
//
serializeMessage = self
               )
}
}
       )
}




fun chatPagePrefetch (
pageSize : Int ,
messageRepository : ChatMessageRepository ,
userRepository : UserRemoteApi ,
fileRepository : FileRemoteApi
) =
runBlocking ( IO ) {
     async (
{ userRepository . getUsersByChat () } ,
{ fileRepository . getHeadsByChat () } ,
{ messageRepository . findLast ( pageSize ) }
     )
}




class ChatService (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) {
private fun chatPagePrefetch ( pageSize : Int ) =
runBlocking ( IO ) {
       async (
{ messageRepository . findLast ( pageSize ) } ,
{ userRepository . getUsersByChat () } ,
{ fileRepository . getHeadsByChat () }
       ) . await ()
}

   fun getLastPage ( n : Int ) : ChatPage =
chatPagePrefetch ( n )
. let { ( messageData , userData , fileData ) ->
         ChatPageImpl ( messageData , userData , fileData )
}
}


Bueno, el controlador no cambiará mucho, solo necesita usar ChatService :: getLastPage en lugar de Chat :: getLastPage. Es decir, el código cambiará así:

class ChatRestController (
private val chat : ChatService
) : ChatRestApi


Conclusiones

  1. La lógica de captación previa se puede colocar dentro de la unidad o en un lugar separado.
  2. Si la lógica de captación previa está fuertemente relacionada con la lógica interna del agregado, es mejor no eliminarla, ya que esto puede interrumpir la encapsulación. Personalmente, no veo mucho sentido sacar la captación previa del agregado, ya que esto limita en gran medida las posibilidades y aumenta la coherencia implícita del código.
  3. La organización agregada en sí misma tiene un efecto positivo en el rendimiento del procesamiento por lotes, ya que el control sobre las solicitudes pesadas se vuelve más y el lugar para la lógica de captación previa se vuelve bastante definido.

En el próximo capítulo, consideraremos una implementación de captación previa que no puede implementarse aisladamente de la función principal.

Proxy y doble llamada


Resolviendo el problema del contrato

Como ya discutimos en las partes anteriores, el principal problema del contrato de captación previa es que está fuertemente conectado con el contrato de la función para la cual debe preparar los datos. Para ser más precisos, depende de qué datos pueda necesitar la función principal. ¿Qué pasa si no tratamos de predecir, pero intentamos hacer ingeniería inversa usando el código mismo? En situaciones simples, el enfoque proxy comúnmente utilizado en las pruebas puede ayudarnos. Las bibliotecas como Mockito generan clases con implementaciones de interfaz, que también pueden acumular información sobre las llamadas. Un enfoque similar se utiliza en nuestra biblioteca .

Si llama a la función principal con repositorios proxy y recopila información sobre los datos necesarios, puede obtener estos datos en forma de paquete y volver a llamar a la función principal para obtener el resultado final.

La condición principal es la siguiente: los datos solicitados no deberían afectar las solicitudes posteriores. El proxy no devolverá datos reales, sino solo algunos apéndices, por lo que desaparecerán todas las ramificaciones y la recepción de los datos asociados.

En nuestro caso, esto significa que es inútil usar proxy messageRepository, ya que se realizan más solicitudes en función de los resultados de la solicitud del mensaje. Esto no es un problema, ya que solo tenemos una solicitud para messageRepository, por lo que no se requiere procesamiento por lotes aquí.

Dado que representaremos las funciones simples UserReference-> UserRemote y FileReference-> FileHeadRemote, solo necesita acumular dos listas de argumentos.

Como resultado, obtenemos lo siguiente:
class ChatRestController (
private val messageRepository : ChatMessageRepository ,
private val userRepository : UserRemoteApi ,
private val fileRepository : FileRemoteApi
) : ChatRestApi {
override fun getLast ( n : Int ) : List < ChatMessageUI > {
val messages = messageRepository . findLast ( n )

//
fun transform (
getUser : ( UserReference ) -> UserRemote ,
getFile : ( FileReference ) -> FileHeadRemote
     ) : List < ChatMessageUI > =
messages . map (
recursiveMemoize { message , self ->
           message . toFrontModel ( getUser , getFile , self )
}
       )

//
val userIds = mutableSetOf < UserReference > ()
val fileIds = mutableSetOf < FileReference > ()
transform (
{ userIds += it ; UserRemote ( 0L , "" ) } ,
{ fileIds += it ; FileHeadRemote ( 0L , "" ) }
     )

return runBlocking ( IO ) {
       //
async (
{ userRepository . getUsersByIds ( userIds ) . associateBy { it . id } ::get . orThrow () } ,
{ fileRepository . getHeadsByIds ( fileIds ) . associateBy { it . id } ::get . orThrow () }
       ) . await () . let { ( getUser , getFile ) ->
         transform ( getUser , getFile )
}
}
   }
}

Si mide el rendimiento, resulta que con este enfoque no es peor que usar métodos de ingeniería inversa, aunque llamamos a la función dos veces. Esto se debe al hecho de que, en comparación con el tiempo de ejecución de consultas externas, el tiempo de ejecución de la función de conversión puede descuidarse (en nuestro caso).

En comparación con el rendimiento cuando se utiliza la heurística empresarial, en nuestro caso, la acumulación de consultas será menos efectiva. Pero tenga en cuenta que no siempre es posible encontrar tan buenas heurísticas. Por ejemplo, si el número de usuarios en el chat es grande, al igual que el número de archivos, y los archivos rara vez se adjuntan a los mensajes, nuestro algoritmo sobre heurística empresarial comenzará a perder de inmediato al recibir honestamente una lista de solicitudes.

Conclusiones

Pros:

  1. prefetch - .
  2. prefetch.
  3. , -.

Contras:

  1. .
  2. , .

A pesar del aparente exotismo, la acumulación de solicitudes a través de proxy y recuperación es bastante aplicable en situaciones en las que la lógica de la función principal no está vinculada a los datos recibidos. La principal dificultad aquí es la misma que con la ingeniería inversa: estamos aplicando la implementación actual de la función, aunque en un grado mucho menor (solo en el hecho de que las siguientes consultas no dependen de los resultados de consultas anteriores).

El rendimiento disminuirá ligeramente, pero en el código de captación previa no tendrá que tener en cuenta todos los matices de la implementación de la función principal.

Puede usar este enfoque cuando no pueda construir una buena heurística comercial para la predicción de consultas y desee reducir la conectividad de captación previa y función.

Conclusión


Usar el procesamiento por lotes no es tan simple como parece a primera vista. Creo que todos los patrones de diseño tienen esta propiedad (recuerde el almacenamiento en caché).

Para un procesamiento por lotes eficiente de las solicitudes, es importante que la persona que llama recopile tantas solicitudes como sea posible, lo que a menudo se ve obstaculizado por la estructura de la aplicación. Hay dos formas de salir: diseñar la aplicación con el fin de trabajar eficientemente con datos (puede muy bien conducir a un dispositivo reactivo de la aplicación) o, como sucede a menudo, tratar de implementar el procesamiento por lotes en una aplicación existente sin una reestructuración significativa.

La forma más obvia de acumular consultas es realizar ingeniería inversa del código existente en busca de consultas pesadas. El principal inconveniente de este enfoque será un aumento en la conectividad de código implícito. Una alternativa es utilizar la información sobre las características comerciales para dividir los datos en fragmentos, que a menudo se comparten y en su conjunto. A veces, para una separación tan efectiva, será necesario desnormalizar el almacenamiento, pero si esto tiene éxito, la lógica del procesamiento por lotes estará determinada por el área temática, lo cual es bueno.

Una forma menos obvia de obtener todas las solicitudes es implementar dos pases. En la primera etapa, recopilamos todas las solicitudes necesarias, en la segunda trabajamos con los datos ya recibidos. La aplicabilidad de este enfoque está limitada por el requisito de independencia de las solicitudes entre sí.

Source: https://habr.com/ru/post/467919/


All Articles