Photo by Marc Szeglat on Unsplash
Introduction
In the first part of this series (🔗 read here), we took a deep dive into Kotlin Coroutine Flows and explored the concept of Cold Flows. While Cold Flows are useful in many scenarios, there are situations where they fall short, especially in cases where data is generated continuously. In such cases, Hot Flows are a powerful alternative that can help you manage and react to data streams in real-time.
In this second part, we’ll explore Kotlin Coroutine Hot Flows in detail. We’ll start by introducing the concept of Hot Flows and explaining how they differ from Cold Flows. We’ll then provide practical examples of how to create, subscribe to, and manage Hot Flows in real-world scenarios. We’ll also discuss the potential pitfalls of using Hot Flows and how to properly manage their lifecycle.
By the end of this article, you’ll have a solid understanding of 🔥 Hot Flows and how to use them to build 🔍 reactive applications that can handle continuous data streams with ease. So let’s dive in and explore the world of Kotlin Coroutine Hot Flows!
🔥Hot Flows
So, what exactly are Hot Flows, and how do they differ from Cold Flows🧊? In simple terms, Cold Flows are streams of data that are created anew for each subscriber, while Hot Flows are streams of data that are shared between subscribers and can emit new values at any time.
Photo by Christopher Burns on Unsplash
To put it another way, Cold Flows🧊 are “pull-based” — subscribers request new data as they need it — while Hot Flows🔥 are “push-based” — data is pushed to subscribers as it becomes available.
🚀 StateFlow
One of the most powerful features of Hot Flows in Kotlin Coroutine is the ability to work with StateFlow🚀. StateFlow is a special kind of Hot Flow that allows you to represent a single value that can be observed by multiple subscribers. It’s often used to represent the state of an application or a specific feature within an application.
💡 One of the key benefits of using StateFlow is that it provides a simple and efficient way to share state between different parts of your application. Because StateFlow emits only the current value (and not the entire history of values), it’s a lightweight and efficient way to represent state.
To create a StateFlow, you first need to define the initial value of the state. You can then use the MutableStateFlow
class to create an instance of the StateFlow and update the value as needed. Here’s an example:
val count = MutableStateFlow(0)
In this example, we’re creating a StateFlow called count
with an initial value of 0
. We can update the value of the StateFlow using the value
property or the update
function, like this:
count.value = 1 count.update { value -> value + 1 }
Now, any subscriber to the count
StateFlow will receive updates whenever the value changes.
To subscribe to a StateFlow, you can use the collect
operator, just like with a regular Flow. Here’s an example:
count.collect { value -> // do something with the new value }
🚨 One important thing to note about StateFlow is that while it’s thread-safe and can be accessed from multiple coroutines, it’s important to use the update
function when updating the state from multiple coroutines to avoid race conditions and ensure consistency.
🌟 SharedFlow
SharedFlow🌟 is one type of hot flow that is commonly used for event streaming. It emits events to all active collectors when they are produced, allowing multiple consumers to receive the same events at the same time.
SharedFlow can be created using the MutableSharedFlow()
function, which takes an optional replay
parameter to specify the number of past events to replay to new collectors. By default, replay
is set to 0, which means that new collectors will only receive events produced after they start collecting.
💡 One important thing to note about SharedFlow is that it is designed for event streaming and does not maintain any state. This means that it is suitable for emitting repetitive events such as button clicks or sensor readings, but not for managing application state.
💡 Another important parameter of SharedFlow is extraBufferCapacity
, which determines the size of the buffer that stores the emitted values. If the buffer is full and a new value is emitted, then one of the existing values will be dropped.
💡 If you want to handle this scenario in a specific way, you can set the onBufferOverflow
parameter to a specific action, such as dropping the oldest value if buffer is full or suspending the emitter until space is available in the buffer.
💻 Here’s an example of how to create a simple SharedFlow that emits events at regular intervals:
val mySharedFlow = MutableSharedFlow<String>() // Start emitting events every 500ms coroutineScope.launch { while (true) { mySharedFlow.emit("Event") delay(500) } } // Collect events from the shared flow coroutineScope.launch { mySharedFlow.collect { event -> println("Received event: $event") } }
👉🏼 In this example, we use a MutableSharedFlow
to create a SharedFlow that emits events every 500ms. We then use two coroutines to start emitting events and collect events from the SharedFlow.
💡 Another useful feature of SharedFlow is that you can use tryEmit()
to emit events only if there are active collectors. This can prevent unnecessary emissions when there are no active collectors and improve performance.
🌡️Sharing Streams Between Emitters and Collectors
💡 Hot Flows, specifically SharedFlow, can share their streams among all the emitters and collectors due to their internal buffer. SharedFlow has a buffer inside that can hold values that will be collected by suspended collectors.
Photo by Andrea Lightfoot on Unsplash
Job Offers
The buffer has a size limit and emitters suspended will be inserted at the end of the buffer. The replay cache size of the buffer can also be set. When a new collector starts collecting from this buffer, it will receive the value from the replay cache size position.
In summary, SharedFlow uses an internal buffer to store emitted values that are then available to all collectors, even if they start collecting after the value has been emitted. This allows for seamless sharing of the stream among all emitters and collectors.
👉 How a StateFlow Collector Works
A StateFlow🚀 collector is created whenever StateFlow.collect
is called. It is responsible for receiving and processing the updates emitted by the StateFlow
. The following steps outline the basic behavior of a hot flow collector:
🔹 Step 1: Allocate a slot for the collector
When StateFlow.collect
is called for the first time, a new collector is created and a slot is allocated for it in the StateFlow
‘s internal slot table. The slot represents a location where the collector’s state can be stored.
🔹 Step 2: Attach the collector to the slot
Once a slot has been allocated, the collector is attached to it. This means that any updates emitted by the StateFlow
will be stored in the slot and made available to the collector.
🔹 Step 3: Process new updates
As updates are emitted by the StateFlow
, they are added to the slot. If the collector is currently processing updates (i.e. it is not suspended), the new updates are immediately processed.
🔹 Step 4: Suspend the collector if necessary
If the collector is suspended (i.e. it is currently waiting for a new update to arrive), the slot is marked as “pending”. This means that the next update emitted by the StateFlow
will be stored in the slot and used to resume the collector.
🔹 Step 5: Resume the collector
When a new update is added to the slot and the collector is suspended, it is immediately resumed. The update is passed to the collector’s lambda, and the process repeats from step 3.
🔹 Step 6: Free the slot
If the collector is cancelled or no longer active, the slot is freed up. The collector is detached from the slot, and the slot becomes available for use by another collector.
That’s a basic overview of how a hot flow collector works.
📌CallbackFlow and ChannelFlow
In addition to cold flows, Kotlin coroutines also support hot flows using SharedFlow
and StateFlow
. However, there are two other types of flows that offer more control and customization: CallbackFlow
and ChannelFlow
.
CallbackFlow
📌 allows you to create a flow that emits values based on callbacks. For example,you can create a hot flow that listens for updates from the device’s GPS and emits the current location data:
fun getGpsUpdates(): Flow<Location> = callbackFlow { val callback = object : DatabaseCallback { override fun onLocationChanged(location: Location) { send(location) // emit the updated value } } registerCallback(callback) // register the callback awaitClose { unregisterCallback(callback) } // clean up when the flow is cancelled }
Here, we create a callbackFlow
and provide an implementation of onLocationChanged
that emits the updated data using send
. We also register and unregister the callback when the flow is started and cancelled, respectively.
ChannelFlow
📌 allows you to create a flow that emits values based on a channel. A channel is a coroutine-based implementation of the producer-consumer pattern. In other words, it’s a way for one coroutine to send data to another coroutine, which can then consume that data asynchronously.
To create a flow using channels, we can use the channelFlow
builder function, which returns a Flow
. Similar to callbackFlow
, this function takes a suspendable block of code that defines the behavior of the channel.
Here’s an example of a simple ChannelFlow
that emits a sequence of integers:
fun sequenceFlow(): Flow<Int> = channelFlow { for (i in 1..5) { send(i) } close() }
In this example, we create a ChannelFlow
using the channelFlow
builder function and emit a sequence of integers from 1 to 5 using the send
function. Finally, we close the channel using the close
function.
💡Note that ChannelFlow
is a cold flow, which means that it won’t start emitting data until it’s collected by a downstream consumer. but you make it “hotter” and the data can keep coming the outside.
🚨Hot Flows Collector Pitfalls
Photo by Juliane Liebermann on Unsplash
🔹Never-Ending Collector🌀
When you use collect to subscribe to a hot flow, it creates an infinite loop that runs until the coroutine is cancelled. This means that if you have multiple hot flows that you’re subscribing to within the same coroutine, only the first one will be collected and the subsequent ones will never be reached. To subscribe to multiple hot flow within the same coroutine, you can use combine or zip to merge the hot flow into a single Flow, or you can use separate coroutines for each hot flow.
coroutineScope.launch { hotFlow1.collect { value -> // process collected values } hotFlow2.collect { value -> // never reached because hotFlow1 never returns 😕 } }
🚨In this example, the second collect
block will never be reached, because the first collect
block is running in an infinite loop and suspending the coroutine.
🔹Nested Collecting🐾
One pitfall to watch out for is nested collecting of hot flows. In this scenario, the first flow is never able to emit new values after the first emission because of backpressure and never returning of the collection. For example:
coroutineScope.launch { hotFlow1.collect { hotFlow2.collect { // never reached because hotFlow1 never returns😔 // process collected values } } }
To avoid this issue, consider using flatMapLatest
instead of nested collecting. This way, the second flow is only collected when the first flow emits a new value, and previous collection is canceled. Here’s an example:
coroutineScope.launch { hotFlow1 .flatMapLatest { it -> hotFlow2(it) } .collect { // process collected items } }
🔹Missed Emissions💔
Lastly, it’s important to be aware of a potential pitfall when collecting a shared flow after an emission has already occurred. In this scenario, it’s possible to miss the previously emitted value because the collector wasn’t active at that time. For example:
val sharedFlow = MutableSharedFlow<Int>() // do something to trigger emission sharedFlow.tryEmit(1) coroutineScope.launch { // collect the shared flow after the emission sharedFlow.collect { value -> // missed the previously emitted value😢 } }
In this scenario, it’s possible to miss the previously emitted value because the collector wasn’t active at that time. One way to address this issue is by configuring your shared flow to increase the replay count using the replay
parameter. For example, MutableSharedFlow<Int>(1, 0, BufferOverflow.DROP_OLDEST)
will emit the latest emission to all new collectors, ensuring that no emitted values are missed.
This article was previously published on proandroiddev.com