Blog Infos
Author
Published
Topics
,
Author
Published
Topics
,
Posted by: Stelios Frantzeskakis

In the first part of the series, we described how merging RxJava streams that run in parallel threads can crash the app, if they emit an error at the same time, which can commonly happen by network failures. The first error will be caught by the error handler and dispose the downstream, causing the second error to throw an UndeliverableException.

This was actually causing our #1 crash and we followed a number of approaches until we finally resolved it.

Global Error Handler: the Naive Approach

We had stumbled upon the RxJava global error handler in the past, but we had never really thought of a use case. Now, however, seemed a good option to catch this exception globally for all our streams. The syntax is pretty straightforward, you just have to set it in your customApplication class, using RxJavaPlugins:

RxJavaPlugins.setErrorHandler {
    // Handle the throwable
}

This error handler will catch every exception that is thrown by a stream where its consumer is already disposed or it doesn’t specify its own error handler (using the onError() method). That way, you can parse the exception and decide how to handle it.

In our case, we decided to catch all UndeliverableExceptions that are wrappers of a TimeoutException which indicate a network failure. We were instead logging non-fatal errors in Crashlytics and the crashes obviously dropped.

But that’s not good enough.

It seems that way we are covering a potential bug because we don’t know how to fix it. Furthermore, all of our RxJava observers define an explicit error handler, unless we don’t expect the stream to throw an error, making the global error handler redundant. If the stream throws an error and the observer is already disposed, we want to crash. We want to find the underlying issue and fix it.

Defining a global error handler can develop a bad practice of swallowing exceptions that we don’t understand. It also decouples the error from the stream, which means that we’re back in the callback era rather than using reactive patterns.

Back to the drawing board.

Schedulers.single: Correct but Inefficient

Another (probably even more naive) idea we had was to use a different scheduler. As we described in the first part of the series, the issue affects the IO scheduler or any other scheduler that is backed by a thread-pool, because the crash will happen if two streams that run in parallel threads emit an error at the same time.

So let’s use a single thread:

These are the same streams we used in the quiz of the first part, but they now run in a different scheduler. We now subscribe to Schedulers.single()which executes the work in a single, shared, background thread.

Although the code above was crashing when subscribing to the IO scheduler, it will now always print “error 1” and it will never crash, because the work is executed in a strongly-sequential manner. That obviously fixes the issue, but it slows down the execution time since the streams will no longer run in parallel. Not to mention that this will block any other work that needs to run in the same scheduler, such as database operations which normally run on a single background thread.

Surviving simultaneous errors with safeMergeArray

Neither of these approaches fixes the underlying issue. There’s got to be a better way to tackle this. We then had an idea — what if we run the streams in parallel, but instead of disposing the downstream if one of them emits an error, we’ll instead catch it, and once all streams complete (or emit an error), we’ll either complete the downstream or signal an error respectively.

That sounds like something that could work as a replacement of Completable.mergeArray(). We called it safeMergeArray() and because we want to mimic the behavior of Completable.mergeArray(), it should take a number of Completables and merge them into one:

fun safeMergeArray(vararg completables: Completable): Completable {
    TODO("Not implemented")
}

Let’s start by defining the requirements of this function so that we can create the respective unit tests first, before we write the implementation.

Testing and documenting safeMergeArray

We are looking to implement a replacement of Completable.mergeArray()that won’t crash if more than one streams emit an error at the same time. With that in mind, it looks that we can separate our tests in two parts:

  1. We need to test that our solution behaves the same with Completable.mergeArray().
  2. We need to test that Completable.mergeArray() indeed crashes when two streams emit an error simultaneously, while our custom solution prevents the crash.

Let’s start with first part.

Testing the parity with Completable.mergeArray()

The documentation of Completable.mergeArray() hints on what we need to test:

Returns a Completable instance that subscribes to all sources at once and completes only when all source CompletableSources complete or one of them emits an error.

So basically we need to test all combinations of the streams either succeeding or throwing an error. For simplicity, we’ll merge two streams which gives us the following combinations:

  • Stream A succeeds, Steam B throws an error → Merged stream should throw an error
  • Stream A throws an error, Stream B succeeds → Merged stream should throw an error
  • Stream A throws an error, Stream B throws an error → Merged stream should throw an error
  • Stream A succeeds, Stream B succeeds → Merged stream should succeed

For our testing framework we’ll use JUnit 5, our tests will be written in BDD (behavior-driven development), and we’ll use the Gherkin syntax for the structure of our test suite.

The scenarios above can be translated to the following tests:

[ggist id=”bb0b113a36b6b6cf6a22cd1b58cdefd2″]

As you noticed, all tests are empty for now because we first want to document the requirements of our function. Next step will be to make the tests fail, following a cycle that is known as Red-Green-Refactor in TDD — explained in detail by Uncle Bob in his blog post.

Writing the RxJava tests is pretty straightforward as long as you’re familiar with the TestScheduler so we won’t explain them in detail, but we’ll post them here for reference:

@TestInstance(TestInstance.Lifecycle.PER_METHOD)
@DisplayName("Given we merge two streams")
class SafeMergeArrayTest {
@Nested
@DisplayName("When the first stream throws an error")
inner class FirstStreamError {
private val completable1 = Completable.error(
Throwable("error 1")
).subscribeOn(Schedulers.io())
@Nested
@DisplayName("When the second stream succeeds")
inner class SecondStreamSuccess {
private val completable2 = Completable.complete()
.subscribeOn(Schedulers.io())
@Test
fun `then the merged stream throws an error`() {
safeMergeArray(completable1, completable2).test().await().assertError {
it.message == "error 1"
}
}
}
@Nested
@DisplayName("When the second stream throws an error")
inner class SecondStreamError {
private val completable2 = Completable.error(
Throwable("error 2")
).subscribeOn(Schedulers.io())
@Test
fun `then the merged stream throws an error`() {
safeMergeArray(completable1, completable2).test().await().assertError {
it.message == "error 1"
}
}
}
}
@Nested
@DisplayName("When the first stream succeeds")
inner class FirstStreamSuccess {
private val completable1 = Completable.complete()
.subscribeOn(Schedulers.io())
@Nested
@DisplayName("When the second stream succeeds")
inner class SecondStreamSuccess {
private val completable2 = Completable.complete()
.subscribeOn(Schedulers.io())
@Test
fun `then the merged stream succeeds`() {
safeMergeArray(completable1, completable2).test().await().assertComplete()
}
}
@Nested
@DisplayName("When the second stream throws an error")
inner class SecondStreamError {
private val completable2 = Completable.error(
Throwable("error 2")
).subscribeOn(Schedulers.io())
@Test
fun `then the merged stream throws an error`() {
safeMergeArray(completable1, completable2).test().await().assertError {
it.message == "error 2"
}
}
}
}
}

At this point we should be happy that all of our tests are failing by throwing a NotImplementedError because it means that we’re at the right path of our TDD approach:

Let’s move on with the second part of our test suite.

Job Offers

Job Offers

There are currently no vacancies.

OUR VIDEO RECOMMENDATION

Jobs

Testing simultaneous errors of reactive streams

We mentioned that we’re trying to prevent the crash that is occurred when two streams emit an error simultaneously. That can be translated to the following test:

@Nested
@DisplayName("When the first stream throws an error")
inner class FirstStreamError {
private val completable1 = Completable.error(
Throwable("error 1")
).subscribeOn(Schedulers.io())
@Nested
@DisplayName("When the second stream throws an error simultaneously")
inner class SecondStreamError {
private val completable2 = Completable.error(
Throwable("error 2")
).subscribeOn(Schedulers.io())
@Test
fun `then the merged stream does not crash`() {
// TODO
}
}
}

But first we need to confirm that using Completable.mergeArray() will indeed cause a crash, right? Or else how will we know that we resolved it? So for now let’s use Completable.mergeArray() in our test and see what happens:

@Test
fun `then the merged stream does not crash`() {
   Completable.mergeArray(completable1, completable2).test().await()
}

If we run this test, we’ll notice the following contradicting behavior:

The test passed although an UndeliverableException is displayed in the console!

That happened because as we explained in the first part of the series, the first error that is thrown terminates the downstream. This means that our TestObserver is done with his work after receiving a terminal event and the test succeeds, because we’re not making any assertion. However, the second stream that runs in parallel is also throwing an error which cannot be caught by the downstream, as it is disposed, and the exception is displayed in the console.

But how can we test this asynchronicity?

After all, we want to prove that our custom solution fixes the crashes. That means that we need a failing test first, otherwise we’ll never know whether we fixed the issue or not.

Side note: This is a very good example that showcases the importance of TDD. The test is passing although an exception is thrown. If we had written our implementation before the test, even if our implementation wasn’t fixing the issue we would still see a “green” test and would naively assume that we fixed it.

It turns out there’s a library called ConcurrentUnit that serves exactly our purpose:

Usage

– Create a Waiter
– Use Waiter.await to block the main test thread.
– Use the Waiter.assert calls from any thread to perform assertions.

Once expected assertions are completed, use Waiter.resume call to unblock the awaiting thread.

That means that we just need to call Waiter.await after merging the Completables, and fail the test if we receive an exception after the downstream is disposed (which is the exception that crashes the app). But how can we observe exceptions if the downstream is disposed? Well, it looks like the RxJava global error handler will come in handy now:

@Test
fun `then the merged stream does not crash`() {
val waiter = Waiter()
RxJavaPlugins.setErrorHandler {
// Fail the test if we receive an exception in the error handler
waiter.fail("Error handler received an exception: $it")
}
Completable.mergeArray(completable1, completable2).test().await()
Timer().schedule(10) {
// Pass the test after 10ms if we don't receive an exception
waiter.resume()
}
// Await for waiter.resume() or waiter.fail()
waiter.await()
}

If we run this test, we can finally see it failing:

That’s exactly what we needed! A test that confirms the issue we are trying to resolve: If two streams that are merged using Completable.mergeArray()and run in parallel threads signal an error at the same time, an exception will be thrown that cannot be handled by the downstream, because it is already disposed, which will cause the app to crash.

Now let’s replace Completable.mergeArray() with our custom safeMergeArray() function in this test and run it. We’ll of course see it failing again, similarly to the previous tests, since our function is not implemented yet.

Tip: Schedulers.trampoline() prevents testing the real behavior of multi-threaded errorsIf the asynchronous test that we wrote above is passing for you, then most likely you are overriding the Schedulers.io() with the Schedulers.trampoline() in your tests — probably using a JUnit 5 extension. This is a common pattern in RxJava tests that allows us to execute the streams sequentially, regardless of the scheduler that we subscribe to in the production code. If that’s the case, you’ll need to add a @BeforeEach and reset the schedulers to their default behavior:

@BeforeEach
fun setup() {
    RxJavaPlugins.reset()
}

That ensures that we’re testing the real asynchronous behavior without mocking the schedulers.

“Red” part is done

All tests are failing, so that means that the “Red” part is done in our Red-Green-Refactor approach.

Up Next

Implementing safeMergeArray() to support multi-threaded merging of simultaneously failable streams.

More in this series
About the author

Stelios Frantzeskakis is a Senior Software Engineer for Perry Street Software, publisher of the LGBTQ+ dating apps SCRUFF and Jack’d, with more than 20M members worldwide.

Thanks to Eric Silverberg.

 

YOU MAY BE INTERESTED IN

YOU MAY BE INTERESTED IN

blog
It’s one of the common UX across apps to provide swipe to dismiss so…
READ MORE
blog
Hi, today I come to you with a quick tip on how to update…
READ MORE
blog
Automation is a key point of Software Testing once it make possible to reproduce…
READ MORE
blog
Drag and Drop reordering in Recyclerview can be achieved with ItemTouchHelper (checkout implementation reference).…
READ MORE
Menu