This text is based on 1.3.7 version of kotlinx.coroutines library
We already know what cold & hot streams are (and if you don’t, check the previous article, link below). Now, let’s discover our first Kotlin Stream implementation: Flows.
Again, you can see some examples by checking the GitHub Repository. There are very small and simple cases that will help you to understand how to use Flows and some other stuff.
Remember that this article belongs to a series:
- Part 1: Cold & Hot Streams.
- Part 2: Flows.
- Part 3: Channels.
- Part 4: BroadcastChannels.
- Part 5: StateFlows and conclusions.
Let’s go with part two: Flows!
Flows
What is a Flow?
A Flow is used to represent a sequential emission of values that, at some point, ends (because it naturally ends or something bad happens).
All the operations in a Flow are executed sequentially inside the same block of code (and, therefore, the same Coroutine Scope).
An important characteristic of Flow is that it only starts emitting values when someone asks for them. Before someone subscribes (or performs a collection), the Flow doesn’t run its code.
So, a Cold Stream, right?
Well, if we look at the documentation:
The Flow interface does not carry information whether a flow truly is a cold stream that can be collected repeatedly and triggers execution of the same code every time it is collected, or if it is a hot stream that emits different values from the same running source on each collection. However, conventionally flows represent cold streams.
They’re not very definitive. We can assume that yes, they’re cold streams, but we’ll revisit this statement later.
Behind the scenes, a Flow is just an interface that exposes a method to collect the emitted values:
Upstream and Downstream Flows
The upstream and downstream concepts help us defining “which flow I will operate” and “which flow I will produce”.
It depends on which operator we consider. Both of them are a powerful concept, because all the operators and some flow properties are strongly related to these flows.
Flow Operators
There are two types of operators: intermediates and terminals.
Intermediate operators
They are functions that are applied to the upstream flow and return a downstream flow, where we can apply other operators.
These operators don’t execute code against the flow, instead, they use the flow that they receive to apply some operations and return some data (in other words, return their version of the received flow).
Some operators don’t return any data, instead they just add some feature to the flow. We’re not going to talk about them, but just know that they exist. An example can be the cancellable one.
The intermediate operators are not suspendable functions, but they can work with suspendable functions inside. We can say that they are just used to create a chain of operations.
Some of them can be: map, filter, take, takeWhile, catch, and a lot more. I will not explain operators here because that’s not the point of the article, but you can find a lot of documentation out there.
Terminal operators
These operators are suspendable functions and their general purpose is to collect the values received from the upstream flow.
Also, they work as the trigger of a flow execution: if there is not a terminal operator applied to the flow, the flow doesn’t start its emission.
As the name mentions, they are terminal because you cannot apply another operator (of any type) if any of them are applied.
Some of them can be: collect, single, toList, and more. Again, I will not explain operators, you can check the docs!
Flow Properties
All flow implementations must adhere to two properties, described below.
1. Context preservation
A flow encapsulates its execution context and never propagates it to the collectors (the ones that are using the values that it emitted).
In summary:
- The context where the flow is emitting values is never leaked to the downstream flow receiver.
- The values are produced in only one coroutine scope.
Some flows allow more than one coroutine (so, potentially more than one Coroutine Scope). However, if we use the Kotlin Flow Builders, they guarantee the context preservation.
There is an exception, one way to change the context, by using the flowOnoperator, but I’m not going to talk about this because we can spend 3 or 4 more hours.
If you try to launch another coroutine in the same flow builder, just like this:
You will get an exception:
You can see, in the final line, that there is an approach to use more than one coroutine, and we’ll see it later.
2. Exception transparency
The implementations of flow must never catch or handle exceptions that occur in a downstream flow. For exception handling, we must use the catch operator, that’s designed to catch exceptions coming from the upstream flow.
An important thing that we can see here is that we never have to try-catch the emit() or emitAll() calls.
There are other operators to handle exceptions, like retry and retryWhen, and I suggest you to take a look at them.
What happens if you try, for example, to make something like this?
Well, we’ll get another error:
Flow’s creation
There are many builders to create a Flow that are provided by Kotlin. I’ll focus on what I consider the most important ones: flow and channelFlow. There are others, like flowOf() or asFlow(), so you can check them if you want.
flow { }
This builder creates a flow that performs sequential emissions.
All the emit() or emitAll() calls must be inside the brackets (in other words, inside the FlowCollector block).
Of course, for each terminal operator applied to it, a new execution of blockwill be triggered.
callbackFlow { } and channelFlow { }
Those builders create a flow that can have concurrent emissions (it can have sequential emissions too).
The difference between them is conceptual:
- We have to use callbackFlows when we need to wrap a callback on a Flow.
- We have to use channelFlows when we need concurrent flow emissions.
There is another difference related to them, that we’ll see later. Until version 1.3.4 of coroutines-core library, the were the same thing.
With those flows, we cannot perform emissions with the emit() or emitAll() methods, instead we have to use send() or offer(). What’s the difference between them? Well, we’ll have to wait until we take a look at Channels 😀.
Again, for each terminal operator applied to it, it triggers a new execution of block.
Both flows have a peculiarity: they end when the last line of block’s code is executed, and that means that we must ensure that no coroutines are still running when we reach that end. But, there are cases when we don’t want to finish the flow there, for example, we perform an API Call and we want to wait until we get the response. So, what we do here?
Prevent the end of channelFlow and callbackFlow
Well, we can solve this by calling the awaitClose() function inside block. This call must be the last statement inside block. By doing this, we’re saying “Wait until I explicitly finish the flow to end the emissions”.
We can cancel this flow wherever we want using close() method (of course, inside the block).
Remember I said that there was another difference between channelFlowand callbackFlow? Well, here we have it: if you don’t add the awaitClose() method call at the end of a callbackFlow, it will fail.
So, what’s happening behinds the scenes? How a Flow can potentially have more than 1 context? Why is the emit() method not allowed? We’ll see later.
Consuming values
We can consume the emissions of a Flow, mainly, in two ways.
Using terminal operators
There are various terminal operators that can be applied to the Flow. The most commons are collect(), single() and toList(). Each of them collect the values differently, but as I said, I’ll not talk about operators here, so go and check the docs by yourself :).
Using asLiveData()
You can consume the Flow as a LiveData.
The LiveData returned has the same type as the Flow. So, if you had a Flow of, for example, Int values, you will receive a LiveData of Int values.
This is very useful if you don’t pretend to send a Flow to your View, and you want to keep it updated using LiveData.
A disclaimer here: you have to include the androidx lifecycle ktx library, because this extension function belongs to it.
Flow’s cancellation
It works as a coroutine on this aspect: if the parent Job gets cancelled, the flow gets cancelled too. But there is something more because, as we may know, coroutines adheres to cooperative cancellation, so Flows adhere too. If you want to get more information about cooperative cancellation, you can check this series.
As we may also know, a cooperative cancellation involves several things, and one of those is that we must check if the parent Job is active or not, and cancel the flow emissions according to it. It’s not too convenient to check after every emission if the Job is active or not, and fortunately, we have a solution for this.
We can use the cancellable intermediate operator. This operator will check if the Job is still active after every emission. By default, all the flows created by the flow {} builder implement the cancellable operator.
What if we want to cancel the Flow explicitly? Well, there is no method to cancel a normal flow { }, but in case we’re using channelFlow { } builder (or callbackFlow { }), we can call close().
Practical use cases
I will enum some practical use cases, but there are a lot more for you to discover:
- Return data using multiple data sources, accessing each of them sequentially: for example: first I check if there is something on the cache and return it, then I call an API to get some new results, then I store those results on the database, and then I return the new results fetching from the database.
- Wrap API Callbacks and expose them as streams: using a callbackFlow we can wrap any callbacks and send the data received from them as a Stream.
- You need to scope the emissions to a lifecycle: these streams are handy when you’re trying to scope the emissions to certain component’s lifecycle. If, for example, you’re using MVVM and viewModelScope, all the Flows started with that scope will be cancelled when the ViewModel gets cleared.
- More more more.
What’s behind channelFlow?
ChannelFlows can have multiple coroutines running inside and could produce data from outside the stream. So, what’s behind?
Well, if we look at the internal implementation of ChannelFlow, we can see that there is something called ReceiveChannel.
As you may imagine, we have something like a Channel implementation behind the scenes. So, we’re using a Channel (hot stream?) inside a Flow (cold stream?) to give Flow (cold stream?) some Channel features (hot stream features?).
And that’s for now!
Hope that now you can understand what is a Flow and how it works. There’re a lot of articles out there, so you can keep exploring them to gain more knowledge.
In the next article, we’ll talk about Channels.
See you later! And, if you like it, you can share!
Special thanks to the MediaMonks Android team that gives feedback.