Blog Infos
Author
Published
Topics
Published

Hello Kotlin Developers, let’s implement / operator for Flow. This operator is similar to of or of . You can check the documents below for more details.

is used when we have multiple s, we all of them, as soon as one of emits a //, it becomes the “winner”, others are canceled and the result will forward all events (including and events) from the “winner”.

race (source: https://rxjs.dev/api/index/function/race)race (source: https://rxjs.dev/api/index/function/race)

https://rxjs.dev/api/index/function/race

Returns an observable that mirrors the first source observable to emit an item.

https://reactivex.io/documentation/operators/amb.html

When you pass a number of source Observables to Amb, it will pass through the emissions and notifications of exactly one of these Observables: the first one that sends a notification to Amb, either by emitting an item or sending an or notification. Amb will ignore and discard the emissions and notifications of all of the other source Observables.

Use and example

This operator is useful when you have multiple resources that provide values, for example, API resources but due to network conditions, the latency is unpredictable and varies significantly. In this case, we want to get the “fastest” value and ignore the “slower” values.

Here is an example:

@ExperimentalCoroutinesApi
public fun <T> race(flows: Iterable<Flow<T>>): Flow<T> = TODO("Will implement")
fun firstSource(): Flow<String> = flow {
delay(400)
repeat(10) {
emit("[1] - $it")
delay(100)
}
}
fun secondSource(): Flow<String> = flow {
delay(100)
repeat(10) {
emit("[2] - $it")
delay(100)
}
}
fun thirdSource(): Flow<String> = flow {
delay(500)
repeat(10) {
emit("[3] - $it")
delay(100)
}
}
suspend fun main() {
race(
firstSource(),
secondSource(),
thirdSource()
).collect { println(it) }
// CONSOLE
// [2] - 0
// [2] - 1
// [2] - 2
// [2] - 3
// [2] - 4
// [2] - 5
// [2] - 6
// [2] - 7
// [2] - 8
// [2] - 9
}
view raw race_example.kt hosted with ❤ by GitHub

Only values from the second are emitted since it starts emitting first.

Implementation

works in the following way:

  1. Collect to all source s
  2. When a new event arrives from a source , pass it down to a collector.
  3. Cancel all other s.
  4. Forward all events from the winner .

Here is the implementation of

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ChannelResult
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.onSuccess
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.yield
@ExperimentalCoroutinesApi
public fun <T> race(flows: Iterable<Flow<T>>): Flow<T> = flow {
coroutineScope {
// 1. Collect to all source Flows
val channels = flows.map { flow ->
// Produce the values using the default (rendezvous) channel
produce {
flow.collect {
send(it)
yield() // Emulate fairness, giving each flow chance to emit
}
}
}
// If channels List is empty, just return and complete result Flow.
if (channels.isEmpty()) {
return@coroutineScope
}
// If channels List has single element, just forward all events from it.
channels
.singleOrNull()
?.let { return@coroutineScope emitAll(it) }
// 2. When a new event arrives from a source Flow, pass it down to a collector.
// Select expression makes it possible to await multiple suspending functions simultaneously
// and select the first one that becomes available.
val (winnerIndex, winnerResult) = select<Pair<Int, ChannelResult<T>>> {
channels.forEachIndexed { index, channel ->
channel.onReceiveCatching {
index to it
}
}
}
// 3. Cancel all other Flows.
channels.forEachIndexed { index, channel ->
if (index != winnerIndex) {
channel.cancel()
}
}
// 4. Forward all events from the winner Flow .
winnerResult
.onSuccess {
emit(it)
emitAll(channels[winnerIndex])
}
.onFailure {
it?.let { throw it }
}
}
}
view raw race.kt hosted with ❤ by GitHub

By using “select expression”, we can select the first event that becomes available from s (https://kotlinlang.org/docs/select-expression.html#selecting-from-channels).

Bonus, we can add a variant function that accepts variable arguments and as an extension function on

@ExperimentalCoroutinesApi
public fun <T> race(flow1: Flow<T>, flow2: Flow<T>, vararg flows: Flow<T>): Flow<T> =
race(
buildList(capacity = 2 + flows.size) {
add(flow1)
add(flow2)
addAll(flows)
}
)
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.raceWith(flow: Flow<T>, vararg flows: Flow<T>): Flow<T> = race(
buildList(capacity = 2 + flows.size) {
add(this@raceWith)
add(flow)
addAll(flows)
}
)
view raw race_with.kt hosted with ❤ by GitHub

Job Offers

Job Offers


    Mobile Developer Android (m/w/d) Firebase

    REWE digital
    Köln / Cologne
    • Full Time
    apply now

    Senior Android Engineer (m/f/d)

    Flink
    Berlin
    • Full Time
    apply now

    Android Manager (all genders)

    Babbel
    Berlin
    • Full Time
    apply now
Load more listings

OUR VIDEO RECOMMENDATION

, ,

The Evolution of Android Graphics in Android 12/13

Android 12 and 13 both added significant new capabilities to Android platform graphics, including RenderEffect, RuntimeShader, and more. At the same time, RenderScript has been deprecated and we’ve introduced the RenderScript Intrinsics Replacement Toolkit. This…
Watch Video

The Evolution of Android Graphics in Android 12/13

Daniel Galpin
Android Developer Advocate and Fast Talking YouTuber
Google

The Evolution of Android Graphics in Android 12/13

Daniel Galpin
Android Developer Ad ...
Google

The Evolution of Android Graphics in Android 12/13

Daniel Galpin
Android Developer Advocat ...
Google

Jobs

Conclusion

We have already implemented operator. Using and “select expression” make the backpressure and concurrency problem a lot easier.

You can find full implementation and others operators here https://github.com/hoc081098/FlowExt, with the entire source code, the documentation, and setup info to add the library to your project.

FlowExt is a Kotlin Multiplatform library, that provides many operators and extensions to Kotlin Coroutines Flow.

Thanks for reading ❤. If you like my article, please follow me on Medium, Github and Twitter.

This article was originally published on proandroiddev.com on March 24, 2022

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
Just like my previous article about coroutines, this is an attempt to group most…
READ MORE
blog
I’m not going to explain how important Clean Architecture or Hilt are, there are…
READ MORE

Leave a Reply

Your email address will not be published.

Fill out this field
Fill out this field
Please enter a valid email address.

Menu