[java] RxJava에서의 동적 스트림 생성

Reactive Extensions for Java (RxJava)은 비동기 및 이벤트 기반 프로그래밍을 위한 라이브러리입니다. RxJava은 데이터 스트림을 조작하고 변환하는 일련의 연산자를 제공하여 비동기 작업을 간편하게 처리할 수 있게 해줍니다. 이번 블로그 포스트에서는 RxJava에서 동적으로 스트림을 생성하는 방법에 대해 알아보겠습니다.

스트림 생성 연산자

RxJava에서는 다양한 스트림 생성 연산자를 제공합니다. 이러한 연산자를 사용하여 동적으로 스트림을 생성할 수 있습니다. 주요한 스트림 생성 연산자로는 Observable.create()Observable.defer()가 있습니다.

Observable.create()

Observable.create() 연산자는 Observable을 직접 생성할 수 있는 방법을 제공합니다. 이 연산자를 사용하면 스트림에 원하는 데이터를 생성하고, 에러나 완료 신호를 보낼 수 있습니다. 예를 들면 다음과 같습니다.

Observable<Integer> dynamicStream = Observable.create(emitter -> {
    // 원하는 데이터를 생성
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);

    // 완료 신호 전송
    emitter.onComplete();
});

Observable.defer()

Observable.defer() 연산자는 스트림을 생성하는 로직을 나중에 실행할 수 있는 방법을 제공합니다. 이 연산자를 사용하면 매번 스트림을 생성할 때마다 로직을 실행할 수 있습니다. 예를 들면 다음과 같습니다.

Observable<Integer> dynamicStream = Observable.defer(() -> {
    // 스트림을 생성하는 로직을 나중에 실행
    List<Integer> numbers = new ArrayList<>();
    numbers.add(1);
    numbers.add(2);
    numbers.add(3);

    return Observable.fromIterable(numbers);
});

예제: 동적으로 스트림 생성하기

다음은 RxJava를 사용하여 1부터 시작하여 지정된 개수의 정수를 동적으로 생성하는 예제입니다.

import io.reactivex.Observable;

public class DynamicStreamExample {
    public static void main(String[] args) {
        int count = 5;

        Observable<Integer> dynamicStream = Observable.create(emitter -> {
            for (int i = 1; i <= count; i++) {
                emitter.onNext(i);
            }
            emitter.onComplete();
        });

        dynamicStream.subscribe(System.out::println);
    }
}

위 예제는 Observable.create()를 사용하여 동적으로 스트림을 생성하고, Observable.subscribe()를 사용하여 생성된 스트림을 구독합니다. 실행 결과는 1부터 5까지의 정수가 출력됩니다.

결론

RxJava에서 동적으로 스트림을 생성하는 방법에 대해 알아보았습니다. Observable.create()Observable.defer()와 같은 스트림 생성 연산자를 사용하여 원하는 데이터를 생성하고, 스트림을 비동기적으로 처리할 수 있습니다. 이러한 기능을 활용하여 복잡한 비동기 작업을 쉽게 다룰 수 있습니다.

참고 자료