RxJava Operations on Observables

Filtering and other operations on Observables

If only a subset of elements in a stream is needed, a filter can be used to eliminate the items not needed and let the observable emit only desired elements.

@Test
 public void testCreateObservableFromCollectionWithFilter() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list).filter(e -> e%2==0);

observableFromCollection.subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Filtered item: " + i);
 }

});
 }

 

If only N elements are needed from the begin of a sequence, take() method can be used:

@Test
 public void testCreateObservableFromCollectionWithTake() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list).take(2);

observableFromCollection.subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken item: " + i);
 }

});
 }

 

If only the last N items are needed from the end of a sequence, takeLast() method can be used

@Test
 public void testCreateObservableFromCollectionWithTakelast() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list).takeLast(2);

observableFromCollection.subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken last item: " + i);
 }

});
 }

 

If duplicate elements are not allowed, the distinct() method can be used to filter the duplicate elements

@Test
 public void testCreateObservableFromCollectionWithDistinct() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list).repeat(2);

observableFromCollection.distinct().subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken last item: " + i);
 }

});
 }

 

Repeating items after each other can be filtered until a different value is emitted

@Test
 public void testCreateObservableFromCollectionWithDistinctUntilChanged() {
 List<Integer> list = new ArrayList<>();
 list.add(1);
 list.add(1);
 list.add(1);
 list.add(1);
 list.add(2);
 list.add(2);
 list.add(3);




Observable<Integer> observableFromCollection = Observable.from(list).repeat(2);

observableFromCollection.distinctUntilChanged()().subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken last item: " + i);
 }

});
 }

 

Using first() and last() methods, the first and last item in a sequence can be taken

@Test
 public void testCreateObservableFromCollectionWithTakelast() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list));

observableFromCollection.first().subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken last item: " + i);
 }

});
 }

 

To skip N items from the begin or from the end of a stream, skip() and skipLast() methods can be used

@Test
 public void testCreateObservableFromCollectionWithTakelast() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list));

observableFromCollection.skip(2).subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken last item: " + i);
 }

});
 }

 

If only an element at a specific position is wanted, elementAt() method can be used

@Test
 public void testCreateObservableFromCollectionWithTakelast() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 1000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list));

observableFromCollection.elementAt(2).subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken last item: " + i);
 }

});
 }

Sampling to emit elements on timeout

If we are not interested in all items but want the observe the behavior roughly or if we have an observable which emits not frequently changing values like the room temperature, we can use sampling to take the values on intervals:

@Test
 public void testCreateObservableFromCollectionWithTakelast() {
 List<Integer> list = new ArrayList<>();
 IntStream.range(0, 100000).forEach(list::add);

Observable<Integer> observableFromCollection = Observable.from(list));

observableFromCollection.sample(1,TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
 @Override
 public void onCompleted() {
 System.out.println("observableFromCollection completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer i) {
 System.out.println("Taken last item: " + i);
 }

});
 }

 

The sample() method will emit the last item when a timeout occurs. If we want the first item in every timeout period, we should use throttleFirst() method.

The sample method doesn’t do anything if in a time period no items has been emitted. If it is a requirement that in every period, the required count of items should be emitted, the timeout() method should be used. If there are no items received in a period, timeout() method will fire an exception. To emit only the first item in a time window and throw the rest of the items away, debounce() method can be used.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s