Blog Infos
Author
Published
Topics
Published
Creating flows

The standard library provides you with several ways to create a flow, the easiest way is to use the flow operator:

fun numbersFlow(): Flow<Int> = flow {
    // code inside flow can suspend!
    for (i in 1..3) { 
        delay(100)
        emit(i)
    }
} // the function numbersFlow() itself does not suspend

There are other ways to create a Flow, including the flowOf() function, and the extension asFlow(), which can be used for collections, sequences, ranges, values or functional types:

flowOf(1,2,3)
listOf(“A”,“B”,“C”).asFlow()
(1..3).asFlow()
Collecting flows

Flows are cold, which means that the code inside a flow builder does not execute until a terminal operator is applied to the flow. The most common terminal operator is collect:

someCoroutineScope.launch {
    numbersFlow.collect { value -> print(value) }
}
  • first() and last() return the respective element or NoSuchElementException. There are also variants that take a predicate as an argument or return null instead of throwing an exception;
  • single() awaits for just one value to be emitted and can throw NoSuchElementException like first() or IllegalStateException if the flow contains more than one element. A singleOrNull() variation also exists;
  • reduce() and fold() can be used to reduce a flow to a single value.
someCoroutineScope.launch {
    val job = numbersFlow
        .onEach { value -> println(value) }
        .launchIn(scope = this)
}
Collecting flows in Android

In Android, usually you’ll end up collecting a Flow in the presentation layer of your app, i.e. a Fragment or an Activity. Obviously, when the UI is not on the screen and we don’t need to display the information we’re getting, so we should stop flow collection. There are several lifecycle-aware alternatives you could use to handle this automatically:

  • Lifecycle.repeatOnLifeCycle(state)
  • Flow<T>.flowWithLifecycle(lifecycle, state)
class MyActivity: AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        // ...
        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.someFlow().collect {
                    // flow is collected during the time
                    // we’re in the STARTED state.
                }
            } 
            // resumes only when lifecycle is DESTROYED!!!
        }
    }
}

Note that the repeatOnLifecycle suspends the coroutine until the lifecycle reaches the DESTROYED state. To collect several flows, you can launch several coroutines:

repeatOnLifecycle(Lifecycle.State.STARTED) {
    launch { viewModel.someFlow().collect { /* ... */ } }
    launch { viewModel.someOtherFlow().collect { /* ... */ } }
}

If you only need to collect a single flow, flowWithLifecycle can be used:

lifecycleScope.launch {
    viewModel.someFlow()
        .flowWithLifecycle(lifecycle, State.STARTED)
        .collect { /* ... */ }
}
Intermediate operators

The behaviour of a flow can be modified by applying different operators to it, just like with collections and sequences. Operators are not suspending functions: they are immediately applied to an upstream flow (before/above the operator) and return a new, transformed flow, but are executed lazily as each item is emitted into the flow.

Context

Unless special intermediate operators that use multiple flows are used, all emitted values are processed by the operators sequentially from upstream to downstream and then delivered to the terminal operator.

fun numbersFlow() = flow { // runs in Dispatchers.Default
        for (i in 1..3) {
            Thread.sleep(100) // some long-running operation
            emit(i)
        }
    }
    .onEach { /* still in Dispatchers.Default */ }
    .flowOn(Dispatchers.Default) // changing dispatcher
lifecycleScope.launch(Dispatchers.Main) {
    numbersFlow()
        .onEach { /* in Dispatchers.Main */ }
        .collect { value ->  /* Dispatchers.Main */ }
}

Collection and emission is now happening concurrently in different coroutines. Remember that the flowOn operator only changes the coroutine context in the builder and operators applied upstream of it.

Job Offers

Job Offers


    API Engineer

    American Express
    London
    • Full Time
    apply now

    Android Engineer

    American Express
    Phoenix, USA
    • Full Time
    apply now

    Mobile Developer Android (m/w/d) Firebase

    REWE digital
    Köln / Cologne
    • Full Time
    apply now
Load more listings

OUR VIDEO RECOMMENDATION

Jobs

Buffering

As stated above — flows execute code in the building block, operator and terminal operator sequentially:

flowOf("A", "B", "C")
    .onEach  { print("1$it ") }
    .collect { print("2$it ") }
// prints "1A 2A 1B 2B 1C 2C"

As a consequence — the total execution time is going to be the sum of the execution times of all the operators. To prevent waiting for all upstream operators to complete on each emitted value instead of changing the coroutine context with the flowOn(), you can apply the buffer() operator:

flowOf("A", "B", "C")
    .onEach  { print("1$it ") }
    .buffer()
    .collect {
        delay(100)
        print("2$it ")
    }
// prints "1A 1B 1C 2A 2B 2C "
someFlow()
    .collectLatest { value -> 
        println("Collecting $value")
        delay(300) // pretend we are processing it for 300 ms
        println("Done $value")
    }
Exceptions

Flow collection can complete with an exception if it’s thrown inside the building block or any of the operators. Exceptions can be handled in both imperative and declarative ways:

fun numbers() = flow {
    repeat(10) { emit(it) }
}
try {
    numbers().collect {
            check(it <= 1) { "Crashed on $it" }
            print("$it;")
        }
} catch (e:Exception) { 
    print("Exception caught(${e.message})!")
}
// prints "0;1;Exception caught(Crashed on 2)!"
fun doNotDoThis() = flow {
    repeat(10) {
        try { 
            emit(it)
        } catch(e: Exception) { 
            print("this violates exception transparency!") 
        }
    }
}
fun strings() = listOf("A","B","C").asFlow()
    .catch { cause: Throwable ->
        // Inside the catch block you can:
        // - ignore, log or process the exception
        // - emit a new value using emit
        // - re-throw the same exception or a new one using throw
    }
numbers()
    .onEach {
        check(it <= 1) { "Crashed on $it" }
        print("$it;")    
    }
    .catch { print("Exception caught(${it.message})!") }
    .collect()
// like the first example, this also prints:
// "0;1;Exception caught(Crashed on 2)!"
Completion

Sometimes, when a flow completes (either normally or by throwing an exception), you might need to perform some action. This can be done in two ways:

fun numbers() = (1..3).asFlow().map { "it;" }
try {
    numbers().collect { value -> print(value) }
} finally {
    print("Done")
} // prints "1;2;3;Done"

For a declarative approach, you can apply the intermediate operator onCompletion before collecting the flow:

numbers()
    .onCompletion { cause: Throwable? ->
        print("Done")
    }
    .collect { value -> print(value) }
// also prints "1;2;3;Done"
Cancellation

Flows adhere to the cooperative cancellation of coroutines (you can read more about coroutines and cooperative cancellation in my previous article). A flow collection can be canceled when the flow is suspended in a cancellable function (like delay).

.onEach { currentCoroutineContext().ensureActive() }
Integration with other libraries

Kotlin flows are supported in multiple libraries, including Retrofit, WorkManager, DataStore and Room.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}
StateFlow and SharedFlow

As mentioned before, Flows are cold by default. This means that if we subscribe to a Flow, the code in its builder will get executed each time we subscribe to it. This is something that you might not want to do when an Activity or Fragment goes through a configuration change. To solve this, StateFlow can be used.

data class SomeState(
    val isLoading: Boolean = false,
    val data: String? = null,
)
val myState = MutableStateFlow<SomeState>(SomeState())
fun onLoadClicked() {
    myState.update { currentState -> 
        currentState.copy(isLoading = true)
    }
    viewModelScope.launch {
        val newData = repository.loadData()
        myState.update { currentState ->
            currentState.copy(isLoading = true)
        }
    }
}
val stateFlow: StateFlow<SomeState> = someFlow
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = someInitialValue,
    )
  • The method observe() from LiveData automatically unregisters the consumer when the view enters the STOPPED state. Flow collection is not stopped automatically, but this behaviour can be easily achieved with the repeatOnLifecycle extension.
MutableSharedFlow(
    replay = 0,
    extraBufferCapacity = 0,
    onBufferOverflow = BufferOverfow.SUSPEND
)
private val _events = Channel<Event>()
val events = _events.receiveAsFlow() // expose as flow
suspend fun postEvent(event: Event) {
    _events.send(event) // suspends on buffer overflow
}
Foreword

Kotlin flows are simple in design, yet offer many possibilities. I’ve tried to describe the most common and important aspects about them, but feel free to comment with any feedback if you think I missed something that should be included in this article.

Other useful links

Thanks to Mario Sanoguera de Lorenzo.

 

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
I’m not going to explain how important Clean Architecture or Hilt are, there are…
READ MORE

Leave a Reply

Your email address will not be published. Required fields are marked *

Fill out this field
Fill out this field
Please enter a valid email address.

Menu