Problemas de procesamiento por lotes de solicitudes y sus soluciones (parte 1)

Casi todos los productos de software modernos consisten en varios servicios. A menudo, los largos tiempos de respuesta del canal entre servicios se convierten en una fuente de problemas de rendimiento. La solución estándar para este tipo de problema es empacar varias solicitudes interservicios en un solo paquete, que se llama procesamiento por lotes.

Si utiliza el procesamiento por lotes, es posible que no esté satisfecho con su resultado en términos de rendimiento o comprensión del código. Este método no es tan fácil para la persona que llama como podría pensar. Para diferentes propósitos y en diferentes situaciones, las decisiones pueden variar mucho. En ejemplos específicos, mostraré los pros y los contras de varios enfoques.

Proyecto de demostración


Para mayor claridad, considere un ejemplo de uno de los servicios en la aplicación en la que estoy trabajando actualmente.

Una explicación de la elección de plataforma para ejemplos.
El problema del bajo rendimiento es bastante general y no se aplica a ningún idioma y plataforma específicos. Este artículo utilizará ejemplos de código Spring + Kotlin para demostrar tareas y soluciones. Kotlin es igualmente comprensible (o incomprensible) para los desarrolladores de Java y C #, además, el código es más compacto y comprensible que en Java. Para facilitar la comprensión de los desarrolladores de Java puro, evitaré la magia negra de Kotlin y usaré solo el blanco (en el espíritu de Lombok). Habrá algunos métodos de extensión, pero en realidad son familiares para todos los programadores de Java como métodos estáticos, por lo que será un poco de azúcar que no estropeará el sabor del plato.

Hay un servicio de aprobación de documentos. Alguien crea un documento y lo envía para su discusión, durante el cual se realizan ediciones y, en última instancia, el documento es coherente. El servicio de reconciliación en sí mismo no sabe nada sobre documentos: es solo un chat de coordinadores con pequeñas funciones adicionales, que no consideraremos aquí.

Por lo tanto, hay salas de chat (correspondientes a documentos) con un conjunto predefinido de participantes en cada uno de ellos. Al igual que en los chats regulares, los mensajes contienen texto y archivos y pueden ser respuestas y reenvíos:

data class ChatMessage (
   // nullable persist
   val id : Long ? = null ,
   /** */
   val author : UserReference ,
   /** */
   val message : String ,
   /** */
   // - JPA+ null,
   val files : List < FileReference > ? = null ,
   /** , */
   val replyTo : ChatMessage ? = null ,
   /** , */
   val forwardFrom : ChatMessage ? = null
)


Los enlaces al archivo y al usuario son enlaces a otros dominios. Vive con nosotros así:

typealias FileReference = Long
typealias UserReference = Long

Los datos del usuario se almacenan en Keycloak y se recuperan a través de REST. Lo mismo ocurre con los archivos: los archivos y la metainformación sobre ellos viven en un servicio de almacenamiento de archivos separado.

Todas las llamadas a estos servicios son solicitudes pesadas . Esto significa que la sobrecarga para transportar estas solicitudes es mucho mayor que el tiempo que tardan en ser procesadas por un servicio de terceros. En nuestros bancos de pruebas, el tiempo de llamada típico para dichos servicios es de 100 ms, por lo que en el futuro utilizaremos estos números.

Necesitamos hacer un controlador REST simple para recibir los últimos N mensajes con toda la información necesaria. Es decir, creemos que en el frontend, el modelo de mensaje es casi el mismo y necesitamos enviar todos los datos. La diferencia entre el modelo frontend es que el archivo y el usuario deben presentarse en una forma ligeramente descifrada para que sean enlaces:

/** */
data class ReferenceUI (
   /** url */
   val ref : String ,
   /** */
   val name : String
)
data class ChatMessageUI (
   val id : Long ,
   /** */
   val author : ReferenceUI ,
   /** */
   val message : String ,
   /** */
   val files : List < ReferenceUI >,
   /** , */
   val replyTo : ChatMessageUI ? = null ,
   /** , */
   val forwardFrom : ChatMessageUI ? = null
)


Necesitamos implementar lo siguiente:

interface ChatRestApi {
   fun getLast ( n : Int ) : List < ChatMessageUI >
}


Postfix UI significa modelos DTO para la interfaz, es decir, lo que debemos dar a través de REST.

Puede parecer sorprendente aquí que no pasemos ningún identificador de chat e incluso en el modelo ChatMessage / ChatMessageUI no lo es. Hice esto a propósito, para no saturar el código de los ejemplos (los chats están aislados, por lo que podemos suponer que tenemos uno).

Retiro filosófico
Tanto la clase ChatMessageUI como el método ChatRestApi.getLast usan el tipo de datos Lista, mientras que en realidad es un Conjunto ordenado. En el JDK, todo esto es malo, por lo que la declaración del orden de los elementos en el nivel de la interfaz (mantener el orden al agregar y extraer) fallará. Por lo tanto, es una práctica común usar List en los casos en que necesita un Set ordenado (todavía hay un LinkedHashSet, pero esto no es una interfaz).

Una limitación importante: suponemos que no hay largas cadenas de respuestas o reenvíos. Es decir, lo son, pero su longitud no supera los tres mensajes. La cadena de mensajes front-end se debe transmitir en su totalidad.

Para recibir datos de servicios externos, existen tales API:

interface ChatMessageRepository {
   fun findLast ( n : Int ) : List < ChatMessage >
}
data class FileHeadRemote (
   val id : FileReference ,
   val name : String
)
interface FileRemoteApi {
   fun getHeadById ( id : FileReference ) : FileHeadRemote
   fun getHeadsByIds ( id : Set < FileReference > ) : Set < FileHeadRemote >
   fun getHeadsByIds ( id : List < FileReference > ) : List < FileHeadRemote >
   fun getHeadsByChat () : List < FileHeadRemote >
}
data class UserRemote (
   val id : UserReference ,
   val name : String
)
interface UserRemoteApi {
   fun getUserById ( id : UserReference ) : UserRemote
   fun getUsersByIds ( id : Set < UserReference > ) : Set < UserRemote >
   fun getUsersByIds ( id : List < UserReference > ) : List < UserRemote >
}


Se puede ver que el procesamiento por lotes se proporcionó inicialmente en servicios externos, y en ambos casos: a través de Set (sin preservar el orden de los elementos, con claves únicas) y a través de List (puede haber duplicados: el orden se conserva).

Implementaciones simples


Implementación ingenua


La primera implementación ingenua de nuestro controlador REST se verá así en la mayoría de los casos:

class ChatRestController (
   private val messageRepository : ChatMessageRepository ,
   private val userRepository : UserRemoteApi ,
   private val fileRepository : FileRemoteApi
) : ChatRestApi {
   override fun getLast ( n : Int ) =
     messageRepository . findLast ( n )
       . map { it . toFrontModel () }

   private fun ChatMessage . toFrontModel () : ChatMessageUI =
     ChatMessageUI (
       id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
       author = userRepository . getUserById ( author ) . toFrontReference () ,
       message = message ,
       files = files ?. let { files ->
         fileRepository . getHeadsByIds ( files )
           . map { it . toFrontReference () }
} ?: listOf () ,
       forwardFrom = forwardFrom ?. toFrontModel () ,
       replyTo = replyTo ?. toFrontModel ()
)
}

Todo está muy claro, y esta es una gran ventaja.

Utilizamos el procesamiento por lotes y recibimos datos de un servicio externo en lotes. Pero, ¿qué está pasando con el rendimiento?

Para cada mensaje, se realizará una llamada a UserRemoteApi para obtener datos en el campo de autor y una llamada a FileRemoteApi para recibir todos los archivos adjuntos. Parece ser todo. Suponga que los campos forwardFrom y replyTo para ChatMessage se obtienen para que esto no requiera llamadas adicionales. Pero convertirlos en ChatMessageUI conducirá a la recursividad, es decir, el rendimiento del recuento de llamadas puede aumentar considerablemente. Como notamos anteriormente, digamos que no tenemos mucho anidamiento y la cadena está limitada a tres mensajes.

Como resultado, recibimos de dos a seis llamadas a servicios externos por mensaje y una llamada JPA a todo el paquete de mensajes. El número total de llamadas variará de 2 * N + 1 a 6 * N + 1. ¿Cuánto cuesta esto en unidades reales? Suponga que necesita 20 publicaciones para representar una página. Para obtenerlos, necesita de 4 sa 10 s. Horrible Me gustaría conocer los 500 ms. Y dado que el front-end soñaba con hacer un desplazamiento continuo, los requisitos de rendimiento de este punto final pueden duplicarse.

Pros:

  1. El código es conciso y autodocumentado (el sueño del soporte).
  2. El código es simple, por lo que casi no hay oportunidades para disparar en la pierna.
  3. El procesamiento por lotes no parece extraño y se ajusta orgánicamente a la lógica.
  4. Los cambios lógicos se realizarán fácilmente y serán locales.

Menos:

Terrible rendimiento debido al hecho de que los paquetes son muy pequeños.

Este enfoque a menudo se puede ver en servicios simples o en prototipos. Si la velocidad de cambio es importante, apenas vale la pena complicar el sistema. Al mismo tiempo, para nuestro servicio muy simple, el rendimiento es terrible, por lo que el alcance de la aplicabilidad de este enfoque es muy limitado.

Ingenuo procesamiento paralelo


Puede comenzar a procesar todos los mensajes en paralelo; esto eliminará un aumento lineal en el tiempo dependiendo de la cantidad de mensajes. Esta no es una forma particularmente buena, ya que dará lugar a una gran carga máxima en el servicio externo.

La implementación del procesamiento paralelo es muy simple:

override fun getLast ( n : Int ) =
   messageRepository . findLast ( n ) . parallelStream ()
     . map { it . toFrontModel () }
     . collect ( toList ())


Al usar el procesamiento de mensajes paralelos, lo ideal es obtener 300–700 ms, lo cual es mucho mejor que con una implementación ingenua, pero aún no lo suficientemente rápido.

Con este enfoque, las solicitudes a userRepository y fileRepository se ejecutarán sincrónicamente, lo que no es muy eficiente. Para solucionar esto, tendrá que cambiar bastante la lógica de las llamadas. Por ejemplo, a través de CompletionStage (también conocido como CompletableFuture):

private fun ChatMessage . toFrontModel () : ChatMessageUI =
   CompletableFuture . supplyAsync {
     userRepository . getUserById ( author ) . toFrontReference ()
   } . thenCombine (
     files ?. let {
       CompletableFuture . supplyAsync {
         fileRepository . getHeadsByIds ( files ) . map { it . toFrontReference () }
}
} ?: CompletableFuture . completedFuture ( listOf ())
) { author , files ->
     ChatMessageUI (
       id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
       author = author ,
       message = message ,
       files = files ,
       forwardFrom = forwardFrom ?. toFrontModel () ,
       replyTo = replyTo ?. toFrontModel ()
)
   } . get () !!


Se puede ver que el código de mapeo inicialmente simple se ha vuelto menos claro. Esto se debe a que tuvimos que separar las llamadas de servicio externas de donde se utilizaron los resultados. Esto en sí mismo no es malo. Pero la combinación de llamadas no se ve muy elegante y se asemeja a un típico "fideo" reactivo.

Si usa corutinas, todo se verá más decente:

private fun ChatMessage . toFrontModel () : ChatMessageUI =
   join (
     { userRepository . getUserById ( author ) . toFrontReference () } ,
     { files ?. let { fileRepository . getHeadsByIds ( files )
       . map { it . toFrontReference () } } ?: listOf () }
   ) . let { ( author , files ) ->
     ChatMessageUI (
       id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
       author = author ,
       message = message ,
       files = files ,
       forwardFrom = forwardFrom ?. toFrontModel () ,
       replyTo = replyTo ?. toFrontModel ()
)
   }


Donde:

fun < A , B > join ( a : () -> A , b : () -> B ) =
   runBlocking ( IO ) {
     awaitAll ( async { a () } , async { b () } )
   } . let {
     it [ 0 ] as A to it [ 1 ] as B
   }


Teóricamente, utilizando dicho procesamiento paralelo, obtenemos 200–400 ms, que ya está cerca de nuestras expectativas.

Desafortunadamente, no ocurre una paralelización tan buena, y la recuperación es bastante cruel: con solo unos pocos usuarios trabajando al mismo tiempo, caerá una gran cantidad de solicitudes en los servicios, que aún no se procesarán en paralelo, por lo que volveremos a nuestros tristes 4 s.

Mi resultado cuando uso dicho servicio es 1300-1700 ms para procesar 20 mensajes. Esto es más rápido que en la primera implementación, pero aún así no resuelve el problema.

Uso alternativo de consultas paralelas.
¿Qué sucede si el procesamiento por lotes no se proporciona en servicios de terceros? Por ejemplo, puede ocultar la falta de implementación del procesamiento por lotes dentro de los métodos de interfaz:

interface UserRemoteApi {
   fun getUserById ( id : UserReference ) : UserRemote
   fun getUsersByIds ( id : Set < UserReference > ) : Set < UserRemote > =
     id . parallelStream ()
       . map { getUserById ( it ) } . collect ( toSet ())
   fun getUsersByIds ( id : List < UserReference > ) : List < UserRemote > =
     id . parallelStream ()
       . map { getUserById ( it ) } . collect ( toList ())
}


Esto tiene sentido si hay esperanza de que el procesamiento por lotes aparezca en futuras versiones.

Pros:

  1. Fácil implementación del procesamiento de mensajes concurrentes.
  2. Buena escalabilidad.

Contras:

  1. La necesidad de separar la recepción de datos de su procesamiento en solicitudes de procesamiento en paralelo para diferentes servicios.
  2. Mayor carga en servicios de terceros.

Se puede ver que el alcance de la aplicabilidad es aproximadamente el mismo que el del enfoque ingenuo. El uso del método de consulta paralela tiene sentido si desea aumentar el rendimiento de su servicio varias veces debido a la explotación despiadada de los demás. En nuestro ejemplo, la productividad aumentó 2.5 veces, pero esto claramente no es suficiente.

Almacenamiento en caché


Puede realizar el almacenamiento en caché de estilo JPA para servicios externos, es decir, almacenar objetos recibidos dentro de la sesión para no recibirlos nuevamente (incluso durante el procesamiento por lotes). Puede hacer estos cachés usted mismo, puede usar Spring con su @Cacheable, además de que siempre puede usar un caché listo como EhCache manualmente.

El problema general estará relacionado con el hecho de que las memorias caché solo tienen sentido si hay visitas. En nuestro caso, es muy probable que haya aciertos en el campo del autor (por ejemplo, 50%) y no habrá aciertos en los archivos. Este enfoque traerá algunas mejoras, pero el rendimiento no cambiará radicalmente (y necesitamos un gran avance).

Las memorias caché entre sesiones (largas) requieren una lógica de invalidación compleja. En general, cuanto más tarde llegue al punto de resolver problemas de rendimiento con cachés entre sesiones, mejor.

Pros:

  1. Implemente el almacenamiento en caché sin cambiar el código.
  2. El rendimiento aumenta varias veces (en algunos casos).

Contras:

  1. Posibilidad de degradación del rendimiento si se usa incorrectamente.
  2. Gran sobrecarga de memoria, especialmente con cachés largos.
  3. Invalidación compleja, errores en los que dará lugar a problemas difíciles en tiempo de ejecución.

Muy a menudo, los cachés solo se usan para reparar rápidamente problemas de diseño. Esto no significa que no necesitan ser utilizados. Sin embargo, siempre vale la pena tratarlos con precaución y primero evaluar la ganancia de rendimiento resultante, y solo luego tomar una decisión.

En nuestro ejemplo, los cachés tendrán un aumento de rendimiento de alrededor del 25%. Al mismo tiempo, los cachés tienen muchas desventajas, por lo que no los usaría aquí.

Resumen


Por lo tanto, analizamos la implementación ingenua de un servicio que utiliza el procesamiento por lotes y algunas formas simples de acelerarlo.

La principal ventaja de todos estos métodos es la simplicidad, de la que hay muchas consecuencias agradables.

Un problema común con estos métodos es el bajo rendimiento, principalmente debido al tamaño del paquete. Por lo tanto, si estas soluciones no le convienen, entonces vale la pena considerar métodos más radicales.

Hay dos áreas principales en las que puede buscar soluciones:

  • Trabajo asincrónico con datos (requiere un cambio de paradigma, por lo tanto, este artículo no se considera)
  • ampliación de paquetes mientras se mantiene el procesamiento sincrónico.

La ampliación de los paquetes reducirá en gran medida el número de llamadas externas y al mismo tiempo mantendrá el código sincrónico. La siguiente parte del artículo estará dedicada a este tema.

Source: https://habr.com/ru/post/460291/


All Articles