Alex Arlievsky February 2016

Reactive Extensions: Is it possible to subscribe on Sum of the result of Buffer operation?

I am trying to get notifications on the result of Aggregate functions (i.e. Sum), which operates on the partial sequence of the infinite sequence (topmost, data source sequence never completes). The problem can be seen here:

var seq = Observable.Interval(TimeSpan.FromMilliseconds(20)).Buffer(10);
seq.Sum(l => l.Sum())
            .Subscribe(n =>
                s_log.DebugFormat("Got {0}", n));

Lambda l.Sum() is called as expected (partial sums are calculated), but "Got ..." line is never printed, because subscriber is never called. I suspected it is related somehow to "never ending" character of the original sequence. The finite sequence:

 Observable.Range(1,100).Buffer(10);

works as expected. So question is simple: how to "mark" partial fragments of the infinite sequence to be "complete", so aggregate functions would work on them separately (and will push their results to subscribers) ?

Answers


Shlomo February 2016

Scan is your friend:

seq.Scan(0L, (l1, l2) => l1 + l2.Sum())
   .Subscribe(n => Console.WriteLine("Got {0}", n));

Post Status

Asked in February 2016
Viewed 2,195 times
Voted 12
Answered 1 times

Search




Leave an answer