2025. 3. 3. 17:55ㆍ카테고리 없음
목차.
1. 포크/조인 프레임워크
2. Spliterator 인터페이스
1. 포크/조인 프레임워크
포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다. 포크/조인 프레임워크에서는 서브태스크를 스레드 풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
1.1 RecursiveTask 활용
스레드 풀을 이용하려면 RecursiveTask<R>의 서브 클래스를 만들어야 한다. 여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때는 RecursiveAction 형식이다. RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다.
protected abstract R compute();
compute 메서드는 태스클 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다. 따라서 대부분의 compute 메서드 구현은 다음과 같은 의사코드 형식을 유지한다.
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}

포크/조인 프레임워크를 이용해서 범위의 숫자를 더하는 예제를 통해 살펴보자
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
//더할 숫자 배열
private final long[] numbers;
//이 서브태스크에서 처리할 배열의 초기 위치와 최종 위치
private final int start;
private final int end;
//분리할 수 없는 서브태스크의 기준
private static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator left =
new ForkJoinSumCalculator(numbers, start, start + length / 2);
//ForkJoinPool 의 다른 스레드로 새로 생성한 태스크를 비동기 실행
left.fork();
ForkJoinSumCalculator right = new ForkJoinSumCalculator(numbers, start + length / 2, end);
//두 번째 서브태스크를 동기 실행
Long rightResult = right.compute();
Long leftResult = left.join();
return leftResult + rightResult;
}
private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
위 RecursiveTask 구현체의 가장 중요한 부분은 compute 메서드이다. 기준점(THRESHOLD) 보다 작은 값은 반으로 나뉘어 왼쪽은 비동기로 실행하고 오른쪽은(왼쪽 태스크가 실행되는 동안) 현재 스레드로 바로 실행한 뒤 (추가로 분할이 일어날 수 있다.) 왼쪽 태스크가 모두 연산될 때까지 기다리고(left.join()) 이후 더한 값을 반환한다.
private static final long N = 10_000_000L;
public static void main(String[] args) {
measureAverage(ForkJoinSum_example::forkJoinSum, 10, N);
}
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
실행순서.
1. ForkJoinPool 의 스레드가 ForkJoinSumCalculator.compute 실행
2. 스레드가 태스크의 크기를 확인하고 병렬로 실행할지 작업을 나눌지 확인
3. 배열을 반으로 분할해서 두 개의 새로운 ForkJoinSumCalculator로 할당
4. ForkJoinPool이 새로 생성된 ForkJoinSumCalculator 실행
5. 태스크를 수행하는 조건에 맞을 때까지 1. 반복
6. 각 서브태스크는 순차적으로 처리
7. 포킹 프로세스로 만들어진 이진트리를 루트에서 역순으로 방문(서브 태스크의 부분 결과 합침)

Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
Result: 50000005000000
done in: 26 msecs
병렬 스트림을 이용할 때보다 성능이 나빠졌다. 이는 ForkJoinSumCalculator 태스크에서 사용할 수 있도록 전체 스트림을 long[]으로 변환했기 때문이다.
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
1.2. 포크/조인 프레임워크를 제대로 사용하는 방법
포크/조인 프레임워크는 쉽게 사용할 수 있는 편이지만 주의를 요한다.
1. join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.
따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리는 일이 발생한다.
2. RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말라.
대신 compute나 fork 메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다. (invoke 는 내부적으로 compute를 실행시키면서 다른 유휴 스레드에게 left 태스크를 넘겨주지 않을 가능성이 높음. 자세한건 ForkJoinPool의 work-stealing 알고리즘 참고)
3. 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는 fork를 호출하는 것보다는 compute 를 호출하는 것이 효율적이다. 그러면 두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
4. 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다.
보통 IDE로 디버깅할 때 스택 트레이트로 문제가 일어난 과정을 쉽게 확인할 수 있는데, 포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.
5. 병렬 스트림에서 살펴본 것처럼 멀티코어에 포크/조인 프레임워크를 사용하는 것이 항상 옳은 건 아니다.
병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할할 수 있어야 한다.
각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 한다.
1.3 작업 훔치기(work-stealing)
ForkJoinSumCalculator 예제에서는 덧셈을 수행할 숫자가 만 개 이하면 서브태스크 분할을 중단했다. 기준값을 바꿔가면서 실험해보는 방법 외에는 좋은 기준을 찾을 방법이 없다.
이론적으로는 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 CPU 코어에서 태스크를 실행할 것이고 크기가 같은 각각의 태스크는 같은 시간에 종료될 것이라 생각할 수 있다. 하지만 복잡한 시나리오가 사용되는 현실에서는 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다. 분할 기법이 효율적이지 않기 때문일 수도 있고 외부 서비스와 협력하는 과정에서 지연이 생길 수 있다.
포크/조인 프레임워크에서 작업 훔치기(work stealing)라는 기법으로 이 문제를 해결한다. 작업 훔치기 기법에서 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스클 가져와서 작업을 처리한다. 이때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있다. 다른 스레드는 바쁘게 일하고 있는데 한 스레드는 할일이 다 떨어진 상황이다. 이때 할일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드의 큐의 꼬리에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때까지, 이 과정을 반복한다.
풀에 있는 작업자 스레드의 태스크를 재분배하고 균형을 맞출 때 작업 훔치기 알고리즘을 사용한다. 작업자의 큐에 있는 태스크를 두 개의 서브 태스크로 분할했을 때 둘 중 하나의 태스크를 다른 유휴 작업자가 훔쳐갈 수 있다. 그리고 주어진 태스크를 순차 싱행할 단계가 될 때까지 이 과정을 재귀적으로 반복한다.

2. Spliterator 인터페이스
자바 8은 Spliterator라는 새로운 인터페이스를 제공한다. Spliterator는 '분할할 수 있는 반복자'라는 의미다. Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있다. 커스텀 Spliteraot를 꼭 직접 구현해야 하는 것은 아니지만 Spliterator가 어떻게 동작하는지 이해한다면 병렬 스트림 동작과 관련한 통찰을 얻을 수 있다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
여기서 T는 Spliterator에서 탐색하는 요소의 형식이다.
1. tryAdvance
- Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환한다.(일반적인 iterator 동작과 같음)
2. trySplit
- Spliterator의 일부 요소를 분할해서 두 번재 Spliterator를 생성하는 메서드.
3. estimateSize
- 탐색해야 할 요소 수 정보를 제공. 특히 탐색해야 할 요소 수가 정확하진 않더라도 제공된 값을 이용해서 더 쉽고 공평하게 Spliterator를 분할 가능.
2.1 분할 과정

그림에서 보여주는 것처럼 분할은 재귀적으로 일어난다.
1단계 : 첫 번째 Spliterator 에서 trySplit으로 두 번재 Spliterator를 생성
2단계 : 두 개의 Spliterator 에서 trySplit을 다시 호출하면 네 개의 Spliterator가 생성
3단계 : trySplit의 결과가 null이 될 때까지 이 과정을 반복
4단계 : 모든 Spliterator의 trySplit의 결과가 null이면 재귀 분할 과정이 종료
이 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
Spliterator 특성
Spliterator는 characteristics라는 추상 메서드를 정의하고 이 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.
특성 | 의미 |
ORDERED | 리스트처럼 요소에 정해진 순서가 있으므로 분할할 때 유의 |
DISTINCT | 요소 중복x |
SORTED | 탐색된 요소는 정렬 순서를 따름 |
SIZED | 크기가 알려진 소스로 생성됨 -> esticatedSize()는 정확한 값을 반환 |
NON-NULL | 탐색하는 모든 요소는 NULL x |
IMMUTABLE | Spliterator의 소스는 불변이므로 요소를 추가, 수정, 삭제 할 수 없다. |
CONCURRENT | 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다. |
SUBSIZED | 이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖음 |
2.2 커스텀 Spliterator 구현하기
문자열의 단어 수를 계산하는 메서드를 구현해보자. 다음은 반복 버전으로 메서드를 구현한 예제다.
public static void main(String[] args) {
System.out.println(countWordsIteratively("DS RHeLLO World rice shrimp bookmark"));
}
public static int countWordsIteratively(String s) {
int count = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)) {
lastSpace = true;
} else if (lastSpace) {
count++;
lastSpace = false;
}
}
return count;
}
=>
6
이제 스레드를 동기화하지 않고 병렬 스트림으로 작업을 병렬로 수행할 수 있게 함수형으로 바꿔보자
함수형으로 단어 수를 세는 메서드 재구현하기
우선 String을 스트림으로 변환해야 한다. 스트림은 int, long, doule 기본형만 제공하므로 Stream<Character>를 사용해야 한다.
Stream<Character> stream = IntStream.range(0, sentence.length())
.mapToObj(sentence::charAt);
스트림에 리듀싱 연산을 실행하면서 단어 수를 계산할 수 있다. 이때 지금까지 발견한 단어 수를 계산하는 int 변수와 마지막 문자가 공백이었는지 여부를 기억하는 Boolean 변수 등 두 가지 변수가 필요하다. 자바에는 튜플(래퍼 객체 없이 다형 요소의 정렬 리스트를 표현할 수 있는 구조체)이 없으므로 캡슐화하는 새로운 WorkCounter를 만들어야 한다.
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
//문자열을 하나씩 탐색한다.
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ?
this :
new WordCounter(counter, true);
} else {
return lastSpace ?
new WordCounter(counter + 1, false) :
this;
}
}
public WordCounter combine(WordCounter other) {
return new WordCounter(counter + other.counter, other.lastSpace);
}
public int getCounter() {
return counter;
}
}
accumulate 메서드는 새로운 WordCounter 클래스를 어떤 상태로 생성할 것인지 정의한다. 스트림을 탐색하면서 새로운 문자를 찾을 때마다 accumulator 메서드를 호출한다. 비공백 문자를 탐색한 다음에 마지막 문자가 공백이면 counter를 증가시킨다.
combine 메서드는 문자열 서브 스트림을 처리한 WordCounter의 결과를 합친다.
이제 문자 스트림의 리듀싱 연산을 구현한다.
private static int countWords(Stream<Character> stream) {
WordCounter reduce = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return reduce.getCounter();
}
Stream<Character> stream = IntStream.range(0, sentence.length())
.mapToObj(sentence::charAt);
System.out.println(countWords(stream.parallel()));;
=>
31
왜 단어가 6개가 아닐까? 원래 문자열을 임의의 위치에서 둘로 나누다 보니 하나의 단어를 둘로 게산하는 상황이 발생했을 것이다. 이 문제를 해결하려면 문자열을 임의의 위치에서 분할하지 말고 단어가 끝나는 위치에서만 분할하는 방법을 사용해야 한다. 그러려면 단어 끝에서 문자열을 분할하는 문자 Spliterator가 필요하다. 문자 Spliterator를 구현한 다음 병렬 스트림으로 전달해보자.
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
//현재 문자를 소비
action.accept(string.charAt(currentChar++));
//소비할 문자가 남아있으면 true 를 반환한다.
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null;
}
for (int splitPos = currentSize / 2 + currentChar;
//파싱할 문자열의 중간을 분할 위치로 설정
splitPos < string.length(); splitPos++) {
//다음 공백이 나올 때까지 분할 위치를 뒤로 이동
if (Character.isWhitespace(string.charAt(splitPos))) {
Spliterator<Character> spliterator =
new WordCounterSpliterator(string.substring(currentChar, splitPos));
//시작 위치를 분할 위치로 설정
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}
분석 대상 문자열로 Spliterator를 생성한 다음에 현재 탐색 중인 문자를 가리키는 인덱스를 이용해서 모든 문자를 반복 탐색한다.
tryAdvance
문자열에서 현재 인덱스에 해당하는 문자를 Consumer에 제공한 다음 인덱스를 증가
trySplit
반복될 자료 구조를 분할하는 로직을 포함하는 가장 중요한 메서드이다. RecursiveTask의 compute 메서드에서 했던 것처럼 분할 동작을 중단할 한계를 설정해야 한다. 실제로는 너무 많은 태스크를 만들지 않도록 10이 아닌 더 높은 값을 설정해야 한다. 단어 중간을 분할하지 않도록 빈 문자가 나올때까지 분할 위치를 이동시키는 부분이 핵심이다.
estimatedSize
탐색해야 할 요소의 개수는 Spliterator가 파싱할 문자열 전체 길이(string.length())와 현재 반복 중인 위치(currentChar)의 차다
characteristics
프레임워크에 Spliterator의 특성을 알려준다.
Spliterator<Character> spliterator = new WordCounterSpliterator(sentence);
Stream<Character> streamWithSpliterator = StreamSupport.stream(spliterator, true);
System.out.println(countWords(streamWithSpliterator));
=>
6