New reactive operators and sources

Intro

Since some time, reactive programming is one of my favourite field of modern software engineering.

I’ve decided to share the idea of introducing few new general-purpose reactive operators and observables. The candidates seem so far not to be part of any major or popular Rx library (as documented on https://reactivex.io) but IMO would be useful as part of such. Some of them offer functionality that cannot be easily created by a simple composition of existing operators. Others - are proposed for clarity and convenience because they introduce better semantics than alternative of not-so-obvious compositions.

With index

Similar to timestamp except that is attaches an ordinal information to each emitted item.

Example:

With-index operator diagram

The operator is inspired with Python’s enumerate function that iterates over collection and returns pair of an index and a value.

Tuplewise

Operator that groups most recent N (and exactly N) items into a tuple (or buffer).

Tuplewise operator

Every N-th

This operator takes an integer value N as argument and emits an item from the source every N-1 items, skipping all others in between.

Every nth operator diagram

This operator provides a very simple way of throttling or sampling observable source items based on a very simple underlying mechanism (a built-in counter will do), so it might be particularly useful for resource-constraint Rx implementations.

Functionality of this operator is equal to chain of:

source.buffer(N).map(b->b[0])

or could be expressed using the with_index operator introduced above, in the form of:

source.with_index().filter(tuple -> tuple.first % N == 0).map(tuple -> tuple.second)

however, dedicated implementation would be so much simpler and smaller, without extra temporary object and buffer constructions etc.

Sort

This operator collects all incoming items and when source completes, re-emits all items in sorted form.

Example:

Sort operator

Alternative operator with a user-given binary operator (as item comparer) can be introduced as well.

Reverse

This operator re-emits all items in reversed order, at the time of source completion.

Example:

Reverse operator

Tokenize

This operator works on observable of characters as input and emits strings on detecting one of string termination markers given

Example:

Tokenize operator

Cartesian product

This operator combines two sources, and when received item on either of them, emits items that are yet missing for a complete Cartesian product of the two sources. The result on completion is that all product items have been emitted.

Example:

Cartesian product operator

The example above displays a cartesian product for two observables, but in general, the operator can produce products of more than two sources (as N-dimensional product).

MScan

The name of this operator stands from multiple-scan. This is generalisation of a scan operator.

mscan, in contrast to scan, takes any number of observable sources (rather than one like scan does), and having a dedicated accumulation functions - one for each observable source it acts on, it emits accumulated values, following items emitted by each given source.

Existing scan operator can be seen as a simple case of proposed mscan, where only one source and its dedicated accumulation function is given.

mscan is a somehow similar in concept to sequence of specially crafted merge and specially prepared scan chained together, with the exception that multiple observables it operates on can be of different value types.

I consider it being quite powerful operator actually. I’m not entirely sure this description is presenting it adequately, so I might give it a dedicated post with some examples and comparison soon.

Play

This observable source takes (similarly to iterate) a collection of time-stamped items (pairs made of a time-point and a value to emit) and emits them sequentially at specified time-stamps.

play({2,A}, {4,B}, {5,C}, {8,D})

time   -1-2-3-4-5-6-7-8-9-...
result ---A---B-C-----D|

The name comes from analogy to playing musing (playing values, like notes, in sequence at specific points in time).

Reasons:

  • Having such a source turns to be extremely useful in writing unit tests, to simulate events that happen in time.

RTC source

This observable source notifies about running real-time-clock time. By default the source emits items every round second, but additional parameters can tune that according to user’s need.

comments powered by Disqus