Creating RxJava Subjects

RxJava Subjects are observable’s and observers at the same time.
Because they can be both, they can subscribe to an other observable’s and because it is self an observable, it can pass through the items it observes by re-emitting them. It can also emit new items.
As a result, a subject can subscribe to an observable, enrich the items and reemit them.

There are 4 types of subjects:

  • Publish Subject
  • Replay Subject
  • Behavior Subject
  • Async Subject

Publish subject

@Test
 public void testPublishSubject() {
 PublishSubject<String> publishSubject = PublishSubject.create();

IntStream.range(0, 10).forEach(e -> publishSubject.onNext("" + e));
 
 publishSubject.subscribe(new Observer<String>() {

@Override
 public void onCompleted() {
 System.out.println("Observable completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(String message) {
 System.out.println(message);
 }
 });
 IntStream.range(20, 30).forEach(e -> publishSubject.onNext("" + e));
 IntStream.range(100, 105).forEach(e -> publishSubject.onNext("" + e));
 }

A publish subject will emit data with every onNext call. Only subscribers connected when this method has been called, will receive the items.

Replay Subject

@Test
 public void testReplaySubject() {
 
 Observer<Integer> o1 = new Observer<Integer>() {

@Override
 public void onCompleted() {
 System.out.println("Observable completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer message) {
 System.out.println(message);
 }
 };
 
 
 ReplaySubject<Integer> replaySubject = ReplaySubject.create(1);

IntStream.range(0, 10).forEach(e -> replaySubject.onNext(e));
 
 replaySubject.subscribe(o1);
 
 IntStream.range(20, 30).forEach(e -> replaySubject.onNext(e));
 }

A replay subject buffers all items it observes and replays them to any Observer
that subscribes. Any subscriber receives exact the same items.

Behavior Subject

@Test
 public void testBehaviorSubject() {
 BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
 
 IntStream.range(0, 10).forEach(e -> behaviorSubject.onNext(e));
 IntStream.range(20, 30).forEach(e -> behaviorSubject.onNext(e));
 
 behaviorSubject.subscribe(new Observer<Integer>() {

@Override
 public void onCompleted() {
 System.out.println("Observable completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer message) {
 System.out.println(message);
 }
 });
 IntStream.range(100, 105).forEach(e -> behaviorSubject.onNext(e));
 }

A behavior subject emits the most recent item it has observed and all subsequent observed items to each subscribed item.

Async Subject

@Test
 public void test1() {
 AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
 
 IntStream.range(0, 10).forEach(e -> asyncSubject.onNext(e));
 //asyncSubject.onCompleted();
 
 asyncSubject.subscribe(new Observer<Integer>() {

@Override
 public void onCompleted() {
 System.out.println("Observable completed");
 }

@Override
 public void onError(Throwable e) {
 System.out.println("Error: " + e);
 }

@Override
 public void onNext(Integer message) {
 System.out.println(message);
 }
 });
 
 IntStream.range(20, 30).forEach(e -> asyncSubject.onNext(e));
 //asyncSubject.onCompleted();
 IntStream.range(40, 50).forEach(e -> asyncSubject.onNext(e));
 asyncSubject.onCompleted(); 
 }

An Async subject will emit only the last item received before onCompleted.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s