Blog Infos
Author
Published
Topics
Published

Most of us use coroutines, but who knows what the coroutine creation process looks like? The structure of the blog post is below:

  1. Definitions
  2. CPS — Continuation Passing Style
  3. Kotlin coroutine principle
    3.1 Coroutine construction
    — 3.1.1 launch()
    — 3.1.2 start()
    — 3.1.3 invoke()
    — 3.1.4 startCoroutineCancellable()
    — 3.1.5 resumeWithCancellable()
    — 3.1.6 resumeWith()
    — 3.1.7 invokeSuspend()
    — 3.1.8 Summary of coroutine construction
    3.2 Bytecode analyses
1. Definitions
What is a Coroutine?

coroutine is an instance of suspendable computation. It is conceptually similar to a thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one.

In asynchronous programs, tasks are executed in parallel on separate threads without waiting for other tasks to complete. Improper use of multithreading can lead to high CPU usage or increased CPU cycles and can drastically reduce the performance of your application and therefore threads are an expensive resource. Coroutines are a lightweight alternative to threads.

What is suspend function?

Suspend function is a function that could be startedpaused, and resume. One of the most important points to remember about the suspend functions is that they are only allowed to be called from a coroutine or another suspend function.

When a coroutine is suspended, that thread is free for other coroutines. The continuation of the coroutine doesn’t have to be on the same thread. Here we conclude that we can simultaneously run many coroutines with a small number of threads. You will see below how it all works.

Suspending vs Non-suspending/regular function:
  • A suspension function has zero or more suspension points, while a regular function has zero suspension points. A suspension point represents a statement in its body that can pause the execution of a function to resume at a later time
  • Non-suspended functions cannot directly call suspending functions, because they do not support suspension points
  • Suspending functions can call non-suspended functions because they have 0 suspension points

This is a simple suspend function:

suspend fun functionA(): String {
return "hello"
}

As we said above, we cannot call a suspended function inside a regular function. Let’s decompile the functionA() to see what happens.

Go to Tools->Kotlin->Show Kotlin Bytecode.

@Nullable
public static final Object functionA(@NotNull Continuation $completion) {
return "hello";
}

 

You’re probably wondering where the Continuation argument comes from and what it means. Now we will explain where it comes from, and later we will show what it is. Each suspend function goes through a CPS-Continuation Passing Style transformation.

We can observe the suspension of the coroutine through the following example. Let’s imagine that we are playing a game and we want to pause (suspend) it and later continue playing where we left off. In that case, we archive the game, and when we want to continue, we can restore from the archive the last paused point. When the process is suspended, it will return Continuation.

2. CPS — Continuation Passing Style

In the CPS transformation suspend function with n parameters (p_1, p_2, p_3,… p_n) and result type T, they get an additional parameter p_n+1 which is of type Continuation<T> and the return type is Any?. The return type is changed to Any? because:

  • If function returns a result, we get T
  • If the function suspend, it return signal value COROUTINE_SUSPENDED which means that is a suspended state.

 

 

3. Kotlin coroutine principle

Let’s go through the coroutine process with an example, understand how the coroutine is created and explain the flow.

class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
lifecycleScope.launch {
val randomNum = getRandomNum() // suspension point #1
val sqrt = getSqrt(randomNum.toDouble()) // suspension point #2
log(sqrt.toString())
}
}
private suspend fun getRandomNum(): Int {
delay(1000)
return (1..1000).shuffled().first()
}
private suspend fun getSqrt(num: Double): Double {
delay(2000)
return sqrt(num)
}
private fun log(text: String) {
Log.i(this@MainActivity::class.simpleName, text)
}
}

In the example above we have two suspended functions. The getRandomNum() function takes a random number in the range from 1 to 1000, passes it to another suspended function getSqrt() that calculates its root.

Let’s set a breakpoint at line 6 and run the code in debug mode to pick up the stack traces of what happened before executing the body of the coroutine. With this we want to see what the coroutine creation process looks like.

This is a stack trace:

invokeSuspend:75, MainActivity$startCoroutine$1 (me.aleksandarzekovic.exploringcoroutines)
resumeWith:33, BaseContinuationImpl (kotlin.coroutines.jvm.internal)
resumeCancellableWith:266, DispatchedContinuationKt (kotlinx.coroutines.internal)
startCoroutineCancellable:30, CancellableKt (kotlinx.coroutines.intrinsics)
startCoroutineCancellable$default:25, CancellableKt (kotlinx.coroutines.intrinsics)
invoke:110, CoroutineStart (kotlinx.coroutines)
start:126, AbstractCoroutine (kotlinx.coroutines)
launch:56, BuildersKt__Builders_commonKt (kotlinx.coroutines)
launch:1, BuildersKt (kotlinx.coroutines)
launch$default:47, BuildersKt__Builders_commonKt (kotlinx.coroutines)
launch$default:1, BuildersKt (kotlinx.coroutines)
startCoroutine:75, MainActivity (me.aleksandarzekovic.exploringcoroutines)
onCreate:71, MainActivity (me.aleksandarzekovic.exploringcoroutines)
...

Execution order is:

 

 

3.1 Coroutine construction

3.1.1 launch()

Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a Job.

public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
CoroutineScope

Defines a scope for new coroutines. Every coroutine builder (like launchasync, etc.) is an extension on CoroutineScope and inherits its coroutineContext to automatically propagate all its elements and cancellation.

CoroutineScope is an interface with only one property coroutineContext.

public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

Note: In our example we used launch coroutine builder and we will explain creating a coroutine through it. Other coroutine builders are quite intuitive to this example.

The launch coroutine builder has three parameters:

  • context — additional to CoroutineScope.coroutineContext context of the coroutine
  • start —coroutine start option. The default value is CoroutineStart.DEFAULT
  • block — the coroutine code which will be invoked in the context of the provided scope. The compiler generates an inner class for the suspended lambda function at compile time that extends the SuspendLambda and implements Function2:
final class me/aleksandarzekovic/exploringcoroutines/MainActivity$onCreate$1
extends kotlin/coroutines/jvm/internal/SuspendLambda
implements kotlin/jvm/functions/Function2

That means that Kotlin generates an SuspendLambda anonymous inner class for each coroutine at compile time. This is coroutine body class. Two methods are implemented internally:

  • invokeSuspend() — contains code in the body of our coroutine, which internally handles the value of the state. The most important state change logic in the coroutine, which is called the state machine, is contained here.
  • create() — method receives a Continuation object, then creates and returns an object of the coroutine body class.
Coroutine state machine

Kotlin implements suspendable functions as state machines, since such implementation does not require specific runtime support. This dictates the explicit suspend marking (function colouring) of Kotlin coroutines: the compiler has to know which function can potentially suspend, to turn it into a state machine.

The states from the state machine correspond to the suspension points. In our example:

There are three states for this block of code:

  • L0: until suspension point #1
  • L1: until suspension point #2
  • L2: until end

 

 

The pseudo-code of the generated state machine with these three states looks like this:

// The initial state of the state machine
int label = 0
A a = null
B b = null
void resumeWith(Object result) {
if (label == 0) goto L0
if (label == 1) goto L1
if (label == 2) goto L2
else throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
L0:
// result is expected to be `null` at this invocation
label = 1
// 'this' is passed as a continuation
result = getRandomNum(this)
if (result == COROUTINE_SUSPENDED) return
L1:
A a = (A) result
label = 2
val result = getSqrt(a, this) // 'this' is passed as a continuation
if (result == COROUTINE_SUSPENDED) return
L2:
B b = (B) result
log(String.valueOf(b))
label = -1 // No more steps are allowed
return
}

Continuation state-machine implementation pseudo-code

 

The logic of the coroutine is encapsulated in the invokeSuspend method, which we mentioned earlier.
SuspendLambda the inheritance relationship is:

Continuation

Interface representing a continuation after a suspension point that returns a value of type T.

public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}

Continuations are so important because they allow the continuation of the coroutine. Every suspending function is associated with a generated Continuation subtype, which handles the suspension implementation.

  • context — the context of the coroutine that corresponds to this continuation
  • resumeWith() — is used to propagate the results in between suspension points. It is called with the result (or exception) of the last suspension point and resumes the coroutine execution
BaseContinuationImpl

The key source code of BaseContinuationImpl is:

internal abstract class BaseContinuationImpl (...) {
// Implement resumeWith of Continuation
// It is final and cannot be overridden!
public final override fun resumeWith (result: Result<Any?>) {
// ...
val outcome = invokeSuspend(param)
// ...
}
// For implementation
protected abstract fun invokeSuspend (result: Result<Any?>) : Any?
}

invokeSuspend() — are an abstract method which is implemented in coroutine body class created in compile time.

The implementation of the resumeWith() method always calls the invokeSuspend() method.

ContinuationImpl

ContinuationImpl inherits BaseContinuationImpl. Its function is to generate an DispatchedContinuation object using the interceptor, which is also a Continuation. We can talk more about this in section 3.2.4.

Let’s continue to analyze the body of the launch() function.

newCoroutineContext()

newCoroutineContext — Creates a context for a new coroutine. It installs Dispatchers.Default when no other dispatcher or ContinuationInterceptor is specified and adds optional support for debugging facilities (when turned on) and copyable-thread-local facilities on JVM.

The newCoroutineContext is an extension function on CoroutineScope. Its function is to merge the inherited context (scope context) from the CoroutineScope with the context of the passed parameter and return the new context.

Let’s briefly go over CoroutineContext.

CoroutineContext

CoroutineContext is an immutable indexed union set of Element instances like CoroutineName, CoroutineId,  CoroutineExceptionHandlerContinuationIntercepterCoroutineDispatcherJob. Every element in this set has a unique Key.

Imagine that we want to control which thread or thread pool will run our coroutine. Depending on whether we want to run a task on the main thread, or whether the tasks are CPU or IO bound, we will use dispatchers.

Dispatchers are thread schedulers provided in the coroutine, used to switch threads and specify the threads that the coroutine runs on. There are four types of schedulers in Dispatchers:

All these dispatchers are CoroutineDispatcher. We said above that CoroutineDispatcher is one of the elements of CoroutineContext, which means that these dispatchers are elements of CoroutineContext.

We said that CoroutineContext is like collection and that collection contains different types of Element. We can create a new context by adding/removing an element, or by merging two existing contexts. The plus operator works as an extension of Set.plus and returns the combination of two contexts with the elements on the right side of plus replacing the elements with the same key on the left.

Context without any element can be created as an instance of EmptyCoroutineContext.

import kotlinx.coroutines.*

fun main() {
    val coroutineName = CoroutineName("C#1") + CoroutineName("C#2")
    println(coroutineName)
}

// result
// CoroutineName(C#2)

Example#1

 

import kotlinx.coroutines.*

fun main() {
    val coroutineContext = CoroutineName("C#1") + Dispatchers.Default
    println(coroutineContext)
}

// result
// [CoroutineName(C#1), Dispatchers.Default]

Example#2

 

import kotlinx.coroutines.*

fun main() {
    val firstCoroutineContext = CoroutineName("C#1") + Dispatchers.Default
    println(firstCoroutineContext)
    
    val secondCoroutineContext = Job() + Dispatchers.IO
    println(secondCoroutineContext)
    
    val finalCoroutineContext = firstCoroutineContext + secondCoroutineContext
    println(finalCoroutineContext)
}

// result
// [CoroutineName(C#1), Dispatchers.Default]
// [JobImpl{Active}@39a054a5, Dispatchers.IO]
// [CoroutineName(C#1), JobImpl{Active}@39a054a5, Dispatchers.IO]

Example#3

 

Job Offers

Job Offers

There are currently no vacancies.

OUR VIDEO RECOMMENDATION

Jobs

A CoroutineContext is never overridden, but merged with an existing one.
Now that we’ve learned a few things about CoroutineContext, we can go back to where we left off, which is newCoroutineContext in the body of the launch builder function.

Let’s define different contexts to make it easier for us to understand:

  • scope context —the context defined in CoroutineScope
  • passed context — the builder function receives a CoroutineContext instance in its first parameter
  • parent context — The suspending block parameter in the builder function has a CoroutineScope receiver, which itself also provides a CoroutineContext. This context is not a new coroutine context!
  • The new coroutine creates its own child Job instance (using a job from this context as its parent) and defines its coroutine context(child context)as a parent context plus its Job. We will see later in more detail how we concluded this.

After defining a new coroutine context (parent context), we can proceed with creating a new coroutine.

StandaloneCoroutine

We use new context (parent context) for creating coroutine. For start parameter default value is CoroutineStart.DEFAULT. In this case, we create StandaloneCoroutine (inhered from AbstractCoroutine) with return type is a Job. StandaloneCoroutine is a coroutine object.

Note: If we set that start is lazy we will have LazyStandaloneCoroutineLazyStandaloneCoroutine inhered from StandaloneCoroutine, and StandaloneCoroutine inhered from AbstractCoroutine.

private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}

Only the handleJobException method has been overridden in the StandaloneCoroutine class to handle exceptions not handled by the parent coroutine. The start method called here is a method of the parent class AbstractCoroutine.

@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
// ...
}

AbstractCoroutine class implements the JobSupport class and the JobContinuation and CoroutineScope interfaces. AbstractCoroutine class is mainly responsible for Coroutine recovery and result return.

 

 

JobSupport

JobSupport is the specific implementation of Job. AbstractCoroutine can be used as a Job to control the lifecycle of the coroutine, it can implement the Continuation interface and it can also be used as a Continuation.

Job

The context of AbstractCoroutine is the context we passed through the parameter (parentContext) plus the current coroutine, and since we know that AbstractCoroutine is both Job and CoroutineScope, we know that the context of our coroutine contains a Job element. That context is the coroutine context (child context).

@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
/**
* The context of this coroutine that includes this coroutine as a [Job].
*/
@Suppress("LeakingThis")
public final override val context: CoroutineContext = parentContext + this
// ...
}

 

Second step in stack trace is coroutine.start(start, coroutine, block).

3.2.2. start()

 

 

Starts this coroutine with the given code block and start strategy. This function shall be invoked at most once on this coroutine.

@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
// ...
}

The AbstractCoroutine#start() method calls the start() method. CoroutineStart is an enum class, and the invoke() method is internally overridden. In that case, the start() method calls the CoroutineStart.invoke() method.

3.2.3. invoke()

 

 

Defines start options for coroutines builders. It is used in start parameter of launchasync, and other coroutine builder functions.

@InternalCoroutinesApi
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}

CoroutineStart is an enumeration class with four types:

  • DEFAULT — immediately schedules coroutine for execution according to its context
  • LAZY — starts coroutine lazily, only when it is needed
  • ATOMIC — atomically (in a non-cancellable way) schedules coroutine for execution according to its context
  • UNDISPATCHED — immediately executes coroutine until its first suspension point in the current thread

Here, DEFAULT is used as an example.

3.2.4. startCoroutineCancellable()

 

 

Use this function to start coroutine in a cancellable way, so that it can be cancelled while waiting to be dispatched.

/**
* param completion is AbstractCoroutine
* return a Continuation
*/
internal fun <R , T> (suspend ( R ) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

runSafely() — runs given block and completes completion with its exception if it occurs. Rationale: startCoroutineCancellable is invoked when we are about to run coroutine asynchronously in its own dispatcher. Thus if dispatcher throws an exception during coroutine start, coroutine never completes, so we should treat dispatcher exception as its cause and resume completion.

private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
completion.resumeWith(Result.failure(e))
}
}

The startCoroutineCancellable implementation is a chained call. Let’s go through that chain:

  1. createCoroutineUnintercepted() — is extended function called throught coroutine body and coroutine body are compiled into a subclass of SuspendLambda (coroutine body class), so this is the BaseContinuationImpl.
// kotlin/libraries/stdlib/jvm/src/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
@SinceKotlin("1.3")
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}

The create() method creates an instance of the coroutine body class and here we get an instance of the coroutine class.

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}

2. intercepted() —intercepts this continuation with ContinuationInterceptor.

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this

this is an example of a coroutine body class that inherits ContinuationImpl.

If intercepted is null, intercept the coroutine body class via the interceptor specified in the context and return the wrapped coroutine body class object.

context[ContinuationInterceptor] — gets the scheduler from the collection, and calls the interceptContinuation().
The interceptContinuation() method is used to wrap the coroutine body Continuation into a DispatchedContinuation.

// CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
= DispatchedContinuation(this, continuation)
DispatchedContinuation

DispatchedContinuation represents the Continuation object of the coroutine body and holds the thread scheduler. Its function is to use the thread scheduler to schedule the coroutine body to the specified thread for execution.

Notice that it takes a dispatcher and a continuation in the constructor, and it implements both Continuation<T> and DispatchedTask<T>.

3. resumeCancellableWith() — an extension method of Continuation

@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}

If this is not an intercepted and wrapped object of the coroutine’s body class, resumeWith(result) will be called.
Otherwise, if this is the intercepted and wrapped DispatchedContinuation class object, then the resumeCancellableWith(result, onCancellation) function is called.

inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}

If you look at the CoroutineDispatcher source code, the return value of dispatcher.isDispatchNeeded() is always true, only Dispatchers.Unconfined will override this to false.
If dispatcher.isDispatchNeeded() returns false, we call resumeWith() method directly from the coroutine body class.

dispatcher.dispatch(context, this) is actually equivalent to distributing the execution process of the code to the Default thread pool. The second parameter is Runnable, we pass this here, because DispatchedContinuation indirectly implements the Runnable interface.

Dispatchers.Default is DefaultScheduler and DefaultScheduler is a singleton class, so only one instance can be created and used everywhere.
DefaultScheduler is a subclass of SchedulerCoroutineDispatcher.

@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
...
}
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
...
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
...
}

Dispatchers.Default#dispatch() calls the dispatch() method of SchedulerCoroutineDispatcher, which calls coroutineScheduler.dispatch().

CoroutineScheduler

CoroutineScheduler is a thread pool implemented in Kotlin that provides threads on which coroutines can be run, which means it generates them.
CoroutineScheduler is a subclass of Executor and its execute() method is also forwarded to dispatch().

internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
...
override fun execute(command: Runnable) = dispatch(command)
...
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
internal inner class Worker private constructor() : Thread() { ... }
}

Inside the dispatch() method we see the following:

  • createTask() — We create a Task from the passed block of type Runnable which is actually a DispatchedContinuation.
  • currentWorker() — gets the currently executing thread. The worker is an internal class of CoroutineScheduler.
  • currentWorker.submitToLocalQueue() — adds the task to the local queue of the Worker thread and waits for execution.
Worker

Worker is the thread of Kotlin coroutine. The implementation of Worker inherits Thread, which is essentially an encapsulation of Java threads. We conclude that the Worker is a thread.

Let’s analyze how the Worker performs the task.

internal inner class Worker private constructor() : Thread() {
...
override fun run() = runWorker()
private fun runWorker() {
...
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
if (task != null) {
...
executeTask(task)
continue
}
...
}
...
}
...
}

The Worker will override the Thread run() method and then the runWorker() method will be invoked. In a while loop, it will always try to fetch the Task from the local Worker queue. If there is a task that needs to be executed, executeTask(task) will be called to execute it.

Let’s see what the executeTask(task) method does:

internal inner class Worker private constructor() : Thread() {
...
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}
...
fun runSafely(task: Task) {
try {
task.run()
}
...
}
}
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
constructor() : this(0, NonBlockingContext)
inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}

Inside the runSafely() method we call task.run(). Task is a Runnable and Runnable#run() actually means that our coroutine task is actually running.

DispatchedContinuation inherits from the DispatchedTask class, DispatchedTask inherits SchedulerTask and SchedulerTask implements the Runnable interface. We see that DispatchedTask finally implements the Runable interface, so let’s look at the run() implementation of DispatchedTask.

internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
// ...
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
// ...
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
}
// ...
}
}

In the run() method, the original coroutine class Continuation object is obtained through DispatchedContinuation.
resumeWithStackTraceresumeWithException, and resume are extension methods that trigger resumeWith() method.

/**
* Resumes the execution of the corresponding coroutine passing [value] as the return value of the last suspension point.
*/
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
/**
* Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
* last suspension point.
*/
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}
3.2.5. resumeWith

 

The final implementation of resumeWith() is in the BaseContinuationImpl class:

public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?

Let’s pay attention to the invokeSuspend() function.

3.2.6. invokeSuspend

 

 

The coroutine body will be executed sequentially through the state machine until the suspend function is called. When we call it, the function will return the COROUTINE_SUSPEND flag and it will directly return to exit the loop as well as the coroutine body. In that case we will not get a thread blocking.
When a function needs to be suspended, the state machine will store the previous result as a continuationin member variable. When the suspend function resumes, the Continuation resumeWith() method is called, and then invokeSuspend() is called. That way, the remaining code from the coroutine body will also continue to execute.

 

 

3.1.8. Summary of coroutine creation
  1. The coroutine body (suspended lambda function) is compiled into an inner class at compile time that extends SuspendLambda and implements Function2. The specific inheritance chain is
    SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
  2. CoroutineScope#launch() creates a coroutine, according to the default launch mode CoroutineStart.DEFAULT, creates a coroutine object StandaloneCoroutine and launches StandaloneCoroutine#start(start, coroutine, block)
  3. StandaloneCoroutine is a subclass of the AbstractCoroutine class, and the implementation of StandaloneCoroutine#start() is found in AbstractCoroutine (AbstractCoroutine#start()). AbstractCoroutine#start() triggers CoroutineStart#invoke()
  4. Since in our example the dispatcher is Dispatchers.Default, we call the startCoroutineCancellable() method of the coroutine body to the processing logic CoroutineStart#invoke()
  5. startCoroutineCancellable() is a chain call:
    createCoroutineUnintercepted().intercepted().resumeCancellableWith()
  6. createCoroutineUnintercepted() creates a coroutine body class object.
  7. intercepted() uses an interceptor/scheduler to wrap the coroutine body class object into DispatchedContinuation. DispatchedContinuation represents a Continuation object of the coroutine body class and contains the scheduler.
  8. Since the dispatcher is is Dispatchers.Default and the isDispatchNeeded() function returned true , DispatchedContinuation#resumeCancellableWith() use the thread scheduler to run dispatcher#dispatch(context, this) for scheduling.
  9. Dispatchers.Default#dispatch() calls the dispatch() method of SchedulerCoroutineDispatcher, which calls CoroutineScheduler#dispatch()
  10. CoroutineScheduler allocates a thread Worker and triggers the run() of DispatchedContinuation (DispatchedTask#run()) in the Worker#run() method.
  11. The run method triggers resumeWith() method. The execution of the coroutine body is actually a call to the resumeWith() method.

 

 

3.2 Bytecode analysis

Let’s analyze the bytecodes of our example
.Go to Tools->Kotlin->Show Kotlin Bytecode. Than click on Decompile to generate the corresponding decompiled java code.

BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var10000;
label17: {
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
MainActivity var6;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var6 = MainActivity.this;
this.label = 1;
var10000 = var6.getRandomNum(this);
if (var10000 == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
ResultKt.throwOnFailure($result);
var10000 = $result;
break label17;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int randomNum = ((Number)var10000).intValue();
var6 = MainActivity.this;
double var10001 = (double)randomNum;
this.label = 2;
var10000 = var6.getSqrt(var10001, this);
if (var10000 == var5) {
return var5;
}
}
double sqrt = ((Number)var10000).doubleValue();
MainActivity.this.log(String.valueOf(sqrt));
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);

Let’s look at the invokeSuspend function:

  1. lines 15,16 — We check if var10000 is equal to COROUTINE_SUSPENDED. If so, that means we don’t have an available result and need to suspend and wait until an available result is returned. In our case, the getRandomNum() method will return COROUTINE_SUSPENDED. Before this we set the label to 1.

Why did the getRandomNum() method return COROUTINE_SUSPENDED?

private final Object getRandomNum(Continuation var1) {
Object $continuation;
label20: {
// ...
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
return var5;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
byte var2 = 1;
return CollectionsKt.first(CollectionsKt.shuffled((Iterable)(new IntRange(var2, 1000))));
}

In line 13, delay() is called. Let’s see its implementation.

public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
// if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule.
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
postDelayed(Runnable {
with(continuation) { resumeUndispatched(Unit) }
}, timeMillis)
}

Returning suspendCancellableCoroutine is COROUTINE_SUSPENDED. It needs to suspend and wait for the result to return. It can be seen that the logic of delay here is similar to the Handle mechanism (Handler.postDelayed). After the execution is completed, continuation.resume() -> BaseContinuationImpl.resumeWith()-> SuspendLambda.invokeSuspend() will be called to restore.

When the execution of the getRandomNum() method is complete and the available result is returned, the invokeSuspend() method is called. At this point label = 1. First we call the throwOnFailure() method which throws an exception if the result is a failure. If the result is successful, we assign the result to var10000, and then break executes the following logic (lines 32-39).

2. lines 15,16 — Executing the getSqrt() method is analogous to executing the getRandomNum() method. At this point label = 2.
When the execution of the getRandomNum() method is complete and the available result is returned, the invokeSuspend() method is called again. We call the throwOnFailure() method again which throws an exception if the result is a failure. If the result is successful, we assign the result to var10000, then execute break label17 and go to line 101 to execute the rest of the logic.

3. When the execution of the getRandomNum() method is complete and the available result is returned, the invokeSuspend() method is called again. We call the throwOnFailure() method again which throws an exception if the result is a failure. If the result is successful, we assign the result to var10000, then execute break label17 and go to line 42 to execute the rest of the logic.

Now you can explore other coroutine builders and see exactly what the differences are. If you have any doubts, you can write to me on LinkedIn.

I hope you liked this article. Stay tuned for more!

This article was originally published on proandroiddev.com on September 19, 2022

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
In the first part of this series (🔗 read here), we took a deep…
READ MORE
blog
Recently, we have been making an effort to improve the observability of our mobile…
READ MORE
blog
Grab your toolkit, let’s prise open some common coroutine patterns you’ve probably seen a…
READ MORE
blog
When I first started learning Kotlin Coroutines and its dispatchers, I was more in…
READ MORE

Leave a Reply

Your email address will not be published. Required fields are marked *

Fill out this field
Fill out this field
Please enter a valid email address.

Menu