[java] RxJava의 흐름 제어

RxJava는 Reactive Extensions의 Java 구현체로, 비동기적인 이벤트 기반 프로그래밍을 쉽게 구현할 수 있도록 도와줍니다. RxJava의 강력한 기능 중 하나는 흐름 제어입니다. 이를 통해 데이터의 흐름을 원하는대로 제어하고, 비동기 작업을 보다 간편하게 처리할 수 있습니다.

1. 스케줄러 활용

RxJava에서 스케줄러는 다양한 작업을 어떤 스레드에서 실행할지 결정하는 역할을 합니다. 스케줄러는 다음과 같은 종류가 있습니다.

Observable.just("Hello, RxJava!")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

위의 예시에서 just() 메소드를 사용하여 “Hello, RxJava!”라는 문자열을 발행하고, subscribeOn()observeOn()을 사용하여 실행할 스레드를 지정합니다. 이 경우 Schedulers.io() 스케줄러에서 작업을 처리하고, 결과를 AndroidSchedulers.mainThread() 스케줄러로 전달하여 메인 스레드에서 결과를 처리합니다.

2. 연산자의 활용

RxJava는 다양한 연산자를 제공하여 데이터 흐름을 제어할 수 있습니다. 몇 가지 예시를 살펴보겠습니다.

Observable.just("Hello")
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s + ", RxJava!";
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

위의 예시에서 just() 메소드를 사용하여 “Hello”라는 문자열을 발행하고, map() 연산자를 사용하여 문자열 뒤에 “, RxJava!”를 추가합니다. 그 결과로 “Hello, RxJava!”라는 문자열이 구독자에게 전달됩니다.

Observable.range(1, 10)
        .filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        })
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, String.valueOf(integer));
            }
        });

위의 예시에서 range() 메소드를 사용하여 1부터 10까지의 숫자를 발행하고, filter() 연산자를 사용하여 숫자를 필터링합니다. 이 예시에서는 짝수만을 구독자에게 전달합니다.

3. 조합 연산자의 활용

RxJava는 여러 Observable을 조합하여 새로운 Observable을 생성하는 다양한 조합 연산자를 제공합니다.

Observable<String> observable1 = Observable.just("Hello");
Observable<String> observable2 = Observable.just("RxJava!");

Observable.combineLatest(observable1, observable2, new BiFunction<String, String, String>() {
    @Override
    public String apply(String s1, String s2) throws Exception {
        return s1 + ", " + s2;
    }
})
.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, s);
    }
});

위의 예시에서 combineLatest() 연산자를 사용하여 observable1observable2를 조합합니다. BiFunction을 사용하여 두 개의 문자열을 조합하여 새로운 문자열을 만들고, 구독자에게 전달합니다.

위의 예시들은 RxJava의 흐름 제어 기능 중 일부를 간단히 소개한 것입니다. RxJava는 많은 다른 연산자와 기능을 제공하므로, 필요에 따라 공식 문서나 예제 코드를 참고하여 자세히 알아보는 것을 추천합니다.