Map Reduce implementations in RxJava

Scan functionality of RxJava is a map-reduce implementation. It applies the given function to the items of a stream.

In the example above, the behavior can be changed only by changing the given function:

scan((x,y) -> x+1) computes the count of elements.

scan((x,y) -> x+y) is implementation of Gauss law, which adds all the numbers.

scan(1, (x,y) -> x * y) computes the factorial to 10, given the list IntStream.range(1, 11).

 @Test
 public void testObservableScan() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list).scan((x,y) -> x+1);

observableFromCollection.subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("total of items: " + i);
 }

});

 

De scan function can be combined with the groupBy function to group the stream and apply the scan function:

@Test
 public void testObservableScanGroupByBuffer() { 
 List<Integer> list = new ArrayList<>();
 IntStream.range(1, 10).forEach(list::add);
 System.out.println("=======================testObservableScanGroupByBuffer=======================");
 Observable.from(list)
 .groupBy((i) -> 0 == (int)i % 2 ? "EVEN" : "ODD") 
 .subscribe((GroupedObservable<String,Integer> group) -> {
      System.out.println("Key: " + group.getKey());
      group.scan((x,y) -> x + y).buffer(5).subscribe(e -> System.out.println(group.getKey() + ": " + e));
   });
 System.out.println("==============================================");
 }

 

which will result:

Key: ODD
ODD: 1
Key: EVEN
EVEN: 2
ODD: 4
EVEN: 6
ODD: 9
EVEN: 12
ODD: 16
EVEN: 20
ODD: 25

We can also buffer the results using buffer function:

 @Test
 public void testObservableScanGroupByBuffer() { 
 List<Integer> list = new ArrayList<>();
 IntStream.range(1, 10).forEach(list::add);
 System.out.println("=======================testObservableScanGroupByBuffer=======================");
 Observable.from(list)
 .groupBy((i) -> 0 == (int)i % 2 ? "EVEN" : "ODD") 
 .subscribe((GroupedObservable<String,Integer> group) -> { System.out.println("Key: " + group.getKey());
 group.scan((x,y) -> x + y).buffer(5).subscribe(e -> System.out.println(group.getKey() + ": " + e));
 });
 System.out.println("==============================================");
 }

which will result:

=======================testObservableScanGroupByBuffer=======================
Key: ODD
Key: EVEN
ODD: [1, 4, 9, 16, 25]
EVEN: [2, 6, 12, 20]
======================

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s