We had an interesting requirement for an observable in work recently. We want to wrap an MSMQ with an IObservable. It’s an interesting problem, especially if you are reading off a transactional queue.
We’re using transactional queues to guarantee delivery, so we need to make sure we have at least 1 recipient for a message: We never want to do a transactional read of a message off the queue, only to find that there’s no one to handle it.
How we wanted our observable to behave:
- Only start reading messages from MSMQ if we have at least 1 subscriber
- Multiple subscribers should not cause multiple calls to read messages from the queue – they should all share the same subscription
- If all subscribers have unsubscribed, stop reading from the queue
At first, I was thinking a combination of Observable.Defer (to lazily create an observable that reads from MSMQ) with Observable.Publish (to share the same subscription amongst multiple subscribers). But, that didn’t quite work out.
Then I came across the very useful RefCount operator. Unlike most other RX operators, this one is an extension over IConnectableObservable<T>, as opposed to just IObservable<T>. From the docs:
Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
This seems to be exactly what we need. We can now set up our observable as follows:
var o = Observable.Create(observer =>
{
// TODO: work to start pulling messages off queue.
return () =>
{
// TODO: Work to disconnect from queue
};
})
.Publish()
.RefCount();
Read more: Nascent Code
QR: