[RxJava] Flowable 과 Observable 의 차이

Rx가 버전 1->2로 업그레이드 되면서 추가 된 것중에 하나가 Flowable인데, Flowable은 Observable과의 차이는 backpressure buffer의 기본으로 달려 있다.

backpressure??

생산자는 미친듯이 element 를 생산해 내는데 소비자가 처리하는 속도가 이를 따라가지 못한다면 busy waiting 또는 out of memory exception 이 발생할 것이다. publish / subscribe 모델에서도 발생 가능 이에 대한 버퍼가 바로 backpressure buffer 다. 버퍼가 가득 차면 어차피 소비자는 element 를 처리할 여유가 없는 상태이므로 더 이상 publish 를 하지 않는다.

생산자의 생산 속도를 소비자가 따라가지 못하는 시나리오다. Flowable 을 사용하면 default buffer size(128) 이상 backpressure buffer 에 element 가 쌓일 경우 흐름제어를 한다.

기존에 없던 개념이 새로 추가된 것은 아니다. 기존 rxJava 1.xx 의 경우 Observable 에 backpressure buffer 를 직접 생성해 주면 사용이 가능하다.

//흐름제어를 통해 정상 동작하는 코드
public class example01 {
 
    public static void main(String... args) throws InterruptedException {
 
        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Flowable foo = Flowable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });
 
        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });
 
        while (true) {
            Thread.sleep(1000);
        }
    }
}
//Observable을 backpressure buffer 생성 없이 사용하면 OutOfMemoryException
public class example02 {
 
    public static void main(String... args) throws InterruptedException {
 
        final String tmpStr = Arrays.stream(new String[10_000_000]).map(x->"*").collect(Collectors.joining());
        Observable foo = Observable.range(0, 1000_000_000)
                .map(x-> {
                    System.out.println("[very fast sender] i'm fast. very fast.");
                    System.out.println(String.format("sending id: %s %d%50.50s", Thread.currentThread().getName(), x, tmpStr));
                    return x+tmpStr;
                });
 
        foo.observeOn(Schedulers.computation()).subscribe(x->{
            Thread.sleep(1000);
            System.out.println("[very busy receiver] i'm busy. very busy.");
            System.out.println(String.format("receiving id: %s %50.50s", Thread.currentThread().getName(), x));
        });
 
        while (true) {
            Thread.sleep(1000);
        }
    }
}