[RxJava 프로그래밍] 2장. Observable 처음 만들기
Chapter 2 Observable 처음 만들기
__2.1 Observable 클래스
____2.1.1 just() 함수
____2.1.2 subscribe() 함수와 Disposable 객체
____2.1.3 create() 함수
____2.1.4 fromArray() 함수
____2.1.5 fromIterable() 함수
____2.1.6 fromCallable() 함수
____2.1.7 fromFuture() 함수
____2.1.8 fromPubilsher() 함수
__2.2 Single 클래스
____2.2.1 just() 함수
____2.2.2 Observable에서 Single 클래스 사용
____2.2.3 Single 클래스의 올바른 사용 방법
__2.3 Maybe 클래스
__2.4 뜨거운 Observable
__2.5 Subject 클래스
____2.5.1 AsyncSubject 클래스
____2.5.2 BehaviorSubject 클래스
____2.5.3 PublishSubject 클래스
____2.5.4 ReplaySubject 클래스
__2.6 ConnectableObservable 클래스
__2.7 마치며

Observable클래스란 무엇일까??

이는 세분화하여 Observable, Maybe, Flowable 클래스로 구분되 사용.

Observable은 옵서버 패턴을 구현. 옵서버 패턴은 객체의 상태 변화를 관찰하는 관찰자 목록을 객체에 등록합니다. 그리고 상태 변화가 있을 때마다 메서드를 호출하여 객체가 직접 목록의 각 옵서버에게 변화를 알려줍니다. 라이프 사이클은 존재하지 않으며 보통 단일 함수를 통해 변화만 알립니다.

Observed라는 단어가 관찰을 통해서 얻은 결과를 의미한다면 Observable은 현재는 관찰되지 않았지만 이론을 통해서 앞으로 관찰할 가능성을 의미한다.

Observable은 세 가지의 알림을 구독자에게 전달.

1. just() 함수
2. subscribe() 함수와 Disposable 객체

RxJAVA는 내가 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절할 수 있습니다. 이때 사용하는 것이 subscribe()함수. Observable은 just()등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe()함수를 호출해야 실제로 데이터를 발행합니다.

@SchedulerSupport(SchedulerSupport.NONE)
  public final Disposable subscribe() {
      return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
  }

@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public final Disposable subscribe(Consumer<? super T> onNext) {
     return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
 }

@CheckReturnValue
 @SchedulerSupport(SchedulerSupport.NONE)
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
     return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
 }

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete) {
        return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
    }

    @CheckReturnValue
       @SchedulerSupport(SchedulerSupport.NONE)
       public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
               Action onComplete, Consumer<? super Disposable> onSubscribe) {
           ObjectHelper.requireNonNull(onNext, "onNext is null");
           ObjectHelper.requireNonNull(onError, "onError is null");
           ObjectHelper.requireNonNull(onComplete, "onComplete is null");
           ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

           LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

           subscribe(ls);

           return ls;
       }

       @SchedulerSupport(SchedulerSupport.NONE)
       @Override
       public final void subscribe(Observer<? super T> observer) {
           ObjectHelper.requireNonNull(observer, "observer is null");
           try {
               observer = RxJavaPlugins.onSubscribe(this, observer);

               ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

               subscribeActual(observer);
           } catch (NullPointerException e) { // NOPMD
               throw e;
           } catch (Throwable e) {
               Exceptions.throwIfFatal(e);
               // can't call onError because no way to know if a Disposable has been set or not
               // can't call onSubscribe because the call might have set a Subscription already
               RxJavaPlugins.onError(e);

               NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
               npe.initCause(e);
               throw npe;
           }
 }

앞 함수 원형은 모두 Disposable 인터페이스의 객체를 리턴합니다.

Observable<String> source = Observable.just("RED", "GREEN", "YELLOW");
Disposable d= source.subscribe(
        v -> System.out.println("onNext(): value : " + v),
        err -> System.out.println("onError() : err :" + err.getMessage()),
        () -> System.out.println("onComplete()")
);

System.out.println("isDisposed() : " + d.isDisposed());
결과
onNext(): value : RED
onNext(): value : GREEN
onNext(): value : YELLOW
onComplete()
isDisposed() : true
3. create()

해당 함수는 onNext(), onComplete, onError 같은 알림을 개발자가 직접 호출해야 한다.

 Observable<Integer> source = Observable.create(
                (ObservableEmitter<Integer> emitter) -> {
                    emitter.onNext(100);
                    emitter.onNext(200);
                    emitter.onNext(300);
                    emitter.onComplete();
                }
        );
        source.subscribe(System.out::println);

사용자가 직접 호출이란 뜻이 이런거.
source.subscribe(System.out::println); 얘는 source.subscribe(data -> System.out.println("Result : " + data)); 를 줄인것

4. fromArray()

배열에서 int[]배열은 Integer[]로 변환해야 합니다. 다양한 방법 중 자바 8의 IntStream.of(intArray).boxed().toArray(Integer[]::new);

5. fromIterable()

Iterable 인터페이스를 구현하는 대표적인 클래스는 ArrayList(List 인터페이스), ArrayBlockingQueue(BlockingQueue 인터페이스), HashSet(Set 인터페이스), LinkedList, Stack, TreeSet, Vector 등이 있습니다.

List<String> names = new ArrayList<>();
        names.add("Jerry");
        names.add("Jerry2");
        names.add("Jerry3");

        Observable<String> source2 = Observable.fromIterable(names);
        source2.subscribe(System.out::println);
6. fromCallable()

기존 자바에서 제공하는 비동기 클래스나 인터페이스의 연동을 할 때 사용

자바 5에서 추가된 동시성 API Callable인터페이스.

 Callable<String> callable = () -> {
            Thread.sleep(1000);
            return "Hello Callable";
        };

        Observable<String> source = Observable.fromCallable(callable);
        source.subscribe(System.out::println);
7. fromFuture()

위와 동일하게 비동기 계산의 결과를 구할 때 사용되지만, 보통 Executor 인터페이스를 구현한 클래스에 Callable 객체에서 구현한 계산 결과가 나올 때까지 블로킹 됩니다.

 Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
            Thread.sleep(1000);
            return "Hello Future";
        });

        Observable<String> source = Observable.fromFuture(future);
        source.subscribe(System.out::println);
8. fromPubilsher()

Observable에서 제공하는 fromXXX() 계열 함수의 마지막. Pubilsher는 자바 9의 표준 Flow API의 일부.

Publisher<String> publisher = (Subscriber<? super String> s) -> {
            s.onNext("Hello Obserable.fromPublisher()");
            s.onComplete();
        };

        Observable<String> source = Observable.fromPublisher(publisher);
        source.subscribe(System.out::println);

Single 클래스

오직 1개의 데이터만 발생하도록 한정하는 것을 말합니다. Observable클래스는 데이터를 무한하게 발행 할 수 있지만 싱글은 오직 1개. 보통 결과가 유일한 서버 API를 호출할 떄 유용하게 사용할 수 있다.

        //1. 기존 Obserable에서 Single 객체로 변환하기
        Observable<String> source = Observable.just("Hello Single");
        Single.fromObservable(source)
                .subscribe(System.out::println);

        //2. single()함수를 호출해 Single 객체 생성하기
        Observable.just("Hello Single")
                .single("default item")
                .subscribe(System.out::println);

        //3. first()함수를 호출해 Single 객체 생성하기.
        String[] colors = {"RED", "Blue", "Gold"};
        Observable.fromArray(colors)
                .first("default value")
                .subscribe(System.out::println);

        //4. empty Observable에서 Single 객체 생성하기.
        Observable.empty()
                .single("default value")
                .subscribe(System.out::println);

        //5. take()함수에서 Single 객체 생성
        Observable.just("하하", "하하2")
                .take(1)
                .single("default order")
                .subscribe(System.out::println);

Maybe 클래스

maybe 클래스는 처음 도입된 Observable의 또 다른 특수 형태.

Single 클래스는 1개 완료, Maybe 클래스는 0혹은 1개완료 할 수도 있습니다.

#

뜨거운 Obserable

Observable에는 뜨거운 것과 차가운 것이 있습니다.

차가운 Observable은 마치 냉장고에 들어있는 냉동식품과 같다. Observable을 선언하고 just(), fromIterable() 함수를 호출해도 옵서버가 subscribe()함수를 호출하여 구독하지 않으면 데이터를 발행하지 않습니다. 다른 말로 게으른 접근법.

뜨거운 Observable은 구독자가 존재 여부와 관계없이 데이터를 발행하는 Observable입니다. 따라서 여러 구독자를 고려 할 수 있다. 단, 구독자로서는 Observable에서 발행하는 데이터를 처음부터 모두 수신할 것으로 보장할 수 없다. 즉, 차가운 Observable은 구독하면 준비된 데이터를 처음부터 발행합니다. 하지만 뜨거운 Observable은 구독한 시점부터 Observable에서 발행한 값을 받습니다.

차가운 Observable의 예는 웹 요청, 데이터베이스 쿼리와 파일 읽기 등입니다. 보통 내가 원하는 URL이나 데이터를 지정하면 그때부터 서버나 데이터베이스 서버에 요청을 보내고 결과를 받아옵니다. 지금까지 우리가 다룬 Observable은 모두 차가운 Observable입니다. 앞으로도 별도의 언급이 없으면 차가운 Observable이라고 생각하면 됩니다.

뜨거운 Observable의 예는 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터와 주식 가격등이 있습니다. 뜨거운 Observable에는 주의 할 점이 있습니다. 바로 배압(back pressure)을 고려.

배압은 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생합니다. Flowable이라는 특화 클래스에서 배압을 처리합니다.

차가운 Observable을 뜨거운 Observable 객체로 변환하는 방법은 Subject 객체를만들거나 ConnectableObservable 클래스를 활용하는 것!

2.5 Subject 클래스

Subject클래스는 차가운 Observable을 뜨거운 Observable로 바꿔준다고 소개했습니다. Subject 클래스의 특성은 Observable의 속성과 구독자의 속성이 모두 있다는 점입니다. Observable 처럼 데이터를 발행할 수도 있고 구독자처럼 발행된 데이터를 바로 처리할 수도 있습니다.

  1. 처음 구독자가 subscribe() 함수 호출
  2. 이후에 1, 2 발행된 후 두번째 구독자가 subscribe()함수를 호출
  3. 마지막으로 3이 발행되고 데이터 발행을 완료.
AsyncSubject<String> subject = AsyncSubject.create();
        subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));
        subject.onNext("1");
        subject.onNext("3");
        subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
        subject.onNext("5");
        subject.onComplete();

Subscriber #1 => 5
Subscriber #2 => 5

BehaviorSubject<String> subject = BehaviorSubject.createDefault("6");
subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
subject.onNext("5");
subject.onComplete();

Subscriber #1 => 6
Subscriber #1 => 1
Subscriber #1 => 3
Subscriber #2 => 3
Subscriber #1 => 5
Subscriber #2 => 5

PublishSubject<String> subject = PublishSubject.create();
        subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));
        subject.onNext("1");
        subject.onNext("3");
        subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
        subject.onNext("5");
        subject.onComplete();

Subscriber #1 => 1
Subscriber #1 => 3
Subscriber #1 => 5
Subscriber #2 => 5

ReplaySubject<String> subject = ReplaySubject.create();
        subject.subscribe(data -> System.out.println("Subscriber #1 => " + data));
        subject.onNext("1");
        subject.onNext("3");
        subject.subscribe(data -> System.out.println("Subscriber #2 => " + data));
        subject.onNext("5");
        subject.onComplete();

Subscriber #1 => 1
Subscriber #1 => 3
Subscriber #2 => 1
Subscriber #2 => 3
Subscriber #1 => 5
Subscriber #2 => 5

ConnectableObservable

Subject클래스처럼 차가운 Observable을 뜨거운 Observable로 변환합니다. Observable을 여러 구독자에게 공유할 수 있으므로 원 데이터 하나를 여러 구독자에게 동시에 전달할 때 사용합니다. 특이한 점은 subscribe()함수를 호출해도 아무 동작이 일어나지 않는다는 점. 새로 추가된 connect() 함수는 호출한 시점부터 subscribe()함수를 호출한 구독자에게 데이터를 발행하기 때문입니다.

ConnectableObservable 객체를 생성하려면 먼저 Observable에 publish()함수를 호출해야합니다. 이 함수는 여러 구독자에게 데이터를 발행하기 위해 connect()함수를 호출하기 전까지 데이터 발행을 유예하는 역할.

String[] dt = {"1", "3", "5"};
        Observable<String> balls = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(Long::intValue)
                .map(i -> dt[i])
                .take(dt.length);

        ConnectableObservable<String> source = balls.publish();
        source.subscribe(tester);
        source.subscribe(data -> System.out.println("Subscriber #1 => " + data));
        source.subscribe(data -> System.out.println("Subscriber #2 => " + data));
        source.connect();
        Thread.sleep(250);

        source.subscribe(data -> System.out.println("Subscriber #3 => " + data));
        Thread.sleep(100);