协程是什么

协程让异步逻辑同步化,杜绝回调地狱。协程最核心的点就是,函数或者一段程序能够被挂起,稍后再在挂起的位置恢复。kotlin中的协程基于线程,它是轻量级线程。

挂起与恢复

协程新增了suspend和resume:

  • suspend:也称为挂起或暂停,用于暂停执行当前协程,并保存所有局部变量;
  • resume:用于让已暂停的协程从其暂停处继续执行。

使用suspend关键字修饰的函数叫作挂起函数。挂起函数只能在协程体内或其他挂起函数内调用。

协程构建器

launch和async

launch和async构建器都用来启动新协程,它们的区别如下:

  • launch:返回一个Job且不附带任何结果值
  • async:返回一个Deferred,Deferred也是一个Job,可以使用.await()在一个延期的值上得到他的最终结果。

等待协程作业:join、await

示例如下:

class CoroutineTest {
  //runBlocking的作用就是将主线程变成主协程
  @Test
  fun `test coroutine`() = runBlocking {
    val job1 = launch {
      delay(200)
      println("job1 finished")
    }
    //join表示等待job1完成任务,job1完成任务之后才会执行job2
    job1.join()
    //
    val job2 = async {
      delay(200)
      println("job2 finished")
      "job2 result"
    }
    //await不仅会返回结果,也会等待job2完成任务,job2完成任务之后才会执行job3
    println(job2.await())
    val job3 = launch {
      delay(200)
      println("job3 finished")
    }
  }
}

协程的启动模式

  • DEFAULT:协程创建后,立即开始调度,在调度前如果协程被取消,其将直接进入取消响应的状态。
  • ATOMIC:协程创建后,立即开始调度,协程执行到第一个挂起点之前不响应取消。
  • LAZY:只有协程被需要时,包括主动调用协程的start、join或者await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。
  • UNDISPATCHED:协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正挂起的点。

DEFAULT与ATOMIC的区别

class CoroutineTest {
  @Test
  fun `test coroutine`() = runBlocking {
    val job = async(start = CoroutineStart.ATOMIC) {
      //do something...
      delay(2000)
      println("job finished")
    }
    //区别:如果是ATOMIC,就算执行了cancel也不会立即取消,会等到//do something...这里的任务完成之后才会取消;
    //     如果是DEFAULT,执行了cancel之后就会立即取消
    job.cancel()
  }
}

UNDISPATCHED

注意:UNDISPATCHED启动模式的优先级是最高的,因为只有该启动模式是立即执行。

class CoroutineTest {
  @Test
  fun `test coroutine`() = runBlocking {
    val job = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
      //以UNDISPATCHED形式启动时,代表不转发,所以这里输出的线程就是`test coroutine`函数所在线程,而不是IO线程。
      println("threadName is ${Thread.currentThread().name}")
    }
  }
}

协程作用域

定义协程必须指定其CoroutineScope,它会跟踪所有协程,同样它还可以取消由它所启动的所有协程。
常用的相关API有:

  • GlobalScope,生命周期是process级别的,即使Activity或Fragment已经被销毁,协程仍然在执行。
  • MainScope, 在Activity中使用,可以在onDestroy()中取消协程。
  • viewModelScope, 只能在ViewModel中使用,绑定ViewModel的生命周期。
  • lifecyclescope,只能在Activity、Fragment中使用,会绑定Activity和Fragment的生命周期。

MainScope

以下两种写法实现的功能是一样的,只不过第二种通过委托的方式实现了

class TestActivity : Activity()  {
  private var mainScope = MainScope()
  override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    mainScope.launch {
      delay(3000)
    }
  }
  override fun onDestroy() {
    super.onDestroy()
    mainScope.cancel()
  }
}
class TestActivity : Activity(), CoroutineScope by MainScope()  {
  override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    launch {
      delay(3000)
    }
  }
  override fun onDestroy() {
    super.onDestroy()
    cancel()
  }
}

coroutineScope与runBlocking

  • runBlocking是常规函数,而coroutineScope是挂起函数
  • 它们都会等待其协程体以及所有子协程结束,主要区别在于runBlocking方法会阻塞当前线程来等待,而coroutineScope只是挂起,会释放底层线程用于其他用途。

coroutineScope与supervisorScope

  • coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消。
  • supervisorScope:一个协程失败了,不会影响其他兄弟协程。

Job

对于每一个创建的协程(通过launch或者async),会返回一个Job实例,该实例是协程的唯一标识,并且负责管理协程的生命周期。
一个任务可以包含一系列状态:新创建(New)、活跃(Active)、完成中(Completing)、已完成(Completed)、取消中(Cancelling)和已取消(Cancelled)。虽然我们无法直接访问这些状态,但是我们可以访问Job属性:isActive、isCancelled和isCompleted。

生命周期

如果协程处于活跃状态,协程运行出错或者调用job.cancel()都会将当前任务置为取消中(Cancelling)状态(isActive = false, isCancelled = true)。当所有的子协程都完成后,协程会进入已取消(Cancelled)状态,此时isCompleted = true。

job生命周期

协程的取消

这里记住概念即可,【协程取消的副作用】一节中会有示例。

  1. 取消作用域会取消它的子协程。
  2. 被取消的子协程并不会影响其余兄弟协程。
  3. 协程通过抛出一个特殊的异常 CancellationException 来处理取消操作。
  4. 所有kotlinx.coroutines中的挂起函数 (withContext、delay等)都是可取消的。

CPU密集型任务的取消方式

  1. isActive:是一个可以被使用在CoroutineScope中的扩展函数,检查Job是否处于活跃状态。
  2. ensureActive():如果Job处于非活跃状态,这个方法会立即抛出异常。
  3. yield():该函数会检查所在协程的状态,如果已经取消,则抛出CancellationException予以响应。此外,它还会尝试出让协程的执行权,给其他协程提供执行机会。
class CoroutineTest {
  private fun CoroutineScope.jobWork(startTime: Long) {
    var nextPrintTime = startTime
    var i = 0
    //方式1
    //while(i < 5 && isActive) { //该函数不会抛出异常
    //方式2
    //while(i < 5) {
    //  ensureActive() //该函数会抛出CancellationException,但是会被静默处理
    //方式3
    while(i < 5) {
      yield() //该函数既会抛出CancellationException,也会尝试让出CPU执行权
      if (System.currentTimeMillis() >= nextPrintTime) {
        println("job , I'm sleeping ${i++} ...")
        nextPrintTime += 500
      }
    }
  }
  @Test
  fun `test cancel cpu task`() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        jobWork(startTime)
    }
    delay(1300)
    println("main , I'm tried of waiting!")
    //job.cancel()
    //job.cancel(CancellationException("自定义的取消异常")) //CancellationException可以自定义异常消息
    //job.join() join用于当前协程等待job协程完成
    job.cancelAndJoin()
    println("main , I'm can quit.")
  }
}

协程取消的副作用

1、在finally中释放资源。

2、use():该函数只能被实现了Closeable的对象使用,程序结束的时候会自动调用close方法,适合文件对象。

//紧接【CPU密集型任务的取消方式】中的示例
class CoroutineTest {
  @Test
  fun `test cancel cpu task`() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
      try {
        jobWork(startTime)
      } catch (e : CancellationException) {
        //因为CancellationException会被内部try-catch静默处理,所以只有通过这种方式才能打印出异常
        println("job is cancel")
      } finally {
        println("Cleanup done!")
      }
    }
    //job之后的代码与【CPU密集型任务的取消方式】一节中一致,故省略
  }
}

不能取消的任务

处于取消中状态的协程不能够挂起(运行不能取消的代码),当协程被取消后需要调用挂起函数,我们需要将清理任务的代码放置于NonCancellable CoroutineContext中。这样会挂起运行中的代码,并保持协程的取消中状态直到任务处理完成。

//紧接【协程取消的副作用】中的示例
class CoroutineTest {
  @Test
  fun `test cancel cpu task`() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
      try {
        jobWork(startTime)
      } catch (e : CancellationException) {
        println("job is cancel")
      } finally {
        withContext(NonCancellable){
          delay(1000L) //或一些其他的挂起函数
          println("Cleanup done!")
        }
      }
    }
    //job之后的代码与【CPU密集型任务的取消方式】一节中一致,故省略
  }
}

超时任务

很多情况下取消一个协程的理由是它有可能超时,使用withTimeout可以当协程超时的时候抛出TimeoutCancellationException。而使用withTimeoutOrNull通过返回null来进行超时操作,从而代替抛出一个异常。

class CoroutineTest {
  @Test
  fun `test deal with timeout return null`() = runBlocking<Unit> {
    //该函数超时会抛出TimeoutCancellationException
    //val result = withTimeout(1300) {
    //该函数超时会返回null
    val result = withTimeoutOrNull(1300) {
      repeat(1000) { i ->
        println("job: I'm sleeping $i")
        delay(500L)
      }
      "Done"
    } ?: "Default"
    //协程超时会返回Result is Default,协程正常完成会返回Result is Done
    println("Result is $result")
  }
}

协程的上下文

CoroutineContext是一组用于定义协程行为的元素。它由如下几项构成:

  1. Job:控制协程的生命周期。
  2. CoroutineDispatcher::向合适的线程分发任务。
  3. CoroutineName:协程的名称,调试的时候很有用。
  4. CoroutineExceptionHandler:处理未被捕捉的异常。

组合上下文中的元素

有时我们需要在协程上下文中定义多个元素。我们可以使用+操作符来实现。比如说,我们可以显示指定一个调度器来启动协程并且同时显示指定一个命名:

class CoroutineTest {
  @Test
  fun `test CoroutineContext`() = runBlocking<Unit> {
    launch(Dispatchers.Default + CoroutineName("test")) {
      println("I'm working in thread${Thread.currentThread().name}")
    }
  }
}

协程上下文的继承

对于新创建的协程,它的CoroutineContext会包含一个全新的Job实例,它会帮助我们控制协程的生命周期。 而剩下的元素会从CoroutineContext的父类继承,该父类可能是另外一个协程或者创建该协程的CoroutineScope。

class CoroutineTest {
  @Test
  fun `test CoroutineContext extend`() = runBlocking<Unit> {
    val scope = CoroutineScope(Job() + Dispatchers.IO + CoroutineName("test"))
    val job = scope.launch {
      //新的协程会将CoroutineScope作为父级
      println("${coroutineContext[Job]} ${Thread.currentThread().name}")
      val result = async {
        //通过async创建的新协程会将当前协程作为父级
        println("${coroutineContext[Job]} ${Thread.currentThread().name}")
        "OK"
      }.await()
    }
    job.join()
  }
}

协程的上下文 = 默认值 + 继承的CoroutineContext +参数

  1. 一些元素包含默认值:Dispatchers.Default是默认的CoroutineDispatcher,以及“coroutine”作为默认的CoroutineName;
  2. 继承的CoroutineContext是CoroutineScope或者其父协程的CoroutineContext;
  3. 传入协程构建器的参数的优先级高于继承的上下文参数,因此会覆盖对应的参数值。
class CoroutineTest {
  @Test
  fun `test CoroutineContext extend2`() = runBlocking<Unit> {
    val coroutineExceptionHandler = CoroutineExceptionHandler { _, exception ->
      println("Caught $exception")
    }
    val scope = CoroutineScope(Job() + Dispatchers.Main + coroutineExceptionHandler)
    //新的CoroutioneContext = 父级CoroutineContext + Job()
    val job = scope.launch(Dispatchers.IO) {
      //新协程:job的CoroutineContext包含Dispatchers.IO而不是scope对象里的Dispatcher.Main,因为它被协程的构建器里的参数覆盖了,但是job对象会重新创建,所以和scope中的job对象不一样。
    }
  }
}

协程的异常处理

协程构建器有两种形式:自动传播异常(launch与actor),向用户暴露异常(async与produce)当这些构建器用于创建一个根协程时(该协程不是另一个协程的子协程), 前者这类构建器,异常会在它发生的第一时间被抛出,而后者则依赖用户来最终消费异常,例如通过await或receive。

class CoroutineTest {
  //根协程异常处理
  @Test
  fun `test exception propagation`() = runBlocking<Unit> {
    val job = GlobalScope.launch {
      throw IndexOutOfBoundsException() //此处抛出异常,需要在此处try-catch
    }
    job.join()
    val deferred = GlobalScope.async {
      throw ArithmeticException()
    }
    deferred.await() //此处抛出异常,需要在此处try-catch
  }
}

非根协程的异常

其他协程所创建的协程中,产生的异常总是会被传播。

class CoroutineTest {
  @Test
  fun `test exception propagation2`() = runBlocking<Unit> {
    val scope = CoroutineScope(Job())
    val job = scope.launch {
      //如果async抛出异常,launch就会立即抛出异常
      async {
        throw IllegalArgumentException()
      }
    }
    job.join()
  }
}

异常的传播特性

当一个协程由一个异常而运行失败时,它会传播这个异常并传递给它的父级。接下来,父级会进行下面几步操作:

  1. 取消它自己的子级;
  2. 取消它自己;
  3. 将异常传播并传递给它的父级。

SupervisorJob

使用SupervisorJob时,一个子协程的运行失败不会影响到其他子协程。SupervisorJob不会传播异常给它的父级,它会让子协程自己处理异常。

class CoroutineTest {
    @Test
    fun `test SupervisorJob`() = runBlocking<Unit> {
        val supervisor = CoroutineScope(SupervisorJob())
        val job1 = supervisor.launch {
            delay(100)
            println("child 1")
            throw IllegalArgumentException()
        }
        val job2 = supervisor.launch {
            delay(400)
            println("child 2")
        }
        //delay(200)
        //supervisor.cancel() //使用cancel方法可以取消所有job
        joinAll(job1, job2)
    }
}

supervisorScope

当作业自身执行失败的时候,所有子作业将会被全部取消。但是子协程的运行失败不会影响到其他子协程。

class CoroutineTest {
    @Test
    fun `test SupervisorScope`() = runBlocking<Unit> {
        supervisorScope {
            launch {
                try {
                    println("The child is sleeping")
                    delay(Companion.MAX_VALUE)
                } finally {
                    println("The child is cancelled")
                }
            }
            yield()//使用yield来给我们的子作业一个机会来执行打印
            println("Throwing an exception from the scope")
            throw AssertionError() //当作业自身执行失败的时候,所有子作业将会被全部取消
        }
    }
}

异常的捕获

使用CoroutineExceptionHandler可以对协程的异常进行捕获,但是需要满足以下条件:

  1. 时机:异常是被自动抛出异常的协程所抛出的(使用launch,而不是async时);
  2. 位置:在CoroutineScope的CoroutineContext中 或 在一个根协程(CoroutineScope或者supervisorScope的直接子协程)中。
class CoroutineTest {
    @Test
    fun `test CoroutineExceptionHandler`() = runBlocking<Unit> {
        val handler = CoroutineExceptionHandler { _, exception ->
            println("Caught $exception")
        }
        //案例1:launch可以被捕获,async不能被捕获
        val job = GlobalScope.launch(handler) {
            throw AssertionError()
        }
        val deferred = GlobalScope.async(handler) {
            throw ArithmeticException()
        }
        job.join()
        deferred.await() //因为此处才是抛出异常的真正位置
        //案例2:能捕获到子协程抛出的异常
        val scope = CoroutineScope(Job())
        val job2 = scope.launch(handler) {
            launch {
                throw AssertionError()
            }
        }
        job2.join()
        //案例3:不能捕获到子协程抛出的异常
        val scope = CoroutineScope(Job())
        val job3 = scope.launch {
            launch(handler) {
                throw AssertionError()
            }
        }
        job3.join()
    }
}

Android中全局异常处理

全局异常处理器可以获取到所有协程未处理的未捕获异常,不过它并不能对异常进行捕获,虽然不能阻止程序崩溃, 全局异常处理器在程序调试和异常上报等场景中仍然有非常大的用处。

使用:我们需要创建src/main/resources/META-INF/services目录,并在其中创建一个名为kotlinx.coroutines.CoroutineExceptionHandler的文件,文件内容就是我们的全局异常处理器的全类名。

package com.example.myapplication
/*
 * 这个类的全类名就是:com.example.myapplication.GlobalCoroutineExceptionHandler
 */
class GlobalCoroutineExceptionHandler : CoroutineExceptionHandler {
    override val key = CoroutineExceptionHandler
    override fun handleException(context: CoroutineContext, exception: Throwable) {
        Log.d("TAG", "Unhandled Coroutine Exception:$exception")
    }
}

取消与异常

  1. 取消与异常紧密相关,协程内部使用CancellationException来进行取消,这个异常会被忽略。所以当子协程被取消时,不会取消它的父协程。
  2. 如果一个协程遇到了CancellationException以外的异常,它将使用该异常取消它的父协程。 但是直到父协程的所有子协程都结束后,异常才会被父协程处理。
class CoroutineTest {
    @Test
    fun `test cancel and exception`() = runBlocking<Unit> {
        val handler = CoroutineExceptionHandler { _, exception ->
            println("Caught $exception")
        }
        val job = GlobalScope.launch(handler) {
            launch {
                try {
                    delay(Long.MAX_VALUE)
                } finally {
                    withContext(NonCancellable) {
                        println("Children are cancelled, but exception is not handled until all children terminate")
                        delay(100)
                        println("The first child finished its non cancellable block")
                    }
                }
            }
            launch {
                delay(10)
                println("Second child throws an exception")
                throw ArithmeticException()
            }
        }
        job.join()
        /** 输出顺序:
        Second child throws an exception
        Children are cancelled, but exception is not handled until all children terminate
        The first child finished its non cancellable block
        Caught java.lang.ArithmeticException
         */
    }
}

异常聚合

当协程的多个子协程因为异常而失败时,一般情况下取第一个异常进行处理。在第一个异常之后发生的所有其他异常,都将被绑定到第一个异常之上。

class CoroutineTest {
    @Test
    fun `test exception aggregation`() = runBlocking<Unit> {
        val handler = CoroutineExceptionHandler { _, exception ->
            //输出值:Caught java.io.IOException [java.lang.ArithmeticException]
            println("Caught $exception ${exception.suppressed.contentToString()}")
        }
        val job = GlobalScope.launch(handler) {
            launch {
                try {
                    delay(Long.MAX_VALUE)
                } finally {
                    throw ArithmeticException()
                }
            }
            launch {
                delay(100)
                throw IOException()
            }
        }
        job.join()
    }
}

Flow-异步流

Flow简单示例

class FlowTest {
  @Test
  fun test() = runBlocking { 
    simpleFlow().collect { value -> println(value) }
  }
  //异步一次返回一个值
  fun simpleFlow() = flow<Int> {
    for (i in 1..3) {
      delay(1000)
      emit(i)
    }
  }
}

冷流

flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的才运行。

Flow的连续性

  • 流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
  • 从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。

Flow的构建器

  • flowOf构建器定义了一个发射固定值集的流。
  • 使用.asFlow()扩展函数,可以将各种集合与序列转换为流。
class FlowTest {
  @Test
  fun `test flow builder`() = runBlocking<Unit> {
    flowOf("one", "two", "three")
      .onEach { delay(1000) }
      .collect { value -> println(value) }
    (1..3).asFlow().collect { value -> println(value) }
  }
}

Flow的上下文

  • 流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存
  • flow{…}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)。
  • flowOn操作符,该函数用于更改流发射的上下文。

启动流

  • 使用launchIn替换collect我们可以在单独的协程中启动流的收集。

Flow的取消

  • 流采集用与协程同样的协作取消,像往常一样,流的收集可以是当流在一个可取消的挂起函数(例如delay)中挂起的时候取消。
//该方法不会输出3这条数据,因为协程已经被取消了
class FlowTest {
  fun simpleFlow6() = flow<Int> {
    for (i in 1..3) {
      delay(1000)
      println("Emitting $i")
      emit(i)
    }
  }
  @Test
  fun `test cancel flow`() = runBlocking<Unit> {
    //流在超时的情况下取消并停止执行
    withTimeoutOrNull(2500) {
      simpleFlow6().collect { value -> println(value) }
    }
    println("Done")
  }
}

Flow的取消检测

  • 为方便起见,流构建器对每个发射执行附加的ensureActive检测以进行取消,这意味着从flow{…}发出的繁忙循环是可以取消的。
  • 出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程出于繁忙循环的情况下,必须明确检测是否取消。可以通过cancellable操作符来执行此操作。
class FlowTest {
  fun simpleFlow7() = flow<Int> {
    for (i in 1..5) {
      emit(i)
      println("Emitting $i")

    }
  }
  @Test
  fun `test cancel flow check`() = runBlocking<Unit> {
//        simpleFlow7().collect { value ->
//            println(value)
//            if (value == 3) cancel()
//        }
    //不增加cancellable会导致取消失败,虽然会抛出异常但是不能阻止4和5输出
    (1..5).asFlow().cancellable().collect() { value ->
      println(value)
      if (value == 3) cancel()
    }
  }
}

解决背压问题的四种方式

  1. 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显式地请求缓冲而不改变执行上下文
  2. buffer():并发运行流中发射元素的代码。
  3. conflate():合并发射项,不对每个值进行处理。
  4. collectLatest():取消并重新发射最后一个值。
class FlowTest {
  fun simpleFlow8() = flow<Int> {
    for (i in 1..3) {
      delay(100)
      emit(i)
      println("Emitting $i ${Thread.currentThread().name}")
    }
  }
  @Test
  fun `test flow back pressure`() = runBlocking<Unit> {
    val time = measureTimeMillis { //计算总耗时
      simpleFlow8()
        /*
        方式一和方式二是为了提高生产效率,解决背压问题
         */
        //.flowOn(Dispatchers.Default) //方式一:切换线程
        //.collect { value ->
          
        //.buffer(50) //方式二:增加缓存,但线程不变
        //.collect { value ->
        /*
        方式三和方式四是为了提高消费效率,解决背压问题
         */
        //.conflate() //方式三:跳过中间的值,2不会被收集
        //.collect { value ->
          
        .collectLatest { value -> //方式四:只收集最后一个值
          delay(300)
          println("Collected $value ${Thread.currentThread().name}")
        }
    }
    println("Collected in $time ms")
  }
}

flow的操作符

过渡流操作符
  • 可以使用操作符转换流,就像使用集合与序列一样。
  • 过渡操作符应用于上游流,并返回下游流。
  • 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。
  • 它运行的速度很快,返回新的转换流的定义。
class FlowTest {
  fun numbers() = flow<Int> {
    try {
      emit(1)
      emit(2)
      println("This line will not execute")
      emit(3)
    } finally {
      println("Finally in numbers")
    }
  }
  @Test
  fun `test transform flow operator`() = runBlocking<Unit> {
    //转换操作符:map:1换1
    (1..3).asFlow()
      .map { request -> "respone $request" }
      .collect { value -> println(value) }
    //转换操作符:transform:1换n
    (1..3).asFlow()
      .transform { request ->
        emit("Making request $request")
        emit("respone $request")
      }
      .collect { value -> println(value) }
    //限长操作符:take:取指定的前几个元素
    numbers().take(2)
      .collect { value -> println(value) }
  }
}
末端流操作符

末端操作符是在流上用于启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:

  1. 转化为各种集合,例如toList与toSet。
  2. 获取第一个(first)值与确保流发射单个(single)值的操作符。
  3. 使用reduce与fold将流规约到单个值。
class FlowTest {
  @Test
  fun `test terminal operator`() = runBlocking<Unit> {
    val sum = (1..5).asFlow()
      .map { it * it }
      .reduce { a, b -> a + b }
    println(sum)
  }
}
组合多个流
  • 就像Kotlin标准库中的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值。
class FlowTest {
  @Test
  fun `test zip`() = runBlocking<Unit> {
    val numbers = (1..3).asFlow().onEach { delay(300) }
    val strs = flowOf("One", "Two", "Three").onEach { delay(400) }
    val startTime = System.currentTimeMillis()
    numbers.zip(strs) { a, b -> "$a ->$b" }.collect {
      println("$it at ${System.currentTimeMillis() - startTime} ms from start")
    }
  }
}
展平流

流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不用的展平模式,因此,存在一系列的流展平操作符:

  1. flatMapConcat连接模式;
  2. flatMapMerge合并模式;
  3. flatMapLatest最新展平模式。
class FlowTest {
  fun requestFlow(i: Int) = flow<String> {
    emit("$i:First")
    delay(500)
    emit("$i:Second")
  }
  /**输出:
  1:First at 125 ms from start
  1:Second at 632 ms from start
  2:First at 737 ms from start
  2:Second at 1242 ms from start
  3:First at 1345 ms from start
  3:Second at 1849 ms from start
   */
  @Test
  fun `test flatMapConcat`() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
      .flatMapConcat { requestFlow(it) }
      .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
  }
  /*输出:
  1:First at 170 ms from start
  2:First at 268 ms from start
  3:First at 372 ms from start
  1:Second at 675 ms from start
  2:Second at 772 ms from start
  3:Second at 879 ms from start
   */
  @Test
  fun `test flatMapMerge`() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
      .flatMapMerge { requestFlow(it) }
      .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
  }
  /*输出:
  1:First at 163 ms from start
  2:First at 297 ms from start
  3:First at 403 ms from start
  3:Second at 907 ms from start
   */
  @Test
  fun `test flatMapLatest`() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) }
      .flatMapLatest { requestFlow(it) }
      .collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
  }
}

流的异常处理

当运算符中的发射器或代码抛出异常时,有几种处理异常的方法:

  1. try/catch块
  2. catch函数
class FlowTest {
  fun simpleFlow10() = flow<Int> {
    for (i in 1..3) {
      println("Emitting $i")
      emit(i)
    }
  }
  @Test
  fun `test flow exception`() = runBlocking<Unit> {
    flow {
      throw ArithmeticException("Div 0")
      emit(1)
    }.catch { e: Throwable -> //捕获上游异常
      println("Caught $e")
      emit(0) //异常时发射默认值
    }
      .flowOn(Dispatchers.IO)
      .collect { println(it) }
  }
}

流的完成

当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。

  1. 命令式finally块
  2. onCompletion声明式处理
class FlowTest {
  fun simpleFlow12() = flow<Int> {
    emit(1)
    throw RuntimeException()
  }
  @Test
  fun `test flow complete in onCompletion`() = runBlocking<Unit> {
    simpleFlow12()
      .onCompletion { exception -> //不管是否抛出异常都会收到
        if (exception != null) println("Flow completed exceptionall")
      }
      .collect { println(it) }
  }
}

callbackFlow

//工具方法
fun LifecycleOwner.watcherText(
    textView: TextView,
    callback: (str: String?) -> Unit
) {
    lifecycleScope.launch {
        textView.textWatcherFlow().collect {
            callback.invoke(it)
        }
    }
}
fun TextView.textWatcherFlow() : Flow<String?> = callbackFlow<String?> {
    val textWatcher = object : TextWatcher {
        override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {}
        override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {}
        override fun afterTextChanged(s: Editable?) {
            trySend(s?.toString())
        }
    }
    addTextChangedListener(textWatcher)
    awaitClose { removeTextChangedListener(textWatcher) }
}
//使用示例
class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val etHello = findViewById<EditText>(R.id.et_hello)
        watcherText(etHello) {
            println("current result is $it")
        }
    }
}

使用LiveData解决Flow嵌套

Flow与Retrofit应用

以上图为例:用一个Flow实时获取EditText中输入的字符,用另一个Flow获取从服务端得到的消息,但是需要将第一个Flow获取到的字符作为第二个Flow的参数,这里就存在了Flow嵌套的情况,想要解决这种情况,可以如图所示,将第一个Flow的数据塞到LiveData中,然后通过观察LiveData去collect第二个Flow,从而解决Flow嵌套的情况

StateFlow

StateFlow是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其value属性读取当前状态值。

class NumberViewModel : ViewModel() {
    val mStateFlow = MutableStateFlow(0)
    fun increment() {
        mStateFlow.value++
    }
    fun decrement() {
        mStateFlow.value--
    }
}

class MainActivity : AppCompatActivity() {
    private val numberViewModel = NumberViewModel()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        findViewById<AppCompatButton>(R.id.btn_increment).setOnClickListener {
            numberViewModel.increment()
        }
        findViewById<AppCompatButton>(R.id.btn_decrement).setOnClickListener {
            numberViewModel.decrement()
        }
        lifecycleScope.launchWhenCreated {
            numberViewModel.mStateFlow.collect {
                println("current result is $it")
            }
        }
    }
}

SharedFlow

SharedFlow会向从其中收集值的所有使用方发出数据。

//
object LocalEventBus {
    val events = MutableSharedFlow<Event>()
    suspend fun postEvent(event: Event) {
        events.emit(event)
    }
}
//
data class Event(val timestamp: Long)
//
class SharedFlowViewModel : ViewModel() {
    private lateinit var job: Job
    fun startRefresh() {
        job = viewModelScope.launch(Dispatchers.IO) {
            while (true) {
                LocalEventBus.postEvent(Event(System.currentTimeMillis()))
            }
        }
    }
    fun stopRefresh() {
        job.cancel()
    }
}
//在多个地方收集events,每个地方都会收到消息
lifecycleScope.launchWhenCreated {
    LocalEventBus.events.collect {
        println("current result is ${it.timestamp}")
    }
}
//在一个地方发起刷新
SharedFlowViewModel().startRefresh()

Channel

Channel定义

摘自https://juejin.cn/post/7048919023580348430

//Channel本质上是一个接口:
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
//Channel不可以直接实例化。但是可以通过官方提供的函数进行创建:
public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
  • capacity:capacity是channel的容量,kotlin为我们定义了几种常量:

    • RENDEZVOUS:只有消费端调用时,才会发送数据,否则挂起发送操作。这也是channel的默认类型。
    • CONFLATED:缓冲区满时,永远用最新元素替代,之前的元素将被废弃。可以理解为是onBufferOverflow等于DROP_OLDEST的快捷创建版。capacity是这个参数时,onBufferOverflow参数只能为BufferOverflow.SUSPEND。
    • UNLIMITED:无限制容量,缓冲队列满后,会直接扩容,直到OOM。
    • BUFFERED:默认创建64位容量的缓冲队列,当缓存队列满后,会挂起发送数据,直到队列有空余。我们也可以直接传递一个数值,来创建指定缓冲大小的channel。
  • onBufferOverflow:指定当缓冲区满的时候的背压策略。有3种选择:

    • SUSPEND:挂起。
    • DROP_OLDEST:丢弃最旧的元素。
    • DROP_LATEST:丢弃最新的元素。
  • onUndeliveredElement:指定数据发送但是接收者没有收到的时候的回调。

Channel基本使用

Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信。注意:队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用receive并取走函数,send就需要挂起。

//示例:故意让接收端的节奏放慢,发现send总是会挂起,直到receive之后才会继续往下执行。
class CoroutineTest {
    @Test
    fun `test know channel`() = runBlocking<Unit> {
        val channel = Channel<Int>()
        //生产者
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            while (true) {
                delay(2000)
                val element = channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer, consumer)
    }
}

迭代Channel

class CoroutineTest {
    @Test
    fun `test iterate channel`() = runBlocking<Unit> {
        val channel = Channel<Int>(Channel.UNLIMITED)
        //生产者
        val producer = GlobalScope.launch {
            for (x in 1..5) {
                channel.send(x * x)
                println("send ${x * x}")
            }
        }
        //消费者
        val consumer = GlobalScope.launch {
            /*val iterator = channel.iterator()
            while (iterator.hasNext()){
                val element = iterator.next()
                println("receive $element")
                delay(2000)
            }*/
            for (element in channel) {
                println("receive $element")
                delay(2000)
            }
        }
        joinAll(producer, consumer)
    }
}

produce与actor

构造生产者与消费者的便捷方法。我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel, 其他协程就可以用这个Channel来接收数据了。反过来,我们可以用actor启动一个消费者协程。

class CoroutineTest {
    @Test
    fun `test fast producer channel`() = runBlocking<Unit> {
        val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce<Int> {
            repeat(100) {
                delay(1000)
                send(it)
            }
        }
        val consumer = GlobalScope.launch {
            for (i in receiveChannel) {
                println("received: $i")
            }
        }
        consumer.join()
    }
    @Test
    fun `test fast consumer channel`() = runBlocking<Unit> {
        val sendChannel: SendChannel<Int> = GlobalScope.actor<Int> {
            while (true) {
                val element = receive()
                println(element)
            }
        }
        val producer = GlobalScope.launch {
            for (i in 0..3) {
                sendChannel.send(i)
            }
        }
        producer.join()
    }
}

Channel的关闭

  • produce和actor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称之为热数据流。
  • 对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的isClosedForSend会立即返回true。而由于Channel缓冲区的存在,这时候可能还有些元素没有被处理完,因此要等所有的元素都被读取之后 isclosedForReceive才会返回true。
  • Channel的生命周期最好由主导方来维护,即建议由事件发起的一方实现关闭。
class CoroutineTest {
    @Test
    fun `test close channel`() = runBlocking<Unit> {
        val channel = Channel<Int>(3)
        //生产者
        val producer = GlobalScope.launch {
            List(3) {
                channel.send(it)
                println("send $it")
            }
            channel.close()
            println(
                """close channel. 
                | -CloseForSend:${channel.isClosedForSend} 
                | -CloseForReceive: ${channel.isClosedForReceive}""".trimMargin()
            )
        }
        //消费者
        val consumer = GlobalScope.launch {
            for (element in channel) {
                println("receive $element")
                delay(1000)
            }
            println(
                """After Consuming. 
                | -CloseForSend:${channel.isClosedForSend} 
                | -CloseForReceive: ${channel.isClosedForReceive}""".trimMargin()
            )
        }
        joinAll(producer, consumer)
    }
}

BroadcastChannel

定义
//BroadcastChannel和Channel的区别在于,BroadcastChannel没有实现ReceiveChannel
public interface BroadcastChannel<E> : SendChannel<E>

前面提到,发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到,而广播则不然,多个接收端不存在互斥行为。示例如下:

class CoroutineTest {
    @Test
    fun `test broadcast`() = runBlocking<Unit> {
        //BroadcastChannel的容量不能是UNLIMITED和RENDEZVOUS
        val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
        //也可以用以下两句替换:
        //val channel = Channel<Int>()
        //val broadcastChannel = channel.broadcast(2)
        val producer = GlobalScope.launch {
            List(2) {
                delay(100)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }
        List(2) { index ->
            GlobalScope.launch {
                //每个协程都去订阅Channel,所以每个协程都能收到Channel中的信息
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel) {
                    /** 输出值:
                    [#0] received: 0
                    [#1] received: 0
                    [#0] received: 1
                    [#1] received: 1
                     */
                    println("[#$index] received: $i")
                }
            }
        }.joinAll()
    }
}

多路复用

数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往会大于传输单一信号的需求,为了有效地利用通信线路, 希望一个信道同时传输多路信号,这就是所谓的多路复用技术(Multiplexing)。

复用多个await

两个API分别从网络和本地缓存获取数据,期望哪个先返回就先用哪个做展示。

//数据实体
data class User(val name: String, val address: String)
//Api
interface UserServiceApi {
    @GET("/webapi/test")
    fun getUser(@Query("name") name: String): Call<User?>
}
val userServiceApi: UserServiceApi = Retrofit.Builder()
    .baseUrl("http://127.0.0.1")
    .addConverterFactory(GsonConverterFactory.create())
    .build()
    .create(UserServiceApi::class.java)
//gson解析
private val gson = Gson()
//文件内容:{"name":"ZhangSan","address":"China"}
private const val cachePath = "/test.json"
//响应实体
data class Response<T>(val value: T, val isLocal: Boolean)
//从本地获取数据
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
    delay(1000)//故意延迟
    File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
//从远端获取数据
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    userServiceApi.getUser(name)
}
//测试哪端先返回数据就用该端的数据
class CoroutineTest {
    @Test
    fun `test select await`() = runBlocking<Unit> {
        GlobalScope.launch {
            val localRequest = getUserFromLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")
            val userResponse = select<Response<User?>> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait {
                    try {
                        Response(it.await(), false)
                    } catch (e: Exception) {
                        Response(User(e.message ?: "", ""), false)
                    }
                }
            }
            userResponse.value?.let { println(it) }
        }.join()
    }
}

复用多个Channel

使用select函数会接收到最快的那个channel消息。

class CoroutineTest {
    @Test
    fun `test select channel`() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }
        GlobalScope.launch {
            delay(50)
            channels[1].send(100)
        }
        val result = select<Int?> {
            channels.forEach { channel ->
                channel.onReceive { it } //onReceive函数的类型是SelectClause1
            }
        }
        println(result) //输出:100
    }
}

SelectClause

我们怎么知道哪些事件可以被select呢?其实所有能够被select的事件都是SelectClauseN类型,包括:

  1. SelectClause0:对应事件没有返回值,例如join没有返回值,那么onJoin就是SelectClauseN类型。使用时,onJoin的参数是一个无参函数。
  2. SelectClause1:对应事件有返回值,前面的onAwait和onReceive都是此类情况。
  3. SelectClause2:对应事件有返回值,此外还需要一个额外的参数,例如:Channel.onSend有两个参数,第一个是Channel数据类型值,表示即将发送的值;第二个就是发送成功时的回调参数。

如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可。

class CoroutineTest {
    @Test
    fun `test selectClause0`() = runBlocking<Unit> {
        val job1 = GlobalScope.launch {
            delay(100)
            println("job 1")
        }
        val job2 = GlobalScope.launch {
            delay(10)
            println("job 2")
        }
        select<Unit> {
            job1.onJoin { println("job 1 onJoin") }
            job2.onJoin { println("job 1 onJoin") }
        }
        delay(1000)
    }
    @Test
    fun `test selectClause2`() = runBlocking<Unit> {
        val channels = listOf(Channel<Int>(), Channel<Int>())
        println(channels)
        launch(Dispatchers.IO) {
            select<Unit> {
                launch {
                    delay(10)
                    channels[1].onSend(200) { sentChannel ->
                        println("sent on $sentChannel")
                    }
                }
                launch {
                    delay(100)
                    channels[0].onSend(100) { sendChannel ->
                        println("send on $sendChannel")
                    }
                }
            }
        }
        //至少要有一个接收者,否则就会一直等待
        GlobalScope.launch {
            println(channels[0].receive())
        }
        GlobalScope.launch {
            println(channels[1].receive())
        }
        delay(1000)
    }
}

使用Flow实现多路复用

多数情况下,我们可以通过构造合适的Flow来实现多路复用的效果。

class CoroutineTest {
    @Test
    fun `test select flow`() = runBlocking<Unit> {
        //函数 ->协程 -> Flow -> Flow合并
        val name = "guest"
        coroutineScope {
            listOf(::getUserFromLocal, ::getUserFromRemote)
                .map { function ->
                    //这里使用了kotlin的反射功能,需要添加依赖:"org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
                    function.call(name)
                }.map { deferred ->
                    flow { emit(deferred.await()) }
                }.merge().collect { user -> println(user) }
        }
    }
}

并发安全

我们使用线程在解决并发问题的时候总是会遇到线程安全的问题,而Java平台上的 Kotlin协程实现免不了存在并发调度的情况,因此线程安全同样值得留意。

class CoroutineTest {
    @Test
    fun `test not safe concurrent`() = runBlocking<Unit> {
        var count = 0
        List(1000) {
            GlobalScope.launch { count++ }
        }.joinAll()
        println(count) //存在并发问题,无法得到1000
    }
    @Test
    fun `test safe concurrent`() = runBlocking<Unit> {
        var count = AtomicInteger(0)
        List(1000) {
            GlobalScope.launch { count.incrementAndGet() }
        }.joinAll()
        println(count)
    }
}

并发工具

除了我们在线程中常用的解决并发问题的手段之外,协程框架也提供了一些并发安全的工具,包括:

  1. Channel:并发安全的消息通道。
  2. Mutex:轻量级锁,它的lock和unlock从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
  3. Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作。当Semaphore的参数为1时,效果等价于Mutex。
class CoroutineTest {
    @Test
    fun `test safe concurrent tools`() = runBlocking<Unit> {
        var count = 0
        val mutex = Mutex()
        List(1000) {
            GlobalScope.launch {
                mutex.withLock {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }
    @Test
    fun `test safe concurrent tools2`() = runBlocking<Unit> {
        var count = 0
        val semaphore = Semaphore(1)
        List(1000) {
            GlobalScope.launch {
                semaphore.withPermit {
                    count++
                }
            }
        }.joinAll()
        println(count)
    }
}

添加新评论