2025. 6. 1. 20:24ㆍBook/모던 자바 인 액션
목차.
1. 비동기 작업을 만들고 결과 얻기
2. 비블록 동작으로 생산성 높이기
3. 비동기 API 설계와 구현
4. 동기 API를 비동기적으로 소비하기
1. Future의 단순 활용
Future 인터페이스는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있다. 시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다. Future 는 저수준의 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있다. Future를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService 에 제출해야 한다.
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
return doSomeLongComputation();
}
});
doSomethingElse();
try {
System.out.println("Result: " + future.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
위 코드에서 보여주는 것처럼 이와 같은 유형의 프로그래밍에서는 ExecutorService에서 제공하는 스레드가 시간이 오래 걸리는 작업을 처리하는 동안 우리 스레드로 다른 작업을 동시에 실행할 수 있다. 다른 작업을 처리하다가 시간이 오래 걸리는 작업의 결과가 필요한 시점이 되었을 때 Future의 get 메서드로 결과를 가져올 수 있다. get 메서드를 호출했을 때 이미 계산이 완료되어 결과가 준비되었다면 즉시 결과를 반환하지만 결과가 준비되지 않았다면 작업이 완료될 때까지 스레드를 블록시킨다.
위 사진에서는 오래 걸리는 작업이 영원히 끝나지 않으면 우리 스레드가 영원히 블록 될 위험이 있다. 따라서 future.get() 메서드를 오버로드해서 우리 스레드가 대기할 최대 타임아웃 시간을 설정하는 것이 좋다.
System.out.println("Result: " + future.get(1, TimeUnit.SECONDS));
1.1 Future 제한
Future 인터페이스에는 비동기 계산이 끝났는지 확인하는 메서드, 계산이 끝나길 기다리는 메서드, 결과 회수 메서드 등을 제공한다.
계산이 완료되었는지 확인하는 메서드:
- isDone() : 작업이 완료되었는지 여부를 반환
계산이 끝나길 기다리고 결과를 받는 메서드들:
- get() : 계산이 완료될 때까지 blocking하며 대기하고 결과를 반환
- get(long timeout, TimeUnit unit) : 지정된 시간 동안만 대기하며 결과를 반환
그외 메서드들:
- isCancelled() : 작업이 취소되었는지 확인
- cancel(boolean mayInterruptIfRunning) : 작업을 취소하려고 시도
하지만 이런 메서드만으로는 간결한 동시 실행 코드를 구현하기에 부족하다. 예를 들어
'오래 걸리는 A라는 계산이 끝나면 그 결과를 다른 오래 걸리는 계산 B로 전달하시오. 그리고 B의 결과가 나오면 다른 질의의 겨로가와 B 의 결과를 조합하시오'
와 같은 요구사항을 쉽게 구현할 수 있어야 한다. future로 이와 같은 동작을 구현하는 것은 쉽지 않다. 다음과 같은 선언형 기능이 있다면 유용할 것이다.
- 두 개의 비동기 계산 결과를 하나로 합친다. 두 가지 계산 결과는 서로 독립적일 수 있으며 또는 두 번째 결과가 첫 번째 결과에 의존하는 상황일 수 있다.
- Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
- Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다.(예를 들어 여러 태스크가 다양한 방식으로 같은 결과를 구하는 상황)
- 프로그램적으로 Future를 완료시킨다(즉, 비동기 동작에 수동으로 결과 제공)
- Future 완료 동작에 반응한다.(즉, 결과를 기다리면서 블록되지 않고 결과가 준비되었다는 알림을 받은 다음에 Future 의 결과로 원하는 추가 동작을 수행할 수 있음)
이 장에서는 지금까지 설명한 기능을 선언형으로 이용할 수 있도록 자바 8에서 새로 제공하는 CompletableFuture 클래스를 살펴본다. Stream 과 CompletableFuture 는 비슷한 패턴, 즉 람다 표현식과 파이프라이닝을 활용한다. 따라서 Future와 CompletableFuture 의 관계를 Collections과 Stream 의 관계에 비유할 수 있다.
1.2 CompletableFuture 로 비동기 애플리케이션 만들기
어떤 제품이나 서비스를 이용해야 하는 상황이라고 가정하자. 예산을 줄일 수 있도록 여러 온라인상점 중 가장 저렴한 가격을 제시하는 상점을 찾는 애플리케이션을 완성해가는 예제를 이용해서 CompletableFuture 의 기능을 살펴보자. 이 애플리케이션을 만드는 동안 아래와 같은 기술을 배우고 사용한다.
- 고객에게 비동기 API를 제공하는 방법을 배운다.
- 동기 API를 사용해야 할 때 코드를 비블록으로 만드는 방법을 배운다. 두 개의 비동기 동작을 파이프라인으로 만드는 방법과 두 개의 동작 결과를 하나의 비동기 계산으로 합치는 방법을 살펴본다. 예를 들어 온라인상점에서 우리가 사려는 물건에 대응하는 할인 코드를 반환한다고 가정하자. 우리는 다른 원격 할인 서비스에 접근해서 할인 코드에 해당하는 할인율을 찾아야 한다.
- 비동기 동작의 완료에 대응하는 방법을 배운다. 모든 상점에서 가격 정보를 얻을 때까지 기다리는 것이 아니라 각 상점에서 가격 정보를 얻을 때마다 즉시 최저가격을 찾는 애플리케이션을 갱신하는 방법을 설명한다.
2. 비동기 API 구현
최저가격 검색 애플리케이션을 구현하기 위해 먼저 각각의 상점에서 제공해야 하는 API부터 정의하자. 다음은 제품명에 해당하는 가격을 반환하는 메서드 정의 코드다.
public class Shop {
public double getPrice(String product) {
// 구현해야 함
}
}
getPrice 메서드는 상점의 데이터베이스를 이용해서 가격 정보를 얻는 동시에 다른 이부 서비스에도 접근할 것이다.(프로모션, 할인 등). 우리가 당장 실제로 호출할 서비스까지 구현할 수 없으므로 delay 메서드를 이용하자.
public static void delay(long seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
위에서 구현한 delay를 이용해서 지연을 흉내 낸 다음에 임의의 계산값을 반환하도록 getPrice를 구현할 수 있다. 계산값은 제품명에 charAt을 적용해서 임의의 계산값을 반환한다.
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay(1000);
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
사용자가 이 API를 호출하면 비동기 동작이 완료될 때까지 1초 동안 블록된다. 최저가격 검색 애플리케이션에서 위 메서드를 사용해서 네트워크상의 모든 온라인상점의 가격을 검색해야 하므로 블록 동작은 바람직하지 않다.
2.1 동기 메서드를 비동기 메서드로 변환
동기 메서드 getPrice를 비동기 메서드로 변환하려면 다음 코드처럼 먼저 이름과 반환값을 바꿔야 한다.
public Future<Double> getPriceAsync(String product) {
...
}
이 장을 시작하면서 설명한 것처럼 자바 5부터 비동기 계산의 결과를 표현할 수 있는 Future 인터페이스를 제공한다.(호출자 스레드가 블록되지 않고 다른 작업을 실행). Future는 결과값의 핸들일 뿐이며 계산이 완료되면 get 메서드로 결과를 얻을 수 있다. getPriceAsync 메서드는 즉시 반환되므로 호출자 스레드는 다른 작업을 수행할 수 있다. 자바 8의 새로운 CompletableFuture 클래스는 아래 예제에서 보여주는 것처럼 getPriceAsync를 쉽게 구현하는 데 도움이 되는 기능을 제공한다.
public Future<Double> getPriceAsync(String product) {
// 계산 결과를 포함할 Future 객체를 생성하고 비동기적으로 계산을 수행
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
// 다른 스레드에서 비동기적으로 계산을 수행
double price = calculatePrice(product);
// 계산이 끝나면 결과를 Future에 담아줌
futurePrice.complete(price);
}).start();
// 결과를 기다리지 않고 Future 객체를 반환
return futurePrice;
}
위 코드에서 비동기 계산과 완료 결과를 포함하는 CompletableFuture 인스턴스를 만들었다. 그리고 실제 가격을 계산할 다른 스레드를 만든 다음에 오래 걸리는 계산 결과를 기다리지 않고 결과를 포함할 Future 인스턴스를 바로 반환했다. 요청한 제품의 가격 정보가 도착하면 complete 메서드를 이용해서 CompletableFuture 를 종료할 수 있다. 아래 코드에서 보여주는 것처럼 클라이언트는 getPriceAsync를 활용할 수 있다.
public static void main(String[] args) {
Shop shop = new Shop();
long start = System.currentTimeMillis();
Future<Double> futurePrice = shop.getPriceAsync("product");
long invocationTime = System.currentTimeMillis() - start;
System.out.println("Invocation returned after " + invocationTime + " ms");
// 제품의 가격을 계산하는 동안
doSomethingElse();
try {
double price = futurePrice.get();
System.out.println("Price is " + price);
} catch (Exception e) {
throw new RuntimeException(e);
}
long end = System.currentTimeMillis() - start;
System.out.println("Price returned after " + end + " ms");
}
=>
Invocation returned after 1 ms
Price is 200.84049662621658
Price returned after 1008 ms
위 코드에서 클라이언트는 특정 제품의 가격 정보를 상점에 요청한다. 상점은 비동기 API를 제공하므로 즉시 Future 를 반환한다. 클라이언트는 반환된 Future를 이용해서 나중에 결과를 얻을 수 있다. 그 사이 클라이언트는 다른 상점에 가격 정보를 요청하는 등 첫 번재 상점의 결과를 기다리면서 대기하지 않고 다른 작업을 처리할 수 있다. 나중에 클라이언트가 특별히 할일이 없으면 Future의 get 메서드를 호출한다. 이때 Future가 결과값을 가지고 있다면 Future에 포함된 값을 읽거나 아니면 값이 계산될 때까지 블록한다.
4절에서는 클라이언트가 블록되는 상황을 회피하고 Future의 작업이 끝났을 때 이를 통지받으면서 람다 표현식이나 메서드 참조로 정의된 콜백 메서드를 실행하는 방법을 살펴본다.
2.2 에러 처리 방법
위 코드는 문제없이 작동한다. 그런데 가격을 계산하는 동안 에러가 발생하면 어떻게 될까? 예외가 발생하면 해당 스레드에만 영향을 미친다. 즉, 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬인다. 결과적으로 클라이언트는 get 메서드가 반환될 때까지 영원히 기다리게 될 수도 있다.
클라이언트는 타임아웃값을 받는 get 메서드의 오버로드 버전을 만들어 이 문제를 해결할 수 있다. 이처럼 블록 문제가 발생할 수 있는 상황에서는 탕미아웃을 활용하는 것이 좋다. 하지만 이때 제품가격 계산에 왜 에러가 발생했는지 알 수 있는 방법이 없다. 따라서 completeExceptionally 메서드를 이용해서 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해야 한다.
public Future<Double> getPriceAsync(String product) {
// 계산 결과를 포함할 Future 객체를 생성하고 비동기적으로 계산을 수행
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
// 다른 스레드에서 비동기적으로 계산을 수행
double price = calculatePrice(product);
// 계산이 끝나면 결과를 Future에 담아줌
futurePrice.complete(price);
} catch (Exception e) {
// 도중에 문제가 발생하면 에러를 포함시켜 Future를 종료한다.
futurePrice.completeExceptionally(e);
}
}).start();
// 결과를 기다리지 않고 Future 객체를 반환
return futurePrice;
}
이제 클라이언트는 가격 계산 메서드에서 발생한 예외 파라미터를 포함하는 ExecutionException을 받게 된다.
팩토리 메서드 supplyAsync로 CompletableFuture 만들기
지금까지 CompletableFuture를 직접 만들었다. 하지만 좀 더 간단하게 CompletableFuture를 만드는 방법이 있다.
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
SupplyAsync 메서드는 Supplier를 인수로 받아서 CompletableFuture를 반환한다. CompletableFuture는 Supplier를 실행해서 비동기적으로 결과를 생성한다. ForkJoinPool의 Executor 중 하나가 Supplier를 실행할 것이다. 하지만 두 번재 인수를 받는 오버로드 버전의 supplyAsync 메서드를 이용해서 다른 Executor를 지정할 수 있다. 결국 모든 다른 CompletableFuture의 팩토리 메서드에 Executor를 선택적으로 전달 할 수 있다.
지금부터는 Shop 클래스에서 구현한 API를 제어할 권한이 우리에게 없는 상황이며 모든 API는 동기 방식의 블록 메서드라고 가정할 것이다. 블록 메서드를 사용할 수밖에 없는 상황에서 비동기적으로 여러 상점에 질의하는 방법, 즉 한 요청의 응답을 기다리며 블록하는 상황을 피해 최저가격 검색 애플리케이션의 성능을 높일 수 잇는 방법을 살펴보자.
3. 비블록 코드 만들기
우리는 동기 API를 이용해서 최저가격 검색 애플리케이션을 개발해야 한다. 다음과 같은 상점 리스트가 있다고 가정하자.
private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("BuyItAll"),
new Shop("MyFavoriteShop"));
그리고 다음처럼 제품명을 입력하면 상점 이름과 제품가격 문자열 정보를 포함하는 List를 반환하는 메서드를 구현해야 한다.
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.toList();
이제 findPrices 메서드로 원하는 제품의 가격을 검색할 수 있다. 또한 나중에 프로그램을 고치면서 성능이 얼마나 개선되었는지 확인할 수 있도록 가격을 찾는 데 소요된 시간도 측정했다.
long start = System.currentTimeMillis();
System.out.println(findPrices("myPhone"));
long duration = System.currentTimeMillis() - start;
System.out.println("Time taken: " + duration);
=>
[BestPrice price is 215.29,
LetsSaveBig price is 185.92,
BuyItAll price is 191.32,
MyFavoriteShop price is 214.11]
Time taken: 4026
예상대로 네 개의 상점에서 가격을 검색하는 동안 각각 1초의 대기시간이 있으므로 전체 가격 검색 결과는 4초보다 조금 더 걸린다. 성능 개선이 필요하다.
3.1 병렬 스트림으로 요청 별렬화하기
병렬 스트림을 이용해서 순차 계산을 병렬로 처리해서 성능을 개선할 수 있다.
public static List<String> parallelFindPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.toList();
}
=>
[BestPrice price is 126.85, LetsSaveBig price is 201.97, BuyItAll price is 142.73, MyFavoriteShop price is 130.50]
Time taken: 1022
간단하게 성능을 개선했다. 이제 네 개의 상점에서 병렬로 검색이 진행되므로 1초 남짓의 시간에 검색이 완료된다. 이를 더 개선하는 것도 가능하다. CompletableFuture 기능을 활용해서 findPrices 메서드의 동기 호출을 비동기 호출로 바꿔보자
3.2 CompletableFuture로 비동기 호출 구현하기
팩토리 메서드 supplyAsync로 CompletableFuture를 만들 수 있음을 알았다. 이 지식을 활용하자
// 비동기 호출
public static List<String> asyncFindPricesAsync(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
// CompletableFuture로 각각의 가격을 비동기적으로 계산한다.
.map(shop -> CompletableFuture.supplyAsync(
() -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))
).toList();
return priceFutures.stream()
// 모든 비동기 동작이 끝나길 기다린다.
.map(CompletableFuture::join)
.toList();
}
=>
[BestPrice price is 156.24,
LetsSaveBig price is 142.40,
BuyItAll price is 221.21,
MyFavoriteShop price is 162.23]
Time taken: 1029
두 map 연산을 하나의 스트림 처리 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리했다는 부분을 보자. 스트림 연산은 게으른 특성이 있으므로 하나의 파이프라인으로 연산을 처리했다면 모든 가격 정보 요청 동작이 동기적, 순차적으로 이루어지는 결과가 된다.
CompletableFuture로 각 상점의 정보를 요청할 때 기존 요청 작업이 완료되어야 join이 결과를 반환하면서 다음 상점으로 정보를 요청할 수 있기 때문이다.
그림의 윗 부분은 순차적으로 평가를 진행하는 단일 파이프라인 스트림 처리 과정을 보여준다.(점선) 즉, 이저 ㄴ요청의 처리가 완전히 끝난 다음에 새로 만든 CompletableFuture가 처리된다.
반면, 아래쪽은 우선 CompletableFuture를 리스트로 모은 다음에 다른 작업과는 독립적으로 각자의 작업을 수행하는 모습이다.
3.3 더 확장성이 좋은 해결 방법
병렬 스트림 버전의 코드는 정확히 네 개의 상점에 하나의 스레드를 할당해서 네 개의 작업을 병렬로 수행하면서 검색 시간을 최소화할 수 있었다. 만약 검색해야 할 다섯 번재 상점이 추가되었다면 (스레드 풀에서 제공하는 스레드 수가 네 개라고 가정) 총 2초가 걸린다.
[BestPrice price is 218.58,
LetsSaveBig price is 181.58,
BuyItAll price is 164.55,
BuyItAll2 price is 163.66,
MyFavoriteShop price is 123.66]
Time taken: 2128
CompletableFuture 버전도 살펴보자
[BestPrice price is 165.88,
LetsSaveBig price is 140.15,
BuyItAll price is 183.89,
BuyItAll2 price is 139.96,
MyFavoriteShop price is 166.67]
Time taken: 2027
CompletableFuture 버전이 병렬 스트림 버전보다 아주 조금 빠르다. 하지만 만족할 수 있는 수준은 아니다. 9개의 상접이 있다고 가정하면 병렬 스트림은 3143밀리초, CompletableFuture 버전은 3029 밀리초가 소요된다. 결과적으로 비슷하지만 CompletableFuture는 병렬 스트림 버전에 비해 작업에 이용하 ㄹ수 있는 다양한 Executor를 지정할 수 있다는 장점이 있다. 따라서 Executor로 스레드 풀의 크기를 조절하는 등 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.
'Book > 모던 자바 인 액션' 카테고리의 다른 글
CHAPTER18 - 함수형 관점으로 생각하기 (5) | 2025.06.06 |
---|---|
CHAPTER16 - CompletableFuture : 안정적 비동기 프로그래밍(2) (1) | 2025.06.01 |
CHAPTER15 - CompletableFuture 와 리액티브 프로그래밍 기초(2) (0) | 2025.05.25 |
CHAPTER13 - 디폴트 메서드 (1) | 2025.04.30 |
CHAPTER9 - 리팩터링, 테스팅 (2) (0) | 2025.03.25 |