[java] RxJava와 데이터 흐름 프로그래밍

RxJava logo

소개

RxJava는 ReactiveX(Reactive Extensions) 라이브러리의 Java 구현체입니다. ReactiveX는 데이터 흐름과 변경에 대한 추상화를 제공하며, 비동기 이벤트 기반 프로그래밍에 많이 사용됩니다. 이러한 방식은 복잡한 비동기 코드를 간결하게 작성할 수 있도록 도와줍니다.

Observable과 Observer

RxJava의 핵심은 ObservableObserver입니다. Observable은 데이터의 흐름을 나타내는 객체이며, ObserverObservable에서 발생하는 이벤트를 처리하는 객체입니다. Observable은 데이터 스트림을 생성하고, Observer는 이 데이터 스트림을 구독하여 이벤트를 처리합니다.

Observable<String> observable = Observable.just("Hello, RxJava!");

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
    }
};

observable.subscribe(observer);

위 코드는 간단한 RxJava 예제입니다. Observable.just()는 데이터 스트림을 생성하고, ObserveronNext(), onError(), onComplete() 메서드를 통해 해당 이벤트를 처리합니다. observable.subscribe(observer)를 호출하여 ObservableObserver를 연결하고 데이터 흐름이 시작됩니다.

연산자

RxJava는 다양한 연산자를 제공하여 데이터 스트림을 변환하고 조작할 수 있습니다. 몇 가지 예시를 살펴보면:

Observable<Integer> numbers = Observable.just(1, 2, 3, 4, 5);

numbers
    .map(number -> number * 2)
    .filter(number -> number > 5)
    .subscribe(result -> System.out.println(result));

numbers라는 Observable에서 데이터를 받아와 map() 연산자로 각 데이터를 2배로 변환한 후, filter() 연산자를 통해 5보다 큰 데이터만 필터링하고, 최종 결과를 출력합니다.

스케줄러

RxJava는 스레드를 효율적으로 관리하기 위해 스케줄러를 제공합니다. 스케줄러는 작업을 실행할 스레드를 관리하고, 각 작업을 적절한 스레드에 할당합니다.

Observable.just("Hello, RxJava!")
    .observeOn(Schedulers.newThread())
    .map(s -> {
        System.out.println("Map: " + Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .observeOn(Schedulers.io())
    .subscribe(result -> {
        System.out.println("Result: " + Thread.currentThread().getName());
        System.out.println(result);
    });

위 코드에서 observeOn()을 사용하여 작업을 수행할 스레드를 지정할 수 있습니다. 첫 번째 observeOn()은 새로운 스레드에서 작업을 수행하고 (newThread()), 두 번째 observeOn()은 IO 스레드풀에서 작업을 수행합니다. 이를 통해 UI 스레드에서 비동기 작업을 수행할 수 있습니다.

마무리

RxJava는 데이터 흐름 프로그래밍을 위한 강력한 도구입니다. ObservableObserver를 활용하여 비동기 코드를 간결하게 작성할 수 있으며, 다양한 연산자와 스케줄러를 제공하여 데이터 스트림의 변환과 관리를 용이하게 할 수 있습니다.

더 자세한 내용은 RxJava 공식 문서를 참고하시기 바랍니다.