Blog Infos
Author
Published
Topics
Published

Photo by Marc Szeglat on Unsplash

 

Introduction

In the first part of this series (🔗 read here), we took a deep dive into Kotlin Coroutine Flows and explored the concept of Cold Flows. While Cold Flows are useful in many scenarios, there are situations where they fall short, especially in cases where data is generated continuously. In such cases, Hot Flows are a powerful alternative that can help you manage and react to data streams in real-time.

In this second part, we’ll explore Kotlin Coroutine Hot Flows in detail. We’ll start by introducing the concept of Hot Flows and explaining how they differ from Cold Flows. We’ll then provide practical examples of how to create, subscribe to, and manage Hot Flows in real-world scenarios. We’ll also discuss the potential pitfalls of using Hot Flows and how to properly manage their lifecycle.

By the end of this article, you’ll have a solid understanding of 🔥 Hot Flows and how to use them to build 🔍 reactive applications that can handle continuous data streams with ease. So let’s dive in and explore the world of Kotlin Coroutine Hot Flows!

🔥Hot Flows

So, what exactly are Hot Flows, and how do they differ from Cold Flows🧊? In simple terms, Cold Flows are streams of data that are created anew for each subscriber, while Hot Flows are streams of data that are shared between subscribers and can emit new values at any time.

Photo by Christopher Burns on Unsplash

To put it another way, Cold Flows🧊 are “pull-based” — subscribers request new data as they need it — while Hot Flows🔥 are “push-based” — data is pushed to subscribers as it becomes available.

🚀 StateFlow

One of the most powerful features of Hot Flows in Kotlin Coroutine is the ability to work with StateFlow🚀. StateFlow is a special kind of Hot Flow that allows you to represent a single value that can be observed by multiple subscribers. It’s often used to represent the state of an application or a specific feature within an application.

💡 One of the key benefits of using StateFlow is that it provides a simple and efficient way to share state between different parts of your application. Because StateFlow emits only the current value (and not the entire history of values), it’s a lightweight and efficient way to represent state.

To create a StateFlow, you first need to define the initial value of the state. You can then use the MutableStateFlow class to create an instance of the StateFlow and update the value as needed. Here’s an example:

val count = MutableStateFlow(0)

In this example, we’re creating a StateFlow called count with an initial value of 0. We can update the value of the StateFlow using the value property or the update function, like this:

count.value = 1

count.update { value ->
    value + 1
}

Now, any subscriber to the count StateFlow will receive updates whenever the value changes.

To subscribe to a StateFlow, you can use the collect operator, just like with a regular Flow. Here’s an example:

count.collect { value ->
    // do something with the new value
}

🚨 One important thing to note about StateFlow is that while it’s thread-safe and can be accessed from multiple coroutines, it’s important to use the update function when updating the state from multiple coroutines to avoid race conditions and ensure consistency.

🌟 SharedFlow

SharedFlow🌟 is one type of hot flow that is commonly used for event streaming. It emits events to all active collectors when they are produced, allowing multiple consumers to receive the same events at the same time.

SharedFlow can be created using the MutableSharedFlow() function, which takes an optional replay parameter to specify the number of past events to replay to new collectors. By default, replay is set to 0, which means that new collectors will only receive events produced after they start collecting.

💡 One important thing to note about SharedFlow is that it is designed for event streaming and does not maintain any state. This means that it is suitable for emitting repetitive events such as button clicks or sensor readings, but not for managing application state.

💡 Another important parameter of SharedFlow is extraBufferCapacity, which determines the size of the buffer that stores the emitted values. If the buffer is full and a new value is emitted, then one of the existing values will be dropped.

💡 If you want to handle this scenario in a specific way, you can set the onBufferOverflow parameter to a specific action, such as dropping the oldest value if buffer is full or suspending the emitter until space is available in the buffer.

💻 Here’s an example of how to create a simple SharedFlow that emits events at regular intervals:

val mySharedFlow = MutableSharedFlow<String>()

// Start emitting events every 500ms
coroutineScope.launch {
    while (true) {
        mySharedFlow.emit("Event")
        delay(500)
    }
}

// Collect events from the shared flow
coroutineScope.launch {
    mySharedFlow.collect { event ->
        println("Received event: $event")
    }
}

👉🏼 In this example, we use a MutableSharedFlow to create a SharedFlow that emits events every 500ms. We then use two coroutines to start emitting events and collect events from the SharedFlow.

💡 Another useful feature of SharedFlow is that you can use tryEmit() to emit events only if there are active collectors. This can prevent unnecessary emissions when there are no active collectors and improve performance.

🌡️Sharing Streams Between Emitters and Collectors

💡 Hot Flows, specifically SharedFlow, can share their streams among all the emitters and collectors due to their internal buffer. SharedFlow has a buffer inside that can hold values that will be collected by suspended collectors.

Photo by Andrea Lightfoot on Unsplash

Job Offers

Job Offers

There are currently no vacancies.

OUR VIDEO RECOMMENDATION

,

Practical Patterns for Kotlin Coroutines in Production

Unlock the full potential of Kotlin Coroutines with a focused exploration of their practical applications in real-world scenarios. This talk will guide you through essential best practices, demonstrate robust patterns for common asynchronous tasks, and…
Watch Video

Practical Patterns for Kotlin Coroutines in Production

Marcin Moskała
Developer during the day, author at night, trainer
Kt. Academy

Practical Patterns for Kotlin Coroutines in Production

Marcin Moskała
Developer during the ...
Kt. Academy

Practical Patterns for Kotlin Coroutines in Production

Marcin Moskała
Developer during the day, ...
Kt. Academy

Jobs

The buffer has a size limit and emitters suspended will be inserted at the end of the buffer. The replay cache size of the buffer can also be set. When a new collector starts collecting from this buffer, it will receive the value from the replay cache size position.

In summary, SharedFlow uses an internal buffer to store emitted values that are then available to all collectors, even if they start collecting after the value has been emitted. This allows for seamless sharing of the stream among all emitters and collectors.

👉 How a StateFlow Collector Works

A StateFlow🚀 collector is created whenever StateFlow.collect is called. It is responsible for receiving and processing the updates emitted by the StateFlow. The following steps outline the basic behavior of a hot flow collector:

🔹 Step 1: Allocate a slot for the collector

When StateFlow.collect is called for the first time, a new collector is created and a slot is allocated for it in the StateFlow‘s internal slot table. The slot represents a location where the collector’s state can be stored.

🔹 Step 2: Attach the collector to the slot

Once a slot has been allocated, the collector is attached to it. This means that any updates emitted by the StateFlow will be stored in the slot and made available to the collector.

🔹 Step 3: Process new updates

As updates are emitted by the StateFlow, they are added to the slot. If the collector is currently processing updates (i.e. it is not suspended), the new updates are immediately processed.

🔹 Step 4: Suspend the collector if necessary

If the collector is suspended (i.e. it is currently waiting for a new update to arrive), the slot is marked as “pending”. This means that the next update emitted by the StateFlow will be stored in the slot and used to resume the collector.

🔹 Step 5: Resume the collector

When a new update is added to the slot and the collector is suspended, it is immediately resumed. The update is passed to the collector’s lambda, and the process repeats from step 3.

🔹 Step 6: Free the slot

If the collector is cancelled or no longer active, the slot is freed up. The collector is detached from the slot, and the slot becomes available for use by another collector.

That’s a basic overview of how a hot flow collector works.

📌CallbackFlow and ChannelFlow

In addition to cold flows, Kotlin coroutines also support hot flows using SharedFlow and StateFlow. However, there are two other types of flows that offer more control and customization: CallbackFlow and ChannelFlow.

CallbackFlow 📌 allows you to create a flow that emits values based on callbacks. For example,you can create a hot flow that listens for updates from the device’s GPS and emits the current location data:

fun getGpsUpdates(): Flow<Location> = callbackFlow {
    val callback = object : DatabaseCallback {
        override fun onLocationChanged(location: Location) {
            send(location) // emit the updated value
        }
    }
    registerCallback(callback) // register the callback
    awaitClose { unregisterCallback(callback) } // clean up when the flow is cancelled
}

Here, we create a callbackFlow and provide an implementation of onLocationChanged that emits the updated data using send. We also register and unregister the callback when the flow is started and cancelled, respectively.

ChannelFlow📌 allows you to create a flow that emits values based on a channel. A channel is a coroutine-based implementation of the producer-consumer pattern. In other words, it’s a way for one coroutine to send data to another coroutine, which can then consume that data asynchronously.

To create a flow using channels, we can use the channelFlow builder function, which returns a Flow. Similar to callbackFlow, this function takes a suspendable block of code that defines the behavior of the channel.

Here’s an example of a simple ChannelFlow that emits a sequence of integers:

fun sequenceFlow(): Flow<Int> = channelFlow {
    for (i in 1..5) {
        send(i)
    }
    close()
}

In this example, we create a ChannelFlow using the channelFlow builder function and emit a sequence of integers from 1 to 5 using the send function. Finally, we close the channel using the close function.

💡Note that ChannelFlow is a cold flow, which means that it won’t start emitting data until it’s collected by a downstream consumer. but you make it “hotter” and the data can keep coming the outside.

🚨Hot Flows Collector Pitfalls

Photo by Juliane Liebermann on Unsplash

🔹Never-Ending Collector🌀

When you use collect to subscribe to a hot flow, it creates an infinite loop that runs until the coroutine is cancelled. This means that if you have multiple hot flows that you’re subscribing to within the same coroutine, only the first one will be collected and the subsequent ones will never be reached. To subscribe to multiple hot flow within the same coroutine, you can use combine or zip to merge the hot flow into a single Flow, or you can use separate coroutines for each hot flow.

coroutineScope.launch {
    hotFlow1.collect { value ->
        // process collected values
    }
    hotFlow2.collect { value ->
        // never reached because hotFlow1 never returns 😕
    }
}

🚨In this example, the second collect block will never be reached, because the first collect block is running in an infinite loop and suspending the coroutine.

🔹Nested Collecting🐾

One pitfall to watch out for is nested collecting of hot flows. In this scenario, the first flow is never able to emit new values after the first emission because of backpressure and never returning of the collection. For example:

coroutineScope.launch {
    hotFlow1.collect {
        hotFlow2.collect { // never reached because hotFlow1 never returns😔
            // process collected values
        }
    }
}

To avoid this issue, consider using flatMapLatest instead of nested collecting. This way, the second flow is only collected when the first flow emits a new value, and previous collection is canceled. Here’s an example:

coroutineScope.launch {
    hotFlow1
        .flatMapLatest { it ->
            hotFlow2(it)
        }
        .collect {
            // process collected items
        }
}
🔹Missed Emissions💔

Lastly, it’s important to be aware of a potential pitfall when collecting a shared flow after an emission has already occurred. In this scenario, it’s possible to miss the previously emitted value because the collector wasn’t active at that time. For example:

val sharedFlow = MutableSharedFlow<Int>()

// do something to trigger emission
sharedFlow.tryEmit(1)

coroutineScope.launch {
    // collect the shared flow after the emission
    sharedFlow.collect { value ->
        // missed the previously emitted value😢
    }
}

In this scenario, it’s possible to miss the previously emitted value because the collector wasn’t active at that time. One way to address this issue is by configuring your shared flow to increase the replay count using the replay parameter. For example, MutableSharedFlow<Int>(1, 0, BufferOverflow.DROP_OLDEST) will emit the latest emission to all new collectors, ensuring that no emitted values are missed.

This article was previously published on proandroiddev.com

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
Today I aim to cover the Domain layer. It is a layer that sits…
READ MORE
blog
Kotlin coroutines provide a powerful way to write asynchronous, non-blocking code that is easy…
READ MORE
blog
Recently, we have been making an effort to improve the observability of our mobile…
READ MORE
blog
Cancellation is a crucial feature of Kotlin coroutines for managing resources and stopping them…
READ MORE
Menu