在JVM世界中,由于Kotlin语言和Project Loom ,协程是最著名的。 我还没有很好地描述Kotlinovsky协程的原理,而Kotlin-协程库的代码对于一个没有准备的人来说是完全无法理解的。 以我的经验,大多数人只知道协程,它们是“轻量级流”,并且在kotlin中它们通过智能字节码生成工作。 所以直到最近。 而且我的想法是,由于协程可以用字节码实现,所以为什么不用Java实现它们。 根据这个想法,随后出现了一个很小且相当简单的库,我希望几乎所有开发人员都可以理解该库的设备。 细节剪下。
源代码
我称这个项目为Microutines,它来自Micro和Coroutines一词。
所有代码都可以在github上找到 。 最初,我想根据自己对主题的理解来发展叙述,谈论我错误的决定和想法,关于api的发展,但是随后本文中的代码将与github的代码有很大的不同(无论如何,这是不可避免的,我写了一篇文章时间,以及不时在github喜剧上)。 因此,我将主要描述该库的最终版本,如果乍一看api的某些部分不太清楚,那么很可能需要使用它们来解决我们将在以下文章中考虑的问题。
免责声明
这是一种培训项目。 如果其中一位读者使他喜欢玩甚至提出拉扯请求,我会很高兴。 但是在生产中使用它是不值得的。 深刻理解技术的最好方法是自己实施,这是该项目的唯一目标。
我不保证所用术语的准确性。 也许我在某个地方听到了一些错误的记忆,其中一些甚至是我自己遗忘的。
什么是协程
正如我已经注意到的,关于协程的常说是“便利流”。 这不是一个真正的定义。 我也不会给出真正的定义,但我会尝试描述它们是什么,协程。 调用协程流程并不完全正确。 协程是比流小的规划单元,而流又比调度单元小。 进程和线程计划由操作系统处理。 Corutin参与了计划……我们自己将参与他们的计划。 协程在常规线程之上工作,它们的主要特征是,它们在等待其他任务完成时不会阻塞线程,而是将其释放给另一个协程。 这种方法称为协作多任务。 Corutin可以先在一个线程中工作,然后再在另一个线程中工作。 协程线程充当资源,一百万个协程可以在单个线程上工作。 您可以看到以下图片:

contrib 1和contrib 2的任务执行某种请求,并且在等待答案时不会阻塞流程,而是暂停工作并在收到答案后继续进行工作。 您可以说,我们可以使用回调编写此类代码。 没错,但是协程的本质是我们编写的代码没有回调,而是编写了异步运行的普通顺序代码。
发电机
我们将开始从简单到复杂的开发。 我们要做的第一件事是惰性集合的生成,使用yield关键字以某些语言实现。 生成器在这里并不是偶然的,我们将在后面看到,生成器和协程可以使用相同的机制来实现。
让我们考虑一下python中的示例,只是因为生成器是开箱即用的。
def generator(): k = 10 yield k k += 10 yield k k += 10 yield k for i in generator(): print(i)
循环展开成这样的样子(也许不是那样,但是原理对我们很重要):
gen = generator() while True: try: i = next(gen) print(i) except StopIteration: break
调用generator()
将创建一个特殊的迭代器,称为generator。 对next(gen)
的第一次调用将执行从generator
函数开始到第一个yield
,并且将来自genertator()
的局部变量k
的值写入i
变量。 每次对next的下一次调用将继续使用上一个yield
之后的指令立即执行该函数yield
依此类推。 在这种情况下, next
调用之间,将保存generator
内所有局部变量的值。
大致相同,但是使用Kotlin语言。
val seq = sequence { var i = 10 yield(i) i += 10 yield(i) i += 10 yield(i) } for (i in seq) { println(i) }
在Java中,我们可以像这样进行延迟生成:
Iterable<Integer> seq = DummySequence.first(() -> { final int i = 10; return DummySequence.next(i, () -> { final int i1 = i + 10; return DummySequence.next(i1, () -> DummySequence.end(i1 + 10)); }); }); for(int i: seq) { System.out.println(i); }
DummySequence实现 import org.junit.Assert; import org.junit.Test; import java.util.Iterator; import java.util.List; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; public class DummySequenceTest { @Test public void dummySequenceTest() { DummySequence<Integer> sequence = DummySequence.first(() -> { final int i = 10; return DummySequence.next(10, () -> { final int i1 = i + 10; return DummySequence.next(i1, () -> DummySequence.end(i1 + 10)); }); }); List<Integer> list = StreamSupport.stream(sequence.spliterator(), false) .collect(Collectors.toList()); Assert.assertEquals(10, ((int) list.get(0))); Assert.assertEquals(20, ((int) list.get(1))); Assert.assertEquals(30, ((int) list.get(2))); } private static class DummySequence<T> implements Iterable<T>, Iterator<T> { private Step<T> step; public DummySequence(Step<T> step) { this.step = step; } @Override public Iterator<T> iterator() { return this; } @Override public boolean hasNext() { if (step instanceof EndStep) return false; step = step.nextStep(); return true; } @Override public T next() { return step.getValue(); } public static <T> DummySequence<T> first(Supplier<Step<T>> next) { return new DummySequence<>(new FirstStep<T>(next)); } public static <T> Step<T> next(T value, Supplier<Step<T>> next) { return new IntermediateStep<>(value, next); } public static <T> Step<T> end(T value) { return new EndStep<>(value); } } private interface Step<T> { T getValue(); Step<T> nextStep(); } public static class FirstStep<T> implements Step<T> { Supplier<Step<T>> nextStep; public FirstStep(Supplier<Step<T>> next) { this.nextStep = next; } @Override public T getValue() { throw new IllegalStateException(); } @Override public Step<T> nextStep() { return nextStep.get(); } } public static class IntermediateStep<T> implements Step<T> { T value; Supplier<Step<T>> nextStep; public IntermediateStep(T value, Supplier<Step<T>> nextStep) { this.value = value; this.nextStep = nextStep; } @Override public T getValue() { return value; } @Override public Step<T> nextStep() { return nextStep.get(); } } public static class EndStep<T> implements Step<T> { T value; public EndStep(T value) { this.value = value; } @Override public T getValue() { return value; } @Override public Step<T> nextStep() { throw new IllegalStateException(); } } }
每个下一个嵌套的lambda捕获所有先前lambda的所有变量,并且仅在请求下一个元素时才执行。 每个lambda的结果将是生成的元素和下一个代码块。 看起来很奇怪,我怀疑有人会这样写。 我们表示我们将努力追求的理想(并且我们将实现这一理想,只是我们将使用匿名类而不是lambda)。
Sequence<Integer> sequence = new Sequence<Integer>(() -> { int i = 10; yield(i); i += 10; yield(i); i += 10; yield(i); });
传递给Sequence构造函数的函数仅在必要时才从yield
变为yield
,局部变量的值应存储在对sequence.next()
调用之间。 堆栈的保存和最后执行指令的数量被称为抢占 (yield被翻译成俄语)或暂停 。
延续性
可以挤出的片段称为连续。 延续被翻译成俄语,称为“延续”,但我将其称为延续。 维基百科写了关于延续的内容:
继续(Eng。Continuation)表示某个时刻程序的状态,可以保存该状态并用于转换到该状态。 连续包含从特定点继续执行程序的所有信息。
假设我们已经以某种神奇的方式实现了延续机制,该机制由以下接口表示。 run
方法可以停止其执行。 随后的每个调用将从最后一个yield
恢复执行。 我们可以将延续性视为可以部分执行的Runnable
。
interface Continuation<T> { void run(SequenceScope<T> scope); }
我们将使用这样的延续:
Sequence<Integer> sequence = new Sequence<>(new Continuation<>() { void run(SequenceScope<Integer> scope) { int i = 1; System.out.println("Continuation start"); scope.yield(i++); System.out.println("Continuation resume"); scope.yield(i++); System.out.println("Continuation resume"); scope.yield(i++); System.out.println("Continuation end"); } }); for(Integer i: sequence) { System.out.println("Next element :" + i); }
我们希望得到以下结论:
输出量 Continuation start Next element: 1 Continuation resume Next element: 2 Continuation resume Next element: 3 Continuation end
根据下一个元素的请求, Sequence
将调用Continuation.run(scope)
,它将执行代码块直到下一个屈服并被挤出。 对Continuation.run(scope)
的下一个调用将从最后一个挤出的地方开始工作,并执行代码,直到下一个yield
为止。 Sequence
代码可能像这样:
class Sequence implements Iterator<T>, SequenceScope<T>, Iterable<T> { private static final Object STOP = new Object(); private Object next = STOP; private Continuation<T> nextStep; public Sequence(Continuation<T> nextStep) { this.nextStep = nextStep; } @Override public boolean hasNext() { if (next == STOP) { nextStep.run(this); } return next != STOP; } @Override public T next() { if (next == STOP) { if (!hasNext()) { throw new NoSuchElementException(); } } T result = (T) next; next = STOP; return result; } @Override void yield(T t) { next = t; } public Iterator<T> iterator() {
一切都很好,只不过java不能在任意位置停止该方法的执行,因此它可以从最后一个停止的位置继续执行。 因此,我们将必须手动执行此操作。 我们引入了label字段,其中将存储最后一个称为yield的编号。
class IntegerSequenceContinuation implements Continuation<Integer> { private int label = 0; private int i = 0; void run(SequenceScope<Integer> scope) { int i = this.i; switch (label) { case 0: System.out.println("Continuation start"); scope.yield(i++); label = 1; this.i = i; return; case 1: System.out.println("Continuation resume"); scope.yield(i++); label = 2; this.i = i; return; case 2: System.out.println("Continuation resume"); scope.yield(i++); label = 3; this.i = i; return; case 3: System.out.println("Continuation end"); label = 4; default: throw new RuntimeException(); } } }
我们有一个状态机(有限状态机),总的来说,这正是科特林在他的协程中所做的(您可以反编译,看看是否当然理解)。 我们有4个状态,每个run
调用都会执行一段代码,并转换到下一个状态。 我们必须将局部变量i保存在class字段中。 除了不合理的复杂性之外,此代码还有另一个问题:我们可以为每个运行调用传递不同的值作为作用域参数。 因此,最好在第一次调用时将scope参数保存在class字段中,然后继续使用它。
在我们中实现了对Java的延续,但是以一种非常奇怪的方式并且仅在一种情况下实现。 每次没有人会写类似的东西,很难编辑这样的代码,很难阅读这样的代码。 因此,我们将在编译后构建状态机。
悬念与延续
我们如何理解延续是已完成工作还是已暂停? 在暂停的情况下,让run
方法返回一个特殊的SUSPEND
对象。
public interface Continuation<T> { Object SUSPEND = new Object() { @Override public String toString() { return "[SUSPEND]"; } }; T run(); }
请注意,我从延续中删除了输入参数。 我们必须确保各个调用之间的参数都不会改变,最好的方法是删除它们。 相反,用户需要scope
参数(它将用于很多事情,但是现在SequenceScope
被传递到其位置,从中调用我们的yield
)。 另外,用户不想知道任何SUSPEND
,也不想返回任何东西。 介绍Suspendable
接口。
public abstract class Suspendable<C extends Scope> { abstract public void run(C scope); } interface Scope {}
为什么要使用抽象类而不是接口?使用类而不是接口不允许编写lambda,并强制编写匿名类。 对于我们来说,使用字节序和类进行连续化将非常方便,因为可以将本地字段存储在其字段中。 但是字节码中的lambda看起来并不像一个类。 有关详细信息,请转到此处 。
Suspendable
是设计时间中的Continuation
,而Continuation
是Suspendable
。 用户以Suspendable
级别编写代码,而库的低级代码可与Continuation
。 修改字节码后变为1。
在讨论调用yield
之后的抢占之前,但将来我们将需要采用其他方法抢占。 我们将使用@Suspend
批注标记此类方法。 这适用于yield
本身:
public class SequenceScope<T> implements Scope { @Suspend public void yield(T t) {...} }
请记住,我们的延续将建立在有限的自动机上。 让我们在这里详细介绍。 它被称为有限状态机,因为它具有有限数量的状态。 为了存储当前状态,我们将使用特殊的字段标签。 最初,标签
为0-零(初始)状态。 每次对Continuation.run
调用都将执行某种代码并进入某种状态(除了初始状态外)。 每次转换后,继续操作应保存所有局部变量,当前状态号并执行return SUSPEND
。 到最终状态的转换将由return null
表示(在以下文章中,我们将不仅返回null
)。 从最终状态到Continuation.run
的调用应以ContinuationEndException
异常结束。
因此,用户使用Suspendable
编写代码,编译后变成Continuation
,库可以使用Continuation
,尤其是生成器。 为用户创建一个新的生成器如下所示:
Sequence<Integer> seq = new Sequence(new Suspendable() {...});
但是生成器本身需要继续,因为他需要初始化Continuation<T> nextStep;
。 为了从Suspendable
中获得Continuation
,我编写了一个特殊的Magic
类。

package microutine.core; import microutine.coroutine.CoroutineScopeImpl; import java.lang.reflect.Field; public class Magic { public static final String SCOPE = "scope$S"; private static <C extends Scope, R> Continuation<R> createContinuation(Suspendable<C> suspendable, C scope) { try { Field contextField = suspendable.getClass().getDeclaredField(SCOPE); contextField.setAccessible(true); if (contextField.get(suspendable) != null) throw new IllegalArgumentException("Continuation already created"); contextField.set(suspendable, scope); } catch (Exception e) { throw new RuntimeException(e); } return getContinuation(suspendable); } public static <R, C extends Scope> Continuation<R> getContinuation(Suspendable suspendable) { if (getScope(suspendable) == null) throw new RuntimeException("No continuation created for provided suspendable");
这个魔术如何运作? 使用scope
参数,通过反射(我们将在字节码中创建的综合字段)初始化scope$S
字段。 连续仅在createContinuation
初始化一次,并且第二次尝试初始化将导致执行。 接下来是将常规类型强制转换为Continuation
。 总的来说,我欺骗了你,所有的魔力都不在这里。 由于可以进行这种类型转换,因此传入的特定Suspendable
已经实现了Continuation
。 这是在编译期间发生的。
项目结构
该项目将包括三个部分:
- 库代码(低级和高级API)
- 测试(实际上,现在只有在其中可以使用此库)
- Converter Suspendable-> Continuation(在gradle buildSrc中作为gradle任务实现)
由于转换器当前位于buildSrc中,因此无法在库本身以外的地方使用它。 但是目前,我们不需要它。 将来,我们将有两个选择:将其放在单独的插件中,或者制作自己的java代理(如Quasar所做的那样),然后在运行时执行转换。
build.gradle plugins { id "java" } group 'microutines' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 task processYield(type: microutine.ProcessSuspendableTask) { classPath = compileJava.outputs.files + compileJava.classpath inputs.files(compileJava.outputs.files) } task processTestYield(type: microutine.ProcessSuspendableTask) { classPath = compileJava.outputs.files + compileTestJava.classpath inputs.files(compileTestJava.outputs.files) } compileJava.finalizedBy(processYield) // compileTestJava.finalizedBy(processTestYield) repositories { mavenCentral() } dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' compile group: 'junit', name: 'junit', version: '4.12' }
Suspendable
到Continuation
将由TaskSuspendableTask任务处理。 冰雹任务类中没有什么有趣的东西,它只是选择必要的类并将其发送到SuspendableConverter
类进行转换。 现在正是他使我们感兴趣。
字节码生成
要使用字节码,我们将使用OW2 ASM库。 该库根据SAX解析器的原理工作。 我们创建一个新的ClassReader,将已编译的类作为字节数组提供给它,然后调用accept(ClassVisitor visitor)
方法。 ClassReader将解析字节码,并在传递的访问者( visitMethod
, visitClass
, visitInsn
)上调用适当的方法。 访客可以在适配器模式下工作,并将呼叫委派给下一位访客。 通常,最后一个访问者是ClassWriter
,在其中生成最终的字节码。 如果任务是非线性的(我们只有一个),则该类可能要经过几遍。 asm提供的另一种方法是将类写入一个特殊的ClassNode
,并进行已经在其上的转换。 第一种方法更快,但可能不适用于解决非线性问题,因此我同时使用了两种方法。
将Suspendable
转换为Continuation
Suspendable
3个类:
SuspendInfoCollector
分析Suspendable.run
方法,收集有关对@Suspend
方法的所有调用以及所使用的局部变量的信息。SuspendableConverter
创建必填字段,更改Suspendable.run
方法的签名和句柄以获取Continuation.run
。SuspendableMethodConverter
转换Suspendable.run
方法的代码。 添加用于保存和还原局部变量,将当前状态保存在label
字段中并移至所需指令的代码。
让我们更详细地描述一些要点。
搜索run
方法如下所示:
MethodNode method = classNode.methods.stream() .filter(methodNode -> methodNode.name.equals("run") && (methodNode.access & Opcodes.ACC_BRIDGE) == 0) .findFirst() .orElseThrow(() -> new RuntimeException("Unable to find method to convert"));
预期在convertible类中将有两种run
方法,其中一种具有bridge修饰符( 在此处阅读)。 我们对没有修饰符的方法感兴趣。
在JVM字节码中,可以在任何地方执行有条件的(和无条件的)转换。 ASM有一个特殊的Label
抽象(label),它是字节码中的一个位置。 在整个代码中,在每次调用@Suspend
方法之后,我们都将在run
方法的开始处放置将对其进行条件跳转的标签。
@Override public void visitCode() {
我们在@Suspend
方法的调用之后放置标签。
@Override public void visitMethodInsn(int opcode, String owner, String name, String descriptor, boolean isInterface) { boolean suspendPoint = Utils.isSuspendPoint(classLoader, owner, name); super.visitMethodInsn(opcode, owner, name, descriptor, isInterface); if (suspendPoint) { super.visitVarInsn(Opcodes.ALOAD, THIS_VAR_INDEX);
测验
我们编写了一个生成器,该生成器连续给出三个数字。
testIntSequence public class YieldTest { @Test public void testIntSequence() { Sequence<Integer> sequence = new Sequence<Integer>(new SequenceSuspendable<Integer>() { @Override public void run(SequenceScope<Integer> scope) { scope.yield(10); scope.yield(20); scope.yield(30); } }); List<Integer> list = new ArrayList<>(); for (Integer integer : sequence) { list.add(integer); } assertEquals(10, (int) list.get(0)); assertEquals(20, (int) list.get(1)); assertEquals(30, (int) list.get(2)); } }
测试本身并不代表任何有趣的事情,但足以反编译类文件。
testIntSequence反编译 public class YieldTest { public YieldTest() { } @Test public void testIntSequence() { class NamelessClass_1 extends SequenceSuspendable<Integer> implements Continuation { private SequenceScope scope$S; NamelessClass_1() { } public Object run(Object var1) { int label = this.label$S$S; SequenceScope var2; if (label != 0) { if (label != 1) { if (label != 2) { if (label != 3) { throw new ContinuationEndException(); } else { var2 = this.scope$S; this.label$S$S = 4; return null; } } else { var2 = this.scope$S; this.yield(30); this.label$S$S = 3; this.scope$S = var2; return Continuation.SUSPEND; } } else { var2 = this.scope$S; this.yield(20); this.label$S$S = 2; this.scope$S = var2; return Continuation.SUSPEND; } } else { var2 = this.scope$S; this.yield(10); this.label$S$S = 1; this.scope$S = var2; return Continuation.SUSPEND; } } } Sequence<Integer> sequence = new Sequence(new NamelessClass_1()); List<Integer> list = new ArrayList(); Iterator var3 = sequence.iterator(); while(var3.hasNext()) { Integer integer = (Integer)var3.next(); list.add(integer); } Assert.assertEquals(10L, (long)(Integer)list.get(0)); Assert.assertEquals(20L, (long)(Integer)list.get(1)); Assert.assertEquals(30L, (long)(Integer)list.get(2)); } }
该代码非常肿,大多数指令都保存和恢复堆栈帧(局部变量)。 但是,它确实有效。 给定的示例将完美地工作而不会产生延迟。 让我们考虑一个更困难的例子。
斐波那契 public class YieldTest { @Test public void fibonacci() { Sequence<Integer> sequence = new Sequence<>(new Suspendable<Integer>() { @Override public void run(SequenceScope<Integer> scope) { scope.yield(1); scope.yield(1); int a = 1; int b = 1; while (true) { b += a; scope.yield(b); a += b; scope.yield(a); } } });
上面的代码生成无限的斐波那契数列。 我们编译和反编译:
斐波那契反编译 public class YieldTest { public YieldTest() { } @Test public void fibonacci() { class NamelessClass_1 extends SequenceSuspendable<Integer> implements Continuation { private SequenceScope scope$S; private int aa$S; private int ba$S; NamelessClass_1() { } public Object run(Object var1) { int label = this.label$S$S; SequenceScope var2; if (label != 0) { if (label != 1) { int var3; int var4; if (label != 2) { if (label == 3) { var2 = this.scope$S; var3 = this.aa$S; var4 = this.ba$S; var3 += var4; var2.yield(var3); this.label$S$S = 4; this.scope$S = var2; this.aa$S = var3; this.ba$S = var4; return Continuation.SUSPEND; } if (label != 4) { throw new ContinuationEndException(); } var2 = this.scope$S; var3 = this.aa$S; var4 = this.ba$S; } else { var2 = this.scope$S; var3 = 1; var4 = 1; } var4 += var3; var2.yield(var4); this.label$S$S = 3; this.scope$S = var2; this.aa$S = var3; this.ba$S = var4; return Continuation.SUSPEND; } else { var2 = this.scope$S; var2.yield(1); this.label$S$S = 2; this.scope$S = var2; return Continuation.SUSPEND; } } else { var2 = this.scope$S; var2.yield(1); this.label$S$S = 1; this.scope$S = var2; return Continuation.SUSPEND; } } } Sequence<Integer> sequence = new Sequence(new NamelessClass_1()); Integer tenthFibonacci = (Integer)StreamSupport.stream(sequence.spliterator(), false).skip(9L).findFirst().get(); Assert.assertEquals(55L, (long)tenthFibonacci); } }
了解使反编译类变得困难的原因。 像上次一样,大多数指令都在那儿驱动局部变量。 一些分配是无用的,变量立即被其他值磨损。 , .
while, . . . , '' return SUSPEND
.
总结
, , , . yield. , , — , . , ( ) . , JIT . yield
yieldAll
— , , , , . , , .
— — , . , . , : , .