[java] RxJava의 병렬 처리

RxJava는 리액티브 프로그래밍을 위한 자바 라이브러리로, 비동기 작업을 쉽게 처리할 수 있도록 도와줍니다. 병렬 처리는 RxJava에서 효율적인 비동기 작업을 수행하는 데 중요한 역할을 합니다. 여기에서 RxJava의 병렬 처리에 대해 알아보겠습니다.

1. Parallel 프로세싱 모델

RxJava에서는 Parallel 타입을 통해 병렬 처리를 지원합니다. Parallel 타입은 Flowable과 유사하게 작동하지만, 여러 개의 스트림을 동시에 처리하는 능력을 가지고 있습니다. 따라서 Parallel 타입을 사용하여 여러 스레드에서 작업을 병렬로 처리할 수 있습니다.

Parallel 타입은 parallel() 메서드를 통해 생성할 수 있습니다. 다음은 Parallel 타입을 생성하고 사용하는 예시입니다.

Flowable<Integer> source = Flowable.range(1, 1000);

ParallelFlowable<Integer> parallel = source.parallel();

parallel
    .runOn(Schedulers.computation())
    .map(num -> num * 2)
    .sequential()
    .subscribe(System.out::println);

위의 예시에서는 range() 메서드를 통해 1부터 1000까지의 숫자를 발행하는 Flowable을 생성하고, 이를 parallel() 메서드를 통해 ParallelFlowable로 변환합니다. 그리고나서 runOn() 메서드를 사용하여 병렬 처리에 사용할 스레드를 지정하고, map() 메서드를 통해 각 숫자를 2배로 변환합니다. 최종적으로 sequential() 메서드를 호출하여 순차적으로 처리하도록 변경하고, 결과를 출력합니다.

2. 병렬 스트림의 처리

Parallel 타입을 사용하면 여러 개의 스트림을 병렬로 처리할 수 있는데, 이는 flatMap() 메서드를 사용하여 구현할 수 있습니다. 다음은 Parallel 타입을 사용하여 병렬 스트림을 처리하는 예시입니다.

Flowable<Integer> source = Flowable.range(1, 1000);

ParallelFlowable<Integer> parallel = source.parallel();

parallel
    .runOn(Schedulers.computation())
    .flatMap(num -> Flowable.just(num).map(n -> n * 2))
    .sequential()
    .subscribe(System.out::println);

위의 예시에서는 flatMap() 메서드를 사용하여 Parallel 타입의 각 요소에 대해 별도의 비동기 처리를 수행할 수 있습니다. flatMap() 메서드 내부에서는 Flowable.just() 메서드를 사용하여 요소 각각을 처리하고, 결과를 Flowable로 반환합니다.

3. 쓰레드 풀 선택하기

RxJava의 병렬 처리에서는 스레드 풀을 지정할 수 있습니다. Schedulers 클래스의 여러 가지 스케줄러 중에서 원하는 스레드 풀을 선택할 수 있습니다. 가장 많이 사용되는 스레드 풀은 다음과 같습니다.

Flowable<Integer> source = Flowable.range(1, 1000);

ParallelFlowable<Integer> parallel = source.parallel();

parallel
    .runOn(Schedulers.computation()) // 또는 io(), newThread()
    .map(num -> num * 2)
    .sequential()
    .subscribe(System.out::println);

위의 예시에서는 병렬 처리에 Schedulers.computation()을 사용하였습니다. 필요에 따라 알맞은 스레드 풀을 선택하여 사용하면 됩니다.

4. 결론

RxJava는 병렬 처리를 위한 Parallel 타입을 지원하여 비동기 작업을 효율적으로 처리할 수 있습니다. 병렬 스트림 처리를 위해 flatMap() 메서드를 사용하고, 필요한 스레드 풀을 선택하여 작업을 처리할 수 있습니다.

더 자세한 내용은 RxJava 공식 문서를 참고하세요.

참고 자료: