Blog Infos
Author
Published
Topics
Published
Topics

A guide about When/Why/How to create callbackFlow . It explains how to convert Firebase RealtimeDatabase callbacks to callbackFlow.

Photo by Luis Tosta on Unsplash

 

Content
  • Overview of Flow and channelFlow
  • callbackFlow characteristics
  • Why/When to use callbackFlow ?
  • callbackFlow UnderTheHood
  • Firebase Setup and Dependencies — ( this step is just in case if you want to try Firebase realtime database on your own using provided github project, If not you can skip this step and go to next step )
  • Firebase RealtimeDatabase callbacks
  • Converting RealtimeDatabase callbacks into callbackFlow
  • How to collect callbackFlow ?
  • Github project
Overview of Flow and channelFlow
  • Flow is the basic stream of data being emitted then collected and so on one after the other in a sequence. Once emitting lambda is completed Flow does not emit any more value and it is closed
  • channelFlow is similar to Flow but with buffer and buffer size of 64, it allows multiple values emitting before each of the emitted value is collected. It also behave similar way like Flow in its default usage i.e it stops emitting values once it goes out of the scope of the lambda, But in channelFlow still we can stop closure of lambda using awaitClose api inside lambda and send values later with predefined method. But this is not the ideal choice for the cases where we want to collect data from any callbacks keeping Flow alive and be able to send data later in time.
callbackFlow Characteristics
Why/When to use callbackFlow?
  • Reading Network Status on Device — (multi-shot)
  • Reading Device Location Data — (multi-shot)
  • Handling any Third Party library callbacks which is providing data real time e.g Firebase callbacks — (multi-shot)
callbackFlow UnderTheHood
public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
private class CallbackFlowBuilder<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) {
override suspend fun collectTo(scope: ProducerScope<T>) {
super.collectTo(scope)
/*
* We expect user either call `awaitClose` from within a block (then the channel is closed at this moment)
* or being closed/cancelled externally/manually. Otherwise "user forgot to call
* awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected.
*/
if (!scope.isClosedForSend) {
throw IllegalStateException(
"""
'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.
See callbackFlow API documentation for the details.
""".trimIndent()
)
}
}
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
CallbackFlowBuilder(block, context, capacity, onBufferOverflow)
}
  1. callbackFlow uses CallbackFlowBuilder class to create
  2. CallbackFlowBuilder class is overriding ChannelFlowBuilder, that means it’s built on top of channelFlow .
  3. callbackFlow requires to provide awaitClose block inside in order to ensure flow is alive and active and be able to unregister listeners inside awaitClose block of code to make sure callbackFlow unsubscribe to listeners when its canceled/closed otherwise listener may leak.
  4. callbackFlow will raise IllegalStateException if we will not provide awaitClose , so it ensures awaitClose block exists unlike channelFlow.
Firebase Setup and Dependencies
  1. Go to Firebase Console and Create a Firebase project
  2. Register your Android app with the Firebase project using guidelines.
  3. After registering your Android app with Firebase project you will be able to download configuration file google-services.json , Add that file into your app level root directory
  4. To enable Google services add following Google services plugin in your root-level build.gradle file as build script dependency
buildscript {
///.....
repositories {
// Make sure that you have the following two repositories
google() // Google's Maven repository
mavenCentral() // Maven Central repository
}
dependencies {
// Add the dependency for the Google services Gradle plugin
classpath 'com.google.gms:google-services:4.3.15'
}
}

5. In app level build.gradle add Google services plugin

plugins {
id 'com.android.application'
// Add the Google services Gradle plugin
id 'com.google.gms.google-services'
}

Job Offers

Job Offers

There are currently no vacancies.

OUR VIDEO RECOMMENDATION

, ,

Mastering Flutter + Firebase: Architecting for Scale, Security & Savings

Discover the key principles and strategies for building scalable, secure Flutter apps with Firebase, while keeping costs in check! Join us for an eye-opening session where we’ll share actionable tips and best practices for architecting…
Watch Video

Mastering Flutter + Firebase: Architecting for Scale, Security & Savings

Tomas Piaggio & Bettina Carrizo
Director of Engineering & Senior Flutter Engineer
Very Good Ventures

Mastering Flutter + Firebase: Architecting for Scale, Security & Savings

Tomas Piaggio & Be ...
Director of Engineer ...
Very Good Ventures

Mastering Flutter + Firebase: Architecting for Scale, Security & Savings

Tomas Piaggio & ...
Director of Engineering & ...
Very Good Ventures

Jobs

6. Add Firebase SDK to your app level build.gradle

// Import the Firebase BoM
implementation platform('com.google.firebase:firebase-bom:31.5.0')
// Add the dependency for the Firebase SDK for Google Analytics
implementation 'com.google.firebase:firebase-database-ktx'
Firebase RealtimeDatabase Callbacks
// Create Database Object
val database = FirebaseDatabase.getInstance("<database path>")
val databaseRef = database.reference
// Read from the database
databaseRef.addValueEventListener(object : ValueEventListener {
override fun onDataChange(dataSnapshot: DataSnapshot) {
// This method is called once with the initial value and again
// whenever data at this location is updated.
val value = dataSnapshot.value
Log.d(TAG, "Value is: $value")
}
override fun onCancelled(error: DatabaseError) {
// Failed to read value
Log.w(TAG, "Failed to read value.", error.toException())
}
})
Converting RealtimeDatabase callbacks into callbackFlow
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.0-RC'

And the code looks like below

fun readFirebaseRealtimeDatabaseFlow(): Flow<String> = callbackFlow {
val database = FirebaseDatabase.getInstance("<path of realtime database>")
val databaseRef = database.reference
val firebaseDataListeners = object : ValueEventListener {
override fun onDataChange(dataSnapshot: DataSnapshot) {
// This method is called once with the initial value and again
// whenever data at this location is updated.
val value = dataSnapshot.value
Log.d(ContentValues.TAG, "Value is: $value")
trySend(value?.toString().orEmpty())
}
override fun onCancelled(error: DatabaseError) {
// Failed to read value
Log.w(ContentValues.TAG, "Failed to read value.", error.toException())
}
}
databaseRef.addValueEventListener(firebaseDataListeners)
awaitClose {
databaseRef.removeEventListener(firebaseDataListeners)
}
}
  • Using function return type as Flow<String> .
  • callbackFlow return block, inside block creating database reference and registering it to the database reference to listen for updates in callbacks.
  • trySend(value?.toString().orEmpty()) statement inside onDataChange callback will send value to the collector of callbackFlow .
  • awaitClose taking a block of code to unregister event listener when callbackFlow gets canceled.
How to Collect callbackFlow?
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.RESUMED) {
FirebaseDatabaseCallbackFlow.readFirebaseRealtimeDatabaseFlow()
.catch {
Log.d(TAG,"exception $it")
}.collect {
Log.d(TAG, "read value $it")
}
}
}
Sources
Github Project

GitHub – saqib-github-commits/RealtimeDatabaseCallbackFlow

— — — — — — — — — — —

This article was previously published on proandroiddev.com

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
Nowadays authentication has become common in almost all apps. And many of us know…
READ MORE
blog
Yes! You heard it right. We’ll try to understand the complete OTP (one time…
READ MORE
blog
Firebase Remote Config is a cloud service that lets you change the behavior and…
READ MORE
blog
This series of articles will explain how to generate a code coverage report for…
READ MORE
Menu