Blog Infos
Author
Published
Topics
, ,
Published

In my previous post, I described how RxJava actually works. For a long time, it has been a non-official standard and the main tool for working with streams and multithreading in Android. But now we have an alternative from Jetbrains, recommended by Google — .

Despite the fact that have been rising a lot of questions among developers, nowadays almost every new project no longer contains a dependency on . And played a significant role in this.

What this post is about

In this post, I will tell you how works. You will learn about the basic principle and its lifecycle, as well as about dispatching.

This post is addressed both to those who take their first steps trying to understand, and those who have years of practice.

The chain
CoroutineScope(context = Dispatchers.Main.immediate).launch() {
doAction()
flowOf("Hey")
.onEach { doAction() }
.map { it.length }
.onStart { doAction() }
.flowOn(Dispatchers.Default)
.flatMapMerge {
doAction()
flowOf(1)
.flowOn(Dispatchers.Main)
.onEach { doAction() }
}
.flowOn(Dispatchers.IO)
.collect {
doAction()
}
}
}
view raw Chain.kt hosted with ❤ by GitHub

Our goal is to determine in which sequence and on which thread each action will be performed.

Basics and lifecycle

is built on just two interfaces:

public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
view raw Flow.kt hosted with ❤ by GitHub

Together, they form the basis for the Consumer & Producer pattern implementation.

Before going further, I highly recommend you to take a look at an example of writing a stream API, based on the two interfaces.

Each chain consists of a specific set of operators. Each operator creates a new instance but at the same time stores a reference to another instance that is located above. Operators do not start until the collect method is called. (Cold operators).

The lifecycle is presented by five important stages:

  1. Launching ️️⬇️
    A coroutine launches on the Dispatcher we transmitted to CoroutineScope.
    After that, the following steps take place: Flow Creation, Operators Collection & Data Emission. The final result will be processed on the transmitted Dispatcher.
  2. Flow creation ⬇️
    Operators are created from top to bottom on the current execution thread. (Similar to the Builder pattern.)
  3. Operators collection ⬆️
    Performed from the bottom up. Each operator collects the upper one
  4. Data emission ⬇️
    Starts when all operators have successfully called collect at the top stream. Goes from the top down.
  5. Cancellation/Completion. The entire chain dies 😵
    Execution cancelled/Completed

Let’s take a closer look at each of the stages.

Launching
val job = CoroutineScope(Dispatchers.Main.immediate).launch {
doAction()
/*.../*
}
view raw Chain.kt hosted with ❤ by GitHub

Scope launching

 

Everything seems clear at the first glance. We create a scope that runs a coroutine on the Main thread. The method is also launched on this coroutine.

Scope returns which we can use to manage the lifecycle. (For example, we can stop all the work by executing the cancel() method.)

Role of immediate dispatcher

In Android, the only way to switch the thread to Main is to use the chain.

This logic hides behind , which hides behind

//handler here is created with Looper.mainLooper()
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}

Now suppose that we are already on the Main thread, but we are still switching to the Main thread using
In this case, our code will not be executed immediately, which may affect the user experience, for example, cause the screen to blink.

The code will have to go through the entire chain through that can be busy processing other commands. The main idea of is to skip this queue and execute the code immediately.

has an method that helps us out in such a case. In , this method is implemented as follows:

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}

creates a new instance at , which switches to true. As a result, the of the Main thread will always be compared with the of the current thread, thus preventing an extra call to .

Flow creation

The starting point of our chain is .
Under the hood, we can see that we explicitly create a new instance of and store the value in a lambda, which will be called at the collection stage:

import kotlinx.coroutines.flow.internal.unsafeFlow as flow
public fun <T> flowOf(value: T): Flow<T> = flow {
emit(value)
}
internal inline fun <T> unsafeFlow(crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
view raw Flow.kt hosted with ❤ by GitHub

After that, the operator will be created in the same way.
However, it is an extension and explicitly retains a reference to the previous .
All other operators up to are created in the same way and do not perform any actions.

The chain’s behaviour at the creation stage:
  1. flowOf(“Hey”)
    caches the transmitted value
  2. onEach { doAction() }
    caches the lambda that will be executed at the emission stage
  3. map {…}
    caches a lambda with a mapper
  4. onStart { doAction() }
    caches the lambda that will be executed at the collecting stage
  5. flowOn(Dispatchers.Default)
    сaches an assignment to the dispatcher
  6. flatMapMerge { … }
    caches the lambda that will be executed at the emission step
  7. flowOn(Dispatchers.IO)сaches an assignment to the dispatcher

As a result, each operator, except for the first one, contains a reference to the previous one, thus forming a .
Creation will be executed at the Main thread.

Job Offers

Job Offers

There are currently no vacancies.

OUR VIDEO RECOMMENDATION

Jobs

Collection

The collecting process goes from the bottom up and starts immediately after the terminal operator call.

Terminal operators in flow:
  • collect()
  • first()
  • toList()
  • toSet()
  • reduce()
  • fold()

When we call , the collection does not apply to the entire chain, but only to the , which is higher.
Then calls collect to the operator whose reference it saved while creating — . This is why operators keep references to the upper streams.

The chain’s behaviour at the collection stage:
  1. flowOn(Dispatchers.IO)
    creates a new coroutine on IO, changes the context to the created coroutine
    → calls collect to the upper stream
  2. flatMapMerge { … }
    → calls collect to the upper stream
  3. flowOn(Dispatchers.Default)
    creates a new coroutine on Default, changes the context to the created coroutine
    → calls collect to the upper stream
  4. onStart { doAction() }
    executes an action on the Default dispatcher
    → calls collect to the upper stream
  5. map { … }
    → calls collect to the upper stream
  6. onEach { … }
    → calls collect to the upper stream

 

 

Of all the operators, only one and two were executed.
The thread context changed twice: first to , and then to .
That is, will be executed as many times as it is written and will create a few instances of coroutines.

⚠️ However, does not always create new coroutine under the hood. Let’s take a look at the example below.

CoroutineScope(Dispatchers.Main.immediate).launch {
flowOf("Hey")
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.onStart { doAction() }
.flowOn(Dispatchers.IO)
.collect()
}
view raw flowOn.kt hosted with ❤ by GitHub

We have intentionally written with the same dispatcher many times.

Result

//onStart1
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main
//onStart2
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main
//onStart3
_____________________________________
Job: ProducerCoroutine{Active}@53f45ab)
Thread: DefaultDispatcher-worker-1,5,main
view raw Result hosted with ❤ by GitHub

As you can see, only one instance of the coroutine has been created, and it is linked to one thread.

Emission

As soon as we reach that does not have a reference to the parent stream, the emission process starts. It goes from the root to the lowest one.

  1. flowOf(“Hey”)
    → emits hey, Default Dispatcher
  2. onEach { doAction() }
    performs an action, Default Dispatcher
  3. map {…}
    does mapping, Default Dispatcher
  4. onStart { doAction() }
    emits 3, Default Dispatcher
  5. flowOn(Dispatchers.Default)
    emits 3, Default Dispatcher
  6. flatMapMerge { … }
    This one is tricky. First, let’s recall its contents:
...
flatMapMerge {
doAction()
flowOf(1)
.flowOn(Dispatchers.Main)
.onEach { doAction() }
}
view raw flatMapMerge.kt hosted with ❤ by GitHub

The chain inside will go through all the stages of , and . After that, the final value will be emitted to the downstream.

 

 

Note that will be executed on . (It will be restored before the block is executed.)

Unlike , implements the concept of , which ensures that the context of the upper streams cannot affect the lower ones.

7. flowOn(Dispatchers.IO)emits 1, IO Dispatcher
8
. collect
invokes collector on Main despite changed context in the upstream

Conclusion
  • The method is suspended, which forces us to decide in advance on which context the result of our chain will be processed.
  • All operators are created from top to bottom and call collect to each other one by one from bottom to top. Thus they form a singly .
    Emission goes from top to bottom starting from the root .
  • Some operators may be executed during the collection stage. For instance, is executed as many times as it’s written.
  • creates a new coroutine transmitting the Dispatcher in arguments and also changes the context. (However, if we have several flowOns with the same dispatcher, only one coroutine will actually be created.)
    It affects only the upper streams, thereby guaranteeing compliance with the principle of .
    Both the creation of a coroutine and the change of context can be performed at two stages: collection and emission.
  • One thread can be used between multiple coroutines. If you have written , a new coroutine will surely be created if the current context is different. However, there is no guarantee that the thread will be different from the thread of the previous coroutine.
  • starts the chain only during parent chain data emission.
    No actions are performed during the root stream collection process.

In my next post, I’ll show how coroutines work under the hood and why they are more efficient than regular threads.

Stay tuned!

Thank you to @kost.maksym, who reviewed the content.

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
It’s one of the common UX across apps to provide swipe to dismiss so…
READ MORE
blog
Hi, today I come to you with a quick tip on how to update…
READ MORE
blog
Automation is a key point of Software Testing once it make possible to reproduce…
READ MORE
blog
Drag and Drop reordering in Recyclerview can be achieved with ItemTouchHelper (checkout implementation reference).…
READ MORE

Leave a Reply

Your email address will not be published. Required fields are marked *

Fill out this field
Fill out this field
Please enter a valid email address.

Menu