A guide about When/Why/How to create callbackFlow
. It explains how to convert Firebase RealtimeDatabase
callbacks to callbackFlow
.
Photo by Luis Tosta on Unsplash
This story will take Firebase RealtimeDatabase
as an example to show usage of callbackFlow
with a real stream of data. At the end of the story we can apply this knowledge to convert any library callbacks to Kotlin Flows using callbackFlow.
Lets see overview of page content
Content
- Overview of
Flow
andchannelFlow
callbackFlow
characteristics- Why/When to use
callbackFlow
? callbackFlow
UnderTheHood- Firebase Setup and Dependencies — ( this step is just in case if you want to try Firebase realtime database on your own using provided github project, If not you can skip this step and go to next step )
- Firebase
RealtimeDatabase
callbacks - Converting
RealtimeDatabase
callbacks intocallbackFlow
- How to collect
callbackFlow
? - Github project
Overview of Flow and channelFlow
The story expects basic understanding of Flow
, channelFlow
and callbackFlow
.
Quick overview of Flow
and channelFlow
to better understand callbackFlow
and its usage.
Flow
is the basic stream of data being emitted then collected and so on one after the other in a sequence. Once emitting lambda is completedFlow
does not emit any more value and it is closedchannelFlow
is similar toFlow
but with buffer and buffer size of 64, it allows multiple values emitting before each of the emitted value is collected. It also behave similar way likeFlow
in its default usage i.e it stops emitting values once it goes out of the scope of the lambda, But inchannelFlow
still we can stop closure of lambda usingawaitClose
api inside lambda and send values later with predefined method. But this is not the ideal choice for the cases where we want to collect data from any callbacks keepingFlow
alive and be able to send data later in time.
callbackFlow Characteristics
callbackFlow
ensures that awaitClose
is called inside lambda(awaitClose
is mandatory in callbackFlow
unlike channelFlow
) keeping Flow
alive, to send data any point later and to be able to unregister any listeners when its canceled.
Why/When to use callbackFlow?
If you want to read a single shot of data then you might not need callbackFlow
, a single shot you can still get from normal Flow
or coroutines
and does not need to implement callbackFlow
for that.
callbackFlow
is an ideal choice where we want to read data periodically from any callbacks E.g following are applications (but not limited to) of callbackFlow
- Reading Network Status on Device — (multi-shot)
- Reading Device Location Data — (multi-shot)
- Handling any Third Party library callbacks which is providing data real time e.g Firebase callbacks — (multi-shot)
callbackFlow UnderTheHood
Let’s look how callbackFlow
is implemented.
public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
private class CallbackFlowBuilder<T>( | |
private val block: suspend ProducerScope<T>.() -> Unit, | |
context: CoroutineContext = EmptyCoroutineContext, | |
capacity: Int = BUFFERED, | |
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND | |
) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) { | |
override suspend fun collectTo(scope: ProducerScope<T>) { | |
super.collectTo(scope) | |
/* | |
* We expect user either call `awaitClose` from within a block (then the channel is closed at this moment) | |
* or being closed/cancelled externally/manually. Otherwise "user forgot to call | |
* awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected. | |
*/ | |
if (!scope.isClosedForSend) { | |
throw IllegalStateException( | |
""" | |
'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. | |
Otherwise, a callback/listener may leak in case of external cancellation. | |
See callbackFlow API documentation for the details. | |
""".trimIndent() | |
) | |
} | |
} | |
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = | |
CallbackFlowBuilder(block, context, capacity, onBufferOverflow) | |
} |
Going over important points below.
callbackFlow
usesCallbackFlowBuilder
class to createCallbackFlowBuilder
class is overridingChannelFlowBuilder
, that means it’s built on top ofchannelFlow
.callbackFlow
requires to provideawaitClose
block inside in order to ensure flow is alive and active and be able to unregister listeners insideawaitClose
block of code to make surecallbackFlow
unsubscribe to listeners when its canceled/closed otherwise listener may leak.callbackFlow
will raiseIllegalStateException
if we will not provideawaitClose
, so it ensuresawaitClose
block exists unlikechannelFlow.
Firebase Setup and Dependencies
In order to try on your own end, you would need to set up a Firebase account, if you don’t want to try now you can skip to the next part to see the code examples, otherwise do the following steps.
- Go to Firebase Console and Create a Firebase project
- Register your Android app with the Firebase project using guidelines.
- After registering your Android app with Firebase project you will be able to download configuration file
google-services.json
, Add that file into your app level root directory - To enable Google services add following Google services plugin in your
root-level
build.gradle
file as build script dependency
buildscript { | |
///..... | |
repositories { | |
// Make sure that you have the following two repositories | |
google() // Google's Maven repository | |
mavenCentral() // Maven Central repository | |
} | |
dependencies { | |
// Add the dependency for the Google services Gradle plugin | |
classpath 'com.google.gms:google-services:4.3.15' | |
} | |
} |
5. In app level build.gradle
add Google services plugin
plugins { | |
id 'com.android.application' | |
// Add the Google services Gradle plugin | |
id 'com.google.gms.google-services' | |
} |
Job Offers
6. Add Firebase SDK to your app level build.gradle
// Import the Firebase BoM | |
implementation platform('com.google.firebase:firebase-bom:31.5.0') | |
// Add the dependency for the Firebase SDK for Google Analytics | |
implementation 'com.google.firebase:firebase-database-ktx' |
7. Create RealtimeDatabase
in your Firebase console project. After creating you will get the database path, We will need that path to create connection with RealtimeDatabase
Firebase RealtimeDatabase Callbacks
Firebase RealtimeDatabase
provides listener callbacks to provide data in real time. We will register for updates for real time data as shown in code below.
Let’s look at what callbacks RealtimeDatabase
provides for reading data from database.
// Create Database Object | |
val database = FirebaseDatabase.getInstance("<database path>") | |
val databaseRef = database.reference | |
// Read from the database | |
databaseRef.addValueEventListener(object : ValueEventListener { | |
override fun onDataChange(dataSnapshot: DataSnapshot) { | |
// This method is called once with the initial value and again | |
// whenever data at this location is updated. | |
val value = dataSnapshot.value | |
Log.d(TAG, "Value is: $value") | |
} | |
override fun onCancelled(error: DatabaseError) { | |
// Failed to read value | |
Log.w(TAG, "Failed to read value.", error.toException()) | |
} | |
}) |
In above code we are creating FirebaseDatabase
instance and then registering ValueEventListener
to read data updates from the database in the onDataChange
callback which provides dataSnapShot
of the time. It provides the first shot of data as soon as we register ValueEventListener
and later on it provides updates as soon as data gets updated/added/deleted. So the callbacks can get called any point in time in real cases.
Converting RealtimeDatabase
callbacks into callbackFlow
We want to convert these callbacks to Kotlin callbackFlow
. Lets see the code below which does that and go through it afterwards. In order to use coroutine Flows you need to add the following dependency.
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.0-RC'
And the code looks like below
fun readFirebaseRealtimeDatabaseFlow(): Flow<String> = callbackFlow { | |
val database = FirebaseDatabase.getInstance("<path of realtime database>") | |
val databaseRef = database.reference | |
val firebaseDataListeners = object : ValueEventListener { | |
override fun onDataChange(dataSnapshot: DataSnapshot) { | |
// This method is called once with the initial value and again | |
// whenever data at this location is updated. | |
val value = dataSnapshot.value | |
Log.d(ContentValues.TAG, "Value is: $value") | |
trySend(value?.toString().orEmpty()) | |
} | |
override fun onCancelled(error: DatabaseError) { | |
// Failed to read value | |
Log.w(ContentValues.TAG, "Failed to read value.", error.toException()) | |
} | |
} | |
databaseRef.addValueEventListener(firebaseDataListeners) | |
awaitClose { | |
databaseRef.removeEventListener(firebaseDataListeners) | |
} | |
} |
Lets see what changes we did to convert the callbacks into callbackFlow
- Using function return type as
Flow<String>
. callbackFlow
return block, inside block creating database reference and registering it to the database reference to listen for updates in callbacks.trySend(value?.toString().orEmpty())
statement insideonDataChange
callback will send value to the collector ofcallbackFlow
.awaitClose
taking a block of code to unregister event listener whencallbackFlow
gets canceled.
How to Collect callbackFlow?
Collection of callbackFlow
must be done in life-cycle-aware
manner but because we can still get data in those callbacks when the app is in the background or if a user moves out of the screen and we are still listening for those flows accidentally.
In general all flows must be collected in a life-cycle-aware manner which is a recommended approach.
Below code shows collecting callbackFlow
when view gets into Resumed
state otherwise it will stop collecting.
lifecycleScope.launch { | |
repeatOnLifecycle(Lifecycle.State.RESUMED) { | |
FirebaseDatabaseCallbackFlow.readFirebaseRealtimeDatabaseFlow() | |
.catch { | |
Log.d(TAG,"exception $it") | |
}.collect { | |
Log.d(TAG, "read value $it") | |
} | |
} | |
} |
Sources
Github Project
GitHub – saqib-github-commits/RealtimeDatabaseCallbackFlow
Thanks for reading and looking forward to the feedback.
👏 if you liked it and follow for more stories 🙂
— — — — — — — — — — —
This article was previously published on proandroiddev.com