CHAPTER16 - CompletableFuture : 안정적 비동기 프로그래밍(2)

2025. 6. 1. 23:30Book/모던 자바 인 액션

1. 두 개 이상의 비동기 연산을 파이프라인으로 만들고 합치기

2. 비동기 작업 완료에 대응하기

 

 

1. 비동기 작업 파이프라인 만들기

 

우리와 계약을 맺은 모든 상점이 하나의 할인 서비스를 사용하기로 했다고 가정하자. 할인 서비스에서는 서로 다른 할인율을 제공하는 다섯 가지 코드를 제공한다.

public class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percent;

        Code(int percent) {
            this.percent = percent;
        }

        public int getPercent() {
            return percent;
        }
    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " +
                Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }
    
    public static double apply(double price, Code discountCode) {
        delay(1);
        return price * (100 - discountCode.getPercent()) / 100;
    }
}

 

또한 상점에서 getPrice 메서드의 결과 형식도 바꾸자. 이제 getPrice는 ShopName:price:DiscountCode 형식의 문자열을 반환한다.

public String getPrice(String product) {
    double price = calculatePrice(product);
    // 임의로 마지막 코드 적용
    Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
    return String.format("%s:%.2f:%s", name, price, code);
}

private double calculatePrice(String product) {
    delay(1);
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

 

 

1.1 할인 서비스 구현

 

이제 우리의 최저가격 검색 애플리케이션은 여러 상점에서 가격 정보를 얻어오고, 결과 문자열을 파싱하고, 할인 서버에 질의를 보낼 준비가 되었다. 할인 서버에서 할인율을 확인해서 최종 가격을 계산할 수 있다. 상점에서 제공한 문자열 파싱은 다음처럼 Quote 클래스로 캡슐화할 수 있다.

public class Quote {
    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

 

상점에서 얻은 정적 팩토리 메서드 parse로 넘겨주면 상점 이름, 할인전 가격, 할인된 가격 정보를 포함하는 Quote  클래스 인스턴스가 생성된다.

 

Discount 서비스에서는 Quote 객체를 인수로 받아 할인된 가격 문자열을 반환하는 applyDiscount 메서드도 제공된다.

 

 

1.2 할인 서비스 사용

 

Discount는 원격 서비스이므로 다음 코드에서 보여주는 것처럼 1초의 지연을 추가한다. 일단은 가장 쉬운방법인 순차적 동기 방식으로 findPrices 메서드를 구현하자.

public static List<String> findPrices(String product) {
    return shops.stream()
            // 각 상점에서 할인전 가격 얻기
            .map(shop -> shop.getPrice(product))
            // 상점에서 반환한 문자열을 Quote 객체로 파싱
            .map(Quote::parse)
            // Discount 서비스를 이용해서 각 Quote 에 할인을 적용
            .map(Discount::applyDiscount)
            .toList();
}

 

세 개의 map 연산을 상점 스트림에 파이프라인으로 연결해서 원하는 결과를 얻었다.

  • 첫 번째 연산에서는 각 상점을 요청한 제품의 가격과 할인 코드로 변환한다.
  • 두 번째 연산에서는 이들 문자열을 파싱해서 Quote 객체를 만든다.
  • 세 번재 연산에서는 원격 Discount 서비스에 접근해서 최종 할인가격을 계산하고 가격에 대응하는 상점 이름을 포함하는 문자열을 반환한다.

예상대로 성능은 아주 좋지 않다.

[BestPrice price is 146.6515,
 LetsSaveBig price is 125.93,
 BuyItAll price is 131.53,
 BuyItAll2 price is 121.55,
 MyFavoriteShop price is 151.49]
Time taken: 10062

 

순차적으로 다섯 상점에 가격 정보를 요청하느라 5초가 소요되었고, 다섯 상점에서 반환한 가격 정보에 할인 코드를 적용할 수 잇도록 할인 서비스에 5초가 소요되었다. 병렬 스트림을 이용하면 성능을 쉽게 개선할 수 있다. 하지만 병렬 스트림에서는 스트림이 사용하는 스레드 풀의 크기가 고정되어 있어서 상점 수가 늘어났을 때처럼 검색 대상이 확장되었을 때 유연하게 대응할 수 없다는 사실이 있다. 따라서 CompletableFuture에서 수행하는 태스크를 설정할 수 있는 커스텀 Executor를 정의함으로써 CPU 사용을 극대화할 수 있다.

 

 

1.3 동기 작업과 비동기 작업 조합하기

 

CompletableFuture에서 제공하는 기능으로 findPrices 메서드를 비동기적으로 재구현하자.

// 동기 + 비동기
public static List<String> asyncAndSyncFindPrices(String product) {
    List<CompletableFuture<String>> priceFutures =
            shops.stream()
                 .map(shop -> CompletableFuture.supplyAsync(
                         () -> shop.getPrice(product), executor))
                 .map(future -> future.thenApply(Quote::parse))
                 .map(future -> future.thenCompose(quote ->
                         CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote))))
                 .toList();

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .toList();
}

 

위 코드는 세 가지 변환 과정을 보여준다.

 

1. 가격 정보 얻기

첫 번째 연산은 팩토리 메서드 supplyAsync에 람다 표현식을 전달해서 비동기적으로 상점에서 정보를 조회했다.

.map(shop -> CompletableFuture.supplyAsync(
        () -> shop.getPrice(product), executor))

 

첫 번째 변환의 결과는 Stream<CompletableFuture<String>> 이다. 각 CompletableFuture는 작업이 끝났을 때 해당 상점에서 반환하는 문자열 정보를 포함한다.

 

2. Quote 파싱하기

두 번재 변환 과정에서는 첫 번째 결과 문자열을 Quote로 변환한다. 첫 번째 과정에서 생성된 CompletableFuture에 thenApply 메서드를 호출한 다음에 문자열을  Quote 인스턴스로 변환하는 Function으로 전달한다.

.map(future -> future.thenApply(Quote::parse))

 

thenApply 메서드는 CompletableFuture 가 끝날 때까지 블록하지 않는다는 점을 주의하자. 즉, CompletableFuture가 동작을 완전히 완료한 다음에 thenApply 메서드로 절달된 람다 표현식(콜백)을 적용할 수 있다.

따라서 CompletableFuture<String>을 CompletableFuture<Quote>로 변환할 것이다.

 

3. CompletableFuture를 조합해서 할인된 가격 계산하기

세 번재 map 연산에서는 상점에서 받은 할인전 가격에 원격 Discount 서비스에서 제공하는 할인율을 적용해야 한다.

.map(future -> future.thenCompose(quote ->
        CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote))))

 

이번에는 원격 실행이 포함되므로 이전 두 변환과 다르며 동기적으로 작업을 수행해야 한다.

 

람다 표현식으로 이 동작을 팩토리 메서드 supplyAsync에 전달할 수 있다. 그러면 다른 CompletableFuture가 반환된다. 결국 두 가지 CompletableFuture로 이뤄진 연쇄적으로 수행되는 두 개의 비동기 동작을 만들 수 있다.

  • 상점에서 가격 정보를 얻어 와서 Quote로 변환하기
  • 변환된 Quote를 Discount 서비스로 전달해서 할인된 최종가격 획득하기

자바 8의 CompletableFuture API는 이와 같은 두 비동기 연산을 파이프라인으로 만들 수 있도록 thenCompose 메서드를 제공한다. thenCompose 메서드는 첫 번째 연산의 결과를 두 번째 연산으로 전달한다.

// 첫 번재 CompletableFuture에 thenCompose 메서드를 호출 후
// Function에 리턴값(quote)을 넘겨줌
.map(future -> future.thenCompose(quote ->
        // 두 번째 CompletableFuture 반환
        CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote))))

즉 첫 번재 CompletableFuture에 thenCompose 메서드를 호출하고 Function에 넘겨주는 식으로 두 CompletableFuture를 조합할 수 있다. Function은 첫 번째 CompletableFuture 반환 결과를 인수로 받고 두 번째 CompletableFuture를 반환하는데, 두 번째 CompletableFuture는 첫 번째 CompletableFuture의 결과를 계산의 입력으로 사용한다.

따라서 Future가 여러 상점에서 Quote를 얻는 동안 메인 스레드는 UI 이벤트에 반응하는 등 유용한 작업을 수행할 수 있다.

 

세 개의 map 연산 결과 스트림의 요소를 리스트로 수집하면 List<CompletableFuture<String>> 형식의 자료를 얻을 수 있다. 마지막으로 CompletableFuture가 완료되기를 기다렸다가 예제에서 그랬듯이 join으로 값을 추출할 수 있다.

[BestPrice price is 126.85,
 LetsSaveBig price is 194.4,
 BuyItAll price is 140.47,
 ShopEasy price is 97.1,
 MyFavoriteShop price is 205.22]
Time taken: 2037

 

CompletableFuture 클래스의 다른 메서드 처럼 thenCompose 메서드도 Async로 끝나는 버전이 존재한다. Async 로 끝나지 않는 메서드는 이전 작업을 수행한 스레드와 같은 스레드에서 작업을 실행함을 의미하며 Async로 끝나는 메서드는 다음 작업이 다른 스레드에서 실행되도록 스레드 풀로 작업을 제출한다.

 

 

1.4 독립 CompletableFuture와 비독립 CompletableFuture 합치기

 

1.3 절에서 살펴본 예제에서 첫 번째 CompletableFuture에 thenCompose 메서드를 실행한 다음에 실행 결과를 첫 번째 실행 결과를 입력으로 받는 두 번재 CompletableFuture로 전달했다. 실전에서는 독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야 하는 상황이 발생할 수 있다. 물론 첫 번째 CompletableFuture의 동작 완료와 관계없이 두 번째 CompletableFuture를 실행할 수 있어야 한다.

 

이런 상황에서는 thenCombine 메서드를 사용한다. thenCombine 메서드는 BiFunction을 두 번째 인수로 받는다. BiFunction은 두 개의 CompletableFuture 결과를 어떻게 합칠지 정의한다. thenCompose와 마찬가지로 thenCombine 메서드에도 Async 버전이 존재한다. thenCombineAsync 메서드에서는 BiFunction 이 정의하는 조합 동작이 스레드 풀로 제출되면서 별도의 태스크에서 비동기적으로 수행된다.

 

예제로 살펴보자 온라인 상점이 유료 가격 정보를 제공하는데, 달러 가격도 보여줘야 한다고 가정하자. 우리는

1. 주어진 상품의 가격을 상점에 요청하는 한편 원격 환율 교환 서비스를 이용해서 유로와 달러의 현재 환율을 비동기적으로 요청한다.

2. 두 가지 데이터를 얻었으면 가격에 환율을 곱해서 결과를 합칠 수 있다.

이렇게 해서 두 CompletableFuture의 결과각 생성되고 BiFunction으로 합쳐진 다음에 세 번째 CompletableFuture를 얻을 수 있다.

CompletableFuture<Double> futurePrice =
        CompletableFuture
                .supplyAsync(() -> shop.getPrice(product))
                .thenApply(Quote::parse)
                .thenApply(Quote::getPrice)
                .thenCombine(
                        CompletableFuture.supplyAsync(
                                () -> getRate("EUR", "USD")),
                        (price, rate) -> price * rate
                );

 

여기서 합치는 연산은 단순한 곱셉이므로 별도의 태스크에서 수행하여 자원을 낭비할 필요가 없다. 따라서 thenCombineAsync 대신 thenCombine 메서드를 사용한다. 위 그림은 태스크가 풀의 스레드에서 어떻게 실행되고 결과가 합쳐지는지 보여준다.

 

 

1.5 타임아웃 효과적으로 사용하기

 

이전에 설명했던 것처럼 Future 의 계산을 무한정 기다리는 것은 좋지 않다. 자바 9에서는 CompletableFuture에서 제공하는 몇 가지 기능을 이용해 이런 문제를 해결할 수 있다.

 

orTimeout : 지정된 시간이 지나면 TimeoutException 발생

CompletableFuture<Double> futurePrice =
        CompletableFuture
                .supplyAsync(() -> shop.getPrice(product))
                .thenApply(Quote::parse)
                .thenApply(Quote::getPrice)
                .thenCombine(
                        CompletableFuture.supplyAsync(
                                () -> getRate("EUR", "USD")),
                        (price, rate) -> price * rate
                ).orTimeout(3, TimeUnit.SECONDS);

 

completeOnTimeout : 지정된 시간이 지나면 서버에서 미리 정한 default 값 대체

.supplyAsync(() -> shop.getPrice(product))
.thenApply(Quote::parse)
.thenApply(Quote::getPrice)
.thenCombine(
        CompletableFuture.supplyAsync(
                () -> getRate("EUR", "USD"))
                // 디폴트 값, 시간, 시간 단위
                .completeOnTimeout(1.1, 2, TimeUnit.SECONDS),
        (price, rate) -> price * rate
).orTimeout(3, TimeUnit.SECONDS);

 

 

이제 최저 가격 애플리케이션을 거의 완료했으나 한 가지 기능이 부족하다. 모든 검색 결과가 완료될 때까지 사용자를 기다리게 만들지 말고, 이용할 수 있는 가격 정보는 즉시 사용자에게 보여줄 수 있어야 한다. 이제 다음 절에서는 get이나 join으로 CompletableFuture가 완료될 때까지 블록하지 않고 다른 방식으로 CompletableFuture의 종료에 대응하는 방법을 알아본다.

 

 

 

2. CompletableFuture의 종료에 대응하는 방법

이 장에서 살펴본 모든 코드 예제에서는 원격 메서드 응답을 1초의 지연으로 흉내 냈다. 하지만 실전에서 사용하는 다양한 원격 서비스는 얼마나 지연될지 예측하기 어렵다. 서버 부하에서 네트워크 문제에 이르기까지 다양한 지연 요소가 있기 때문이다. 또한 질의당 얼마를 더 지불하느냐에 따라 우리 애플리케이션이 제공하는 서비스의 질이 달라질 수도 있다.

 

여러 상점에 정보를 제공했을 때 몇몇 상점은 다른 상점보다 훨씬 먼저 결과를 제공할 가능성이 크다. 0.5초에서 2.5초 사이의 임의의 지연으로 이를 시뮬리이션하자.

public static void randomDelay() {
    int delay = 500 + random.nextInt(2000);
    try {
        Thread.sleep(delay);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

 

findPrices는 다양항 상점에서 물건의 가격 정보를 얻어왔는데, 모든 정보를 불러온 후에야 그것을 사용할 수 있었다. 이제는 상점에서 가격 정보를 불러올 때마다 각 상점에서 가격 정보를 즉시 보여줄 수 있는 최저가격 검색 애플리케이션을 만들어보자.

 

 

2.1 최저가격 검색 애플리케이션 리팩터링

 

먼저 모든 가격 정보를 포함할 때까지 리스트 생성을 기다리지 않도록 프로그램을 고쳐야 한다. 그러려면 상점에 필요한 일련의 연산 실행 정보를 포함하는 CompletableFuture의 스트림을 직접 제어해야 한다.

public Stream<CompletableFuture<String>> findPricesAsync(String product) {
    return shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));

 

이제 findPricesStream 메서드 내부에서 세 가지 map 연산을 적용하고 반환하는 스트림에 네 번째 map 연산을 적용하자. 새로 추가한 연산은 단순하게 각 CompletableFuture에 동작을 등록한다. CompletableFuture에 등록된 동작은 CompletableFuture의 계산이 끝나면 값을 소비한다. 자바 8의 CompletableFuture API 는 thenAccept라는 메서드로 이 기능을 제공한다. thenAccept 메서드는 연산 결과를 소비하는 Consumer 를 인수로 받는다. 우리 예제에서는 할인 서비스에서 반환하는 문자열이 값이다. 이 문자열은 상점 이름과 할인율을 적용한 제품의 가격을 포함한다. 우리가 원하는 동작은 이 문자열을 출력하는 것이다.

findPricesAsync("myPhone")
        .map(f -> f.thenAccept(System.out::println));

 

thenAccept 메서드는 CompletableFuture가 생성한 결과를 어떻게 소비할지 미리 지정했으므로 CompletableFuture<Void>를 반환한다. 이제 CompletableFuture<Void> 가 동작을 끝낼 때까지 딱히 할 수 잇는 일이 없다. 또한 가장 느린 상점에서 응답을 받아서 반환된 가격을 출력할 기회를 제공하고 싶다고 가정하자. 그러기 위해서는 아래 코드처럼 스트림의 모든 CompletableFuture<Void>를 배열로 추가하고 실행 결과를 기다려야 한다.

CompletableFuture[] futures = findPricesAsync("myPhone")
        .map(f -> f.thenAccept(System.out::println))
        .toArray(size -> new CompletableFuture[size]);

CompletableFuture.allOf(futures).join();

 

팩토리 메서드 allOf는 CompletableFuture 배열을 입력으로 받아 CompletableFuture<Void>를 반환한다. 전달된 모든 CompletableFuture가 완료되어야 CompletableFuture<Void>가 완료된다. 따라서 allOf 메서드가 반환하는 CompletableFuture에 join을 호출하면 원래 스트림의 모든 CompletableFuture의 실행 완료를 기다릴 수 있다. 이를 이용해서 최저가격 검색 애플리케이션은 '모든 상점이 결과를 반환했거나 타임아웃되었음' 같은 메시지를 사용자에게 보여줄 수 있다.

 

반면 배열의 CompletableFuture 중 하나만 작업이 끝나면 모두 끝나는 상황이 있을 수 있다. 이때는 팩토리 메서드 anyOf를 사용한다.