Thuy's Blog

How to test receiving events from a hot flow in Kotlin coroutines?

September 18, 2019

Problem

I’ve started to look into Kotlin coroutines recently as it has become increasingly more mainstream. With RxJava, BehaviorSubject can be used to create hot streams. Assume we emit few events through BehaviorSubject, testing whether the observer of that BehaviorSubject has received those events is straightforward with test(). test() will subscribe to the BehaviorSubject and return a TestObserver. We can use this TestObserver to verify the received values like below:

@Test
fun `should receive values from BehaviorSubject`() {
  val publisher = BehaviorSubject.createDefault(0)
  val observer: TestObserver<Int> = publisher.test()

  publisher.onNext(1)
  publisher.onNext(2)

  observer.assertValues(0, 1, 2)
}

How do we do that in Kotlin coroutines?

Solution

The Flow API (aka Kotlin flows) from Kotlin coroutines shares a lot of similarities with RxJava. So I’ve been trying to map everything I knew about RxJava to Kotlin flows. While the DataFlow API is still in progress, as far as I know, there are at least 2 ways to create hot streams (I will refer to hot flows from now on):

  • ConflatedBroadcastChannel which is analogous to BehaviorSubject.
  • callbackFlow() which can be used to bridge callback-based APIs into flows. It has a similar role as Observable.create().

ConflatedBroadcastChannel

As for ConflatedBroadcastChannel, in order to perform the exact same earlier test, I had to scratch my head over few places including some existing tests in the repo of Kotlin coroutines. Basically we will need to go through 3 steps:

  • Add a new dependency: org.jetbrains.kotlinx:kotlinx-coroutines-test.
  • Wrap the whole test body inside runBlockingTest().
  • Use launch() to collect events. launch() will return a Job. We need to cancel this job at the end of the test. Otherwise, UncompletedCoroutinesError may be thrown.
@Test
fun `should work with ConflatedBroadcastChannel`() = runBlockingTest {
  val publisher = ConflatedBroadcastChannel(0)
  val values = mutableListOf<Int>()

  val job = launch {
    publisher.asFlow().collect { values.add(it) }
  }
  expectThat(publisher.value).isEqualTo(0)

  publisher.offer(1)
  expectThat(values).contains(0, 1)
  expectThat(publisher.value).isEqualTo(1)

  publisher.offer(2)
  expectThat(values).contains(0, 1, 2)
  expectThat(publisher.value).isEqualTo(2)

  job.cancel()
}

How about callbackFlow?

We need a callback api first. Here I simulated the LocalBroadcastManager from Android SDK:

typealias BroadcastReceiver = (Int) -> Unit
typealias Intent = Int

class LocalBroadcastManager {
  private val receivers = mutableSetOf<BroadcastReceiver>()

  fun sendBroadcast(intent: Intent) {
    receivers.forEach { it(intent) }
  }

  fun registerReceiver(receiver: BroadcastReceiver) {
    receivers.add(receiver)
  }

  fun unregisterReceiver(receiver: BroadcastReceiver) {
    receivers.remove(receiver)
  }
}

Then, converting into Flow can be done like in the doc of callbackFlow:

fun LocalBroadcastManager.asFlow(): Flow<Intent> = callbackFlow {
  val receiver: BroadcastReceiver = { offer(it) }
  registerReceiver(receiver)
  awaitClose { unregisterReceiver(receiver) }
}

Testing that asFlow is pretty much similar to testing ConflatedBroadcastChannel.asFlow():

@Test
fun `should work with callbackFlow`() = runBlockingTest {
  val broadcastManager = LocalBroadcastManager()
  val values = mutableListOf<Int>()
  val job = launch {
    broadcastManager.asFlow().collect { values.add(it) }
  }

  broadcastManager.sendBroadcast(0)
  expectThat(values).contains(0)

  broadcastManager.sendBroadcast(1)
  expectThat(values).contains(0, 1)

  broadcastManager.sendBroadcast(2)
  expectThat(values).contains(0, 1, 2)

  job.cancel()
}

What to improve?

Testing Kotlin flows isn’t as intuitive as test() from RxJava but it can be improved. For example, one way is to introduce a new TestCollector:

class TestCollector<T> {
  private val values = mutableListOf<T>()

  fun test(scope: CoroutineScope, flow: Flow<T>): Job {
    return scope.launch { flow.collect { values.add(it) } }
  }

  fun assertValues(vararg _values: T) {
    expectThat(values).contains(_values.toList())
  }
}

A test with TestCollector can be like:

@Test
fun `should work with TestCollector`() = runBlockingTest {
  val broadcastManager = LocalBroadcastManager()
  val collector = TestCollector<Intent>()
  val job = collector.test(this, broadcastManager.asFlow())

  broadcastManager.sendBroadcast(0)
  collector.assertValues(0)

  broadcastManager.sendBroadcast(1)
  collector.assertValues(0, 1)

  broadcastManager.sendBroadcast(2)
  collector.assertValues(0, 1, 2)

  job.cancel()
}

Also, having a way to auto-cancel jobs after test makes things a bit neater. I’ve seen such an approach using JUnit 4 rules and JUnit 5 extensions on Medium.

Sample code is available on my GitHub.


Thuy Trinh

Written by Thuy Trinh who lives and works in Frankfurt, Germany building robust Android apps. You should follow him on Twitter