CHAPTER15 - CompletableFuture 와 리액티브 프로그래밍 기초(1)

2025. 5. 25. 14:36카테고리 없음

 

목차.

1. Thread, Future,  자바가 풍부한 동시성 API를 제공하도록 강요하는 진화

2. 비동기 API

 

1. 동시성을 구현하는 자바 지원의 진화

처음에 자바는 Runnable과 Thread를 동기화된 클래스와 메서드를 이용해 공유 자원에 대한 접근을 제어했다.

2004년 자바 5는 동시성을 지원하고 스레드 생성과 실행을 분리하는 ExecutorService 인터페이스, Runnable, Thread 의 변형을 반환하는 Callable<T>, Future<T>, 제네릭 등을 지원했다. ExecutorService는 Runnable과 Callable 둘 다 실행할 수 있다. 이런 기능들 덕분에 다음 해부터 등장한 멀티코어 CPU에서 쉽게 병렬 프로그래밍을 구현할 수 있게 되었다.

 

멀티코어 CPU에서 효과적으로 프로그래밍을 실행할 필요성이 커지면서 이후 자바 버전에서는 개선되 동시성 지원이 추가되었다. 7장에서 살펴본 것처럼 분할/정복 알고리즘의 포크/조인 구현을 지원하는 RecursiveTask가 추가되었고 자바 8에서 스트림과 새로 추가된 람다 지원에 기반한 병렬 프로세싱이 추가 되었다.

 

 

1.1 스레드와 높은 수준의 추상화

 

멀티코어 설정(한 사용자 프로세스만 실행하는 한 명의 사용자 노트북)에서는 스레드의 도움 없이 프로그램이 노트북의 컴퓨팅 파워를 모두 활용 할 수 없다. 각 코어는 한 개 이상의 프로세스나 스레드에 할당될 수 있지만 프로그램이 스레드를 사용하지 않는다면 효율성을 고려해 여러 프로세서 코어 중 한 개만을 사용한다.

 

실제로 네 개의 코어를 가진 CPU에서 이론적으로는 프로그램을 네 개의 코어에서 병렬로 실행함으로 실행 속도를 네 배까지 향상시킬 수 있다.(오버헤드 때문에 사실 어렵다.) 숫자 1,000,000 개를 저장한 배열을 처리하는 다음 예제를 살펴보자

public long sequentialSum() {

    long sum = 0;
    for (int i = 0; i < 1000000000; i++) {
        sum += stats[i];
    }

    return sum;
}
    
=>
SequentialSum.sequentialSum  avgt   10  265.849 ± 1.756  ms/op

 

위 코드는 한 개의 코어로 작업을 수행한다. 반면 아래 코드는 네 개의 스레드가 작업을 나누어 처리한다.

@Benchmark
public long parallelSum() throws InterruptedException {
    final long[] results = new long[4];

    Thread thread0 = new Thread(() -> {
        long sum0 = 0;
        for (int i = 0; i < 2_500_000; i++) {
            sum0 += stats[i];
        }
        results[0] = sum0;
    });

    Thread thread1 = new Thread(() -> {
        long sum1 = 0;
        for (int i = 2_500_000; i < 5_000_000; i++) {
            sum1 += stats[i];
        }
        results[1] = sum1;
    });

    Thread thread2 = new Thread(() -> {
        long sum2 = 0;
        for (int i = 5_000_000; i < 7_500_000; i++) {
            sum2 += stats[i];
        }
        results[2] = sum2;
    });

    Thread thread3 = new Thread(() -> {
        long sum3 = 0;
        for (int i = 7_500_000; i < 10_000_000; i++) {
            sum3 += stats[i];
        }
        results[3] = sum3;
    });

    // 모든 스레드 시작
    thread0.start();
    thread1.start();
    thread2.start();
    thread3.start();

    // 모든 스레드가 완료될 때까지 대기
    thread0.join();
    thread1.join();
    thread2.join();
    thread3.join();

    return results[0] + results[1] + results[2] + results[3];
}


=>
ParallelSum.parallelSum  avgt   10  0.884 ± 0.010  ms/op

 

메인 프로그램은 네 개의 스레드를 완성하고 자바의 .start() 로 실행한 다음 .join()으로 완료 될 때까지 기다렸다가 다음을 계산한다.

sum0 + sum1 + sum2 + sum3

 

이를 각 루프로 처리하는 것은 귀찮고 쉽게 에러가 발생할 수 있다. 루프가 아닌 코드라면 어떻게 처리할지 난감해질 수 있다.

 

7장에서 자바 스트림으로 외부 반복 대신 내부 반복을 통해 쉽게 병렬성을 달성했었다.

public long parallelSumWithStream() {
    return Arrays.stream(stats)
            .parallel()
            .sum();
}

 

결론적으로 병렬 스트림 반복은 명시적으로 스레드를 사용하는 것에 비해 높은 수준의 개념이라는 사실을 알 수 있다. 즉, 스트림을 이용해 스레드 사용 패턴을 추상화할 수 있다. 쓸모 없는 코드가 라이브러리 내부로 구현되면서 복잡성도 줄어드는 장점이 있다. 7장에서 자바 7의 RecursiveTask 지원 덕분에 포크/조인 스레드 추상화로 분할 그리고 정복 알괼즘을 병렬화하면서 멀티코어 머신에서 배열의 합을 효율적으로 계산하는 높은 수준의 방식을 제공하는 방법을 살펴봤다.

 

 

1.2 Executor와 스레드 풀

 

자바 5는 Executor 프레임워크와 스레드 풀을 통해 스레드의 힘을 높은 수준으로 끌어올리는, 태스크 제출과 실행을 분리할 수 있는 기능을 제공한다.

 

스레드의 문제

자바 스레드는 직접 운영체제 스레드에 접근한다. 운영체제 스레드를 만들고 종료하려면 비싼 비용을 치러야 하며 운영체제 스레드의 숫자는 제한되어 있다.

 

보통 운영체제와 자바의 스레드 개수가 하드웨어 스레드 개수보다 많으므로 일부 운영 체제 스레드가 블록되거나 자고 있는 상황에서 모든 하드웨어 스레드가 코드를 실행하도록 할당된 상황에 놓을 수 있다.

 

스레드 풀 그리고 스레드 풀이 더 좋은 이유

자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다. 프로그램은 newFixedThreadPool 같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 만들어 사용할 수 있다.

 

ExecutorService newFixedThreadPool(int nThreads)

 

이 메서드는 워커 스레드라 불리는 nThreads를 포함하는 ExecutorService를 만들고 이들을 스레드 풀에 저장한다. 스레드 풀에서 사용되지 않은 스레드로 제출된 태스크를 먼저 온 순서대로 실행한다.이들 태스크 실행이 종료되면 이들 스레드를 풀로 반환한다. 이 방식의 장점은 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다는 점이다. 큐의 크기 조정, 거부 정책, 태스크 종류에 따른 우선순위 등 다양한 설정을 할 수 있다.

 

프로그래머는 태스크(Runnable, Callable) 을 제공하면 스레드가 이를 실행한다.

 

스레드 풀 그리고 스레드 풀이 나쁜 이유

거의 모든 관점에서 스레드를 직접 사용하는 것보다 스레드 풀을 이용하는 것이 바람직하지만 두 가지 사항을 주의해야 한다.

 

1. k 스레드를 가진 스레드 풀은 오직 k만틈의 스레드를 동시에 실행할 수 있다. 초과로 제출된 태스크는 큐에 저장되며 이전에 태스크 중 하나가 종료되기 전까지는 스레드에 할당하지 않는다. 불필요하게 많은 스레드를 만드는 일을 피할 수 있으므로 보통 이 상황은 아무 문제가 없다. 하지만 잠을 자거나 I/O 를 기다리는 블록 상황에서 이들 태스크가 워커 스레드에 할당된 상태를 유지하지만 아무 작업도 하지 않게 된다.

아래 그림에서 보여주는 것처럼 네 개의 하드웨어 스레드와 5개의 스레드를 갖는 스레드 풀에 20개의 태스크를 제출했다고 가정하자. 모든 태스크가 병렬로 실행되면서 20개의 태스크를 실행할 것이라 예상할 수 있다. 하지만 처음 제출한 세 스레드가 잠을 자거나 I/O를 기다린다고 가정하자. 그러면 나머지 15개의 태스크를 두 스레드가 실행해야 하므로 작업 효율성이 예상보다 절반으로 떨어진다. 처음 제출된 태스크가 자신보다 나중에 제출된 태스크의 실행 결과를 기다리는 상황이 발생하면, 데드락에 빠질 수 있다. 핵심은 블록 할 수 있는 태스크는 스레드 풀에 제출하지 말아야 한다는 것이지만 항상 이를 지킬 수 있는 것은 아니다.

 

2. 중요한 코드를 실행하는 스레드가 죽는 일이 발생하지 않도록 보통 자바 프로그램은 main이 반환하기 전에 모든 스레드의 작업이 끝나길 기다린다. 따라서 프로그램을 종료하기 전에 모든 스레드 풀을 종료하는 습관을 가져야 한다. 보통 장기간 실행하는 인터넷 서비스를 관리하도록 오래 실행되는 ExecutorService를 갖는 것은 흔한 일이다. 자바는 이런 상황을 다룰 수 있도록 Thread.setDaemon 메서드를 제공한다.

 

 

 

1.3 스레드의 다른 추상화 : 중첩되지 않은 메서드 호출

 

7장(병렬 스트림 처리와 포크/조인 프레임워크)에서 설명한 동시성과 지금 설명하는 동시성은 조금 다르다. 7장에서 사용한 동시성에서는 한 개의 특별한 속성(블로킹) 즉, 태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다렸다.(동시성은 있지만 블로킹된다.) 스레드 생성과 join() 이 한 쌍처럼 중첩된 메서드 호출 내에 추가되었다.

 

시작된 태스크를 내부 호출이 아니라 외부 호출에서 종료하도록 기다리는 좀 더 여유로운 방식의 포크/조인을 사용해도 비교적 안전하다.

 

15장에서는 사용자의 메서드 호출에 의해 스레드가 생성되고 메서드를 벗어나 계속 실행되는 동시성 형태에 초점을 둔다.

 

메서드 호출자에 기능을 제공하도록 메서드가 반환된 후에도 만들어진 태스크 실행이 계속되는 메서드를 비동기 메서드라고 한다. 이들 메서드를 사용할 때는 약간의 위험성이 따른다.

 

1. 스레드 실행은 메서드를 호출한 다음의 코드와 동시에 실행되므로 데이터 경쟁 문제를 일으키지 않도록 주의해야 한다.

2. 기존 실행 중이던 스레드가 종료되지 않은 상황에서 자바의 main() 메서드가 반환하면 아래와 같은 상황이 발생한다.

  - 애플리케이션을 종료하지 못하고 모든 스레드가 실행을 끝낼 때까지 기다린다.

  - 애플리케이션 종료를 방해하는 스레드를 강제종료시키고 애플리케이션을 종료한다.

 

첫 번재 방법에서는 잊고서 종료를 못한 스레드에 의해 애플리케이션이 크래시될 수 있다. 또 다른 문제로 디스크에 쓰기 I/O 작업을 시도하는 일련의 작업을 중단했을 때 이로 인해 외부 데이터의 일관성이 파괴될 수 있다. 이들 문제를 피하려면 애플리케이션에서 만든 모든 스레드를 추적하고 애플리케이션을 종료하기 전에 스레드 풀을 포함한 모든 스레드를 종료하는 것이 좋다.

 

자바 스레드는 setDaemon() 메서드를 이용해 데몬 또는 비데몬으로 구분시킬 수 있다. 데몬 스레드는 애플리케이션이 종료될 때 강제 종료되므로 데이터 일관성을 파괴하지 않는 동작을 수행할 때 유용하게 활용할 수 있는 반면, main() 메서드는 모든 비데몬 스레드가 종료될 때가지 프로그램을 종료시키지 않고 기다린다.

 

 

1.4 스레드에 무엇을 바라는가?

 

일반적으로 모든 하드웨어 스레드를 활용해 병렬성의 장점을 극대화하도록 프로그램 구조를 만드는 것 즉, 프로그램을 작은 태스크 단위로 구조화하는 것이 목표다. 7장에서는 병려려 스트림 처리와 포크/조인을 for 루프와 분할 그리고 정복 알고리즘을 처리하는 방법을 살펴봤는데 이 장의 나머지 부분과 16장, 17장에서는 스레드를 조작하는 복잡한 코드를 구현하지 않고 메서드를 호출하는 방법을 공부한다.

 

 

2. 동기 API와 비동기 API

 

7장에서 자바 8 스트림을 이용해 명시적으로 병렬 하드웨어를 이용할 수 있음을 알았다. 두 가지 단계로 병렬성을 이용할 수 있다.

 

1. 외부 반복(for 루프)을 내부 반복(stream 메서드)으로 바꾼다.

2. 스트림에 parallel() 메서드를 이용하여 JRE가 복잡한 스레드 작업을 하지 않고 병렬로 요소가 처리되도록 한다.

 

루프가 실행될 때 개발자가 스레드 갯수를 추측해야 하지만 런타임 시스템은 사용할 수 있는 스레드를 정확하게 알고 있다는 장점도 있다.

 

다음과 같은 시그니처를 갖는 f, g 두 메서드의 호출을 합하는 예제를 보자.

private static int f(int x) {
    return x+1;
}

private static int g(int x) {
    return x*2;
}

public static void main(String[] args) {
    int x = 10;
    
    int y = f(x);
    int z = g(x);
    System.out.println(y + z);
}

 

이들 메서드는 물리적 결과를 반환하므로 동기 API라고 부른다. f 와 g 를 실행하는데 오랜 시간이 걸린다고 가정하자. f, g 의 작업을 컴파일러가 완전하게 이해하기 어려우므로 보통 자바 컴파일러는 코드 최적화와 관련한 아무 작업도 수항하지 않을 수 있다. f와 g 가 서로 상호작용하지 않는다는 사실을 알고 있거나 상호작용을 전혀 신경쓰지 않는다면 f 와 g 를 별도의 CPU 코어로 실행함으로 f 와 g 중 오래 걸리는 작업의 시간으로 합계 구하는 시간을 단축할 수 있다. 별도의 스레드로 f 와 g 를 실행해 이를 구현할 수 있다. 의도는 좋지마 ㄴ이전의단순했던 코드가 다음처럼 복잡하게 변한다.

public class ThreadExample {

    public static void main(String[] args) throws InterruptedException {
        int x = 1337;
        Result result = new Result();

        Thread t1 = new Thread(() -> result.left = f(x));
        Thread t2 = new Thread(() -> result.right = g(x));

        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(result.left + result.right);
    }

    private static class Result {
        private int left;
        private int right;
    }
}

 

Runnable 대신 Future API 인터페이스를 이용해 코드를 더 단순화할 수 있다.

public class ExecutorServiceExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int x = 1337;

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Integer> y = executorService.submit(() -> f(x));
        Future<Integer> z = executorService.submit(() -> g(x));
        System.out.println(y.get() + z.get());
        
        executorService.shutdown();
    }
}

 

여전히 이 코드도 명시적인 submit 메서드 호출 같은 불필요한 코드로 오염되었다. 명시적 반복으로 병렬화를 수행하던 코드를 스트림을 이용해 내부 반복으로 바꾼것처럼 비슷한 방법으로 이 문제를 해결해야 한다.

 

문제의 해결은 비동기 API 라는 기능으로 API를 바꿔서 해결할 수 있다.

 

1. 자바 8의 CompletableFuture로 이들을 조합할 수 있게 되면서 더욱 기능이 풍부해졌다.

2. 발행-구독 프로토콜에 기반한 자바 9의 Flow 인터페이스를 이용하는 방법이다.

 

 

2.1 Future 형식 API

 

대안을 이용하면 f, g의 시그니처가 다음처럼 바뀐다.

public static void main(String[] args) throws ExecutionException, InterruptedException {
    Future<Integer> f = f(10);
    Future<Integer> g = g(10);

    System.out.println(f.get() + g.get());
}

 

메서드 f 는 호출 즉시 자신의 원래 바디를 평가하는 태스크를 포함하는 Future 를 반환한다. 마찬가지로 메소드 g도 Future를 반환하며 세 번째 코드는 get() 메서드를 이용해 두 Future가 완료되어 결과가 합쳐지기를 기다린다.

 

 

2.2 리액티브 형식 API

 

두 번째 대안에서 핵심은 f, g의 시그니처를 바꿔서 콜백 형식의 프로그래밍을 이용하는것이다.

public static void main(String[] args) {

    int x = 1337;
    Result result = new Result();

    f(x, (int y) -> {
        result.left = y;
        System.out.println(result.left + result.right);
    });

    g(x, (int z) -> {
        result.right = z;
        System.out.println(result.left + result.right);
    });
}

void f(int x, IntConsumer consumer);

 

하지만 결과가 달라졌다. f 와 g 의 호출 합계를 정확하게 출력하지 않고 상황에 따라 먼저 계산된 결과를 출력한다. 락을 사용하지 않으므로 값을 두 번 출력할 수 있을 뿐더러 때로는 +에 제공된 두 피연산자가 println 호출되기 전에 업데이트 될 수 있다.  아래처럼 두 가지 방법으로 이 문제를 보완할 수 있다.

 

1. if-then-else 와 적절한 락을 이용해 두 콜백이 모두 호출되었는지 확인한 다음 println을 호출해 원하는 기능을 수행할 수 있다.

public static void main(String[] args) {
    int x = 1337;
    Result result = new Result();
    Object lock = new Object();
    boolean[] completed = new boolean[2];

    f(x, (int y) -> {
        synchronized (lock) {
            result.left = y;
            completed[0] = true;
            if (completed[0] && completed[1]) {
                System.out.println(result.left + result.right);
            }
        }
    });

    g(x, (int z) -> {
        synchronized (lock) {
            result.right = z;
            completed[1] = true;
            if (completed[0] && completed[1]) {
                System.out.println(result.left + result.right);
            }
        }
    });
}

 

2. 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future를 이용하는 것이 더 적절하다.

int x = 1337;
CompletableFuture<Integer> future1 = new CompletableFuture<>();
CompletableFuture<Integer> future2 = new CompletableFuture<>();

f(x, future1::complete);
g(x, future2::complete);

future1.thenCombine(future2, (y, z) -> {
    System.out.println(y + z);
    return y + z;
});

 

리액티브 형식의 프로그래밍으로 메서드 f 와 g 는 dealWithResult 콕백을 여러 번 호출할 수 있다. 원래의 f, g 함수는 오직 한 번만 return을 사용하도록 되어있다. 마찬가지로 Future도 한 번만 완료되며 그 결과를 get()으로 얻을 수 있다. 리액티브 형식의 비동기 API는 자연스럽게 일련의 값(나중에 스트림으로 연결)을, Future 형식의 API는  일회성의 값을 처리하는 데 적합하다.

 

 

2.3 잠자기(기타 블로킹 동작)는 쓰지 말 것

 

사람과 상호작용하거나 어떤 일이 일정 속도로 제한되어 일어나는 상황의 애플리케이션을 만들 때 자연스럽게 sleep() 메서드를 사용할 수 있다. 하지만 스레드는 잠들어도 여전히 시스템 자원을 점유한다. 스레드를 단지 몇 개 사용하는 상황에서는 큰 문제가 아니지만 스레드가 많아지고 그 중 대부분이 잠을 잔다면 문제가 심각해진다.

 

스레드 풀에서 잠을 자는 태스크는 다른 태스크가 시작되지 못하게 막으므로 자원을 소비한다. 잠자는 스레드 뿐 아니라 모든 블록 동작도 마찬가지다. 다른 태스크가 어떤 동작을 완료하기를 기다리는 동작(Future.get())과 외부 상호작용(네트워크, 데이터베이스 등) 을 기다리는 동작 두 가지로 구분할 수 있다.

 

이 상황에서 이상적으로는 절대 태스크에서 기다리는 일을 만들지 말거나 아니면 코드에서 예외를 일으키는 방법으로 이를 처리할 수 있다. 태스크를 앞과 뒤로 나누고 블록되지 않을 때만 뒷 부분을 자바가 스케줄링하도록 요청할 수도 있다.

 

다음은 한 개의 작업을 갖는 코드 A다.

work1();
Thread.sleep(1000);
work2();

 

이를 코드 B 와 비교해보자

public static void main(String[] args) {
    ScheduledExecutorService service =
            Executors.newScheduledThreadPool(1);
    
    work1();
    service.schedule(
            ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS);
    
    service.shutdown();
}

 

두 태스크 모두 스레드 풀에서 실행된다고 가정하자.

코드 A 와 코드 B의 실행 결과는 같은데 코드 B가 더 좋은 점은 무엇일까? 두 코드의 다른 점은 A가 자는 동안 귀중한 스레드 자원을 점유하고 있다. 반면 B는 다른 작업이 실행될 수 있도록 스레드 점유를 10초 동안 하지 않는다.

 

태스크를 만들 때는 이런 특징을 잘 활용해야 한다. 태스크가 실행되면 귀중한 자원을 점유하므로 태스크가 끝나서 자원을 해제하기 전까지 태스크를 계속 실행해야 한다. 태스크를 블록하는 것보다는 다음 작업을 태스크로 제출하고 현재 태스크를 종료하는 것이 바람직하다.

 

가능하다면 I/O 작업에도 이 원칙을 적용하는 것이 좋다. 고전적으로 읽기 작업을 기다리는 것이 아니라 블록하지 않는 '읽기 시작' 메서드를 호출하고 읽기 작업이 끝나면 이를 처리할 다음 태스크를 런타임 라이브러리에 스케줄하도록 요청하고 종료한다.

 

이런 디자인 패턴을 따르려면 코드가 복잡해질 수 있지만 후술할 CompletableFuture 인터페이스는 이전에 살펴본 Future.get() 을 이용해 명시적으로 블록하지 않고 콤비네이터를 이용해 이런 형식의 코드를 런타임 라이브러리 내부에서 추상화한다.