Blog Infos
Author
Published
Topics
, , , ,
Published

Have you ever wondered which multithreading framework is the fastest? I have occasionally wondered. One day, fate led me to investigate this question. So, if you are also curious, I have tried to test and compare them and share the results with you.

Problem

Firstly, let’s figure out why I did this. I was faced with a simple task: there was a system that invoked several hundred callbacks. All callbacks should be completed as quickly as possible.

I didn’t know the complexity or the time that would be spent on each callback. A callback might create an object or make a long request to the server or database. In this case, we can and we need to use multithreading due to requests to the database or server.

While trying to solve this problem, I realized that I wasn’t sure which multithreading framework was faster in which situations.

Motivation

But there’s an immediate question: “Hasn’t anyone tested it out yet?” To my surprise, I couldn’t find any tests that satisfied me.

Some of the tests were simple comparisons between two technologies, such as Rx vs Coroutines. Others had test cases that were, in my opinion, too specific, for example, only testing arithmetic operations or database requests.

Anyway, I wasn’t very happy with this and decided to do my own testing.

It makes sense that I cannot cover all the use cases for multi-threaded frameworks. I’ll only test the ones that are relevant to my task… So it’s not a total test, but it’s pretty thorough.

Tools

Let’s start with the measurement tool. I’ll use Android tools because I’m an Android developer. In theory, running multithreading tests on the JVM on a computer may differ from running them on an Android device… However, in practice, there is no significant difference and it doesn’t affect the overall test results.

Well, there is a Jetpack Microbenchmark tool that can be used to test code performance on an Android device.

The Microbenchmark measurement test is similar to an ordinary instrumentation test, with the only difference being the use of a specific Rule — BenchmarkRule.

@get:Rule
val benchmarkRule = BenchmarkRule()

@Test
fun sampleTest() {
   benchmarkRule.measureRepeated {
       // We're going to measure this
   }
}

As a result, we get the JSON with information about the execution: minimum and maximum times, and most importantly, the median time.

    "benchmarks": [
        {
            "name": "sampleTest",
            "params": {},
            "className": "com.test.benchmark.ExampleBenchmark",
            "totalRunTimeNs": 85207217833,
            "metrics": {
                "timeNs": {
                    "minimum": 9.82149833E8,
                    "maximum": 1.019338584E9,
                    "median": 1.004151917E9,
                    "runs": [...]
                },
                "allocationCount": {
                    "minimum": 324.0,
                    "maximum": 324.0,
                    "median": 324.0,
                    "runs": [...]
                }
            },
            "sampledMetrics": {},
            "warmupIterations": 3200,
            "repeatIterations": 5000,
            "thermalThrottleSleepSeconds": 0
        }
    ]

This JSON contains other information, including the number of iterations, the number of object allocations and so on. This is not a priority for us at the moment.

Benchmark test cases

The main difference between frameworks is:

  • the time taken by the framework to create a single thread;
  • the framework’s ability to distribute tasks efficiently and quickly between threads.

We will explore the results of test cases to assess the differences between these frameworks.

Now, we have to decide what we want to test. Let’s start with the test data.

Test data

Well, it’s not a big deal. We create an ArrayList of 100 items, ranging from 0 to 99.

private fun createTestList(): List<Int> {
   return List(100) { it }
}

Then we will perform some actions on each item.

Now let’s take a look at the test cases.

A single thread

Let’s start with single thread test cases. When comparing the results of these test cases, we will carefully examine the first difference between the frameworks — the time taken by the framework to create a single thread. Since there is only one thread, we can see the time taken by the framework to initialize and create the thread.

Direct invoke

The first test case will be to simply invoke the method directly. No frameworks.

@Test
fun directInvoke() {
   val list = createTestList()
   benchmarkRule.measureRepeated {
       list.forEach { action(it) }
   }
}
RxJava

The second test case will be Rx. Let’s perform an action within Completable. There will be a separate Completable for each action. The Scheduler will be Scheduler.single, responsible for ensuring that all our actions are executed on a single thread.

And since we need to wait for the completion, we will call blockingAwait on the resulting Completable.

@Test
fun rxOne() {
   val list = createTestList()
   val scheduler = Schedulers.single()
   benchmarkRule.measureRepeated {
       val completables = list.map {
           Completable.fromAction {
               action(it)
           }.subscribeOn(scheduler)
       }
       Completable.merge(completables).blockingAwait()
   }
}  
Kotlin Coroutines

Well, where would we be without the young and promising Kotlin Coroutines framework? To wait for the coroutine to execute, we’ll run the action via async and wait for its completion via await. So, there will be a separate coroutine for each action. To have everything run on a single thread, we simply use runBlocking.

@Test
fun coroutineOne() {
   val list = createTestList()
   benchmarkRule.measureRepeated {
       runBlocking {
           list.map {
               async {
                   action(it)
               }
           }.forEach { it.await() }
       }
   }
}
Kotlin Flow

Let’s also consider Flow in comparison. Directly comparing coroutines and Rx may not be the best approach, as both technologies are used in different ways and employ different concepts, despite their association with multithreading.

We create one flow for each action, combine them all together and then simply collect them. There will be a separate Flow for each action. Meanwhile, we don’t specify Dispatcher anywhere, so everything happens on the same thread.

@Test
fun flowOne() {
   val list = createTestList()
   benchmarkRule.measureRepeated {
       runBlocking {
           val flows = list.map {
               flow {
                   val result = action(it)
                   emit(result)
               }
           }
           flows.merge().collect()
       }
   }
}
The number of threads is equal to the number of CPU threads

Of course, using only one thread doesn’t seem like the best option, as we have a framework for multithreading.

Let’s start by considering a situation where the number of threads in a pool is equal to the number of CPU threads. Ideally, we can see the second difference (the framework’s ability to distribute tasks efficiently and quickly between threads) by comparing the results of these test cases. Due to the lack of sufficient threads for each action, the framework will need to distribute them anyway.

You should not expect an even distribution of tasks across all processor cores. The operating system and the CPU determine:

  • what will be executed,
  • when it will be executed,
  • who will execute it.

Modern “big” CPUs use SMT/HyperThreading, which allows them to create several logical threads on each CPU core. A CPU core is a physical unit that can execute instructions independently. A thread, on the other hand, is a logical software unit that can run on a single core. Also mobile CPUs and newer Intel CPUs employ an approach with different types of CPU cores. Large cores are suitable for complex tasks, while medium and energy-efficient cores differ significantly in performance.

So, we can consider this pool to be a pool with a small number of threads.

There is an excellent method for determining the number of CPU threads:

Runtime.getRuntime().availableProcessors()

This value is eight for my test device.

RxJava

Let’s start with Rx. Rx has the Scheduler.Computation to implement the desired behavior. In fact, the code remains the same as it would be for a single thread. The only difference is that for each individual Completable we will make subscribeOn with Scheduler.computation().

@Test
fun rxCPU() {
   val list = createTestList()
   val scheduler = Schedulers.computation()
   benchmarkRule.measureRepeated {
       val completables = list.map {
           Completable.fromAction {
               action(it)
           }.subscribeOn(scheduler)
       }
       Completable.merge(completables).blockingAwait()
   }
}
Kotlin Coroutines

Now let’s take a look at the coroutines. They have the Dispatchers.Default whose documentation says:

“It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used by this dispatcher is equal to the number of CPU cores, but is at least two.

So, by default, the maximum level of “parallelism” is equal to the number of CPU cores. That’s exactly what we need.

The code is similar to the single core version. Actions are now being executed inside withContext with Dispatchers.Default.

@Test
fun coroutineCPU() {
   val list = createTestList()
   val dispatcher = Dispatchers.Default
   benchmarkRule.measureRepeated {
       runBlocking {
           list.map {
               async {
                   withContext(dispatcher) {
                       action(it)
                   }
               }
           }.forEach { it.await() }
       }
   }
}

But there’s an important detail… If we take a closer look at the Dispatchers.Default, we will find this constructor.

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
   CORE_POOL_SIZE, MAX_POOL_SIZE,
   IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
)

It seems suspicious that we pass two constants into it: CORE_POOL_SIZE which is equal to the number of CPU threads in this case and MAX_POOL_SIZE which, by the way, is two million.

Further examination shows that these variables are used to create CoroutineScheduler.

private fun createScheduler() = CoroutineScheduler(
  corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName
)

And that’s what the documentation says:

“The scheduler does not limit the count of pending blocking tasks, potentially creating up to maxPoolSize threads. End users do not have access to the scheduler directly and can dispatch blocking tasks only with a LimitingDispatcher that does control concurrency level by its own mechanism.

It follows that the Dispatchers.Default isn’t limited by the number of CPU threads.

It’s wrong to compare Dispatchers.Default and Scheduler.Computation directly. Dispatchers.Default may use additional threads in some cases. Fortunately, the documentation also provides information on how to make the comparison more fair. We just need to use the LimitingDispatcher. To do this, we call the limitedParallelism method of Dispatchers.Default with the number of CPU threads.

@Test
fun coroutineCPULimit() {
   val list = createTestList()
   val threadCount = Runtime.getRuntime().availableProcessors()
   val dispatcher = Dispatchers.Default.limitedParallelism(threadCount)
   benchmarkRule.measureRepeated {
       runBlocking {
           list.map {
               async {
                   withContext(dispatcher) {
                       action(it)
                   }
               }
           }.forEach { it.await() }
       }
   }
}
Kotlin Flow

Let’s do the same for flow. Just use Dispatchers.Default.

@Test
fun flowCPU() {
   val list = createTestList()
   val dispatcher = Dispatchers.Default
   benchmarkRule.measureRepeated {
       runBlocking {
           val flows = list.map {
               flow {
                   val result = action(it)
                   emit(result)
               }.flowOn(dispatcher)
           }
           flows.merge().collect()
       }
   }
}

And use Dispatchers.Default with limit.

@Test
fun flowCPULimit() {
   val list = createTestList()
   val threadCount = Runtime.getRuntime().availableProcessors()
   val dispatcher = Dispatchers.Default.limitedParallelism(threadCount)
   benchmarkRule.measureRepeated {
       runBlocking {
           val flows = list.map {
               flow {
                   val result = action(it)
                   emit(result)
               }.flowOn(dispatcher)
           }
           flows.merge().collect()
       }
   }
}
Java Executor

As we had to explore all the intricacies of coroutines, why not use these intricacies? Therefore, we will try to use Executor.

First, let’s create an Executor using Executors.newFixedThreadPool. This simply creates an Executor limited by the number of threads, which in our case is the number of CPU cores.

I create an Executor before the measureRepeated as creating an Executor isn’t a trivial task. Then, we call submit from the Executor with our task. Let’s just use the get method to wait for completion.

@Test
fun executorFixedCPU() {
   val list = createTestList()
   val threadCount = Runtime.getRuntime().availableProcessors()
   val executorService = Executors.newFixedThreadPool(threadCount)
   benchmarkRule.measureRepeated {
       val futures = list.map { executorService.submit { action(it) } }
       futures.forEach { it.get() }
   }
}

Using the Executors.newWorkStealingPool method, we are going to create a second type of Executor that is more interesting. Indeed, it also creates an Executor with a limited number of threads. But the word “Stealing” in the name of the method isn’t merely a coincidence. The threads of this Executor are able to steal tasks from other threads if the current thread becomes idle and there are tasks in the queue for another thread. This can help to clear our common queue at the end.

In terms of code, everything is the same as before. Except for the fact that the method of creating an Executor is different.

@Test
fun executorStealCPU() {
   val list = createTestList()
   val threadCount = Runtime.getRuntime().availableProcessors()
   val executorService = Executors.newWorkStealingPool(threadCount)
   benchmarkRule.measureRepeated {
       val futures = list.map { executorService.submit { action(it) } }
       futures.forEach { it.get() }
   }
}
Thread per action

Now, let’s explore the possibility of allocating an unlimited number (almost unlimited, just a lot) of threads. Since we have only one hundred actions, we can actually allocate one hundred threads. We just don’t need any more of them.

RxJava

We will start with Rx. Scheduler.io is responsible for these things. Scheduler.io has a cache of threads. If there are free threads available, it takes one from the cache. If not, it creates a new thread. The code is the same as for Scheduler.computation, but with a different scheduler.

@Test
fun rxIo() {
   val list = createTestList()
   val scheduler = Schedulers.io()
   benchmarkRule.measureRepeated {
       val completables = list.map {
           Completable.fromAction {
               action(it)
           }.subscribeOn(scheduler)
       }
       Completable.merge(completables).blockingAwait()
   }
}
Kotlin Coroutines

The similar tasks in coroutines are the responsibility of the Dispatchers.IO, so we simply insert it into the familiar code.

@Test
fun coroutineIo() {
   val list = createTestList()
   val dispatcher = Dispatchers.IO
   benchmarkRule.measureRepeated {
       runBlocking {
           list.map {
               async {
                   withContext(dispatcher) {
                       action(it)
                   }
               }
           }.forEach { it.await() }
       }
   }
}
Kotlin Flow

Repeat the same for Flow.

@Test
fun flowIo() {
   val list = createTestList()
   val dispatcher = Dispatchers.IO
   benchmarkRule.measureRepeated {
       runBlocking {
           val flows = list.map {
               flow {
                   val result = action(it)
                   emit(result)
               }.flowOn(dispatcher)
           }
           flows.merge().collect()
       }
   }
}
Java Executor

Unfortunately, Executor doesn’t have such a nice API. Therefore, we just allocate a hundred threads to it. Since there are only one hundred actions, it won’t need more than that.

@Test
fun executorFixedIo() {
   val list = createTestList()
   val executorService = Executors.newFixedThreadPool(100)
   benchmarkRule.measureRepeated {
       val futures = list.map { executorService.submit { action(it) } }
       futures.forEach { it.get() }
   }
}

I will do the same for newWorkStealingPool.

@Test
fun executorStealIo() {
   val list = createTestList()
   val executorService = Executors.newWorkStealingPool(100)
   benchmarkRule.measureRepeated {
       val futures = list.map { executorService.submit { action(it) } }
       futures.forEach { it.get() }
   }
}

This is a table of all the test cases by framework.

Job Offers

Job Offers


    Senior Android Engineer

    Carly Solutions GmbH
    Munich
    • Full Time
    apply now

    Senior Android Developer

    SumUp
    Berlin
    • Full Time
    apply now

OUR VIDEO RECOMMENDATION

Jobs

And finally, the run tests.

Tests

Ha, it was a little deception. First of all, let’s consider the categories of actions.

There are only five of them:

  1. arithmetic — simple arithmetic operations;
  2. listsManipulation — object manipulations;
  3. storage — simulation of work with storages;
  4. network — simulation of work with network;
  5. mixed — combination of all the previous scripts, since in the original task we have no idea how difficult the actions will be.

Let us begin with the simple one.

arithmetic

I’d like to remind you that we are going to perform actions on each item in the list from 0 to 99. In this case, to make the operation a little more complicated, we need to convert the number to Float and raise it to the power of its own value. From 0 raised to the power of 0 to 99 raised to the power of 99. Of course, at some point we’ll reach the Float’s limit, but that’s okay.

private fun arithmetic(seed: Int): Int {
   return seed.toFloat().pow(seed.toFloat()).toInt()
}

In general, actions that can be classified into this category include the creation of objects using a constructor or accessing properties. These actions do not take long to complete.

What are the results?

As expected, directInvoke came out on top. It was 70 times faster than its closest competitor. This makes sense as these operations are very simple and any additional overhead from using a multithreading framework would be 10 times higher than the cost of the operations themselves.

However, the second and third places were unexpected surprises. Both were Executors with a fixed number of threads. Initially, I expected directInvoke would be first, then test cases on a single thread, then CPU and finally, at the very end, IO would take place. But Executor with a fixed number of threads completely breaks this logic. At the same time, Executor created using Executors.newWorkStealingPool shows poorer performance with an “infinite” number of threads.

You can also see that the times of Dispatchers.Default with and without limits are different, so a limit has been set for a reason.

It’s interesting to note that Flow has the highest overhead for processing a single action. It is slower than all its competitors. And while coroutines may be better than Rx on one thread, as the number of threads increases, Rx becomes the leader.

In summary, we can see that in this category of actions, directInvoke is the most effective. Therefore, it is better not to use a multithreading framework for such tasks. Alternatively, use a pool with a single thread or a small number of threads.

listsManipulation

Now, let’s take a look at something a little more complicated — object manipulations. It’s easier to work with objects if you add them to a list. This category of actions may include, for example, POJO mapping.

Within the action, we simply create a new list with the size equal to the number received as a parameter. Next, we just perform some operations, convert the list to a map, then perform some operations again and finally filter this collection… In general, we manipulate the list.

The basis of complexity here isn’t even the operation itself. This is a regular immutable list, not a Sequence. This means that a new list is created after each operation.

private fun listsManipulation(seed: Int): Int {
   List(seed) { it }
       .map { it.toFloat() }
       .map { it + 0.3f }
       .associateBy { it.toString() }
       .mapValues { it.value * it.value }
       .filter { it.value > 5f }
   return seed
}

How did it go?

directInvoke quickly relinquished the lead. Even with basic tasks such as object manipulation, there are already difficulties.

The lead is taken by an Executor created with Executors.newWorkStealingPool. It is more efficient than its simpler counterpart — Executors.newFixedThreadPool.

Rx moves slightly forward. Ahead is Scheduler.computation. In theory, it is the preferred choice for such operations because it uses fewer resources.

Single thread test cases are at the end of the list, but directInvoke is still left in the middle.

flowCPU and coroutineCPU are at the bottom of the list. These test cases have the same number of threads as CPU cores, without any limit. I ran the tests again, but the results were the same. I thought it might be a bug, so I updated coroutines, but there’s been no change.

Overall, CPU test cases are faster than IO in terms of execution time for this level of complexity.

storage

Since accessing a database or small file is a blocking operation, we simply put the thread to sleep within the action. The sleep time will be between 500 and 1,490 microseconds, or 0.5 to 1.5 milliseconds. However, the usual Thread.sleep method produces incorrect results due to “busy waiting”. Therefore, we’ll use LockSupport.parkNanos to put the thread into a sleep state.

private fun storage(seed: Int): Int {
   val timeInMicroseconds = 500 + 10 * seed.toLong()
   LockSupport.parkNanos(TimeUnit.MICROSECONDS.toNanos(timeInMicroseconds))
   return seed
}

Finally, the results.

All the IO test cases first, then the CPU, and finally the single-threaded. That sounds like something straight out of a textbook, right? “IO thread pools are best for IO operations”.

The rxIO test cases are faster than flowsIO and coroutinesIO. It probably uses the least amount of resources to create a thread, but… It’s not that important for such tasks. The difference is just a millisecond. The distinctions between the frameworks have almost disappeared.

network

Well, how can we not test the network cases? Everything is the same as it was in the previous test. The thread sleeps for a while, between 0 and 99 milliseconds in this case. In this test, delays caused by “busy waiting” don’t play such a disruptive role any more. So we are going to use Thread.sleep.

private fun network(seed: Int): Int {
   TimeUnit.MILLISECONDS.sleep(seed.toLong())
   return seed
}

Your attention is drawn to the following results.

.

And this is where the line between the frameworks has become completely blurred. Yes, rxIo is first, but only by less than 1%, which is generally within the margin of error.

mixed

Eventually, the final test is very important to me and to the original task. We perform actions of mixed complexity.

private fun mixed(seed: Int): Int {
   return when {
       seed % 5 == 0 -> network(seed)
       seed % 3 == 0 -> storage(seed)
       seed % 2 == 0 -> listsManipulation(seed)
       else -> arithmetic(seed)
   }
}

The longest action we will have is 90 milliseconds, as you can see from the code.

So here are the results.

In general, there is the same pattern of division into three blocks: IO, CPU, One.

The only difference is that Executor created with Executors.newWorkStealingPool takes the lead by adding small tasks. Apparently, this is due to the “stealing” of tasks.

Conclusion

As you can see, the role of multithreading has almost disappeared when working with file systems and networks. Most uses of multithreading are for IO operations. Therefore, the choice of a multithreaded framework based on performance is strange. It’s better to be guided by criteria such as convenience and other important factors.

However, if you’re doing something simpler than blocking a thread for 0.5 milliseconds, it’s better to use a small pool of threads. For example, you could use the number of CPU cores as the size of the pool.

Well, Executors and Rx are best suited for such tasks.

Exceptions are very simple operations, such as arithmetic, object creation, etc. In these cases, it’s better not to use multithreading at all because the time required for thread creation by the framework will be more than the time required to complete all the tasks.

To solve my problem, I followed these steps:

  • I separated all simple tasks into a separate pool of tasks and executed them using directInvoke.
  • All other tasks are executed on coroutines using the Dispatchers.IO because they make it easier to write both synchronous and asynchronous code simultaneously.

And what do you think? Which framework do you currently use?

This article is previously published on proandroiddev.com

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
It’s one of the common UX across apps to provide swipe to dismiss so…
READ MORE
blog
Hi, today I come to you with a quick tip on how to update…
READ MORE
blog
Automation is a key point of Software Testing once it make possible to reproduce…
READ MORE
blog
Drag and Drop reordering in Recyclerview can be achieved with ItemTouchHelper (checkout implementation reference).…
READ MORE

Leave a Reply

Your email address will not be published. Required fields are marked *

Fill out this field
Fill out this field
Please enter a valid email address.

Menu