[java] RxJava의 비동기 이벤트 처리

소개

RxJava는 자바로 구현된 리액티브 프로그래밍 라이브러리입니다. 이벤트 기반 프로그래밍을 보다 간편하게 처리할 수 있도록 도와줍니다. 비동기 이벤트 처리는 일반적으로 콜백 기반의 프로그래밍 모델보다 훨씬 간결하고 읽기 쉽게 작성할 수 있습니다.

RxJava의 장점

기본 개념

Observable

Observable은 이벤트를 발생시키는 데이터 소스입니다. 이벤트 스트림을 생성하고 처리할 수 있습니다. create() 메서드를 사용하여 Observable을 생성하고 subscribe() 메서드를 사용하여 이벤트를 구독할 수 있습니다.

Observable<String> observable = Observable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onNext("World");
    emitter.onComplete();
});

observable.subscribe(s -> System.out.println(s));

Observer

Observer는 Observable로부터 발생하는 이벤트를 처리하는 객체입니다. subscribe() 메서드를 사용하여 Observable에 Observer를 등록합니다. 이벤트가 발생할 때마다 Observer에게 데이터를 전달합니다.

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }

    @Override
    public void onError(Throwable e) {
        System.err.println(e.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
    }
};

observable.subscribe(observer);

Operators

Operators는 Observable에서 발생하는 이벤트를 변환, 변형 또는 필터링하는 데 사용되는 함수입니다. 예를 들어, map() 연산자를 사용하면 발행되는 각 이벤트 값을 변환할 수 있습니다.

Observable<Integer> observable = Observable.just(2, 4, 6)
        .map(x -> x * x);

observable.subscribe(System.out::println); // 4, 16, 36

Schedulers

Schedulers는 스레드 관리를 담당하여 Observable의 이벤트가 어떤 스레드에서 실행될지 지정할 수 있습니다. 예를 들어, observeOn() 메서드를 사용하여 발생한 이벤트를 특정 스레드에서 처리하도록 지정할 수 있습니다.

Observable.just("Hello")
        .observeOn(Schedulers.io())
        .map(s -> s + " World")
        .observeOn(Schedulers.computation())
        .map(String::toUpperCase)
        .observeOn(Schedulers.single())
        .subscribe(System.out::println);

결론

RxJava를 사용하면 이벤트 기반 비동기 프로그래밍을 훨씬 효율적으로 처리할 수 있습니다. Observable, Observer, Operators, Schedulers 등의 개념을 잘 이해하고 활용하면 복잡한 비동기 로직도 간결하게 작성할 수 있습니다.

참고 자료