To get started, let’s see what ʻStructured Concurrency’ may look like in a real-world scenario:
Let’s say John decides to build a house. It requires completion of certain tasks such as bricklaying, carpentry, plumbing, electrical installations and roofing. While some of these tasks can overlap and be carried out concurrently, the house is said to be built completely only when all these tasks are completed.
Now, let’s say John runs out of budget in between for some reason and hence, decides to stop building the house further. In this case, any tasks in execution, be it bricklaying, carpentry or plumbing, would have to be halted.
This is the core concept of structured concurrency. It states that all sub-tasks shall complete before the completion of their parent task(s), i.e. no sub-task can outlive its parent task(s).
Definition (in context to programming):
“Structured concurrency” refers to a way to structure async/concurrent computations so that child operations are guaranteed to complete before their parents, i.e. no child operation is executed outside the scope of a parent operation.
Let’s have a look at the image below to understand the difference between structured and unstructured concurrency:
The benefit of structured concurrency is encapsulation. When a method is invoked to do a task, its caller should not care whether or not the method decomposes the work into several sub-tasks that are executed by hundreds or thousands of threads concurrently. When the method completes, it should guarantee that the requested work has been completed.
Before going ahead, please make sure that you understand:
- What are coroutines and how to create coroutines using coroutine builders in Kotlin?
- What a
Job is and the different states a
Jobcan be in?
(The above points contain a link each that can help you understand them.)
Kotlin coroutines & structured concurrency
In Kotlin, we can create coroutines using builders such aslaunch
and async
, which return aJob
instance. This Job
may further contain nested coroutine builders that create children Job instances that are computed concurrently.
Let’s see a simple code example of structured concurrency using the above-mentioned builders:
val parentJob = launch { | |
val childJob = launch { | |
var count = 1 | |
while (count <= 5) { | |
println("Count: $count") | |
delay(100) | |
count++ | |
} | |
} | |
} | |
delay(250) | |
println("Cancelling parent job") | |
parentJob.cancel() | |
----------- | |
OUTPUT: | |
Count: 1 | |
Count: 2 | |
Count: 3 | |
Cancelling parent job |
In the above example, the code within childJob
executes and prints the value of count 3 times with a delay of 100 milliseconds between each, until the parentJob
is cancelled after a total delay of 250 milliseconds, which in turn cancels the childJob
. This is exactly what structured concurrency is all about.
Coroutine Job(s)
have a parent-child relationship. All coroutine builders (such aslaunch
and async
) take in a CoroutineContext
parameter, that contains an instance of a Job
, to which the coroutine is bound, to enforce the discipline of structured concurrency with the propagation of cancellation.
fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, // ... ): Job
Understanding cancellation
Until so far, everything has been pretty straightforward. We saw a code sample depicting the cancellation of a parent Job
, which recursively cancelled its child Job
and in turn, halted the execution of code within the child Job
.
⚠️ However, that may not always be the case and the code inside a coroutine may/may-not continue to execute even if
cancel() is invoked on its
Job
instance.
Before discussing when and how that happens, let’s take note of the following two things:
- Whether the execution of code in a coroutine should continue or not after
cancel()
is invoked, is a decision that lies in the hands of a developer, which will, in turn, depend on the use case. - Execution of code within a coroutine even after
cancel()
is invoked does not mean that structured concurrency is violated since the parentJob
will still wait for the childrenJob(s)
to reach a final state before reaching its final state. AJob
is in a final state when itsisCompleted
= true
and we can usejoin()
in our code to wait for the completion of aJob
before executing any further code.
Read more about states of a Job here.
⚠️️ Note carefully how cancellation is different from completion.
When a coroutine cancellation is invoked, theJob
instance’s value for the flag isActive
is changed to false
. This value can be used to determine whether it should continue to execute further or not. It is the responsibility of the developer to account for the conditional check in the code, as per the use case.
P.S. — More often than not, use cases demand that coroutines discontinue execution on cancellation. However, this is not a rule and can vary from use case to use case.
Okay, now let’s get on with it and see this is in action. Here is another code sample that does the same thing as the one before, except that this time the code in coroutine is executed even after cancellation is invoked.
val parentJob = launch { | |
val childJob = launch { | |
var count = 1 | |
val startTime = System.currentTimeMillis() | |
var nextPrintTime = startTime | |
while (count <= 5) { | |
if (System.currentTimeMillis() >= nextPrintTime) { | |
println("Count: $count") | |
nextPrintTime += 100L | |
count++ | |
} | |
} | |
} | |
} | |
delay(250) | |
println("Cancelling parent job") | |
parentJob.cancel() | |
parentJob.join() | |
println("Parent job completed") | |
----------- | |
OUTPUT: | |
Count: 1 | |
Count: 2 | |
Count: 3 | |
Cancelling parent job // cancels parentJob and hence, cancels childJob recursively | |
Count: 4 // childJob continues executing its code | |
Count: 5 | |
Parent job completed // parentJob completes only after childJob. (Structured Concurrency) |
Job Offers
The only difference between the two implementations is that the first one uses delay(...)
while the second one uses System.currentTimeMillis()
. The code execution halts in the first case because delay(...)
and various other kotlinx provided in-built suspending functions use suspendCancellableCoroutine
, which internally checks and continues execution only if the flagisActive
is true
.
💡Always check the internal implementation details or behaviour of an in-built suspending function before using it, to avoid unexpected results.
To convert the second example to behave similar to the first one, we can explicitly add the check for isActive
as follows:
while (count <= 5 && isActive)
Hence, it is important to check whether a Job
is active or not, to avoid long-running operations from running even when they are no longer needed.
Parallel Decomposition
We have seen how the parent-child relationship is handled between Job
instances. However, an interesting point to note is the support for parallel decomposition when multiple children Job
instances are running concurrently within a parent Job
. Let’s look at the code below:
suspend fun performTasks(task1: String, task2: String) { val job1 = async { doTask(task1) } val job2 = async { doTask(task2) } mergeTasks(job1.await(), job2.await()) }
❌ This code does not compile and we’ll quickly see why!
Let’s say that the first task, that is, job1
fails and throws the corresponding exception, then ideally job2
should also not keep working in the background because its result is no longer needed. This is what parallel decomposition is all about.
In Kotlin, every coroutine must be created from aCoroutineScope
, else compilation would fail just as in the example above. Thus, for the same reason, any coroutine builder is defined as an extension function of CoroutineScope
. For instance,launch
coroutine builder is defined as:
fun CoroutineScope.launch( // ... block: suspend CoroutineScope.() -> Unit ): Job
There are two ways to provideCoroutineScope
to a Job
:
- The
Job
can be directly written and executed within the lambda block of anotherJob
, to implicitly use it as its scope. For instance:
val parentJob = launch { val childJob = launch { ... } }
- In a suspending function, you can wrap your code into a
coroutineScope { … }
block that establishes a boundary of your operation, its scope.
suspend fun performTasks(task1: String, task2: String) { coroutineScope { val job1 = async { doTask(task1) } val job2 = async { doTask(task2) } mergeTasks(job1.await(), job2.await()) } }
As we can see from the above two ways, all the coroutines will implicitly become the children of the scope they are called from, and if the scope fails with an exception or is cancelled, all the children are cancelled too.
CoroutineContext vs CoroutineScope
Did you notice the emphasis on the word — `implicitly`? It is done so because a coroutine may explicitly define a CoroutineContext
, instead of implicitly being bound to the CoroutineContext
of the CoroutineScope
it is being called from.
This sounds confusing, doesn’t it? Let’s simplify this by defining CoroutineContext
and CoroutineScope
.
CoroutineContext: Every coroutine in Kotlin has a context that is represented by an instance of CoroutineContext
interface. A context is a set of elements and is responsible for the coroutine’s lifecycle, cancellation, and parent-child relations.
We have observed earlier in this blog that coroutine Job(s)
have a parent-child relationship. All coroutine builders (such aslaunch
and async
) take in a CoroutineContext
parameter, that contains an instance of a Job
, to which the coroutine is bound to, in order to enforce the discipline of structured concurrency with the propagation of cancellation. For instance:
fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, // ... ): Job
CoroutineScope: It is just a wrapper around the context, that is, it is an interface that consists of a sole property — val
coroutineContext: CoroutineContext
. It has nothing else but context.
public interface CoroutineScope { public val coroutineContext: CoroutineContext }
So, why do they both exist and how do they differ? The difference between a CoroutineContext
and CoroutineScope
is in their intended purpose.
The intended purpose of theCoroutineScope
receiver is to reference a scope in which a new coroutine is launched.
On the other hand, the intended purpose of theCoroutineContext
parameter is to provide elements that are responsible for the coroutine’s lifecycle, cancellation, and parent-child relations.
Important to know!— By convention, the CoroutineContext present in the
CoroutineScope contains a Job that is going to become the parent of a new coroutine, unless any other
CoroutineContext containing some other
Job instance is explicitly defined. We do not usually pass a
Job in a context parameter to
launch, since that breaks the parent-child relationship and hence breaks structured concurrency, unless we explicitly and consciously want to break it for some reason/use-case.
Let’s have a look at a code snippet wherein a coroutine is created from a CoroutineScope
but runs in a CoroutineContext
other than that provided by the CoroutineScope
it was created from and hence, the parent-child relationship is broken. It does so by explicitly defining the context as a parameter to the launch
coroutine builder:
val parallelJob = launch { | |
delay(1000) | |
} | |
val parentJob = launch { | |
val childJob = launch(parallelJob) { // Notice how the context is explicitly provided | |
var count = 1 | |
while (count <= 5) { | |
println("Count: $count") | |
delay(100) | |
count++ | |
} | |
} | |
} | |
delay(250) | |
println("Cancelling parent job") | |
parentJob.cancel() | |
parentJob.join() | |
println("Parent job completed") | |
----------- | |
OUTPUT: | |
Count: 1 | |
Count: 2 | |
Count: 3 | |
Cancelling parent job | |
Parent job completed | |
Count: 4 | |
Count: 5 // childJob continues after parentJob (CoroutineScope), | |
// and is bound to parallelJob (CoroutineContext) |
⚠️ Notice how the
childJob
continues to execute even after completion ofparentJob
. It is becauseparentJob
is theCoroutineScope
that executeschildJob
andparallelJob
is theCoroutineContext
to which the lifecycle of thechildJob
is bound.
Therefore, ensure that your coroutine is bound to the correct CoroutineContext
and CoroutineScope
to avoid unexpected behaviour.
💡If you’re curious and wish to dive deeper into
CoroutineContext and
CoroutineScope, here’s a great blog.
Great! you’ve just aced Structured Concurrency using Kotlin coroutines. Thanks for making it till here and 👏 if this helped you.
Special thanks to Nishant Shah for sharing his learnings and helping me on this topic.
Feel free to reach out to me if you have any questions, suggestions or ideas.