
Hey folks! 👋
Today we’re diving deep into one of the most crucial aspects of Kotlin Flows — cancellation. If you’ve been working with flows, you’ve probably faced scenarios where you needed to stop a long-running flow operation, maybe because the user navigated away from a screen, or a network request is taking too long, or you simply want to prevent unnecessary resource consumption.
Flow cancellation isn’t just about calling cancel() and hoping for the best. There are multiple sophisticated techniques, each with its own use cases and nuances. So let’s pull back the curtain and explore the world of cancellable flows! 🚀
Why Flow Cancellation Matters
Before we jump into the techniques, let’s understand why cancellation is critical:
- Resource Management: Prevents memory leaks and unnecessary CPU usage
- User Experience: Stops outdated operations when users navigate away
- Network Efficiency: Cancels pending requests that are no longer needed
- Battery Life: Reduces background processing on mobile devices
Method 1: Job Cancellation — The Foundation
The most fundamental way to cancel a flow is through Job cancellation. Every coroutine has a Job, and when you cancel that job, all flows running within that coroutine scope get cancelled too.
Let’s see this in action:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.time.Duration.Companion.seconds
suspend fun main() {
val job = CoroutineScope(Dispatchers.Default).launch {
createNumberFlow()
.collect { value ->
println("Received: $value")
}
}
delay(3.seconds)
println("Cancelling job...")
job.cancel()
delay(1.seconds)
println("Program finished")
}
fun createNumberFlow() = flow {
repeat(10) { i ->
println("Emitting: $i")
emit(i)
delay(1.seconds)
}
}
Output:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Cancelling job... Program finished
What’s happening under the hood?
When you call job.cancel(), it sends a cancellation signal to the coroutine. The flow builder (flow { }) is cancellation-cooperative, meaning it checks for cancellation at suspension points like delay() and emit(). Once cancelled, the flow stops emitting new values and the collector stops receiving them.
But here’s something interesting — what if your flow doesn’t have suspension points?
fun nonCancellableFlow() = flow {
repeat(1000000) { i ->
emit(i) // No suspension point!
// This will keep running even after cancellation
}
}
This flow won’t respect cancellation because there are no suspension points. To fix this, you can use ensureActive():
fun cancellableFlow() = flow {
repeat(1000000) { i ->
ensureActive() // Checks for cancellation
emit(i)
}
}
Advanced Job Cancellation with Structured Concurrency
Let’s explore a more complex scenario using structured concurrency:
class DataRepository {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
fun fetchDataStream(): Flow<String> = flow {
repeat(Int.MAX_VALUE) { i ->
emit("Data item $i")
delay(500.milliseconds)
}
}.flowOn(Dispatchers.IO)
fun startFetching(): Job {
return scope.launch {
fetchDataStream()
.catch { e -> println("Error: ${e.message}") }
.collect { data ->
println("Processing: $data")
}
}
}
fun cleanup() {
scope.cancel("Repository is being cleaned up")
}
}
// Usage
suspend fun main() {
val repository = DataRepository()
val fetchJob = repository.startFetching()
delay(3.seconds)
println("Cleaning up repository...")
repository.cleanup()
delay(1.seconds)
println("Done")
}
Output:
Processing: Data item 0 Processing: Data item 1 Processing: Data item 2 Processing: Data item 3 Processing: Data item 4 Processing: Data item 5 Cleaning up repository... Done
Method 2: withTimeout — Time-Based Cancellation
Sometimes you want to cancel a flow operation if it takes too long. withTimeout is perfect for this scenario. It creates a time bomb ⏰ — if the operation doesn’t complete within the specified time, it throws a TimeoutCancellationException.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.time.Duration.Companion.seconds
suspend fun main() {
try {
withTimeout(5.seconds) {
slowDataFlow()
.collect { value ->
println("Received: $value")
}
}
} catch (e: TimeoutCancellationException) {
println("Operation timed out: ${e.message}")
}
}
fun slowDataFlow() = flow {
repeat(10) { i ->
println("Emitting: $i")
emit(i)
delay(1.seconds) // Each emission takes 1 second
}
}
Output:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Operation timed out: Timed out waiting for 5000 ms
withTimeoutOrNull — Graceful Timeout Handling
If you don’t want exceptions, use withTimeoutOrNull:
suspend fun main() {
val result = withTimeoutOrNull(3.seconds) {
fastDataFlow()
.toList() // Collect all values into a list
}
when (result) {
null -> println("Operation timed out")
else -> println("Completed with results: $result")
}
}
fun fastDataFlow() = flow {
repeat(5) { i ->
emit(i)
delay(500.milliseconds)
}
}
Output:
Completed with results: [0, 1, 2, 3, 4]
Real-World Example: Network Request with Timeout
Here’s a practical example of using timeout with network requests:
class ApiService {
suspend fun fetchUserData(userId: String): Flow<UserData> = flow {
// Simulate network delay
delay(2.seconds)
emit(UserData(userId, "John Doe", "john@example.com"))
delay(1.seconds)
emit(UserData(userId, "John Doe", "john.doe@example.com")) // Updated email
}
}
data class UserData(val id: String, val name: String, val email: String)
suspend fun main() {
val apiService = ApiService()
try {
withTimeout(4.seconds) {
apiService.fetchUserData("123")
.collect { userData ->
println("User data updated: $userData")
}
}
} catch (e: TimeoutCancellationException) {
println("Network request timed out. Please check your connection.")
}
}
Output:
User data updated: UserData(id=123, name=John Doe, email=john@example.com) User data updated: UserData(id=123, name=John Doe, email=john.doe@example.com)
Method 3: takeWhile — Conditional Cancellation
takeWhile is a powerful operator that cancels the flow based on a condition. It continues emitting values as long as the predicate returns true, and stops (cancels) as soon as it returns false
suspend fun main() {
numberSequence()
.takeWhile { it < 5 } // Stop when value >= 5
.collect { value ->
println("Received: $value")
}
println("Flow completed")
}
fun numberSequence() = flow {
repeat(10) { i ->
println("Emitting: $i")
emit(i)
delay(500.milliseconds)
}
}
Output:
Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Emitting: 5 Flow completed
Notice how the flow emitted 5 but didn’t receive it, because takeWhile stopped the collection when the condition became false.
takeWhile vs filter — Understanding the Difference
Many developers confuse takeWhile with filter. Let’s see the difference:
suspend fun main() {
println("=== Using filter ===")
numberSequence()
.filter { it < 5 } // Filters values but doesn't stop the flow
.collect { value ->
println("Received: $value")
}
println("\n=== Using takeWhile ===")
numberSequence()
.takeWhile { it < 5 } // Stops the flow completely
.collect { value ->
println("Received: $value")
}
}
Output:
=== Using filter === Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Emitting: 5 Emitting: 6 Emitting: 7 Emitting: 8 Emitting: 9
=== Using takeWhile === Emitting: 0 Received: 0 Emitting: 1 Received: 1 Emitting: 2 Received: 2 Emitting: 3 Received: 3 Emitting: 4 Received: 4 Emitting: 5
Key difference: filter continues the flow but skips unwanted values, while takeWhile completely terminates the flow.
Real-World takeWhile Examples
Example 1: Battery Level Monitoring
fun batteryLevelFlow() = flow {
var batteryLevel = 100
while (true) {
emit(batteryLevel)
batteryLevel = (batteryLevel - (1..5).random()).coerceAtLeast(0)
delay(1.seconds)
}
}
suspend fun main() {
batteryLevelFlow()
.takeWhile { it > 20 } // Stop monitoring when battery is low
.collect { level ->
println("Battery level: $level%")
if (level <= 30) {
println("⚠️ Low battery warning!")
}
}
println("🔋 Battery critically low! Stopping monitoring.")
}
Example 2: Stock Price Monitoring
data class StockPrice(val symbol: String, val price: Double)
fun stockPriceFlow(symbol: String) = flow {
var price = 100.0
while (true) {
price += (-5.0..5.0).random()
emit(StockPrice(symbol, price))
delay(1.seconds)
}
}
suspend fun main() {
val stopLossPrice = 90.0
stockPriceFlow("AAPL")
.takeWhile { it.price > stopLossPrice } // Stop loss triggered
.collect { stock ->
println("${stock.symbol}: $${String.format("%.2f", stock.price)}")
}
println("🛑 Stop loss triggered! Stopping price monitoring.")
}
Method 4: take — Count-Based Cancellation
take cancels the flow after emitting a specific number of items:
suspend fun main() {
infiniteFlow()
.take(5) // Only take first 5 items
.collect { value ->
println("Received: $value")
}
println("Collected exactly 5 items")
}
fun infiniteFlow() = flow {
var counter = 0
while (true) {
emit(counter++)
delay(300.milliseconds)
}
}
Method 5: cancellable() — Making Flows Cancellation-Aware
Sometimes you have flows that aren’t naturally cancellation-cooperative. The cancellable() operator makes them responsive to cancellation:
suspend fun main() {
val job = CoroutineScope(Dispatchers.Default).launch {
heavyComputationFlow()
.cancellable() // Makes the flow cancellation-aware
.collect { value ->
println("Processed: $value")
}
}
delay(2.seconds)
println("Cancelling...")
job.cancel()
delay(500.milliseconds)
println("Done")
}
fun heavyComputationFlow() = flow {
repeat(1000) { i ->
// Simulate heavy computation without suspension points
val result = (1..1000).map { it * it }.sum()
emit("Result $i: $result")
}
}
Method 6: first() and Conditional Terminal Operators
You’re absolutely right about first { condition }! This is a powerful cancellation technique where the flow collects until it finds the first element matching the condition, then cancels the flow and returns that element.
suspend fun main() {
val result = numberFlow()
.first { it > 5 } // Cancels as soon as it finds first value > 5
println("First value > 5: $result")
}
fun numberFlow() = flow {
repeat(20) { i ->
println("Emitting: $i")
emit(i)
delay(200.milliseconds)
}
}
Output:
Emitting: 0 Emitting: 1 Emitting: 2 Emitting: 3 Emitting: 4 Emitting: 5 Emitting: 6 First value > 5: 6
Notice how the flow stopped emitting after finding the first value greater than 5. This is different from filter which would continue the entire flow.
first() vs firstOrNull()
suspend fun main() {
// first() - throws exception if not found
try {
val result1 = shortFlow().first { it > 10 }
println("Found: $result1")
} catch (e: NoSuchElementException) {
println("No element found matching condition")
}
// firstOrNull() - returns null if not found
val result2 = shortFlow().firstOrNull { it > 10 }
println("Result: $result2")
}
fun shortFlow() = flowOf(1, 2, 3, 4, 5)
Output:
No element found matching condition Result: null
Real-World Example: Finding Available Server
data class Server(val name: String, val responseTime: Int)
fun checkServers() = flow {
val servers = listOf("server1", "server2", "server3", "server4")
servers.forEach { serverName ->
println("Checking $serverName...")
delay(500.milliseconds) // Simulate network check
val responseTime = (100..800).random()
emit(Server(serverName, responseTime))
}
}
suspend fun main() {
val fastServer = checkServers()
.first { it.responseTime < 300 } // Find first fast server and stop
println("Using fast server: ${fastServer.name} (${fastServer.responseTime}ms)")
}
Method 7: single() — Expecting Exactly One Element
single() is similar to first() but it expects exactly one element. It cancels after finding the first element, but throws an exception if there are more elements.
suspend fun main() {
// This will work - exactly one element matches
val result1 = flowOf(1, 2, 3, 4, 5)
.single { it == 3 }
println("Single result: $result1")
// This will throw - multiple elements match
try {
val result2 = flowOf(1, 2, 3, 4, 5)
.single { it > 2 } // 3, 4, 5 all match!
println("Result: $result2")
} catch (e: IllegalArgumentException) {
println("Error: Multiple elements found - ${e.message}")
}
}
Output:
Single result: 3 Error: Multiple elements found - Flow has more than one element matching the predicate.
Method 8: any(), all(), none() — Boolean Condition Cancellation
These operators provide early cancellation based on boolean conditions:
any() — Cancels on First Match
suspend fun main() {
val hasLargeNumber = numberFlow()
.any { it > 15 } // Cancels as soon as it finds first value > 15
println("Has number > 15: $hasLargeNumber")
}
fun numberFlow() = flow {
repeat(30) { i ->
println("Checking: $i")
emit(i)
delay(100.milliseconds)
}
}
Output:
Checking: 0 Checking: 1 ... Checking: 15 Checking: 16 Has number > 15: true
all() — Cancels on First Non-Match
suspend fun main() {
val allSmall = flowOf(1, 2, 3, 4, 5, 15, 6, 7)
.all { it < 10 } // Cancels as soon as it finds first value >= 10
println("All numbers < 10: $allSmall")
}
none() — Cancels on First Match
suspend fun main() {
val noLargeNumbers = flowOf(1, 2, 3, 4, 5, 15, 6, 7)
.none { it > 10 } // Cancels as soon as it finds first value > 10
println("No numbers > 10: $noLargeNumbers")
}
Method 9: transformWhile — Advanced Conditional Transformation
transformWhile is a more powerful version of takeWhile that allows you to transform elements and has more flexible emission behavior:
suspend fun main() {
numberFlow()
.transformWhile { value ->
if (value < 5) {
emit("Value: $value")
emit("Double: ${value * 2}") // Can emit multiple times
true // Continue
} else {
emit("Final: $value") // Can emit the "stopping" element
false // Stop here
}
}
.collect { println(it) }
}
Output:
Emitting: 0 Value: 0 Double: 0 Emitting: 1 Value: 1 Double: 2 Emitting: 2 Value: 2 Double: 4 Emitting: 3 Value: 3 Double: 6 Emitting: 4 Value: 4 Double: 8 Emitting: 5 Final: 5
transformWhile vs takeWhile
suspend fun main() {
println("=== takeWhile ===")
flowOf(1, 2, 3, 4, 5, 6)
.takeWhile { it < 4 }
.collect { println("Received: $it") }
println("\n=== transformWhile ===")
flowOf(1, 2, 3, 4, 5, 6)
.transformWhile { value ->
if (value < 4) {
emit("Valid: $value")
true
} else {
emit("Stopping at: $value") // Can emit the stopping element!
false
}
}
.collect { println(it) }
}
Output:
=== takeWhile === Received: 1 Received: 2 Received: 3
=== transformWhile === Valid: 1 Valid: 2 Valid: 3 Stopping at: 4
Method 10: collectLatest — Cancel Previous Collection
collectLatest cancels the previous collection action whenever a new value is emitted:
suspend fun main() {
fastEmittingFlow()
.collectLatest { value ->
println("Processing $value...")
delay(1.seconds) // Slow processing
println("Finished processing $value")
}
}
fun fastEmittingFlow() = flow {
repeat(5) { i ->
emit(i)
delay(300.milliseconds) // Fast emission
}
}
Output:
Processing 0... Processing 1... Processing 2... Processing 3... Processing 4... Finished processing 4
Only the last value gets fully processed because each new emission cancels the previous processing.
Practical collectLatest Example: Search Implementation
class SearchManager {
private val _searchQuery = MutableStateFlow("")
suspend fun startSearching() {
_searchQuery
.filter { it.isNotBlank() }
.collectLatest { query ->
println("Searching for: $query")
delay(2.seconds) // Simulate API call
println("Results for: $query")
}
}
fun updateQuery(query: String) {
_searchQuery.value = query
}
}
suspend fun main() {
val searchManager = SearchManager()
val job = CoroutineScope(Dispatchers.Default).launch {
searchManager.startSearching()
}
// Simulate user typing
searchManager.updateQuery("k")
delay(500.milliseconds)
searchManager.updateQuery("ko")
delay(500.milliseconds)
searchManager.updateQuery("kot")
delay(500.milliseconds)
searchManager.updateQuery("kotlin")
delay(3.seconds)
job.cancel()
}
Output:
Searching for: k Searching for: ko Searching for: kot Searching for: kotlin Results for: kotlin
Only “kotlin” gets the full search result because previous searches were cancelled by new queries.
Method 11: Custom Cancellation with SharedFlow and StateFlow
For more complex scenarios, you might need custom cancellation logic:
class DataManager {
private val _dataFlow = MutableSharedFlow<String>()
val dataFlow = _dataFlow.asSharedFlow()
private var isActive = true
suspend fun startEmitting() {
while (isActive) {
_dataFlow.emit("Data at ${System.currentTimeMillis()}")
delay(1.seconds)
}
}
fun stop() {
isActive = false
}
}
suspend fun main() {
val dataManager = DataManager()
val job = CoroutineScope(Dispatchers.Default).launch {
dataManager.startEmitting()
}
val collectorJob = CoroutineScope(Dispatchers.Default).launch {
dataManager.dataFlow.collect { data ->
println("Received: $data")
}
}
delay(3.seconds)
println("Stopping data manager...")
dataManager.stop()
delay(1.seconds)
job.cancel()
collectorJob.cancel()
println("All jobs cancelled")
}
Best Practices and When to Use Each Technique
1. Job Cancellation
- Use when: You need to cancel entire coroutine scopes
- Best for: Activity/Fragment lifecycle management, repository cleanup
- Remember: Always cancel parent jobs to avoid memory leaks
2. withTimeout/withTimeoutOrNull
- Use when: Operations have time constraints
- Best for: Network requests, file operations, user input waiting
- Remember: Consider using
withTimeoutOrNullfor graceful handling
3. takeWhile
- Use when: You have a condition-based stopping criteria
- Best for: Monitoring systems, user input validation, threshold-based operations
- Remember: The flow stops as soon as the condition becomes false
4. take
- Use when: You need a specific number of items
- Best for: Pagination, sampling, testing scenarios
- Remember: Simple and predictable behavior
5. cancellable()
- Use when: Working with CPU-intensive flows without suspension points
- Best for: Mathematical computations, data processing
- Remember: Adds overhead, so use only when necessary
Performance Considerations
Different cancellation techniques have different performance implications:
suspend fun performanceComparison() {
println("=== Performance Test ===")
// Test 1: Job cancellation (fastest)
val time1 = measureTimeMillis {
val job = CoroutineScope(Dispatchers.Default).launch {
repeat(1000000) {
// Heavy work
}
}
job.cancel()
}
println("Job cancellation: ${time1}ms")
// Test 2: takeWhile (conditional overhead)
val time2 = measureTimeMillis {
flow {
repeat(1000) { emit(it) }
}.takeWhile { it < 500 }
.collect { }
}
println("takeWhile: ${time2}ms")
// Test 3: withTimeout (exception overhead)
val time3 = measureTimeMillis {
try {
withTimeout(1.milliseconds) {
repeat(1000000) {
// Some work
}
}
} catch (e: TimeoutCancellationException) {
// Expected
}
}
println("withTimeout: ${time3}ms")
}
Common Pitfalls and How to Avoid Them
Pitfall 1: Forgetting to Check for Cancellation in CPU-Intensive Operations
// ❌ Wrong - Won't respect cancellation
fun badFlow() = flow {
repeat(1000000) { i ->
heavyComputation()
emit(i)
}
}
// ✅ Correct - Checks for cancellation
fun goodFlow() = flow {
repeat(1000000) { i ->
ensureActive() // or yield()
heavyComputation()
emit(i)
}
}
Pitfall 2: Not Handling TimeoutCancellationException
// ❌ Wrong - Exception will crash the app
suspend fun badTimeout() {
withTimeout(1.seconds) {
longRunningOperation()
}
}
// ✅ Correct - Proper exception handling
suspend fun goodTimeout() {
try {
withTimeout(1.seconds) {
longRunningOperation()
}
} catch (e: TimeoutCancellationException) {
println("Operation timed out, handling gracefully")
}
}
Pitfall 3: Confusing takeWhile with filter
// Remember: takeWhile STOPS the flow, filter just skips values
flow { emit(1); emit(2); emit(3) }
.takeWhile { it < 2 } // Emits: 1 (then stops)
.filter { it < 2 } // This won't even run because takeWhile stopped the flow
Job Offers
Conclusion
Flow cancellation is a powerful feature that, when used correctly, can significantly improve your app’s performance and user experience. Here’s a quick recap:
- Job cancellation for lifecycle management
- withTimeout for time-bounded operations
- takeWhile for condition-based stopping
- take for count-based limiting
- cancellable() for making non-cooperative flows responsive
Remember, the key to effective flow cancellation is understanding your use case and choosing the right technique. Don’t overthink it — start with the simplest approach that solves your problem and optimize from there.
Happy flowing! 🌊
What’s your favorite flow cancellation technique? Have you encountered any interesting edge cases? Drop a comment below and let’s discuss! 💬
This article was previously published on proandroiddev.com.



