RxJava - SyncOnSubscribe and fromCallable

RxJava is impressive, but it's a fairly overwhelming framework so I want to share some of the lesser known parts.

Note: Code examples will be in Kotlin.

Backpressure

Simply; "backpressure" is when your Observable emits items quicker than a downstream Observer/Operator can consume them.

For example:

Observable  
    .create<Int> { s ->
        for (i in 0..999) {
            s.onNext(i)
        }
    }
    .observeOn(Schedulers.io())
    .subscribe()

This will eventually throw a MissingBackpressureException once the scheduler runs out of workers to assign the upstream elements to.

On Android this would translate to observeOn(AndroidSchedulers.mainThread()) where changing UI state is taking longer than the incoming IO.

This can be fixed with adding an onBackpressureX() before the observeOn operator. But I would only use this approach if you don't have control over the source emits elements.

OnSubscribe helpers

The RxJava team realised writing backpressure friendly Observable's is pretty tricky, at first the Producer API was added, but it's pretty cumbersome and difficult to roll your own.

Thankfully the new backpressure friendly SyncOnSubscribe and AsyncOnSubscribe classes were added fairly recently.

These provide much simpler callback methods to generateState, get the next item and optionally what to do if unSubscribe is called.

Let's convert the above example:

class IntSyncSubscribe(val countToo: Int = 999) : SyncOnSubscribe<Int, Int>() {  
    override fun generateState(): Int = 0
    override fun next(state: Int, observer: Observer<in Int>): Int {
        observer.onNext(state)
        return (state + 1).apply {
            if (this > countToo) observer.onCompleted()
        }
    }

}

(N.B. I am using some Kotlin HOF if you are unsure what .apply {} is.)

This is then constructed as:

Observable  
    .create(IntSyncSubscribe())
    .observeOn(Schedulers.io())
    .subscribe()

Now the downstream Operator (observeOn) will request the next item from upstream source.

Too much boilerplate

What if you are only emitting one item? SyncOnSubscribe seems like overkill.

Then use Observable.fromCallable(Callable):

Observable.fromCallable { doSomeWork() }  

This is backpressure friendly and the recommended way to create simple Observables now.

If you are currently doing:
Observable.defer { Observable.just(doSomeWork()) } or worse:
Observable.just(doSomeWork()). I recommend you change to fromCallable().

Practical Example

This is a wrapped OkHttp call pulled from a project:

class RequestSyncOnSubscriber(val okHttp: OkHttpClient, val httpRequest: HttpRequest)  
: SyncOnSubscribe<Call, Response>() {

    override fun generateState(): Call { return okHttp.newCall(httpRequest) }

    override fun next(state: Call, observer: Observer<in Response>): Call {
        if (state.isCanceled || state.isExecuted) {
            observer.onCompleted()
            return state
        }
        try {
            val response = state.execute()
            observer.onNext(response)
        } catch (e: IOException) {
            Exceptions.throwOrReport(e, observer)
        }
        observer.onCompleted()
        return state
    }

    override fun onUnsubscribe(state: Call) { state.cancel() }
}

Let me know feedback in the comments!

comments powered by Disqus