Probleme der Stapelverarbeitung von Anfragen und deren Lösungen (Teil 1)

Fast alle modernen Softwareprodukte bestehen aus mehreren Diensten. Oft werden lange Antwortzeiten zwischen Kanälen zwischen Diensten zu einer Ursache für Leistungsprobleme. Die Standardlösung für diese Art von Problem besteht darin, mehrere dienstübergreifende Anforderungen in ein Paket zu packen, was als Batching bezeichnet wird.

Wenn Sie die Stapelverarbeitung verwenden, sind Sie möglicherweise nicht zufrieden mit dem Ergebnis in Bezug auf Leistung oder Codeverständlichkeit. Diese Methode ist für den Anrufer nicht so einfach, wie Sie vielleicht denken. Für verschiedene Zwecke und in verschiedenen Situationen können Entscheidungen sehr unterschiedlich sein. An konkreten Beispielen werde ich die Vor- und Nachteile verschiedener Ansätze aufzeigen.

Demo-Projekt


Betrachten Sie zur Verdeutlichung ein Beispiel für einen der Dienste in der Anwendung, an der ich gerade arbeite.

Eine Erklärung der Wahl der Plattform für Beispiele
Das Problem der schlechten Leistung ist recht allgemein und gilt nicht für bestimmte Sprachen und Plattformen. In diesem Artikel werden Spring + Kotlin-Codebeispiele verwendet, um Aufgaben und Lösungen zu demonstrieren. Kotlin ist für Java- und C # -Entwickler gleichermaßen verständlich (oder unverständlich). Außerdem ist der Code kompakter und verständlicher als in Java. Um reinen Java-Entwicklern das Verständnis zu erleichtern, werde ich die schwarze Magie von Kotlin vermeiden und nur Weiß verwenden (im Geiste von Lombok). Es wird einige Erweiterungsmethoden geben, die jedoch allen Java-Programmierern als statische Methoden bekannt sind. Dies ist also ein wenig Zucker, der den Geschmack des Gerichts nicht beeinträchtigt.

Es gibt einen Dokumentgenehmigungsdienst. Jemand erstellt ein Dokument und legt es zur Diskussion vor, währenddessen Änderungen vorgenommen werden. Letztendlich ist das Dokument konsistent. Der Abstimmungsdienst selbst weiß nichts über Dokumente: Es handelt sich lediglich um einen Chat von Koordinatoren mit kleinen zusätzlichen Funktionen, die wir hier nicht berücksichtigen werden.

Es gibt also Chatrooms (entsprechend den Dokumenten) mit jeweils einer vordefinierten Gruppe von Teilnehmern. Wie in normalen Chats enthalten Nachrichten Text und Dateien und können beantwortet und weitergeleitet werden:

data class ChatMessage (
   // nullable persist
   val id : Long ? = null ,
   /** */
   val author : UserReference ,
   /** */
   val message : String ,
   /** */
   // - JPA+ null,
   val files : List < FileReference > ? = null ,
   /** , */
   val replyTo : ChatMessage ? = null ,
   /** , */
   val forwardFrom : ChatMessage ? = null
)


Links zu der Datei und dem Benutzer sind Links zu anderen Domänen. Es lebt bei uns so:

typealias FileReference = Long
typealias UserReference = Long

Benutzerdaten werden in Keycloak gespeichert und über REST abgerufen. Gleiches gilt für Dateien: Dateien und Metainformationen darüber befinden sich in einem separaten Dateispeicherdienst.

Alle Anrufe bei diesen Diensten sind schwere Anfragen . Dies bedeutet, dass der Aufwand für den Transport dieser Anforderungen viel größer ist als die Zeit, die sie für die Verarbeitung durch einen Drittanbieter benötigen. Auf unseren Prüfständen beträgt die typische Anrufzeit für solche Dienste 100 ms, daher werden wir diese Nummern in Zukunft verwenden.

Wir müssen einen einfachen REST-Controller erstellen, um die letzten N Nachrichten mit allen erforderlichen Informationen zu empfangen. Das heißt, wir glauben, dass das Nachrichtenmodell im Frontend fast dasselbe ist und wir alle Daten senden müssen. Der Unterschied zwischen dem Frontend-Modell besteht darin, dass die Datei und der Benutzer in einer leicht entschlüsselten Form dargestellt werden müssen, damit sie verknüpft werden:

/** */
data class ReferenceUI (
   /** url */
   val ref : String ,
   /** */
   val name : String
)
data class ChatMessageUI (
   val id : Long ,
   /** */
   val author : ReferenceUI ,
   /** */
   val message : String ,
   /** */
   val files : List < ReferenceUI >,
   /** , */
   val replyTo : ChatMessageUI ? = null ,
   /** , */
   val forwardFrom : ChatMessageUI ? = null
)


Wir müssen Folgendes implementieren:

interface ChatRestApi {
   fun getLast ( n : Int ) : List < ChatMessageUI >
}


Postfix UI bedeutet DTO-Modelle für das Frontend, das heißt, was wir durch REST geben müssen.

Es mag hier überraschend erscheinen, dass wir keine Chat-ID übergeben und dies auch im ChatMessage / ChatMessageUI-Modell nicht ist. Ich habe dies absichtlich getan, um den Code für die Beispiele nicht zu überladen (die Chats sind isoliert, sodass wir davon ausgehen können, dass wir überhaupt einen haben).

Philosophischer Rückzug
Sowohl die ChatMessageUI-Klasse als auch die ChatRestApi.getLast-Methode verwenden den List-Datentyp, während dies tatsächlich eine geordnete Menge ist. Im JDK ist dies alles schlecht, sodass das Deklarieren der Reihenfolge der Elemente auf Schnittstellenebene (Beibehalten der Reihenfolge beim Hinzufügen und Extrahieren) fehlschlägt. Es ist daher üblich, List in Fällen zu verwenden, in denen Sie ein bestelltes Set benötigen (es gibt immer noch ein LinkedHashSet, dies ist jedoch keine Schnittstelle).

Eine wichtige Einschränkung: Wir gehen davon aus, dass es keine langen Ketten von Antworten oder Vorwärtsbewegungen gibt. Das heißt, sie sind es, aber ihre Länge überschreitet drei Nachrichten nicht. Die Front-End-Nachrichtenkette muss vollständig übertragen werden.

Um Daten von externen Diensten zu empfangen, gibt es folgende APIs:

interface ChatMessageRepository {
   fun findLast ( n : Int ) : List < ChatMessage >
}
data class FileHeadRemote (
   val id : FileReference ,
   val name : String
)
interface FileRemoteApi {
   fun getHeadById ( id : FileReference ) : FileHeadRemote
   fun getHeadsByIds ( id : Set < FileReference > ) : Set < FileHeadRemote >
   fun getHeadsByIds ( id : List < FileReference > ) : List < FileHeadRemote >
   fun getHeadsByChat () : List < FileHeadRemote >
}
data class UserRemote (
   val id : UserReference ,
   val name : String
)
interface UserRemoteApi {
   fun getUserById ( id : UserReference ) : UserRemote
   fun getUsersByIds ( id : Set < UserReference > ) : Set < UserRemote >
   fun getUsersByIds ( id : List < UserReference > ) : List < UserRemote >
}


Es ist ersichtlich, dass die Stapelverarbeitung ursprünglich in externen Diensten vorgesehen war, und zwar in beiden Fällen: über Set (ohne die Reihenfolge der Elemente mit eindeutigen Schlüsseln beizubehalten) und über List (es kann Duplikate geben - die Reihenfolge bleibt erhalten).

Einfache Implementierungen


Naive Umsetzung


Die erste naive Implementierung unseres REST-Controllers sieht in den meisten Fällen ungefähr so ​​aus:

class ChatRestController (
   private val messageRepository : ChatMessageRepository ,
   private val userRepository : UserRemoteApi ,
   private val fileRepository : FileRemoteApi
) : ChatRestApi {
   override fun getLast ( n : Int ) =
     messageRepository . findLast ( n )
       . map { it . toFrontModel () }

   private fun ChatMessage . toFrontModel () : ChatMessageUI =
     ChatMessageUI (
       id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
       author = userRepository . getUserById ( author ) . toFrontReference () ,
       message = message ,
       files = files ?. let { files ->
         fileRepository . getHeadsByIds ( files )
           . map { it . toFrontReference () }
} ?: listOf () ,
       forwardFrom = forwardFrom ?. toFrontModel () ,
       replyTo = replyTo ?. toFrontModel ()
)
}

Alles ist sehr klar und das ist ein großes Plus.

Wir verwenden die Stapelverarbeitung und empfangen Daten von einem externen Dienst in Stapeln. Aber was passiert mit der Leistung?

Für jede Nachricht wird ein Aufruf von UserRemoteApi ausgeführt, um Daten im Autorenfeld abzurufen, und ein Aufruf von FileRemoteApi, um alle angehängten Dateien zu erhalten. Es scheint alles zu sein. Angenommen, die Felder forwardFrom und replyTo für ChatMessage werden abgerufen, sodass keine zusätzlichen Aufrufe erforderlich sind. Die Umwandlung in ChatMessageUI führt jedoch zu einer Rekursion, dh die Leistung der Anrufanzahl kann erheblich gesteigert werden. Nehmen wir an, wir haben nicht viel Verschachtelung und die Kette ist auf drei Nachrichten beschränkt.

Infolgedessen erhalten wir zwei bis sechs Anrufe an externe Dienste pro Nachricht und einen JPA-Anruf an das gesamte Nachrichtenpaket. Die Gesamtzahl der Anrufe variiert zwischen 2 * N + 1 und 6 * N + 1. Wie viel kostet das in realen Einheiten? Angenommen, Sie benötigen 20 Beiträge, um eine Seite zu rendern. Um sie zu bekommen, benötigen Sie 4 s bis 10 s. Schrecklich Ich möchte die 500 ms treffen. Und da das Front-End davon geträumt hat, einen nahtlosen Bildlauf durchzuführen, können die Leistungsanforderungen dieses Endpunkts verdoppelt werden.

Vorteile:

  1. Der Code ist prägnant und selbstdokumentierend (Support-Traum).
  2. Der Code ist einfach, so dass es fast keine Möglichkeiten gibt, in das Bein zu schießen.
  3. Die Stapelverarbeitung sieht nicht fremd aus und passt organisch in die Logik.
  4. Logikänderungen werden einfach vorgenommen und sind lokal.

Minus:

Schreckliche Leistung aufgrund der Tatsache, dass die Pakete sehr klein sind.

Dieser Ansatz kann häufig in einfachen Diensten oder in Prototypen gesehen werden. Wenn die Geschwindigkeit der Änderung wichtig ist, lohnt es sich kaum, das System zu komplizieren. Gleichzeitig ist die Leistung für unseren sehr einfachen Service schrecklich, so dass der Anwendungsbereich dieses Ansatzes sehr eng ist.

Naive Parallelverarbeitung


Sie können mit der parallelen Verarbeitung aller Nachrichten beginnen. Dadurch wird ein linearer Zeitanstieg abhängig von der Anzahl der Nachrichten vermieden. Dies ist kein besonders guter Weg, da dies zu einer hohen Spitzenlast des externen Dienstes führt.

Die Implementierung der Parallelverarbeitung ist sehr einfach:

override fun getLast ( n : Int ) =
   messageRepository . findLast ( n ) . parallelStream ()
     . map { it . toFrontModel () }
     . collect ( toList ())


Bei paralleler Nachrichtenverarbeitung erhalten wir idealerweise 300–700 ms, was viel besser ist als bei einer naiven Implementierung, aber immer noch nicht schnell genug.

Bei diesem Ansatz werden Anforderungen an userRepository und fileRepository synchron ausgeführt, was nicht sehr effizient ist. Um dies zu beheben, müssen Sie die Logik von Anrufen ziemlich stark ändern. Zum Beispiel über CompletionStage (auch bekannt als CompletableFuture):

private fun ChatMessage . toFrontModel () : ChatMessageUI =
   CompletableFuture . supplyAsync {
     userRepository . getUserById ( author ) . toFrontReference ()
   } . thenCombine (
     files ?. let {
       CompletableFuture . supplyAsync {
         fileRepository . getHeadsByIds ( files ) . map { it . toFrontReference () }
}
} ?: CompletableFuture . completedFuture ( listOf ())
) { author , files ->
     ChatMessageUI (
       id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
       author = author ,
       message = message ,
       files = files ,
       forwardFrom = forwardFrom ?. toFrontModel () ,
       replyTo = replyTo ?. toFrontModel ()
)
   } . get () !!


Es ist ersichtlich, dass der anfänglich einfache Zuordnungscode weniger klar geworden ist. Dies liegt daran, dass wir externe Serviceanrufe von dem Ort trennen mussten, an dem die Ergebnisse verwendet wurden. Das an sich ist nicht schlecht. Die Kombination von Anrufen sieht jedoch nicht sehr elegant aus und ähnelt einer typischen reaktiven "Nudel".

Wenn Sie Coroutinen verwenden, sieht alles anständiger aus:

private fun ChatMessage . toFrontModel () : ChatMessageUI =
   join (
     { userRepository . getUserById ( author ) . toFrontReference () } ,
     { files ?. let { fileRepository . getHeadsByIds ( files )
       . map { it . toFrontReference () } } ?: listOf () }
   ) . let { ( author , files ) ->
     ChatMessageUI (
       id = id ?: throw IllegalStateException ( " $ this must be persisted" ) ,
       author = author ,
       message = message ,
       files = files ,
       forwardFrom = forwardFrom ?. toFrontModel () ,
       replyTo = replyTo ?. toFrontModel ()
)
   }


Wo:

fun < A , B > join ( a : () -> A , b : () -> B ) =
   runBlocking ( IO ) {
     awaitAll ( async { a () } , async { b () } )
   } . let {
     it [ 0 ] as A to it [ 1 ] as B
   }


Theoretisch erhalten wir bei einer solchen Parallelverarbeitung 200–400 ms, was bereits unseren Erwartungen entspricht.

Leider findet eine so gute Parallelisierung nicht statt, und die Amortisation ist ziemlich grausam: Wenn nur wenige Benutzer gleichzeitig arbeiten, wird eine Vielzahl von Anfragen auf die Dienste fallen, die immer noch nicht parallel verarbeitet werden, sodass wir zu unseren traurigen 4 s zurückkehren.

Mein Ergebnis bei Verwendung eines solchen Dienstes ist 1300-1700 ms für die Verarbeitung von 20 Nachrichten. Dies ist schneller als bei der ersten Implementierung, löst das Problem jedoch immer noch nicht.

Alternative Verwendung paralleler Abfragen
Was ist, wenn die Stapelverarbeitung nicht in Diensten von Drittanbietern bereitgestellt wird? Beispielsweise können Sie die fehlende Implementierung der Stapelverarbeitung in Schnittstellenmethoden verbergen:

interface UserRemoteApi {
   fun getUserById ( id : UserReference ) : UserRemote
   fun getUsersByIds ( id : Set < UserReference > ) : Set < UserRemote > =
     id . parallelStream ()
       . map { getUserById ( it ) } . collect ( toSet ())
   fun getUsersByIds ( id : List < UserReference > ) : List < UserRemote > =
     id . parallelStream ()
       . map { getUserById ( it ) } . collect ( toList ())
}


Dies ist sinnvoll, wenn die Hoffnung besteht, dass die Stapelverarbeitung in zukünftigen Versionen angezeigt wird.

Vorteile:

  1. Einfache Implementierung der gleichzeitigen Nachrichtenverarbeitung.
  2. Gute Skalierbarkeit.

Nachteile:

  1. Die Notwendigkeit, den Empfang von Daten von ihrer Verarbeitung in parallelen Verarbeitungsanforderungen für verschiedene Dienste zu trennen.
  2. Erhöhte Belastung von Diensten von Drittanbietern.

Es ist ersichtlich, dass der Anwendungsbereich in etwa dem des naiven Ansatzes entspricht. Die Verwendung der parallelen Abfragemethode ist sinnvoll, wenn Sie die Leistung Ihres Dienstes aufgrund der gnadenlosen Ausnutzung anderer mehrmals steigern möchten. In unserem Beispiel hat sich die Produktivität um das 2,5-fache erhöht, was jedoch eindeutig nicht ausreicht.

Caching


Sie können JPA-artiges Caching für externe Dienste durchführen, dh empfangene Objekte in der Sitzung speichern, um sie nicht erneut zu empfangen (auch während der Stapelverarbeitung). Sie können diese Caches selbst erstellen, Sie können Spring mit seinem @Cacheable verwenden und Sie können jederzeit einen vorgefertigten Cache wie EhCache manuell verwenden.

Das allgemeine Problem hängt mit der Tatsache zusammen, dass Caches nur dann einen guten Sinn ergeben, wenn Treffer vorhanden sind. In unserem Fall sind Treffer im Autorenfeld (z. B. 50%) sehr wahrscheinlich, und es werden überhaupt keine Treffer in Dateien angezeigt. Dieser Ansatz wird einige Verbesserungen bringen, aber die Leistung wird sich nicht radikal ändern (und wir brauchen einen Durchbruch).

Intersession (lange) Caches erfordern eine komplexe Invalidierungslogik. Im Allgemeinen ist es umso besser, je später Sie zu dem Punkt kommen, an dem Sie Leistungsprobleme mit intersessionellen Caches lösen.

Vorteile:

  1. Implementieren Sie das Caching, ohne den Code zu ändern.
  2. Leistungssteigerung mehrmals (in einigen Fällen).

Nachteile:

  1. Möglichkeit zur Leistungsminderung bei unsachgemäßer Verwendung
  2. Großer Speicheraufwand, insbesondere bei langen Caches.
  3. Komplexe Ungültigmachung, Fehler, die zu schwierigen Problemen in der Laufzeit führen.

Sehr oft werden Caches nur verwendet, um Designprobleme schnell zu beheben. Dies bedeutet nicht, dass sie nicht verwendet werden müssen. Es lohnt sich jedoch immer, sie mit Vorsicht zu behandeln und zuerst den daraus resultierenden Leistungsgewinn zu bewerten und erst dann eine Entscheidung zu treffen.

In unserem Beispiel haben die Caches eine Leistungssteigerung von rund 25%. Gleichzeitig haben die Caches viele Nachteile, so dass ich sie hier nicht verwenden würde.

Zusammenfassung


Wir haben uns also die naive Implementierung eines Dienstes angesehen, der die Stapelverarbeitung verwendet, und einige einfache Möglichkeiten, dies zu beschleunigen.

Der Hauptvorteil all dieser Methoden ist die Einfachheit, aus der sich viele angenehme Konsequenzen ergeben.

Ein häufiges Problem bei diesen Methoden ist die schlechte Leistung, hauptsächlich aufgrund der Paketgröße. Wenn diese Lösungen nicht zu Ihnen passen, sollten Sie daher radikalere Methoden in Betracht ziehen.

Es gibt zwei Hauptbereiche, in denen Sie nach Lösungen suchen können:

  • Asynchrone Arbeit mit Daten (erfordert einen Paradigmenwechsel, daher wird dieser Artikel nicht berücksichtigt)
  • Vergrößerung der Packungen unter Beibehaltung der synchronen Verarbeitung.

Durch die Vergrößerung der Bundles wird die Anzahl der externen Anrufe erheblich reduziert und gleichzeitig der Code synchron gehalten. Der nächste Teil des Artikels ist diesem Thema gewidmet.

Source: https://habr.com/ru/post/de460291/


All Articles