Blog Infos
Author
Published
Topics
Published

Hello Kotlin Developers, let’s implement / operator for Flow. This operator is similar to of or of . You can check the documents below for more details.

is used when we have multiple s, we all of them, as soon as one of emits a //, it becomes the “winner”, others are canceled and the result will forward all events (including and events) from the “winner”.

race (source: https://rxjs.dev/api/index/function/race)race (source: https://rxjs.dev/api/index/function/race)

https://rxjs.dev/api/index/function/race

Returns an observable that mirrors the first source observable to emit an item.

https://reactivex.io/documentation/operators/amb.html

When you pass a number of source Observables to Amb, it will pass through the emissions and notifications of exactly one of these Observables: the first one that sends a notification to Amb, either by emitting an item or sending an or notification. Amb will ignore and discard the emissions and notifications of all of the other source Observables.

Use and example

This operator is useful when you have multiple resources that provide values, for example, API resources but due to network conditions, the latency is unpredictable and varies significantly. In this case, we want to get the “fastest” value and ignore the “slower” values.

Here is an example:

@ExperimentalCoroutinesApi
public fun <T> race(flows: Iterable<Flow<T>>): Flow<T> = TODO("Will implement")
fun firstSource(): Flow<String> = flow {
delay(400)
repeat(10) {
emit("[1] - $it")
delay(100)
}
}
fun secondSource(): Flow<String> = flow {
delay(100)
repeat(10) {
emit("[2] - $it")
delay(100)
}
}
fun thirdSource(): Flow<String> = flow {
delay(500)
repeat(10) {
emit("[3] - $it")
delay(100)
}
}
suspend fun main() {
race(
firstSource(),
secondSource(),
thirdSource()
).collect { println(it) }
// CONSOLE
// [2] - 0
// [2] - 1
// [2] - 2
// [2] - 3
// [2] - 4
// [2] - 5
// [2] - 6
// [2] - 7
// [2] - 8
// [2] - 9
}
view raw race_example.kt hosted with ❤ by GitHub

Only values from the second are emitted since it starts emitting first.

Implementation

works in the following way:

  1. Collect to all source s
  2. When a new event arrives from a source , pass it down to a collector.
  3. Cancel all other s.
  4. Forward all events from the winner .

Here is the implementation of

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ChannelResult
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.onSuccess
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.yield
@ExperimentalCoroutinesApi
public fun <T> race(flows: Iterable<Flow<T>>): Flow<T> = flow {
coroutineScope {
// 1. Collect to all source Flows
val channels = flows.map { flow ->
// Produce the values using the default (rendezvous) channel
produce {
flow.collect {
send(it)
yield() // Emulate fairness, giving each flow chance to emit
}
}
}
// If channels List is empty, just return and complete result Flow.
if (channels.isEmpty()) {
return@coroutineScope
}
// If channels List has single element, just forward all events from it.
channels
.singleOrNull()
?.let { return@coroutineScope emitAll(it) }
// 2. When a new event arrives from a source Flow, pass it down to a collector.
// Select expression makes it possible to await multiple suspending functions simultaneously
// and select the first one that becomes available.
val (winnerIndex, winnerResult) = select<Pair<Int, ChannelResult<T>>> {
channels.forEachIndexed { index, channel ->
channel.onReceiveCatching {
index to it
}
}
}
// 3. Cancel all other Flows.
channels.forEachIndexed { index, channel ->
if (index != winnerIndex) {
channel.cancel()
}
}
// 4. Forward all events from the winner Flow .
winnerResult
.onSuccess {
emit(it)
emitAll(channels[winnerIndex])
}
.onFailure {
it?.let { throw it }
}
}
}
view raw race.kt hosted with ❤ by GitHub

By using “select expression”, we can select the first event that becomes available from s (https://kotlinlang.org/docs/select-expression.html#selecting-from-channels).

Bonus, we can add a variant function that accepts variable arguments and as an extension function on

@ExperimentalCoroutinesApi
public fun <T> race(flow1: Flow<T>, flow2: Flow<T>, vararg flows: Flow<T>): Flow<T> =
race(
buildList(capacity = 2 + flows.size) {
add(flow1)
add(flow2)
addAll(flows)
}
)
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.raceWith(flow: Flow<T>, vararg flows: Flow<T>): Flow<T> = race(
buildList(capacity = 2 + flows.size) {
add(this@raceWith)
add(flow)
addAll(flows)
}
)
view raw race_with.kt hosted with ❤ by GitHub

Job Offers

Job Offers

There are currently no vacancies.

OUR VIDEO RECOMMENDATION

,

Tests unitaires avec KotlinFlow

Lorsque nous développons une fonctionnalité, nous devons écrire les tests unitaires. C’est une partie essentielle du développement. Cela assure le bon fonctionnement du code lors de futurs changements ou refactorisations. Kotlin Flow ne fait pas…
Watch Video

Tests unitaires avec KotlinFlow

Florent Blot
Android Developer
Geev

Tests unitaires avec KotlinFlow

Florent Blot
Android Developer
Geev

Tests unitaires avec KotlinFlow

Florent Blot
Android Developer
Geev

Jobs

Conclusion

We have already implemented operator. Using and “select expression” make the backpressure and concurrency problem a lot easier.

You can find full implementation and others operators here https://github.com/hoc081098/FlowExt, with the entire source code, the documentation, and setup info to add the library to your project.

FlowExt is a Kotlin Multiplatform library, that provides many operators and extensions to Kotlin Coroutines Flow.

Thanks for reading ❤. If you like my article, please follow me on Medium, Github and Twitter.

This article was originally published on proandroiddev.com on March 24, 2022

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
Just like my previous article about coroutines, this is an attempt to group most…
READ MORE
blog
I’m not going to explain how important Clean Architecture or Hilt are, there are…
READ MORE
blog
I know there are plenty of articles about flow and channelFlow, often highlighting that…
READ MORE
blog
Inthe second part of this series of articles, we will continue discussing best practices…
READ MORE
Menu