Creating Observables and Subscribers in RxJava

RxJava is a very useful library for asynchronous and event-based programming, also called reactive programming.

There are two main actors in RxJava:

Observables — An object that emits a stream of data or events

Observers/Subscribers — An object that acts upon the emitted data.

 

An observer is an object which know how to act upon 3 sort of events:

  1. onNext : this event provides a new item each time.
  2. onCompleted: this event notifies that the event stream is finished.
  3. onError: this event notifies an error.

We can see this event sorts as 3 separate pipelines. When an observer starts observing data, it gets 1 item every time onNext method is called. When there is no more data, it gets notified by the onCompleted method. If an error occurs below the pilepine, the error gets propogated by the onError event.

It is important to notice that, once a stream has changed the pipeline, it wil never switch back. It means, once an error occurs, the processing stops. There comes no data after an error event. Also, after an finished event, no data will be processed.

To use the RxJava, first the dependency has to be add. For release 1, the maven dependency is:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.1.6</version>
</dependency>

Once the dependency has been set, we can use the library.

@Test
 public void testObservable() {
 /*
 * myObservable is an observable object. When an observer subscribes to
 * it, it will call the 'call' method for every item in the stream
 */
 Observable<Integer> myObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(Subscriber<? super Integer> observer) {
 try {
 if (!observer.isUnsubscribed()) {
 for (int i = 1; i <= 5; i++) {
 observer.onNext(i);
 }
 observer.onCompleted();
 }
 } catch (Exception e) {
 observer.onError(e);
 }
 }
 });

myObservable.subscribe(new Subscriber<Integer>() {
 @Override
 public void onNext(Integer item) {
 System.out.println("The next item * item is : " + item * item);
 }

@Override
 public void onError(Throwable error) {
 System.err.println("Error: " + error.getMessage());
 }

@Override
 public void onCompleted() {
 System.out.println("Stream completed.");
 }
 }); 
 
 for (int x = 0; x < 5; x++) {

myObservable.subscribe(new Subscriber<Integer>() {
 @Override
 public void onNext(Integer item) {
 System.out.println("onNext: " + item);
 }

@Override
 public void onError(Throwable error) {
 System.err.println("Error: " + error.getMessage());
 }

@Override
 public void onCompleted() {
 System.out.println("Stream completed.");
 }
 }); 
 }
 }

In RxJava, the main object is an Observable.
Every subscriber can define it’s own operations like onNext, OnError and OnCompleted.

The observable methods get executed in the same thread as the subscriber.
The operations of obsevable’s can be chained. In this case, all operations will be executed from the first thread.

If there is already an object as data source, the reference of that object can be used when creating an observable.

In the example below, an existing list is used to feed the observable. Once an object subscribes to the observable, the list items will be pushed to the subscriber.

 @Test
 public void testCreateObservableFromCollection() {
   List<Integer> list = new ArrayList<>();
   IntStream.range(0, 1000).forEach(list::add);

   Observable<Integer> observableFromCollection = Observable.from(list);

   observableFromCollection.subscribe(new Observer<Integer>() {
     @Override
     public void onCompleted() {
       System.out.println("observableFromCollection completed");
     }

     @Override
     public void onError(Throwable e) {
       System.out.println("Error: " + e);
     }

     @Override
     public void onNext(Integer i) {
       System.out.println("Collection item: " + i);
     }

  });
}

Most of the time, the data will be computed using a method call. In RxJava, such a method can be used to create an Obsevable:

@Test
public void testCreateObservableFromMethod() {
  Observable<String[]> observableFromMethod = Observable.just(helloWorld());

  observableFromMethod.subscribe(new Observer<String[]>() {
  @Override
  public void onCompleted() {
    System.out.println("observableFromMethod completed");
  }

  @Override
  public void onError(Throwable e) {
    System.out.println("Error: " + e);
  }

  @Override
  public void onNext(String[] item) {
    Arrays.asList(item).forEach(e -> System.out.println("Item is: \"" + e + "\""));
  }
 });
}

private String[] helloWorld() {
  return new String[]{"Hello World", "This is an example"};
}

An observable can also be created from final values:

@Test
public void testObservableFromFinal() {
  Observable<String> observableFromFinal = Observable.just("Hello World!");

  observableFromFinal.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
      System.out.println("observableFromFinal completed");
    }

    @Override
    public void onError(Throwable e) {
      System.out.println("Error: " + e);
    }

    @Override
    public void onNext(String item) {
      System.out.println("Item: \"" + item + "\"");
    }
  });
}

 

There are also Observable.empty(), Observable.never(), and Observable.throw() methods for special cases.

Observable.empty() emits nothing and terminates normally.

Observable.never() emits nothing and terminates also never.

Observable.throw()emits nothing and terminates with an exception.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s