[java] RxJava에서의 중첩된 Observable 처리

RxJava는 비동기 이벤트 기반 프로그래밍을 위한 라이브러리로 많이 사용됩니다. 이러한 비동기 작업을 처리하기 위해 중첩된 Observable을 사용할 수 있습니다. 중첩된 Observable은 한 Observable이 다른 Observable을 생성하고, 그 결과를 사용하는 구조입니다.

중첩된 Observable의 작성

RxJava에서는 flatMap 또는 concatMap 함수를 사용하여 중첩된 Observable을 작성할 수 있습니다. 아래의 예제 코드에서는 Observable.from() 함수를 사용하여 1부터 5까지의 숫자를 방출하는 Observable을 생성합니다. 이후 flatMap 함수를 사용하여 각 숫자에 대해 새로운 Observable을 생성하고, 그 Observable에서 “Hello”와 해당 숫자를 조합하여 방출하는 예제입니다.

Observable<Integer> source = Observable.from(Arrays.asList(1, 2, 3, 4, 5));

source.flatMap(number -> {
        return Observable.just("Hello " + number);
    })
    .subscribe(message -> {
        System.out.println(message);
    });

위의 코드를 실행하면 다음과 같은 결과가 출력됩니다:

Hello 1
Hello 2
Hello 3
Hello 4
Hello 5

중첩된 Observable의 결합

중첩된 Observable에서 각각의 Observable이 비동기적으로 실행되면서 결과를 방출하게 됩니다. 이렇게 나온 결과들을 결합하여 처리하고 싶은 경우, flatMap 함수 대신 concatMap 함수를 사용할 수 있습니다. concatMap 함수는 각 Observable을 순차적으로 실행하고, 이전 Observable의 결과가 처리된 후에 다음 Observable을 실행하는 특징을 가지고 있습니다.

아래의 예제 코드에서는 1부터 5까지의 숫자를 방출하는 Observable을 생성한 후, concatMap 함수를 사용하여 각 숫자에 대해 1초의 딜레이를 가지는 Observable을 생성합니다. 이후 각 숫자에 대한 결과값을 모두 출력하는 예제입니다.

Observable<Integer> source = Observable.from(Arrays.asList(1, 2, 3, 4, 5));

source.concatMap(number -> {
        return Observable.just(number)
            .delay(1, TimeUnit.SECONDS);
    })
    .subscribe(result -> {
        System.out.println(result);
    });

Thread.sleep(6000);

위의 코드를 실행하면 결과값이 1부터 5까지 1초씩 딜레이되어 출력됩니다:

1
2
3
4
5

참고 자료