RxJava는 Reactive Extensions의 Java 구현체로, 비동기적인 이벤트 기반 프로그래밍을 쉽게 구현할 수 있도록 도와줍니다. RxJava의 강력한 기능 중 하나는 흐름 제어입니다. 이를 통해 데이터의 흐름을 원하는대로 제어하고, 비동기 작업을 보다 간편하게 처리할 수 있습니다.
1. 스케줄러 활용
RxJava에서 스케줄러는 다양한 작업을 어떤 스레드에서 실행할지 결정하는 역할을 합니다. 스케줄러는 다음과 같은 종류가 있습니다.
Schedulers.io()
: I/O 작업에 최적화된 스케줄러로, 네트워크 요청이나 파일 입출력 등의 작업에 적합합니다.Schedulers.computation()
: CPU 연산에 최적화된 스케줄러로, 계산 작업이나 복잡한 작업에 사용됩니다.Schedulers.newThread()
: 새로운 스레드를 생성하여 작업을 처리하는 스케줄러입니다.AndroidSchedulers.mainThread()
: Android 애플리케이션의 메인 스레드에서 작업을 처리하는 스케줄러입니다.
Observable.just("Hello, RxJava!")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
위의 예시에서 just()
메소드를 사용하여 “Hello, RxJava!”라는 문자열을 발행하고, subscribeOn()
과 observeOn()
을 사용하여 실행할 스레드를 지정합니다. 이 경우 Schedulers.io()
스케줄러에서 작업을 처리하고, 결과를 AndroidSchedulers.mainThread()
스케줄러로 전달하여 메인 스레드에서 결과를 처리합니다.
2. 연산자의 활용
RxJava는 다양한 연산자를 제공하여 데이터 흐름을 제어할 수 있습니다. 몇 가지 예시를 살펴보겠습니다.
map()
: Observable이 발행하는 데이터를 변환하는 연산자입니다.
Observable.just("Hello")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s + ", RxJava!";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
위의 예시에서 just()
메소드를 사용하여 “Hello”라는 문자열을 발행하고, map()
연산자를 사용하여 문자열 뒤에 “, RxJava!”를 추가합니다. 그 결과로 “Hello, RxJava!”라는 문자열이 구독자에게 전달됩니다.
filter()
: Observable이 발행하는 데이터를 필터링하는 연산자입니다.
Observable.range(1, 10)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, String.valueOf(integer));
}
});
위의 예시에서 range()
메소드를 사용하여 1부터 10까지의 숫자를 발행하고, filter()
연산자를 사용하여 숫자를 필터링합니다. 이 예시에서는 짝수만을 구독자에게 전달합니다.
3. 조합 연산자의 활용
RxJava는 여러 Observable을 조합하여 새로운 Observable을 생성하는 다양한 조합 연산자를 제공합니다.
Observable<String> observable1 = Observable.just("Hello");
Observable<String> observable2 = Observable.just("RxJava!");
Observable.combineLatest(observable1, observable2, new BiFunction<String, String, String>() {
@Override
public String apply(String s1, String s2) throws Exception {
return s1 + ", " + s2;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
위의 예시에서 combineLatest()
연산자를 사용하여 observable1
과 observable2
를 조합합니다. BiFunction
을 사용하여 두 개의 문자열을 조합하여 새로운 문자열을 만들고, 구독자에게 전달합니다.
위의 예시들은 RxJava의 흐름 제어 기능 중 일부를 간단히 소개한 것입니다. RxJava는 많은 다른 연산자와 기능을 제공하므로, 필요에 따라 공식 문서나 예제 코드를 참고하여 자세히 알아보는 것을 추천합니다.