[java] RxJava의 실시간 데이터 처리

RxJava는 자바에서 사용할 수 있는 리액티브 프로그래밍 라이브러리로, 비동기적인 이벤트 기반 프로그래밍을 쉽게 처리할 수 있도록 도와줍니다. 이번 글에서는 RxJava를 사용하여 실시간 데이터 처리를 하는 방법에 대해 알아보겠습니다.

1. Observable과 Observer

RxJava의 핵심 개념은 Observable과 Observer입니다. Observable은 이벤트 스트림을 생성하는 객체이고, Observer는 이벤트 스트림을 처리하는 객체입니다. Observable은 데이터를 발행하고, Observer는 이 데이터를 받아서 처리합니다.

예를 들어, 실시간 주식 시세 데이터를 처리하는 경우를 생각해보겠습니다. Observable은 주식 시세를 실시간으로 발행하고, Observer는 이 시세를 처리하여 화면에 출력하거나 다른 작업을 수행할 수 있습니다.

2. 데이터 스트림 생성과 처리

RxJava에서는 데이터 스트림을 생성할 때는 Observable을 사용하고, 이 데이터를 처리할 때는 Observer를 사용합니다. 예를 들어, 아래의 코드는 1부터 10까지의 숫자를 1초마다 발행하는 Observable을 생성하고, 이를 Observer가 처리하는 예제입니다.

Observable.interval(1, TimeUnit.SECONDS)
    .take(10)
    .subscribe(new Observer<Long>() {
        @Override
        public void onNext(Long value) {
            System.out.println("Received value: " + value);
        }

        @Override
        public void onError(Throwable e) {
            System.err.println("Error occurred: " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("Completed");
        }
    });

위의 예제에서는 interval 함수를 사용하여 1초마다 숫자를 발행하도록 설정하고, take 함수를 사용하여 총 10개의 숫자만 발행하도록 제한하였습니다. subscribe 메서드를 호출하여 Observer를 등록하고, onNext 메서드에서는 발행된 값을 받아 처리하고, onError 메서드는 에러가 발생한 경우 처리하며, onComplete 메서드는 데이터 발행이 완료되었을 때 호출됩니다.

3. 데이터 처리 연산자

RxJava는 다양한 데이터 처리 연산자를 제공하여 데이터를 변환하거나 조합하는 작업을 쉽게 처리할 수 있도록 도와줍니다. 몇 가지 대표적인 연산자에 대해 알아보겠습니다.

3.1. map

map 연산자는 Observable이 발행하는 각각의 데이터를 변환하는 작업을 수행합니다. 예를 들어, 아래의 코드는 1부터 10까지의 숫자를 제곱하여 발행하는 Observable을 생성하는 예제입니다.

Observable.range(1, 10)
    .map(number -> number * number)
    .subscribe(System.out::println);

위의 예제에서는 range 함수를 사용하여 1부터 10까지의 숫자를 발행하는 Observable을 생성하고, map 함수를 사용하여 각 숫자를 제곱하여 발행하도록 설정하였습니다. subscribe 메서드를 호출하여 결과를 출력합니다.

3.2. filter

filter 연산자는 조건에 맞는 데이터만을 발행하는 작업을 수행합니다. 예를 들어, 아래의 코드는 1부터 10까지의 숫자 중에서 짝수만을 발행하는 Observable을 생성하는 예제입니다.

Observable.range(1, 10)
    .filter(number -> number % 2 == 0)
    .subscribe(System.out::println);

위의 예제에서는 range 함수를 사용하여 1부터 10까지의 숫자를 발행하는 Observable을 생성하고, filter 함수를 사용하여 짝수만을 발행하도록 설정하였습니다. 결과는 화면에 출력됩니다.

4. 결론

위에서는 RxJava를 사용하여 실시간 데이터 처리를 하는 방법에 대해 알아보았습니다. RxJava는 Observable과 Observer를 이용하여 데이터 스트림을 생성하고 처리하는 방식으로 편리하게 비동기적인 작업을 처리할 수 있습니다. 또한, 다양한 데이터 처리 연산자를 제공하여 데이터를 변환하거나 조합하는 작업을 쉽게 처리할 수 있습니다.

더 자세한 내용은 RxJava 공식 홈페이지를 참고하시기 바랍니다.

이미지 출처: Pixabay