Blog Infos
Author
Published
Topics
, , , ,
Published

 

Hey folks! 👋

Today we’re diving deep into one of the most crucial aspects of Kotlin Flows — cancellation. If you’ve been working with flows, you’ve probably faced scenarios where you needed to stop a long-running flow operation, maybe because the user navigated away from a screen, or a network request is taking too long, or you simply want to prevent unnecessary resource consumption.

Flow cancellation isn’t just about calling cancel() and hoping for the best. There are multiple sophisticated techniques, each with its own use cases and nuances. So let’s pull back the curtain and explore the world of cancellable flows! 🚀

Why Flow Cancellation Matters

Before we jump into the techniques, let’s understand why cancellation is critical:

  • Resource Management: Prevents memory leaks and unnecessary CPU usage
  • User Experience: Stops outdated operations when users navigate away
  • Network Efficiency: Cancels pending requests that are no longer needed
  • Battery Life: Reduces background processing on mobile devices
Method 1: Job Cancellation — The Foundation

The most fundamental way to cancel a flow is through Job cancellation. Every coroutine has a Job, and when you cancel that job, all flows running within that coroutine scope get cancelled too.

Let’s see this in action:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.seconds

 

suspend fun main() {
    val job = CoroutineScope(Dispatchers.Default).launch {
        createNumberFlow()
            .collect { value ->
                println("Received: $value")
            }
    }
    
    delay(3.seconds)
    println("Cancelling job...")
    job.cancel()
    
    delay(1.seconds)
    println("Program finished")
}
fun createNumberFlow() = flow {
    repeat(10) { i ->
        println("Emitting: $i")
        emit(i)
        delay(1.seconds)
    }
}

 

Output:

Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Cancelling job...
Program finished

What’s happening under the hood?

When you call job.cancel(), it sends a cancellation signal to the coroutine. The flow builder (flow { }) is cancellation-cooperative, meaning it checks for cancellation at suspension points like delay() and emit(). Once cancelled, the flow stops emitting new values and the collector stops receiving them.

But here’s something interesting — what if your flow doesn’t have suspension points?

fun nonCancellableFlow() = flow {
    repeat(1000000) { i ->
        emit(i) // No suspension point!
        // This will keep running even after cancellation
    }
}

This flow won’t respect cancellation because there are no suspension points. To fix this, you can use ensureActive():

fun cancellableFlow() = flow {
    repeat(1000000) { i ->
        ensureActive() // Checks for cancellation
        emit(i)
    }
}
Advanced Job Cancellation with Structured Concurrency

Let’s explore a more complex scenario using structured concurrency:

class DataRepository {
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
    
    fun fetchDataStream(): Flow<String> = flow {
        repeat(Int.MAX_VALUE) { i ->
            emit("Data item $i")
            delay(500.milliseconds)
        }
    }.flowOn(Dispatchers.IO)
    
    fun startFetching(): Job {
        return scope.launch {
            fetchDataStream()
                .catch { e -> println("Error: ${e.message}") }
                .collect { data ->
                    println("Processing: $data")
                }
        }
    }
    
    fun cleanup() {
        scope.cancel("Repository is being cleaned up")
    }
}

 

// Usage
suspend fun main() {
    val repository = DataRepository()
    val fetchJob = repository.startFetching()
    
    delay(3.seconds)
    println("Cleaning up repository...")
    repository.cleanup()
    
    delay(1.seconds)
    println("Done")
}

 

Output:

Processing: Data item 0
Processing: Data item 1
Processing: Data item 2
Processing: Data item 3
Processing: Data item 4
Processing: Data item 5
Cleaning up repository...
Done
Method 2: withTimeout — Time-Based Cancellation

Sometimes you want to cancel a flow operation if it takes too long. withTimeout is perfect for this scenario. It creates a time bomb ⏰ — if the operation doesn’t complete within the specified time, it throws a TimeoutCancellationException.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.seconds

 

suspend fun main() {
    try {
        withTimeout(5.seconds) {
            slowDataFlow()
                .collect { value ->
                    println("Received: $value")
                }
        }
    } catch (e: TimeoutCancellationException) {
        println("Operation timed out: ${e.message}")
    }
}
fun slowDataFlow() = flow {
    repeat(10) { i ->
        println("Emitting: $i")
        emit(i)
        delay(1.seconds) // Each emission takes 1 second
    }
}

 

Output:

Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Emitting: 3
Received: 3
Emitting: 4
Received: 4
Operation timed out: Timed out waiting for 5000 ms
withTimeoutOrNull — Graceful Timeout Handling

If you don’t want exceptions, use withTimeoutOrNull:

suspend fun main() {
    val result = withTimeoutOrNull(3.seconds) {
        fastDataFlow()
            .toList() // Collect all values into a list
    }
    
    when (result) {
        null -> println("Operation timed out")
        else -> println("Completed with results: $result")
    }
}

 

fun fastDataFlow() = flow {
    repeat(5) { i ->
        emit(i)
        delay(500.milliseconds)
    }
}

 

Output:

Completed with results: [0, 1, 2, 3, 4]
Real-World Example: Network Request with Timeout

Here’s a practical example of using timeout with network requests:

class ApiService {
    suspend fun fetchUserData(userId: String): Flow<UserData> = flow {
        // Simulate network delay
        delay(2.seconds)
        emit(UserData(userId, "John Doe", "john@example.com"))
        
        delay(1.seconds)
        emit(UserData(userId, "John Doe", "john.doe@example.com")) // Updated email
    }
}

 

data class UserData(val id: String, val name: String, val email: String)
suspend fun main() {
    val apiService = ApiService()
    
    try {
        withTimeout(4.seconds) {
            apiService.fetchUserData("123")
                .collect { userData ->
                    println("User data updated: $userData")
                }
        }
    } catch (e: TimeoutCancellationException) {
        println("Network request timed out. Please check your connection.")
    }
}

 

Output:

User data updated: UserData(id=123, name=John Doe, email=john@example.com)
User data updated: UserData(id=123, name=John Doe, email=john.doe@example.com)
Method 3: takeWhile — Conditional Cancellation

takeWhile is a powerful operator that cancels the flow based on a condition. It continues emitting values as long as the predicate returns true, and stops (cancels) as soon as it returns false

suspend fun main() {
    numberSequence()
        .takeWhile { it < 5 } // Stop when value >= 5
        .collect { value ->
            println("Received: $value")
        }
    
    println("Flow completed")
}

 

fun numberSequence() = flow {
    repeat(10) { i ->
        println("Emitting: $i")
        emit(i)
        delay(500.milliseconds)
    }
}

 

Output:

 

Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Emitting: 3
Received: 3
Emitting: 4
Received: 4
Emitting: 5
Flow completed

 

Notice how the flow emitted 5 but didn’t receive it, because takeWhile stopped the collection when the condition became false.

takeWhile vs filter — Understanding the Difference

Many developers confuse takeWhile with filter. Let’s see the difference:

suspend fun main() {
    println("=== Using filter ===")
    numberSequence()
        .filter { it < 5 } // Filters values but doesn't stop the flow
        .collect { value ->
            println("Received: $value")
        }
    
    println("\n=== Using takeWhile ===")
    numberSequence()
        .takeWhile { it < 5 } // Stops the flow completely
        .collect { value ->
            println("Received: $value")
        }
}

Output:

 

=== Using filter ===
Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Emitting: 3
Received: 3
Emitting: 4
Received: 4
Emitting: 5
Emitting: 6
Emitting: 7
Emitting: 8
Emitting: 9

 

 

=== Using takeWhile ===
Emitting: 0
Received: 0
Emitting: 1
Received: 1
Emitting: 2
Received: 2
Emitting: 3
Received: 3
Emitting: 4
Received: 4
Emitting: 5

 

Key differencefilter continues the flow but skips unwanted values, while takeWhile completely terminates the flow.

Real-World takeWhile Examples

Example 1: Battery Level Monitoring

fun batteryLevelFlow() = flow {
    var batteryLevel = 100
    while (true) {
        emit(batteryLevel)
        batteryLevel = (batteryLevel - (1..5).random()).coerceAtLeast(0)
        delay(1.seconds)
    }
}

 

suspend fun main() {
    batteryLevelFlow()
        .takeWhile { it > 20 } // Stop monitoring when battery is low
        .collect { level ->
            println("Battery level: $level%")
            if (level <= 30) {
                println("⚠️ Low battery warning!")
            }
        }
    
    println("🔋 Battery critically low! Stopping monitoring.")
}

 

Example 2: Stock Price Monitoring

data class StockPrice(val symbol: String, val price: Double)

 

fun stockPriceFlow(symbol: String) = flow {
    var price = 100.0
    while (true) {
        price += (-5.0..5.0).random()
        emit(StockPrice(symbol, price))
        delay(1.seconds)
    }
}
suspend fun main() {
    val stopLossPrice = 90.0
    
    stockPriceFlow("AAPL")
        .takeWhile { it.price > stopLossPrice } // Stop loss triggered
        .collect { stock ->
            println("${stock.symbol}: $${String.format("%.2f", stock.price)}")
        }
    
    println("🛑 Stop loss triggered! Stopping price monitoring.")
}

 

Method 4: take — Count-Based Cancellation

take cancels the flow after emitting a specific number of items:

suspend fun main() {
    infiniteFlow()
        .take(5) // Only take first 5 items
        .collect { value ->
            println("Received: $value")
        }
    
    println("Collected exactly 5 items")
}

 

fun infiniteFlow() = flow {
    var counter = 0
    while (true) {
        emit(counter++)
        delay(300.milliseconds)
    }
}

 

Method 5: cancellable() — Making Flows Cancellation-Aware

Sometimes you have flows that aren’t naturally cancellation-cooperative. The cancellable() operator makes them responsive to cancellation:

suspend fun main() {
    val job = CoroutineScope(Dispatchers.Default).launch {
        heavyComputationFlow()
            .cancellable() // Makes the flow cancellation-aware
            .collect { value ->
                println("Processed: $value")
            }
    }
    
    delay(2.seconds)
    println("Cancelling...")
    job.cancel()
    
    delay(500.milliseconds)
    println("Done")
}

 

fun heavyComputationFlow() = flow {
    repeat(1000) { i ->
        // Simulate heavy computation without suspension points
        val result = (1..1000).map { it * it }.sum()
        emit("Result $i: $result")
    }
}

 

Method 6: first() and Conditional Terminal Operators

You’re absolutely right about first { condition }! This is a powerful cancellation technique where the flow collects until it finds the first element matching the condition, then cancels the flow and returns that element.

suspend fun main() {
    val result = numberFlow()
        .first { it > 5 } // Cancels as soon as it finds first value > 5
    
    println("First value > 5: $result")
}

 

fun numberFlow() = flow {
    repeat(20) { i ->
        println("Emitting: $i")
        emit(i)
        delay(200.milliseconds)
    }
}

 

Output:

 

Emitting: 0
Emitting: 1
Emitting: 2
Emitting: 3
Emitting: 4
Emitting: 5
Emitting: 6
First value > 5: 6

 

Notice how the flow stopped emitting after finding the first value greater than 5. This is different from filter which would continue the entire flow.

first() vs firstOrNull()

 

suspend fun main() {
    // first() - throws exception if not found
    try {
        val result1 = shortFlow().first { it > 10 }
        println("Found: $result1")
    } catch (e: NoSuchElementException) {
        println("No element found matching condition")
    }
    
    // firstOrNull() - returns null if not found
    val result2 = shortFlow().firstOrNull { it > 10 }
    println("Result: $result2")
}

 

 

fun shortFlow() = flowOf(1, 2, 3, 4, 5)

 

Output:

 

No element found matching condition
Result: null

 

Real-World Example: Finding Available Server

 

data class Server(val name: String, val responseTime: Int)

 

 

fun checkServers() = flow {
    val servers = listOf("server1", "server2", "server3", "server4")
    
    servers.forEach { serverName ->
        println("Checking $serverName...")
        delay(500.milliseconds) // Simulate network check
        val responseTime = (100..800).random()
        emit(Server(serverName, responseTime))
    }
}
suspend fun main() {
    val fastServer = checkServers()
        .first { it.responseTime < 300 } // Find first fast server and stop
    
    println("Using fast server: ${fastServer.name} (${fastServer.responseTime}ms)")
}

 

Method 7: single() — Expecting Exactly One Element

single() is similar to first() but it expects exactly one element. It cancels after finding the first element, but throws an exception if there are more elements.

suspend fun main() {
    // This will work - exactly one element matches
    val result1 = flowOf(1, 2, 3, 4, 5)
        .single { it == 3 }
    println("Single result: $result1")
    
    // This will throw - multiple elements match
    try {
        val result2 = flowOf(1, 2, 3, 4, 5)
            .single { it > 2 } // 3, 4, 5 all match!
        println("Result: $result2")
    } catch (e: IllegalArgumentException) {
        println("Error: Multiple elements found - ${e.message}")
    }
}

Output:

 

Single result: 3
Error: Multiple elements found - Flow has more than one element matching the predicate.

 

Method 8: any(), all(), none() — Boolean Condition Cancellation

These operators provide early cancellation based on boolean conditions:

any() — Cancels on First Match

 

suspend fun main() {
    val hasLargeNumber = numberFlow()
        .any { it > 15 } // Cancels as soon as it finds first value > 15
    
    println("Has number > 15: $hasLargeNumber")
}

 

 

fun numberFlow() = flow {
    repeat(30) { i ->
        println("Checking: $i")
        emit(i)
        delay(100.milliseconds)
    }
}

 

Output:

 

Checking: 0
Checking: 1
...
Checking: 15
Checking: 16
Has number > 15: true

 

all() — Cancels on First Non-Match

 

suspend fun main() {
    val allSmall = flowOf(1, 2, 3, 4, 5, 15, 6, 7)
        .all { it < 10 } // Cancels as soon as it finds first value >= 10
    
    println("All numbers < 10: $allSmall")
}

 

none() — Cancels on First Match

 

suspend fun main() {
    val noLargeNumbers = flowOf(1, 2, 3, 4, 5, 15, 6, 7)
        .none { it > 10 } // Cancels as soon as it finds first value > 10
    
    println("No numbers > 10: $noLargeNumbers")
}

 

Method 9: transformWhile — Advanced Conditional Transformation

transformWhile is a more powerful version of takeWhile that allows you to transform elements and has more flexible emission behavior:

suspend fun main() {
    numberFlow()
        .transformWhile { value ->
            if (value < 5) {
                emit("Value: $value")
                emit("Double: ${value * 2}") // Can emit multiple times
                true // Continue
            } else {
                emit("Final: $value") // Can emit the "stopping" element
                false // Stop here
            }
        }
        .collect { println(it) }
}

Output:

 

Emitting: 0
Value: 0
Double: 0
Emitting: 1
Value: 1
Double: 2
Emitting: 2
Value: 2
Double: 4
Emitting: 3
Value: 3
Double: 6
Emitting: 4
Value: 4
Double: 8
Emitting: 5
Final: 5

 

transformWhile vs takeWhile

 

suspend fun main() {
    println("=== takeWhile ===")
    flowOf(1, 2, 3, 4, 5, 6)
        .takeWhile { it < 4 }
        .collect { println("Received: $it") }
    
    println("\n=== transformWhile ===")
    flowOf(1, 2, 3, 4, 5, 6)
        .transformWhile { value ->
            if (value < 4) {
                emit("Valid: $value")
                true
            } else {
                emit("Stopping at: $value") // Can emit the stopping element!
                false
            }
        }
        .collect { println(it) }
}

 

Output:

 

=== takeWhile ===
Received: 1
Received: 2
Received: 3

 

 

=== transformWhile ===
Valid: 1
Valid: 2
Valid: 3
Stopping at: 4

 

Method 10: collectLatest — Cancel Previous Collection

collectLatest cancels the previous collection action whenever a new value is emitted:

suspend fun main() {
    fastEmittingFlow()
        .collectLatest { value ->
            println("Processing $value...")
            delay(1.seconds) // Slow processing
            println("Finished processing $value")
        }
}

 

fun fastEmittingFlow() = flow {
    repeat(5) { i ->
        emit(i)
        delay(300.milliseconds) // Fast emission
    }
}

 

Output:

 

Processing 0...
Processing 1...
Processing 2...
Processing 3...
Processing 4...
Finished processing 4

 

Only the last value gets fully processed because each new emission cancels the previous processing.

Practical collectLatest Example: Search Implementation

 

class SearchManager {
    private val _searchQuery = MutableStateFlow("")
    
    suspend fun startSearching() {
        _searchQuery
            .filter { it.isNotBlank() }
            .collectLatest { query ->
                println("Searching for: $query")
                delay(2.seconds) // Simulate API call
                println("Results for: $query")
            }
    }
    
    fun updateQuery(query: String) {
        _searchQuery.value = query
    }
}

 

 

suspend fun main() {
    val searchManager = SearchManager()
    
    val job = CoroutineScope(Dispatchers.Default).launch {
        searchManager.startSearching()
    }
    
    // Simulate user typing
    searchManager.updateQuery("k")
    delay(500.milliseconds)
    searchManager.updateQuery("ko")
    delay(500.milliseconds)
    searchManager.updateQuery("kot")
    delay(500.milliseconds)
    searchManager.updateQuery("kotlin")
    
    delay(3.seconds)
    job.cancel()
}

 

Output:

Searching for: k
Searching for: ko
Searching for: kot
Searching for: kotlin
Results for: kotlin

Only “kotlin” gets the full search result because previous searches were cancelled by new queries.

Method 11: Custom Cancellation with SharedFlow and StateFlow

For more complex scenarios, you might need custom cancellation logic:

class DataManager {
    private val _dataFlow = MutableSharedFlow<String>()
    val dataFlow = _dataFlow.asSharedFlow()
    
    private var isActive = true
    
    suspend fun startEmitting() {
        while (isActive) {
            _dataFlow.emit("Data at ${System.currentTimeMillis()}")
            delay(1.seconds)
        }
    }
    
    fun stop() {
        isActive = false
    }
}

 

suspend fun main() {
    val dataManager = DataManager()
    
    val job = CoroutineScope(Dispatchers.Default).launch {
        dataManager.startEmitting()
    }
    
    val collectorJob = CoroutineScope(Dispatchers.Default).launch {
        dataManager.dataFlow.collect { data ->
            println("Received: $data")
        }
    }
    
    delay(3.seconds)
    println("Stopping data manager...")
    dataManager.stop()
    
    delay(1.seconds)
    job.cancel()
    collectorJob.cancel()
    println("All jobs cancelled")
}

 

Best Practices and When to Use Each Technique
1. Job Cancellation
  • Use when: You need to cancel entire coroutine scopes
  • Best for: Activity/Fragment lifecycle management, repository cleanup
  • Remember: Always cancel parent jobs to avoid memory leaks
2. withTimeout/withTimeoutOrNull
  • Use when: Operations have time constraints
  • Best for: Network requests, file operations, user input waiting
  • Remember: Consider using withTimeoutOrNull for graceful handling
3. takeWhile
  • Use when: You have a condition-based stopping criteria
  • Best for: Monitoring systems, user input validation, threshold-based operations
  • Remember: The flow stops as soon as the condition becomes false
4. take
  • Use when: You need a specific number of items
  • Best for: Pagination, sampling, testing scenarios
  • Remember: Simple and predictable behavior
5. cancellable()
  • Use when: Working with CPU-intensive flows without suspension points
  • Best for: Mathematical computations, data processing
  • Remember: Adds overhead, so use only when necessary
Performance Considerations

Different cancellation techniques have different performance implications:

suspend fun performanceComparison() {
    println("=== Performance Test ===")
    
    // Test 1: Job cancellation (fastest)
    val time1 = measureTimeMillis {
        val job = CoroutineScope(Dispatchers.Default).launch {
            repeat(1000000) {
                // Heavy work
            }
        }
        job.cancel()
    }
    println("Job cancellation: ${time1}ms")
    
    // Test 2: takeWhile (conditional overhead)
    val time2 = measureTimeMillis {
        flow {
            repeat(1000) { emit(it) }
        }.takeWhile { it < 500 }
         .collect { }
    }
    println("takeWhile: ${time2}ms")
    
    // Test 3: withTimeout (exception overhead)
    val time3 = measureTimeMillis {
        try {
            withTimeout(1.milliseconds) {
                repeat(1000000) {
                    // Some work
                }
            }
        } catch (e: TimeoutCancellationException) {
            // Expected
        }
    }
    println("withTimeout: ${time3}ms")
}
Common Pitfalls and How to Avoid Them
Pitfall 1: Forgetting to Check for Cancellation in CPU-Intensive Operations

 

// ❌ Wrong - Won't respect cancellation
fun badFlow() = flow {
    repeat(1000000) { i ->
        heavyComputation()
        emit(i)
    }
}

 

 

// ✅ Correct - Checks for cancellation
fun goodFlow() = flow {
    repeat(1000000) { i ->
        ensureActive() // or yield()
        heavyComputation()
        emit(i)
    }
}

 

Pitfall 2: Not Handling TimeoutCancellationException

 

// ❌ Wrong - Exception will crash the app
suspend fun badTimeout() {
    withTimeout(1.seconds) {
        longRunningOperation()
    }
}

 

 

// ✅ Correct - Proper exception handling
suspend fun goodTimeout() {
    try {
        withTimeout(1.seconds) {
            longRunningOperation()
        }
    } catch (e: TimeoutCancellationException) {
        println("Operation timed out, handling gracefully")
    }
}

 

Pitfall 3: Confusing takeWhile with filter

 

// Remember: takeWhile STOPS the flow, filter just skips values
flow { emit(1); emit(2); emit(3) }
    .takeWhile { it < 2 }  // Emits: 1 (then stops)
    .filter { it < 2 }     // This won't even run because takeWhile stopped the flow

 

Job Offers

Job Offers

There are currently no vacancies.

OUR VIDEO RECOMMENDATION

,

Kotlin Coroutine Mechanisms: A Surprisingly Deep Rabbithole

Sometimes you think you know coroutines, and then after a while, you’re like, “Wait, do I really know coroutines?”
Watch Video

Kotlin Coroutine Mechanisms: A Surprisingly Deep Rabbithole

Amanda Hinchman-Dominguez
Senior Android Developer
SpotOn

Kotlin Coroutine Mechanisms: A Surprisingly Deep Rabbithole

Amanda Hinchman-Do ...
Senior Android Devel ...
SpotOn

Kotlin Coroutine Mechanisms: A Surprisingly Deep Rabbithole

Amanda Hinchman- ...
Senior Android Developer
SpotOn

Jobs

Conclusion

Flow cancellation is a powerful feature that, when used correctly, can significantly improve your app’s performance and user experience. Here’s a quick recap:

  • Job cancellation for lifecycle management
  • withTimeout for time-bounded operations
  • takeWhile for condition-based stopping
  • take for count-based limiting
  • cancellable() for making non-cooperative flows responsive

Remember, the key to effective flow cancellation is understanding your use case and choosing the right technique. Don’t overthink it — start with the simplest approach that solves your problem and optimize from there.

Happy flowing! 🌊

What’s your favorite flow cancellation technique? Have you encountered any interesting edge cases? Drop a comment below and let’s discuss! 💬

 

This article was previously published on proandroiddev.com.

Menu