CHAPTER7 - 병렬 데이터 처리와 성능(1)

2025. 2. 28. 22:30Book/모던 자바 인 액션

목차.

1. 병렬 스트림으로 데이터를 병렬 처리하기

2. 병렬 스트림의 성능 분석

 

 

1. 병렬 스트림

 

컬렉션에 parallelStream을 호출하면 병렬 스트림이 생성된다. 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다. 예제로 살펴보자

 

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현한다고 가정하자. 숫자로 이뤄진 무한 스트림을 만든다음에 인수로 주어진 크기로 스트림을 제한하고, 두 숫자를 더하는 BinaryOperator로 리듀싱 작업을 해보자

public long sequentialSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
            .limit(n)
            .reduce(0L, Long::sum);
}

 

전통적인 자바에서는 다음과 같이 반복문으로 이를 구현할 수 있다.

public long iterativeSum(long n) {
    long sum = 0;
    for (long i = 1; i <= n; i++) {
        sum += i;
    }
    return sum;
}

 

특히 n이 커진다면 이 연산을 병렬로 처리하는 것이 좋을 것이다. 전통적인 방법으로 병렬로 처리하려면 변수 동기화, 스레드 갯수 등 고려할 것이 많아지지만 병렬 스트림을 이용한다면 이런 고려는 필요 없다.

 

 

1.1 순차 스트림을 병렬 스트림으로 변환하기

 

순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리된다.

public long sequentialParallelSum(long n) {
    return Stream.iterate(1L, i -> i + 1)
            .limit(n)
            .parallel()
            .reduce(0L, Long::sum);
}

 

위 코드가 이전 코드와 다른 점은 스트림이 여러 청크로 분할되어 있다는 것이다. 마지막으로 리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.

 

순차 스트림에 parallel 을 호출해도 스트림 자체에는 아무 변화가 없다. 내부적으로 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불리언 플래그가 설정된다. 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있다. 이 두 메서드를 이용해서 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어할 수 있다.

stream().parallel()
        .filter(...)
        .sequential()
        .map(...)
        .parallel()
        .reduce();

 

parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다. 이 에제에서 파이프라인의 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다.

 

 

1.2 스트림 성능 측정

 

병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측할 수 있다. 하지만 이건 추측일 뿐이다. 성능을 최적화할 때는 측정이 매우매우 중요하다. 따라서 자바 마이크로벤치마크 하니스(JVH)라는 라이브러리를 이용해 작은 벤치마크를 구현할 것이다. 

plugins {
    id 'java'
    //jmh 플러그인
    id "me.champeau.jmh" version "0.7.3"
}

group 'org.example'
version '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

// jar로 빌드할때 jar 이름
jmhJar {
    archiveFileName = 'benchmark.jar'
}

dependencies {

    //jmh 라이브러리
    jmh 'org.openjdk.jmh:jmh-core:1.37'
    //jar로 빌드하는데 도움을 주는 어노테이션 프로세서
    jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.37'

    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
    useJUnitPlatform()
}

 

@BenchmarkMode(Mode.AverageTime) //벤치마크 대상 메서드를 실행하는 데 걸링 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) //벤치마크 결과를 밀리초 단위로 출력
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"}) //4Gb의 힙 공간을 제공한 환경에서 두 번 벤치마크를 수행
@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 5, time = 1)
public class SequentialStreamBenchmark {

    private static final long N = 10_000_000L;

    @Benchmark
    public long sequentialSum() {
        return Stream.iterate(1L, i->i+1).limit(N)
                .reduce(0L, Long::sum);
    }

    @TearDown(Level.Invocation) // 매 번 벤치마크 실행한 다음에는 가비지 컬렉터 동작 시도
    public void tearDown() {
        System.gc();
    }
}

 

클래스를 컴파일하면 benchmark.jar라는 파일이 빌드 디렉토리 하위에 생성된다. 이 파일을 다음처럼 실행할 수 있다.

java -jar ./build/libs/benchmark.jar SequentialStreamBenchmark

 

벤치마크가 가능한 가비지 컬렉터의 영향을 받지 않도록 힙의 크기를 충분하게 설정하고 벤치마크가 끝날 때마다 가비지 컬렉터가 실행되도록 강제했다.

 

이 코드를 실행할 때 JMH 명령은 핫스팟이 코드를 최적화 할 수 있도록 20 번을 실행하며 벤치마크를 준비한 다음 20번을 더 실행해 최종 결과를 계산한다. JMH의 특정 어노테이션(@Fork, @Warmup, @Mesurement)이나 -w, -i 플래그를 명령행에 추가해서 기본 동작 횟수를 조절할 수 있다.

Benchmark                                Mode  Cnt   Score   Error  Units
SequentialStreamBenchmark.sequentialSum  avgt   10  56.456 ± 0.146  ms/op

 

전통적인 for 루프를 사용해 반복하는 방법이 더 저순으로 동작할 뿐 아니라 특히 기본깞을 박싱하거나 언박싱할 피룡가 없으므로 더 빠를 것이다.

@Benchmark
public long sequentialSum() {
    long sum = 0;

    for (long i = 1L; i <= N; i++)
        sum += i;

    return sum;
}

 

같은 컴퓨터로 두 번째 벤치마크 코드를 돌려보자 아래는 결과이다.

Benchmark                             Mode  Cnt  Score   Error  Units
SequentialForBenchmark.sequentialSum  avgt   10  3.799 ± 0.006  ms/op

 

전통적인 for 문이 순차 스트림 연산보다 10배 넘게 빠른걸 확인할 수 있다.

병렬 스트림의 속도도 확인해보자

Benchmark                              Mode  Cnt   Score   Error  Units
ParallelStreamBenchmark.sequentialSum  avgt   10  49.635 ± 0.108  ms/op

 

책에 쓰인 예제에서는 병렬 스트림이 순차 스트림보다 4배 느리게 나왔는데 직접 테스트한 결과는 순차 스트림보다 미세하게 빠르다. 아마 jdk 버전, 컴퓨터 코어수에 따른 차이같다. 하지만 결론은 같다. 왜 병렬 스트림이 예상보다 더 좋은 퍼포먼스를 보여주지 못하느냐? 두 가지 문제가 있다.

 

1. 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.

2. 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.

 

두 번째 문제는 특히 중요하다. 우리에겐 병렬로 수행될 수 있는 스트림 모델이필요하기 때문이다. 특히 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 iterate 연산을 청크로 분할하기가 어렵다.

 

이와 같은 상황에서 리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수 없다.(미리 준비된 리스트에서는 청크로 나눠 연산이 가능하다.)

 

더 특화된 메서드 사용

멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면 어떻게 해야 할까? 5챕터에서 LongStream.rangeClosed라는 메서드를 소개했다. 이 메서드는 iterate에 비해 다음과 같은 장점을 제공한다.

 

1. LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.

2. LongStream.rangeClosed는 는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다.

 

언박싱과 관련한 오버헤드가 얼마나 될까? 다음 순차 스트림을 처리하는 시간을 측정해보자.

Benchmark                                              Mode  Cnt  Score   Error  Units
SequentialStreamBenchmark.sequentialSumWithLongStream  avgt   10  3.811 ± 0.010  ms/op

 

기존의 iterate 팩토리 메서드로 생성한 순차 버전과 속도가 비슷해졌다. 특화되지 않은 스트림을 처리할 때는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문이다.  상황에 따라서는 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다. 병렬 스트림에도 적용해보자.

Benchmark                                            Mode  Cnt  Score   Error  Units
ParallelStreamBenchmark.sequentialSumWithLongStream  avgt   10  0.356 ± 0.002  ms/op

 

확연히 빨라진 결과를 볼 수 있다. 이전 iterate 연산과 다르게 리듀싱 연산이 병렬로 수행되고 박싱 언박싱 오버헤드가 사라졌기 때문이다.

 

하지만 병렬화가 완전 공짜는 아니다. 병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 한다. 멀티코어 간의 데이터 이동은 생각보다 비싸다. 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.

 

 

1.3 병렬 스트림의 올바른 사용법

 

병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다. 다음 예제 코드를 보자 n까지의 자연수를 더하면서 공유된 누적자를 바꾸는 프로그램을 구현한 코드다.

public long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(0L, n).forEach(accumulator::add);
    return accumulator.total;
}

private class Accumulator {
    public long total = 0;

    private void add(long value) { total += value; }
}

 

명령형 프로그래밍 패러다임에 익숙한 개발자라면 위와 같은 코드를 자주 구현할 것이다. 리스트의 숫자를 반복할 때의 코드와 비슷하다. 즉, 누적자를 초기화하고, 리스트의 각 요소를 하나씩 탐색하면서 누적자에 숫자를 추가할 수 있다.

 

코드에 문제가 보이는가? 위 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있으므로 병렬로 실행하면 대참사다. 특히 total을 접근할 때마다 데이터 레이스 문제가 일어난다. 동기화로 문제를 해결하다보면 결국 병렬화라는 특성이 없어져 버린다. 스트림을 병렬로 만들어서 문제를 확인해보자.

public static void main(String[] args) {
    measureAverage(ProperUseOfParallelStream::sideEffectSum, 10, N);
}

private static long measurePerf(Function<Long, Long> function, long n) {
    long start = System.currentTimeMillis();
    System.out.println("Result: " + function.apply(n));
    return System.currentTimeMillis() - start;
}

public static void measureAverage(Function<Long, Long> function, int iterate, long n) {
    long result = 0;
    for (int i = 0; i < iterate; i++) {
        result += measurePerf(function, n);
    }
    result /= iterate;
    System.out.println("SideEffect parallel sum done in: " + result + " msecs");
}

=>

Result: 8590045127553
Result: 5501421763173
Result: 4616724525000
Result: 5660762987175
Result: 6333440107512
Result: 8450667294252
Result: 6096106108764
Result: 10193322253863
Result: 5302766856998
Result: 7797107925285
SideEffect parallel sum done in: 3 msecs

 

올바른 결과값(50000005000000)이 나오지 않는다. 여러 스레드에서 동시에 누적자, total += value를 실행하면서 이런 문제가 발생한다. 얼핏 보면 아토믹 연산같지만 total += value는 아토믹 연산이 아니다. 결국 여러 스레드에서 공유하는 객체의 상태를 바꾸는 forEach 블록 내부에서 add 메서드를 호출하면서 이 같은 문제가 발생한다. 이처럼 병렬 스트림을 사용했을 때 이상한 결과를 피하려면 상태 공유에 따른 부작용을 피해야 한다.

 

병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피해야 한다. 이후 18, 19에서 함수형 프로그래밍을 공부하면서 상태 변화를 피하는 방법을 자세히 알아보자. 우선 병렬 스트림이 올바로 동작하려면 공유된 가변 상태를 피해야 한다는 사실만 알고있으면 된다.

 

 

1.4 병렬 스트림 효과적으로 사용하기

 

'천 개 이상의 요소가 있을 때만 병렬 스트림을 사용하라'같이 양을 기준으로 병렬 스트림 사용을 결정하는 것은 부적절하다. 정해진 기기에서 정해진 연산을 수행할 때는 이와 같은 기준을 적용할 수 있지만 상황이 달라지면 이와 같은 기준이 제 역할을 하지 못한다. 그래도 어떤 상황에서 병렬 스트림을 사용할 것인지 약간의 수량적 힌트를 정하는 것이 도움이 될 때도 있다.

 

1. 확신이 서지 않으면 직접 측정하라.

스트림간 순차-병렬 변환이 쉽지만 무조건 바꾸는 것이 능사는 아니다. 특히 병렬 스트림의 수행 과정은 투명하지 않을 때가 많으므로 순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 측정하라

 

2. 박싱을 주의하라.

오토박싱과 언박싱은 성능을 크게 저하시킨다. 기본형 특화 스트림을 사용하자

 

3. 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.

특히 limit나 findFirst처럼 요소의 순서에 의존하는 연산은 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다. findAny는 요소의 순서와 상관없이 연산하므로 findFirst보다 성능이 좋다. (정렬된)스트림에 N개 요소가 있을 때 요소의 순서가 상관없다면 unordered를 호출하고 (비정렬된) 스트림에서 limit를 호출하는 것이 더 효율적이다.

 

4. 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라. 

처리해야 할 요소 수가 N이고 하나의 요소를 처리하는 데 드는 비용을 Q라 하면 전체 스트림 파이프라인 처리비용을 N*Q로 예상할 수 있다. Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있다.

 

5. 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.

병렬화 과정에서 생기는 부가 비용이 더 크다.

 

6. 스트림을 구성하는 자료구조가 적절한지 확인하라

예를 들어 ArrayList를 LinkedList보다 효율적으로 분할할 수 있다. LinkedList를 분할하려면 모든 요소를 탐색해야 하지만 ArrayList(인덱스로 접근해 서브리스트를 바로 추출할 수 있다.)는 요소를 탐색하지 않고도 리스트를 분할할 수 있기 때문이다. range팩토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다. 또한 커스텀 Spliterator를 구현해서 분해 과정을 완벽하게 제어할 수 있다.

 

7. 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.

예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다. 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없다.(하나라도 필터링되면 UNSIZED임)

 

8. 최종 연산의 병합 과정(ex. Collector.combiner 메서드) 비용을 살펴보라.

병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.

 

마지막으로 병렬 스트림이 수행되는 내부 인프라구조도 살펴봐야 한다. 병렬 합계 예제에서는 병렬 스트림을 제대로 사용하려면 병렬 스트림의 내부 구조를 잘 알아야한다.