A guide about When/Why/How to create callbackFlow . It explains how to convert Firebase RealtimeDatabase callbacks to callbackFlow.
Photo by Luis Tosta on Unsplash
This story will take Firebase RealtimeDatabaseas an example to show usage of callbackFlow with a real stream of data. At the end of the story we can apply this knowledge to convert any library callbacks to Kotlin Flows using callbackFlow.
Lets see overview of page content
Content
- Overview of
FlowandchannelFlow callbackFlowcharacteristics- Why/When to use
callbackFlow? callbackFlowUnderTheHood- Firebase Setup and Dependencies — ( this step is just in case if you want to try Firebase realtime database on your own using provided github project, If not you can skip this step and go to next step )
- Firebase
RealtimeDatabasecallbacks - Converting
RealtimeDatabasecallbacks intocallbackFlow - How to collect
callbackFlow? - Github project
Overview of Flow and channelFlow
The story expects basic understanding of Flow, channelFlow and callbackFlow.
Quick overview of Flow and channelFlow to better understand callbackFlow and its usage.
Flowis the basic stream of data being emitted then collected and so on one after the other in a sequence. Once emitting lambda is completedFlowdoes not emit any more value and it is closedchannelFlowis similar toFlowbut with buffer and buffer size of 64, it allows multiple values emitting before each of the emitted value is collected. It also behave similar way likeFlowin its default usage i.e it stops emitting values once it goes out of the scope of the lambda, But inchannelFlowstill we can stop closure of lambda usingawaitCloseapi inside lambda and send values later with predefined method. But this is not the ideal choice for the cases where we want to collect data from any callbacks keepingFlowalive and be able to send data later in time.
callbackFlow Characteristics
callbackFlow ensures that awaitClose is called inside lambda(awaitClose is mandatory in callbackFlow unlike channelFlow ) keeping Flow alive, to send data any point later and to be able to unregister any listeners when its canceled.
Why/When to use callbackFlow?
If you want to read a single shot of data then you might not need callbackFlow , a single shot you can still get from normal Flow or coroutines and does not need to implement callbackFlow for that.
callbackFlow is an ideal choice where we want to read data periodically from any callbacks E.g following are applications (but not limited to) of callbackFlow
- Reading Network Status on Device — (multi-shot)
- Reading Device Location Data — (multi-shot)
- Handling any Third Party library callbacks which is providing data real time e.g Firebase callbacks — (multi-shot)
callbackFlow UnderTheHood
Let’s look how callbackFlow is implemented.
public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
| private class CallbackFlowBuilder<T>( | |
| private val block: suspend ProducerScope<T>.() -> Unit, | |
| context: CoroutineContext = EmptyCoroutineContext, | |
| capacity: Int = BUFFERED, | |
| onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND | |
| ) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) { | |
| override suspend fun collectTo(scope: ProducerScope<T>) { | |
| super.collectTo(scope) | |
| /* | |
| * We expect user either call `awaitClose` from within a block (then the channel is closed at this moment) | |
| * or being closed/cancelled externally/manually. Otherwise "user forgot to call | |
| * awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected. | |
| */ | |
| if (!scope.isClosedForSend) { | |
| throw IllegalStateException( | |
| """ | |
| 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. | |
| Otherwise, a callback/listener may leak in case of external cancellation. | |
| See callbackFlow API documentation for the details. | |
| """.trimIndent() | |
| ) | |
| } | |
| } | |
| override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = | |
| CallbackFlowBuilder(block, context, capacity, onBufferOverflow) | |
| } |
Going over important points below.
callbackFlowusesCallbackFlowBuilderclass to createCallbackFlowBuilderclass is overridingChannelFlowBuilder, that means it’s built on top ofchannelFlow.callbackFlowrequires to provideawaitCloseblock inside in order to ensure flow is alive and active and be able to unregister listeners insideawaitCloseblock of code to make surecallbackFlowunsubscribe to listeners when its canceled/closed otherwise listener may leak.callbackFlowwill raiseIllegalStateExceptionif we will not provideawaitClose, so it ensuresawaitCloseblock exists unlikechannelFlow.
Firebase Setup and Dependencies
In order to try on your own end, you would need to set up a Firebase account, if you don’t want to try now you can skip to the next part to see the code examples, otherwise do the following steps.
- Go to Firebase Console and Create a Firebase project
- Register your Android app with the Firebase project using guidelines.
- After registering your Android app with Firebase project you will be able to download configuration file
google-services.json, Add that file into your app level root directory - To enable Google services add following Google services plugin in your
root-levelbuild.gradlefile as build script dependency
| buildscript { | |
| ///..... | |
| repositories { | |
| // Make sure that you have the following two repositories | |
| google() // Google's Maven repository | |
| mavenCentral() // Maven Central repository | |
| } | |
| dependencies { | |
| // Add the dependency for the Google services Gradle plugin | |
| classpath 'com.google.gms:google-services:4.3.15' | |
| } | |
| } |
5. In app level build.gradle add Google services plugin
| plugins { | |
| id 'com.android.application' | |
| // Add the Google services Gradle plugin | |
| id 'com.google.gms.google-services' | |
| } |
Job Offers
6. Add Firebase SDK to your app level build.gradle
| // Import the Firebase BoM | |
| implementation platform('com.google.firebase:firebase-bom:31.5.0') | |
| // Add the dependency for the Firebase SDK for Google Analytics | |
| implementation 'com.google.firebase:firebase-database-ktx' |
7. Create RealtimeDatabase in your Firebase console project. After creating you will get the database path, We will need that path to create connection with RealtimeDatabase
Firebase RealtimeDatabase Callbacks
Firebase RealtimeDatabase provides listener callbacks to provide data in real time. We will register for updates for real time data as shown in code below.
Let’s look at what callbacks RealtimeDatabase provides for reading data from database.
| // Create Database Object | |
| val database = FirebaseDatabase.getInstance("<database path>") | |
| val databaseRef = database.reference | |
| // Read from the database | |
| databaseRef.addValueEventListener(object : ValueEventListener { | |
| override fun onDataChange(dataSnapshot: DataSnapshot) { | |
| // This method is called once with the initial value and again | |
| // whenever data at this location is updated. | |
| val value = dataSnapshot.value | |
| Log.d(TAG, "Value is: $value") | |
| } | |
| override fun onCancelled(error: DatabaseError) { | |
| // Failed to read value | |
| Log.w(TAG, "Failed to read value.", error.toException()) | |
| } | |
| }) |
In above code we are creating FirebaseDatabase instance and then registering ValueEventListener to read data updates from the database in the onDataChange callback which provides dataSnapShot of the time. It provides the first shot of data as soon as we register ValueEventListener and later on it provides updates as soon as data gets updated/added/deleted. So the callbacks can get called any point in time in real cases.
Converting RealtimeDatabase callbacks into callbackFlow
We want to convert these callbacks to Kotlin callbackFlow. Lets see the code below which does that and go through it afterwards. In order to use coroutine Flows you need to add the following dependency.
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.0-RC'
And the code looks like below
| fun readFirebaseRealtimeDatabaseFlow(): Flow<String> = callbackFlow { | |
| val database = FirebaseDatabase.getInstance("<path of realtime database>") | |
| val databaseRef = database.reference | |
| val firebaseDataListeners = object : ValueEventListener { | |
| override fun onDataChange(dataSnapshot: DataSnapshot) { | |
| // This method is called once with the initial value and again | |
| // whenever data at this location is updated. | |
| val value = dataSnapshot.value | |
| Log.d(ContentValues.TAG, "Value is: $value") | |
| trySend(value?.toString().orEmpty()) | |
| } | |
| override fun onCancelled(error: DatabaseError) { | |
| // Failed to read value | |
| Log.w(ContentValues.TAG, "Failed to read value.", error.toException()) | |
| } | |
| } | |
| databaseRef.addValueEventListener(firebaseDataListeners) | |
| awaitClose { | |
| databaseRef.removeEventListener(firebaseDataListeners) | |
| } | |
| } |
Lets see what changes we did to convert the callbacks into callbackFlow
- Using function return type as
Flow<String>. callbackFlowreturn block, inside block creating database reference and registering it to the database reference to listen for updates in callbacks.trySend(value?.toString().orEmpty())statement insideonDataChangecallback will send value to the collector ofcallbackFlow.awaitClosetaking a block of code to unregister event listener whencallbackFlowgets canceled.
How to Collect callbackFlow?
Collection of callbackFlow must be done in life-cycle-aware manner but because we can still get data in those callbacks when the app is in the background or if a user moves out of the screen and we are still listening for those flows accidentally.
In general all flows must be collected in a life-cycle-aware manner which is a recommended approach.
Below code shows collecting callbackFlow when view gets into Resumed state otherwise it will stop collecting.
| lifecycleScope.launch { | |
| repeatOnLifecycle(Lifecycle.State.RESUMED) { | |
| FirebaseDatabaseCallbackFlow.readFirebaseRealtimeDatabaseFlow() | |
| .catch { | |
| Log.d(TAG,"exception $it") | |
| }.collect { | |
| Log.d(TAG, "read value $it") | |
| } | |
| } | |
| } |
Sources
Github Project
GitHub – saqib-github-commits/RealtimeDatabaseCallbackFlow
Thanks for reading and looking forward to the feedback.
👏 if you liked it and follow for more stories 🙂
— — — — — — — — — — —
This article was previously published on proandroiddev.com



