I often come with a scenario where I have a set of historical data and I want to replay and them and process using Rx reactive extensions.
Here is a recipe how to use
HistoricalScheduler in Python 3.
from rx import Observable from rx.concurrency import HistoricalScheduler from datetime import timedelta # tuples as our timestamped data, timestamps are in milliseconds data = [(200, 'a'), (250, 'b'), (400, 'c'), (450, 'd'), (1300, 'e'), (3000, 'f')] sched = HistoricalScheduler() # this converts items from the `data` collection # into observable of time-delayed observables, flat_mapped back to original tuples Observable.from_list(data).flat_map(lambda pair: Observable.timer( timedelta(milliseconds=pair), scheduler=sched).map(lambda _: pair)).subscribe(on_next=print) # This is executed on real time scheduler # and the interval determines the resolution # and the multiplier in .advance_by determines the replay speed Observable.interval(10).subscribe( on_next=lambda i: sched.advance_by(timedelta(milliseconds=4*i))) # this is just to keep the schedulers running while True: pass
To run this code you need to:
pip3 install Rx
There are two main elements in the code:
- an observable that is running with
HistoricalSchedulerusing virtual time
- an interval that advances the virtual time clock of the historical scheduler
This gives requires flexibility to control at which pace the historical data are played and what is the resolution of the virtual time for triggering various events.