CHAPTER15 - CompletableFuture 와 리액티브 프로그래밍 기초(2)

2025. 5. 25. 18:02Book/모던 자바 인 액션

목차.

1. 동시 컴퓨팅의 박스와 채널 뷰

2. CompletableFuture 콤비네이터로 박스를 동적으로 연결

3. 리액티브 프로그래밍용 자바 9 플로 API의 기초를 이루는 발행 구독 픅로토콜

4. 리액티브 프로그래밍과 리액티브 시스템

 

1. 박스와 채널 모델

 

동시성 모델을 가장 잘 설계하고 개념화하려면 그림이 필요하다. 우리는 이 기법을 박스와 채널 모델이라고 부른다.

이전 예제인 f(x) + g(x) 의 계산을 일반화해서 정수와 관련된 간단한 상황이 있다고 가정하자. f나 g를 호출하거나 p 함수에 인수 x 를 이용해 호출하고 그 결과를 q1과 q2에 전달하며 다시 이 두 호출의 결과로 함수 r을 호출한 다음 결과를 출력한다. 편의상 클래스 C 의 메서드와 연상함수 C::m을 수분하지 않는다.

 

지비러 위 그림을 두 가지 방법으로 구현해 문제를 확인해보자

 

아래는 첫 번재 방법이다.

int x = 10;
int t = p(x);

System.out.println(r(q1(t), q2(t)));

 

겉보기엔 깔끔해 보이는 코드지만 자바가 q1, q2를 차례로 호출하는데 이는 하드웨어 병렬성을 활용하지 않는다.

 

Future를 이용해 q1, q2 를 병렬로 평가해보자

int x = 10;
int t = p(x);

Future<Integer> a1 = executor.submit(() -> q1(t));
Future<Integer> a2 = executor.submit(() -> q2(t));
System.out.println(r(a1.get(), a2.get()));

 

이 예제에서는 박스와 채널 다이어그램의 모양상 p와 r을 Future로 감싸지 않았다. p는 다른 어떤 작업보다 먼저 처리해야 하며 r은 모든 작업이 끝난 다음 가장 마지막으로 처리해야 한다.

 

위 코드의 병렬성을 극대화혀려면 모든 함수를 Future 로 감싸야 한다.

 

시스템에서 많은 작업이 동시에 실행되고 있지 않다면 이 방법도 잘 동작할 수 있다. 하지만 시스템이 커지고 각각의 많은 박스와 채널 다이어그램이 등장하고 각각의 많은 박스와 채널 다이어그램이 등장하고 각각의 박스는 내부적으로 자신만의 박스와 채널을 사용한다면 문제가 달라진다. 이런 상황에서는 많은 태스크가 get() 메서드를 호출해 Future가 끝나기를 기다리는 상태에 빠질 수 있다.

 

자바 8에서는 CompletableFuture와 콤비네이터를 이용해 문제를 해결한다.

 

 

2. CompletableFuture와 콤비네이터를 이용한 동시성

 

자바 8에서 추가된 Future 인터페이스의 구현인 CompletableFuture를 이용해 Future를 조합할 수 있는 기능을 추가했다. 그럼 ComposableFuture 가 아니라 CompletableFuture라고 부르는 이유는 무엇일까. 일반적으로 Future 는 실행해서 get()으로 결과를 얻을 수 있는 Callable로 만들어진다. 하지만 CompletableFuture는 실행할 코드 없이 Future를 만들 수 있도록 허용하며 complete() 메서드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고 get()으로 값을 얻을 수 있도록 허용한다.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService service = Executors.newFixedThreadPool(10);

    int x = 1337;
    CompletableFuture<Integer> a = new CompletableFuture<>();
    service.submit(() -> a.complete(f(x)));
    int b = g(x);

    System.out.println(a.get() + b);

    service.shutdown();
}

 

위 코드는 f(x)의 실행이 끝나지 않는 상황에서 get()을 기다려야 하므로 프로세싱 자원을 낭비할 수 있다. 자바 8의 CompletableFuture를 이용하면 이 상황을 해결할 수 있다.

 

CompletableFuture<T> 에 thenCombine 메서드를 사용함으로 두 연산 결과를 더 효과적으로 더할 수 있다.

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn)

 

이 메서드는 두 개의 CompletableFuture 값(T, U 결과 형식)을 받아 한 개의 새 값을 만든다.

처음 두 작업이 끝나면 두 결과 모두에 fn을 적용하고 블록하지 않은 상태로 결과 Future를 반환한다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService service = Executors.newFixedThreadPool(10);
    int x = 1337;

    CompletableFuture<Integer> a = new CompletableFuture<>();
    CompletableFuture<Integer> b = new CompletableFuture<>();
    CompletableFuture<Integer> c = a.thenCombine(b, (x1, x2) -> x1 + x2);

    service.submit(() -> a.complete(f(x)));
    service.submit(() -> b.complete(g(x)));

    System.out.println(c.get());

    service.shutdown();
}

 

thenCombine 행이 핵심이다. Future a 와 Future b의 결과를 알지 못한 상태에서 thenCombine은 두 연산이 끝났을 때 쓰레드 풀에서 실행된 연산을 만든다. 결과를 추가하는 세 번재 연산 c 는 두 작업이 끝날 때까지는 스레드에서 실행되지 않는다(먼저 시작해서 블록되지 않는 것이 특징). 따라서 기존의 두 가지 버전의 코드에서 발생했던 블록 문제가 어디서도 일어나지 않는다. Future의 연산이 두 번째로 종료되는 상황에서 실제 필요한 스레드는 한 개지만 스레드 풀의 두 스레드가 여전히 활성 상태다.

 

이전의 두 버전에서 y + z 연산은 f(x) 또는 g(x) 를 실행(블록될 가능성이 있는)한 같은 스레드에서 수행했다. 반면 thenCombine을 이용하면 f(x) 와 g(x)가 끝난 다음에야 덧셈 계산이 실행된다.

 

 

2. 발행-구독 그리고 리액티브 프로그래밍

 

Future와 CompletableFuture은 독립적 실행과 병렬성이라는 정식적 모델에 기반한다. 연산이 끝나면 get()으로 Future의 결과를 얻을 수 있다. 따라서 Future는 한 번만 실행해 결과를 제공한다.

 

반면 리액티브 프로그래밍은 시간이 흐르면서 여러 Future 같은 객체를 통해 여러 결과를 제공한다. 먼저 온도계 객체를 예로 생각해보자. 이 객체는 매 초마다 온도 값을 반복적으로 제공한다. 또 다른 예로 웹 서버 컴포넌트 응답을 기다리는 리스너 객체를 생각할 수 있다. 이 객체는 네트워크에 HTTP 요청이 발생하길 기다렸다가 이후에 결과 데이터를 생산한다. 그리고 다른 코드에서 온도 값 또는 네트워크 결과를 처리한다. 그리고 온도계와 리스너 객체는 다음 결과를 처리할 수 있도록 온도 결과나 다른 네트워크 요청을 기다린다.

 

눈여겨봐야 할 두 가지 사실이 있다. 이 두 예제에서는 Future 같은 동작이 모두 사용되었지만 한 에제에서는 한 번의 결과가 아니라 여러 번의 결과가 필요하다. 두 번째 예제에서 눈여겨봐야할 또 다른 점은 모든 결과가 똑같이 중요한 반면 온도계 예제에서는 대부분의 사람에게 가장 최근의 온도만 중요하다. 이런 종류의 프로그래밍을 리액티브라고 부른다.(예를 들어 온도가 내려가면 히터가 켜짐)

 

여기서 스트림을 떠올릴 수 있다(시간에 따라 연속적으로 데이터가 발생). 프로그램이 스트림 모델에 잘 맞는 상황이라면 좋은 구현이 될 수 있다. 하지만 보통 리액티브 프로래밍 패러다임은 비싼 편이다. 주어진 자바 스트림의 한 번의 단말 동작으로 소비될 수 있다. 박스 채널 모델에서 본 것 같이 스트림 패러다임은 두 개의 파이프라인으로 값을 분리(fork) 하기 어려우며 두 개의 분리된 스트림에서 다시 결과를 합치기도 (join) 어렵다. 스트림은 선형적인 파이프라인 처리 기법에 알맞다.

 

자바 9에서는 Flow 인터페이스에 발행-구독 모델을 적용해 리액티브 프로그래밍을 제공한다. 간단하게 세 가지 플로 API를 정리할 수 있다.

1. 구독자가 구독할 수 있는 발행자

2. 이 연결을 구독(Subscription) 이라 한다.

3. 이 연결을 이용해 메시지(이벤트)를 전송한다.

 

구독을 채널로 발행자와 구독자를 박스로 표현한 그림을 보자. 여러 컴포넌트가 한 구독자로 구독할 수 있고 한 컴포넌트는 여러 개별 스트림을 발행할 수 있으며 한 컴포넌트는 여러 구독자에 가입할 수 있다.

 

 

 

2.1 두 플로를 합치는 예제

 

두 정보 소스로부터 발생하는 이벤트를 합쳐서 다른 구독자가 볼 수 있도록 발행하는 예를 통해 발행-구족의 특징을 간단하게 확인할 수 있다. 이 기능은 수식을 포함하는 스프레드 시트의 셀에서 제공되는 동작이다."=C1+C2" 라는 공식을 포함하는 스프레드 시트 셀 C3을 만들어보자.

public class SimpleCell {
    private int value = 0;
    private String name;
    public SimpleCell(String name) { this.name = name; }
}

 

아직은 코드가 단순한 편이며 아래 처럼 몇 개의 셀을 초기화할 수 있다.

SimpleCell c1 = new SimpleCell("C1");
SimpleCell c2 = new SimpleCell("C2");

 

 

c1 이나 c2 의 값이 바뀌었을 때 c3가 두 값을 더하도록 지정해보자. 먼저 c1, c2 에 이벤트가 발생했을 때 c3 를 구독하도록 만들어야 한다. 그러려면 아래와 같은 인터페이스가 필요하다.

public interface Publisher <T>{
    void subscribe(Subscriber<? super T> subscriber);
}

public interface Subscriber<T> {
    void onNext(T t);
}

 

Cell은 이벤트 발행자이며 구독자임을 알 수 있다.

public class SimpleCell implements
        Publisher<Integer>,
        Subscriber<Integer> {
    private int value = 0;
    private String name;
    private List<Subscriber<? super Integer>> subscribers = new ArrayList<>();

    public SimpleCell(String name) { this.name = name; }

    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
        subscribers.add(subscriber);
    }

    private void notifyAllSubscribers() {
        subscribers.forEach(subscriber -> subscriber.onNext(this.value));
    }

    @Override
    public void onNext(Integer newValue) {
        this.value = newValue;
        System.out.println(name + " : " + value);
        notifyAllSubscribers();
    }
}

 

다음 간단한 예제를 보자

SimpleCell c1 = new SimpleCell("C1");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c3 = new SimpleCell("C3");

c1.subscribe(c3);

c1.onNext(10);
c2.onNext(20);

=>

C1 : 10
C3 : 10
C2 : 20

 

'C3=C1+C2' 은 어떻게 구현할까? 왼쪽과 오른쪽의 연산 결과를 저장할 수 있는 별도의 클래스가 필요하다.

public class ArithmeticalCell extends SimpleCell{
    private int left;
    private int right;
    
    public ArithmeticalCell(String name) {
        super(name);
    }
    
    public void setLeft(int left) {
        this.left = left;
        onNext(left + this.right);
    }
    
    public void setRight(int right) {
        this.right = right;
        onNext(this.left + right);
    }
}

 

public static void main(String[] args) {
    SimpleCell c1 = new SimpleCell("C1");
    SimpleCell c2 = new SimpleCell("C2");
    ArithmeticalCell c3 = new ArithmeticalCell("C3");

    c1.subscribe(c3::setLeft);
    c2.subscribe(c3::setRight);

    c1.onNext(10);
    c2.onNext(20);
    c1.onNext(30);
}

=>

C1 : 10
C3 : 10
C2 : 20
C3 : 30
C1 : 30
C3 : 50

 

 

 

2.2 역압력

 

Subscriber 객체를 어떻게 Publisher에 전달해 발행자가 필요한 메서드를 호출할 수 있는지 살펴봤다. 이 객체는 Publisher에서 Subscriber로 정보를 전달한다. 정보의 흐름 속도를 구독자가 발행자에게 요청해야 할 필요가 있을 수 있다. 발행자는 여러 구독자를 갖고 있으므로 역압력 요청이 한 연결에만 영향을 미쳐야 한다는 것이 문제가 될 수 있다. 자바 9 플로 API의 Subscriber 인터페이스는 네 번째 메서드를 포함한다.

void onSubscribe(Subscription subscription)

 

Publisher와 Subscriber 사이에 채널이 연결되면 첫 이벤트로 위 메서드를 호출된다. Subscription 객체는 다음처럼 Subscriber 와 Publisher 와 통신할 수 있는 메서드를 포함한다.

interface Subscription {
    void cancel();
    void request(Long n);
}

 

콜백을 통한 역방향 소통 효과에 주목하자. Publisher는 Subscription 객체를 만들어 Subscriber로 전달하면 Subscriber는 이를 이용해 Publisher로 정보를 보낼 수 있다.

 

 

2.3 실제 역압력의 간단한 형태

 

한 번에 한 개의 이벤트를 처리하도록 발행-구독 연결을 구성하려면 다음과 같은 작업이 필요하다.

 

- Subscriber가 OnSubscribe로 전달된 Subscription 객체를 subscription 같은 필드에 로컬로 저장한다.

 

- Subscriber가 수많은 이벤트를 받지 않도록 onSubscribe, onNext, onError 의 마지막 동작에 channel.request(1)을 추가해 오직 한 이벤트만 요청한다.

 

- 요청을 보낸 채널에만 onNext, onError 이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다.(여러 Subscriber가 자신만의 속도를 유지할 수 있도록 Publisehr는 새 Subscription을 만들어 각 Subscriber 와 연결한다.)

 

구현이 간단해 보일 수 있지만 역압력을 구현하려면 여러가지 장단점을 생각해야 한다.

 

- 여러 Subscriber가 있을 때 이벤트를 가장 느린 속도로 보낼 것인가? 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도의 큐를 가질 것인가?

 

- 큐가 너무 커지면 어떻게 해야 할까?

 

- Subscriber 가 준비가 안 되었다면 큐의 데이터를 폐기할 것인가?

 

 

위 질문에 대한 답변은 데이터의 성격에 따라 달라진다. 잃어버려도 되는 데이터는 그리 대수로운 것이 아니지만 은행의 돈과 같은 데이터는 큰 일이다.