Android Hintergrund Tutorial. Teil 5: Coroutinen in Kotlin


Kotlin Island

Frühere Texte in dieser Reihe: über AsyncTask , über Loader , über Executors und EventBus , über RxJava .

Diese Stunde ist also gekommen. Dies ist der Artikel, für den die gesamte Serie geschrieben wurde: eine Erklärung, wie der neue Ansatz „unter der Haube“ funktioniert. Wenn Sie noch nicht wissen, wie Sie es verwenden sollen, finden Sie hier einige nützliche Links, um Ihnen den Einstieg zu erleichtern:


Und nachdem Sie sich mit Coroutinen vertraut gemacht haben, fragen Sie sich vielleicht, was Kotlin diese Gelegenheit gegeben hat und wie es funktioniert. Bitte beachten Sie, dass wir uns hier nur auf die Kompilierungsphase konzentrieren: Sie können einen separaten Artikel über die Ausführung schreiben.

Das erste, was wir verstehen müssen, ist, dass im Korpus tatsächlich keine Koroutinen existieren. Der Compiler wandelt die Funktion mit dem Suspend-Modifikator in eine Funktion mit dem Continuation- Parameter um. Diese Schnittstelle hat zwei Methoden:

abstract fun resume(value: T) abstract fun resumeWithException(exception: Throwable) 

Typ T ist der Rückgabetyp Ihrer ursprünglichen Suspend-Funktion. Und hier ist, was tatsächlich passiert: Diese Funktion wird in einem bestimmten Thread ausgeführt (Geduld, wir kommen auch dazu), und das Ergebnis wird an die Wiederaufnahmefunktion dieser Fortsetzung übergeben, in deren Kontext die Suspend-Funktion aufgerufen wurde. Wenn die Funktion das Ergebnis nicht empfängt und eine Ausnahme auslöst, wird resumeWithException ausgelöst, wodurch ein Fehler im aufrufenden Code ausgelöst wird.

Ok, aber woher kam die Fortsetzung? Natürlich vom Corutin-Erbauer! Schauen wir uns den Code an, der eine Coroutine erstellt, zum Beispiel den Start:

 public actual fun launch( context: CoroutineContext = DefaultDispatcher, start: CoroutineStart = CoroutineStart.DEFAULT, parent: Job? = null, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context, parent) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } 

Hier erstellt der Builder eine Coroutine - eine Instanz der AbstractCoroutine-Klasse, die wiederum die Continuation-Schnittstelle implementiert. Die Startmethode gehört zur Jobschnittstelle. Es ist jedoch sehr schwierig, die Definition der Startmethode zu finden. Aber wir können von der anderen Seite hierher kommen. Ein aufmerksamer Leser hat bereits bemerkt, dass das erste Argument für die Startfunktion der CoroutineContext ist und standardmäßig auf DefaultDispatcher gesetzt ist. Dispatcher sind Klassen, die die Ausführung von Coroutinen steuern. Sie sind daher auf jeden Fall wichtig, um zu verstehen, was passiert. Schauen wir uns die DefaultDispatcher-Deklaration an:

 public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool 

Tatsächlich ist dies also CommonPool, obwohl Java-Docks uns sagen, dass sich dies ändern kann. Was ist CommonPool?

Dies ist ein Coroutine-Manager, der ForkJoinPool als Implementierung von ExecutorService verwendet. Ja, das ist es: Am Ende sind alle Ihre Lambda-Coroutinen nur lauffähig und in Executor mit einer Reihe kniffliger Transformationen gefangen. Aber der Teufel steckt wie immer im Detail.


Gabel? Oder mitmachen?

Nach den Ergebnissen der Umfrage auf meinem Twitter zu urteilen, muss ich hier kurz erklären, was FJP ist :)


Zunächst einmal ist ForkJoinPool ein moderner Executor, der für die Verwendung mit parallelen Java 8-Streams erstellt wurde. Die ursprüngliche Aufgabe war die effiziente Parallelität bei der Arbeit mit der Stream-API. Dies bedeutet im Wesentlichen, dass die Streams aufgeteilt werden, um einen Teil der Daten zu verarbeiten, und diese dann kombiniert werden, wenn alle Daten verarbeitet wurden. Stellen Sie sich zur Vereinfachung vor, Sie hätten den folgenden Code:

 IntStream .range(1, 1_000_000) .parallel() .sum() 

Die Menge eines solchen Streams wird nicht in einem Stream berechnet. Stattdessen teilt ForkJoinPool den Bereich rekursiv in Teile auf (zuerst in zwei Teile von 500.000, dann jeweils in 250.000 usw.), berechnet die Summe jedes Teils und kombiniert die Ergebnisse zu einem einzigen Menge. Hier ist eine Visualisierung eines solchen Prozesses:


Threads werden für verschiedene Aufgaben aufgeteilt und nach Abschluss wieder zusammengeführt

Die Effektivität von FJP basiert auf dem Algorithmus „Job Theft“: Wenn einem bestimmten Thread die Aufgaben ausgehen, werden die Warteschlangen anderer Pool-Threads aufgerufen und ihre Aufgaben gestohlen. Zum besseren Verständnis können Sie den Bericht von Alexei Shipilev sehen oder eine Präsentation ansehen.

Nun, wir haben erkannt, was unsere Coroutinen tun! Aber wie landen sie dort?

Dies geschieht innerhalb der CommonPool # -Versandmethode:

 _pool.execute(timeSource.trackTask(block)) 

Die Versandmethode wird von der Methode resume (Wert: T) in DispatchedContinuation aufgerufen. Klingt vertraut! Wir erinnern uns, dass Continuation eine in AbstractCoroutine implementierte Schnittstelle ist. Aber wie hängen sie zusammen?

Der Trick liegt in der CoroutineDispatcher-Klasse. Die ContinuationInterceptor-Schnittstelle wird wie folgt implementiert:

 public actual override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) 

Sehen Sie? Sie stellen dem Builder-Corutin einen einfachen Block zur Verfügung. Sie müssen keine Schnittstellen implementieren, über die Sie nichts wissen möchten. Die Coroutine-Bibliothek kümmert sich um all das. Sie ist
fängt die Ausführung ab, ersetzt die Fortsetzung durch DispatchedContinuation und sendet sie an den Executor, wodurch die effizienteste Ausführung Ihres Codes garantiert wird.

Jetzt müssen wir uns nur noch damit befassen, wie der Versand von Anfang an aufgerufen wird. Lassen Sie uns diese Lücke füllen. Die Resume-Methode wird von startCoroutine in der Erweiterungsfunktion des Blocks aufgerufen:

 public fun <R, T> (suspend R.() -> T).startCoroutine( receiver: R, completion: Continuation<T> ) { createCoroutineUnchecked(receiver, completion).resume(Unit) } 

Und startCoroutine wird wiederum vom Operator "()" in der CoroutineStart-Enumeration aufgerufen. Ihr Builder akzeptiert es als zweiten Parameter und der Standardwert ist CoroutineStart.DEFAULT. Das ist alles!

Das ist der Grund, warum ich den Corutin-Ansatz bewundere: Es ist nicht nur eine spektakuläre Syntax, sondern auch eine brillante Implementierung.

Und für diejenigen, die bis zum Ende gelesen haben, erhalten sie exklusiv: Ein Video meines Berichts „Ein Geiger wird nicht benötigt: Wir lehnen RxJava zugunsten von Coroutine in Kotlin ab“ von der Mobius- Konferenz. Viel Spaß :)

Source: https://habr.com/ru/post/de415335/


All Articles