自底向上看Kotlin协程

翻译自 A Bottom-Up View of Kotlin Coroutines
原文链接:https://www.infoq.com/articles/kotlin-coroutines-bottom-up/

关键要点

  • JVM 并未提供 native 的协程支持
  • Kotlin 在编译器中通过(将函数)转换为状态机实现协程
  • Kotlin 实现协程只使用了一个保留字,剩余部分都通过 Library 实现
  • Kotlin 使用 Continuation Passing Style(CPS) 来实现
  • 协程会使用调度器 (Dispatchers),所以在 JavaFX,Android 和 Swing 中使用可能会有些许差异。

协程尽管已经不是个新鲜事物,但仍然是个引人入胜的话题。从各种文档中可以看到,协程在多年来已经被反复多次挖掘,典型的情况是需要一种轻量级的线程,或是为了解决“回调地狱”的问题。

近期在 JVM 平台上,协程已成为响应式编程 (Reactive Programming) 之外的另一种选择。诸如 RxJava 和 Project Reactor 等框架提供给用户一种增量处理信息的方式,同时对节流和并行提供了广泛的支持。但是你必须使用响应式流 (reactive streams) 和函数式操作 (functional operation) 重构代码,在许多场景下这是弊大于利的。

这就是 Android 社区内一直在寻求一种更简单的替代方案的原因。Kotlin 语言引入协程并将其作为实验功能来满足需求,同时在经过一些修订之后,Kotlin 的 1.3 版本已经将其作为正式 feature。协程已经逐渐跳出了 GUI 开发的圈子,被服务端框架(比如Spring 5)和 Arrow 等函数式编程框架采用。


学习协程的挑战

不幸的是,理解协程并不容易。尽管已经有非常多来自 Kotlin 砖家的访谈,但是其中很多只是对“协程是什么”或者“如何使用协程”提供了单一的看法,很难将这些看法聚合在一块。你可能会说协程是并行编程中的 monads。

一部分问题在于其背后的实现。Kotlin 编译器只实现了一个 suspend 关键词,剩余部分都由协程库来处理。协程从结果上看来一个非常强大和灵活的工具,但是也有些杂乱无章。这就给新手学习造成了障碍,他们需要通过可靠的引导和严格的规范才能学好。这篇文章从自底向上的视角来看协程,希望能够以此提供一些基础。

示例程序(服务端)

我们的应用将基于一个经典的问题,这个问题需要安全有效地多次调用 RESTful 服务。我们来玩一个《Where’s Waldo》的文字版游戏——用户需要遵循一系列名字的规则,直到最终到达“ Waldo”。

下面是一个使用 Http4k 编写的 RESTful 服务。Marius Eriksen 在一篇众所周知的 paper 中提出了一种函数式服务端架构,Http4k 就是这一架构的 Kotlin 实现。这一框架在许多其他语言中都有实现,包括 Scala(Http4s)和Java8(Http4j)。

假设有一个路由通过 Map 实现了一串名称。当传入一个名字时,我们要么返回对应的值和 200 状态码,要么返回 404 和一条错误信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
fun main() {
val names = mapOf(
"Jane" to "Dave",
"Dave" to "Mary",
"Mary" to "Pete",
"Pete" to "Lucy",
"Lucy" to "Waldo"
)

val lookupName = { request: Request ->
val name = request.path("name")
val headers = listOf("Content-Type" to "text/plain")
val result = names[name]
if (result != null) {
Response(OK)
.headers(headers)
.body(result)
} else {
Response(NOT_FOUND)
.headers(headers)
.body("No match for $name")
}
}

routes(
"/wheresWaldo" bind routes(
"/{name:.*}" bind Method.GET to lookupName
)
).asServer(Netty(8080))
.start()
}

大体上,我们想让客户端发起类似以下一连串请求:
1

示例程序(客户端)

客户端应用是一个基于 JavaFX 的桌面图形界面。为了简化工作和避免一些不必要的细节,我们使用 TornadoFX,它将一些 Kotlin DSL 应用于 JavaFX 的上层。

以下是客户端视图的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class HelloWorldView: View("Coroutines Client UI") {
private val finder: HttpWaldoFinder by inject()
private val inputText = SimpleStringProperty("Jane")
private val resultText = SimpleStringProperty("")

override val root = form {
fieldset("Lets Find Waldo") {
field("First Name:") {
textfield().bind(inputText)
button("Search") {
action {
println("Running event handler".addThreadId())
searchForWaldo()
}
}
}
field("Result:") {
label(resultText)
}
}
}
private fun searchForWaldo() {
GlobalScope.launch(Dispatchers.Main) {
println("Doing Coroutines".addThreadId())
val input = inputText.value
val output = finder.wheresWaldo(input)
resultText.value = output
}
}
}

我们同时也会用以下辅助函数作为 String 类的扩展函数

1
fun String.addThreadId() = "$this on thread ${Thread.currentThread().id}"

下面是 UI 的大致样式:
2

当用户点击按钮时,会启动一个新的协程,通过一个 service 对象 HttpWaldoFinder 访问 RESTful 路由。

Kotlin 协程只在一个“协程作用域”中生存,协程作用域和一个代表其底层并发模型的调度器相关联。这个并发模型通常是一个线程池,但是也有很多其它选择。

有哪些调度器可用取决于运行 Kotlin 代码的环境。Main Dispatcher 通常表示 UI 库的事件处理线程,因此(在JVM上)仅在 Android,JavaFX 和 Swing 中可用。最初,Kotlin Native 根本不支持 Coroutines 多线程,但是这种情况正在改变。在服务端,你可以自己引入协程,但是越来越多的地方已经默认支持了,例如 Spring 5。

在开始调用 suspend 方法之前,我们必须有一个协程,一个“协程作用域 (CoroutineScope) ”和一个“调度器 (Dispatcher)”。如果是第一次调用(如上面的代码所示),我们可以通过“协程构建器”(如launchasync)来启动一个协程。

调用协程构建函数或“withContext”之类的作用域函数,都始终会创建一个新的协程作用域。在作用域内,协程所执行的任务由 Job 实例组成的层级结构表示。

有一些有趣的特性,即:

  • 每一个 Job 会先等待自己作用域中的所有协程完成,然后再完成自身。
  • 取消一个 Job 导致其所有子 Job 被取消。
  • 子 Job 的失败或取消会传播到父 Job

这样设计旨在避免并发编程中的常见问题,例如在不终止子任务的情况下把父任务杀死了。

访问 REST 路由的服务

这是我们的HttpWaldoFinder服务的完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class HttpWaldoFinder : Controller(), WaldoFinder {
override suspend fun wheresWaldo(starterName: String): String {
val firstName = fetchNewName(starterName)
println("Found $firstName name".addThreadId())

val secondName = fetchNewName(firstName)
println("Found $secondName name".addThreadId())

val thirdName = fetchNewName(secondName)
println("Found $thirdName name".addThreadId())

val fourthName = fetchNewName(thirdName)
println("Found $fourthName name".addThreadId())

return fetchNewName(fourthName)
}
private suspend fun fetchNewName(inputName: String): String {
val url = URI("http://localhost:8080/wheresWaldo/$inputName")
val client = HttpClient.newBuilder().build()
val handler = HttpResponse.BodyHandlers.ofString()
val request = HttpRequest.newBuilder().uri(url).build()

return withContext<String>(Dispatchers.IO) {
println("Sending HTTP Request for $inputName".addThreadId())
client
.send(request, handler)
.body()
}
}
}

fetchNewName函数接受一个已知的人名,并且通过请求路由拿到和它关联的名字。这是通过 HttpClient 完成的,HttpClient 自从 Java 11 之后成为了标准。实际的 HTTP GET 请求运行在一个使用 IO Dispatcher 的新协程之中。其中 IO Dispatcher 通常意味着一个线程池,其为长期运行的任务(如网络调用)做了优化。

wheresWaldo函数调用请求五次来寻找”Waldo”。因为我们随后会反编译生成的字节码,所以我们实现尽可能的简单。值得注意的是,每一次调用fetchNewName,当子协程运行过程中,会导致当前协程被挂起。在这个特定例子里,父协程运行于 Main Dispatcher,子协程运行于 IO Dispatcher。所以,当子协程在执行 HTTP 请求时,UI 事件处理线程会被释放开来,以处理其它用户与视图的交互。如下图所示:

3

当我们调用一个 suspend 函数时,Intellij 会作出提示,同时转换协程之间的控制权。注意到,如果我们在调用时不切换调度器,则并不一定会导致创建新的协程。当一个 suspend 函数调用另一个 suspend 函数时,可以在同一协程中继续执行,实际上,如果我们想要将两次调用停留在同一线程上,这恰恰是我们期望的行为。

4

当我们执行客户端时,这是控制台的输出:

5

可以看到,在这一特定情况下,Main Dispatcher 在线程17上运行,而 IO Dispatcher 在包含线程 24 和 26 的线程池上运行。

开始探索

使用 IntelliJ 自带的字节码反汇编工具,我们可以一探究竟。当然我们还可以使用 JDK 随附的标准“javap”工具。

6

我们可以看到 HttpWaldoFinder 的方法签名变了。它额外多了一个参数接受 continuation 对象,同时返回值变成了一个 Object。

1
2
3
4
5
public final class HttpWaldoFinder extends Controller implements WaldoFinder {

public Object wheresWaldo(String a, Continuation b)
final synthetic Object fetchNewName(String a, Continuation b)
}

接下来我们深入研究添加到这些方法中的代码,并解释“continuation”是什么,以及被修改过的函数返回值到底是什么。

Continuation Passing Style (CPS)

按照 Kotlin 标准化进程中的协程提案记载,协程的实现是基于 Continuation Passing Style。而 continuation 对象用于存储函数挂起时所必须的状态。

实际上,suspend 函数中的每个局部变量都会成为 continuation 对象中的一个属性。同时如果函数是某个类的方法,还会为该函数的参数和当前对象的引用创建 continuation 中的属性。假设一个挂起方法有 4 个参数和 5 个局部变量,那么生成的 continuation 对象中有至少 10 个字段。

HttpWaldoFinder 类的 wheresWaldo 方法中,有一个参数,4 个局部变量。所以 continuation 对象当中应当有 6 个字段。若我们将 kotlin 编译器生成的字节码反编译为 Java 代码,可以发现的确如此:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$continuation = new ContinuationImpl($completion) {
Object result;
int label;
Object L$0;
Object L$1;
Object L$2;
Object L$3;
Object L$4;
Object L$5;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return HttpWaldoFinder.this.wheresWaldo((String)null, this);
}
};

由于所有成员变量都是 Object 类型,因此它们的作用并不是很明显。但是随着我们进一步探索,我们将看到:

  • L$0保存对HttpWaldoFinder实例的引用。始终如此。
  • L$1保存starterName参数的值。始终如此。
  • L$2L$5保留局部变量的值。这些将在代码执行时逐步填充。L$2将保留firstName的值,依此类推。

我们还有其他字段用于最终结果,和一个有趣的整数,称为“label”。

挂起还是不挂起—这是一个值得考虑的问题

我们查看编译器生成的代码时,有一点要注意:代码必须处理两种情况。每当一个 suspend 函数调用另一个时,它有可能挂起当前的协程(随后另一个函数可以运行到相同的协程),也有可能继续执行当前协程。

假设有一个 suspend 函数从数据存储中读取一个值,大概率它会在 I/O 时被挂起。但是它也有可能有缓存,接下来直接同步返回缓存结果。Kotlin 编译器生成的代码必须允许每一种情况。

Kotlin 编译器调整了每个 suspend 函数的返回值类型,因此其既可以返回真正的结果,也可以返回一个特殊值 COROUTINE_SUSPENDED。如果是后者则代表当前的协程挂起了。这就是 suspend 函数的返回值从原有的结果类型变成了 Object 的原因。

在我们的例子 wheresWaldo 中,它反复的调用了 fetchNewName。理论上每一次调用有可能会挂起,也有可能不挂起当前的协程。作为编写 fetchNewName 函数的程序员,我们一定很清楚这里会发生挂起。但是要想使生成的代码能够跑通,我们必须清楚它需要处理所有的情况。

巨大的 switch 语句和标签

我们进一步看反编译的代码,可以发现一个 switch 表达式和其中很多嵌套的标签。这其实是一个状态机的实现,用于控制 wheresWaldo 方法中的不同挂起点。这是其整体结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// listing one: the generated switch statement and labels
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
label47: {
label46: {
Object $result = $continuation.result;
var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch($continuation.label) {
case 0:
// code omitted
case 1:
// code omitted
case 2:
// code omitted
case 3:
// code omitted
case 4:
// code omitted
case 5:
// code omitted
default:
throw new IllegalStateException(
"call to 'resume' before 'invoke' with coroutine");
} // end of switch
// code omitted
} // end of label 46
// code omitted
} // end of label 47
// code omitted
} // end of label 48
// code omitted

现在可以看出 contiuation 中 label 字段的目的了。当完成了 wheresWaldo 的不同阶段时,我们会修改 label 的值。嵌套的语句块包含了原始 kotlin 代码挂起点之间的代码。label 值允许代码是可重入的,并且可以跳到上一次挂起的地方(适宜的case 语句),从 continuation 对象中取回信息,然后 break 掉对应的代码块。

然而,如果所有的挂起点都最终没有挂起的话,整个代码块将同步执行。在生成的代码中我们经常看到以下片段:

1
2
3
4
// listing two - deciding if the current coroutine should suspend
if (var10000 == var11) {
return var11;
}

上述代码的 var11 已经被赋值为 CONTINUATION_SUSPENDED,同时 var10000 持有另一个 suspend 函数的返回值。所以,当发生挂起操作时,上述代码返回(后续会重入)。如果没有发生挂起,代码会通过 break 一个合适的 label 继续执行下一个部分。

再次提示,生成的编译代码并不能假设所有的调用都会被挂起,或者所有的调用都会继续执行。它必须处理每一种可能。

跟踪执行过程

当我们开始执行时,continuation 中的 label 初始值是 0。以下是对应的 switch 分支:

1
2
3
4
5
6
7
8
9
10
11
// listing three - the first branch of the switch
case 0:
ResultKt.throwOnFailure($result);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.label = 1;
var10000 = this.fetchNewName(starterName, $continuation);
if (var10000 == var11) {
return var11;
}
break;

我们将当前类的实例(this 指针)和参数存入 continuation 对象,然后将 continuation 作为参数传入 fetchNewName。我们之前已经讨论过,fetchNewName 会返回真正的结果或者一个 CONTINUATION_SUSPENDED 值。

如果协程被挂起,那么我们会从函数中返回,并且下一次恢复的时候会跳到 case 1 的分支。如果我们继续执行协程,那么我们会从 switch 语句中跳出,执行以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
// listing four - calling ‘fetchNewName’ for the second time
firstName = (String)var10000;
secondName = UtilsKt.addThreadId("Found " + firstName + " name");
boolean var13 = false;
System.out.println(secondName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.label = 2;
var10000 = this.fetchNewName(firstName, $continuation);
if (var10000 == var11) {
return var11;
}

因为已知 var10000 包含了我们想要的返回值,我们可以转换到正确到类型并且保存到局部变量 firstName 中。生成的代码接下来用 secondName 存放增加了线程 ID 的值,然后接下来被输出。

continuation 中的值被更新,添加了我们从服务端取回的值。注意到 label 的值现在是 2,我们接下来第三次执行fetchNewName

第三次执行—不挂起

我们需要根据 fetchNewName 的返回值再次做选择。如果返回值是 CONTINUATION_SUSPENDED 我们直接返回,下一次调用时直接进入 case 2 的分支。

如果我们在当前协程继续执行,那么接下来的代码块会被执行。如你所见,这和上面出现的代码是相同的,除了我们向 continuation 中存放了更多的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// listing four - calling ‘fetchNewName’ for the third time
secondName = (String)var10000;
thirdName = UtilsKt.addThreadId("Found " + secondName + " name");
boolean var14 = false;
System.out.println(thirdName);
$continuation.L$0 = this;
$continuation.L$1 = starterName;
$continuation.L$2 = firstName;
$continuation.L$3 = secondName;
$continuation.label = 3;
var10000 = this.fetchNewName(secondName, (Continuation)$continuation);
if (var10000 == var11) {
return var11;
}

如果后续的代码不挂起,那么剩下的部分会一直按这个模式执行下去直到函数结束。

第三次执行—挂起

另一种情况是,如果协程被挂起,那么我们会执行以下的 case 代码块:

1
2
3
4
5
6
7
8
// listing five - the third branch of the switch
case 2:
firstName = (String)$continuation.L$2;
starterName = (String)$continuation.L$1;
this = (HttpWaldoFinder)$continuation.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label46;

我们从 continuation 中提取值,保存到函数的局部变量中。一个带有 label 的 break 会将代码跳转到上一章节代码注释中的 listing four。因此,无论是否挂起,最终我们将在同一处结束。

汇总执行过程

重温我们梳理的代码结构,我们可以从一个很高的角度来描述每个阶段到底做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// listing six - the generated switch statement and labels in depth
String firstName;
String secondName;
String thirdName;
String fourthName;
Object var11;
Object var10000;
label48: {
label47: {
label46: {
Object $result = $continuation.result;
var11 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch($continuation.label) {
case 0:
// set label to 1 and make the first call to ‘fetchNewName’
// if suspending return, otherwise break from the switch
case 1:
// extract the parameter from the continuation
// break from the switch
case 2:
// extract the parameter and first result from the continuation
// break to outside ‘label46’
case 3:
// extract the parameter, first and second results from the
// continuation
// break to outside ‘label47’
case 4:
// extract the parameter, first, second and third results from
// the continuation
// break to outside ‘label48’
case 5:
// extract the parameter, first, second, third and fourth
// results from the continuation
// return the final result
default:
throw new IllegalStateException(
"call to 'resume' before 'invoke' with coroutine");
} // end of switch
// store the parameter and first result in the continuation
// set the label to 2 and make the second call to ‘fetchNewName’
// if suspending return, otherwise proceed
} // end of label 46
// store the parameter, first and second results in the
// continuation
// set the label to 3 and make the third call to ‘fetchNewName’
// if suspending return, otherwise proceed
} // end of label 47
// store the parameter, first, second and third results in the
// continuation
// set the label to 4 and make the fourth call to ‘fetchNewName’
// if suspending return, otherwise proceed
} // end of label 48
// store the parameter, first, second, third and fourth results in the continuation
// set the label to 5 and make the fifth call to ‘fetchNewName’
// return either the final result or COROUTINE_SUSPENDED

总结

这些代码并不容易理解。我们正在研究从 Kotlin 编译器生成的字节码反汇编的 Java 代码。此代码生成器的输出旨在提高效率和简约性,而非清晰。

但是,我们可以得出一些有用的结论:

  • 并没有黑魔法。当开发人员首次了解协程时,很容易假设有一些特殊的“魔法”将所有东西捆绑在一起。如我们所见,生成的代码仅使用了面向过程编程的基本代码块,例如条件和标记的中断。
  • 协程的实现是基于 continuation 的。如最初 KEEP 提案中所述,通过在对象内缓存函数的状态来挂起和恢复函数。因此,对于每个挂起函数,编译器将创建一个具有 N 个字段的 continuation 类型,其中 N 是参数数量加上成员变量的字段数量加 3。最后的 3 分别保存当前对象(this),最终结果(返回值)和索引值(label)。
  • 执行过程始终遵循一套标准模式。如果要从挂起中恢复,则可以使用 continuation 对象的 label 字段跳转到 switch 语句的适当分支。在这个分支中,我们取得 continuation 对象执行到当前的数据,然后使用 labelled break 跳转到下一段代码,如果没有发生挂起,这段代码将直接执行。