스트림의 병렬 처리 - 모던 자바 인 액션
자바에서 데이터 컬렉션을 병렬로 처리하는 방법
자바 7 이전 – 수동 병렬 처리
- 데이터를 수작업으로 여러 서브파트로 나누고, 각 파트를 별도 스레드에 할당
- 레이스 컨디션을 방지하기 위해 적절한 동기화 코드 필요
- 작업 결과를 직접 합치는 로직을 구현해야 함
자바 7 – Fork/Join 프레임워크
- 자바 7에서는 ForkJoinPool 기반의 Fork/Join 프레임워크가 도입됨
- 재귀적으로 작업을 분할하고, 각각을 병렬로 수행 후 결과를 합침
자바 8 – 병렬 스트림 (Parallel Stream)
- 내부적으로 ForkJoinPool을 사용
- 기본적으로 CPU 코어 수만큼 스레드가 생성됨
병렬 스트림은 내부적으로 ForkJoinPool 을 사용한다.
ForkJoinPool 은 프로세서 수에 상응하는 스레드를 갖는데, 아래와 같이 사용가능한 프로세서 수를 확인할 수 있다.
전역 설정 코드이므로 모든 병렬 스트림 연산에 영향은 준다.
Runtime runtime = Runtime.getRuntime();
int availableProcessors = runtime.availableProcessors();
System.out.println("availableProcessor = "+availableProcessors);
//availableProcessor = 4
잘못된 병렬 처리 예 – Stream.iterate
public long parallelSum(long n){
return Stream.iterate(1L, i->i+1)
.limit(n)
.parallel()
.reduce(0L,Long::sum);
}
왜 비효율적인가?
- Stream.iterate()는 순차적 특성을 가지고 있어 분할이 어려움
이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 iterate 연산을 청크로 분할하기 어렵다.
리듀싱 과정을 시작하는 시점에 전체 숫자리스트가 준비되어 있지 않아 스트림을 병렬로 처리할 수 있게끔 청크 분할이 가능하지 않다.스트림이 병렬로 처리되도록 지시해서 각각 합계가 다른 스레드에서 수행되었지만
결국 순차처리 방식과 크게 다른 점이 없어 스레드를 할당하는 오버헤드만 증가하게 된다. - 박싱된 Long을 사용하므로 박싱/언박싱 비용이 존재
이처럼 병렬 프로그래밍을 오용하면 성능이 나빠질 수 있다.
멀티코어 프로세서를 활용해 효과적으로 합계 연산을 병렬로 실행하려면 어떻게 해야 할까.
public long parallelSum2(int n){
return LongStream.rangeClosed(1L,n)
.parallel()
.reduce(0L,Long::sum);
}
LongStream.rangeClosed 는 iterate 에 비해 다음과 같은 장점을 제공한다.
- 기본형 long 사용 → 박싱/언박싱 비용 없음
- 범위 기반 스트림이라 청크 분할이 쉬움
예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15,16-20 범위의 숫자로 분할할 수 있다. - 병렬 처리가 효율적으로 수행됨 → 성능 향상
LongStream 을 이용하면 iterate 보다 빠른 처리가 가능하고 parallel 을 더하면 순차적 처리보다 빠른 속도를 보인다.
병렬 스트림 사용 시 주의할 점
1. 공유 상태 변경 금지
병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다.
다음은 n 까지의 자연수를 더하며 공유된 누적자를 바꾸는 프로그램 코드이다.
public long sideEffectSum2(long n){
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1,n).parallel().forEach(accumulator::add);
return accumulator.total;
}
호출하는 n이 커지면 결과가 다르게 나온다.
2. 병렬화가 항상 빠른 것은 아니다
- 병렬 처리에는 스레드 생성 및 스케줄링, 코어 간 데이터 전송 등의 오버헤드가 존재합니다.
- 처리 시간이 짧은 작업에서는 오히려 병렬 처리가 성능 저하를 유발할 수 있습니다.
- 병렬화는 충분히 큰 작업에만 사용하는 것이 좋습니다.
자바 Fork/Join 프레임워크와 병렬 스트림 이해 및 활용법
Fork/Join 프레임워크란?
- 자바 7부터 도입된 병렬 처리 프레임워크
- 큰 작업을 재귀적으로 작은 작업으로 분할(fork) → 각 작업을 병렬로 수행 → 결과를 합침(join)
- 내부적으로 ForkJoinPool(스레드 풀)을 이용해 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현
핵심 구성요소
- RecursiveTask<R>: 반환값이 있는 작업 정의용 클래스
- RecursiveAction: 반환값이 없는 작업 정의용 클래스
- compute(): 작업을 분할하고 처리하는 핵심 로직
- fork(): 새로운 태스크를 비동기 실행 큐에 추가
- join(): fork된 태스크의 결과가 준비될 때까지 블로킹
스레드풀을 이용하려면 RecursiveTask<R> 의 서브클래스를 만들어야한다.
R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때는 RecursiveAction 형식이다.
RecursiveTask 를 정의하려면 추상 메서드 compute 를 구현해야 한다.
compute 메서드는 태스크를 서브태스크를 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.
fork 는 크기가 충분히 작아질 때까지 재귀적으로 수행된다.
이 알고리즘은 분할 후 정복 알고리즘의 병렬화 버전이다.
package stream.parallel;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers; //더할 숫자 배열
private final int start; //서브태스크에서 처리할 배열의 초기 위치
private final int end;//서브태스크에서 처리할 배열의 최종 위치
public static final long THRESHOLD=10_000; //서브태스크 분할 기준. 이 값 이하로 분할 불가
/***
* 메인 태스크 생성용 생성자
*/
public ForkJoinSumCalculator(long[] numbers) {
this(numbers,0, numbers.length);
}
/**
* 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성자
* */
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if(length <= THRESHOLD){
return computeSequentially();
}
//배열의 첫번째 절반을 더하도록 서브태스크 생성
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
//다른 스레드로 새로 생성한 태스크를 비동기 실행한다.
leftTask.fork();
//배열의 나머지 절반을 더하도록 서브태스크 생성
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
//두번째 서브태스크를 동시 실행한다. 이때 추가로 분할이 일어날 수 있다.
Long rightResult = rightTask.compute();
//첫번째 서브태크스의 결과를 읽거나 아직 결과가 없으면 기다린다.
Long leftResult = leftTask.join();
//두 서브태스크를 조합한 값이 결과이다.
return leftResult+rightResult;
}
private long computeSequentially() {
long sum=0;
for(int i=start;i<end;i++){
sum += numbers[i];
}
return sum;
}
public static long forkJoinSum(int n){
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
}
leftTask 는 fork() 하며 작업 Que 로 추가되는데, 이 역시 compute 에 의해 반복해서 나뉜다.
자신의 작업 큐가 비어있는 스레드는 다른 스레드의 작업 큐에서 작업을 가져와 수행한다.
이것을 작업 훔쳐오기라 하며 스레드풀에 의해 자동적으로 이뤄진다.
작업훔치기는 비효율적인 분할 기법이나, 디스크, 외부 서비스와 협력과정에서 생기는 지연으로 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있는 상황에서 작업자 스레드 간의 작업 부하를 비슷하게 유지할 수 있게 해준다.
포크/조인 프레임워크를 제대로 사용하는 방법
- join()은 fork() 이후에 호출할 것
- 그렇지 않으면 블로킹이 발생해 성능이 떨어짐
(각각의 서브태스크가 다른 테스크가 끝나길 기다리며, 순차 알고리즘보다 느리고 복잡해질 수 있다.)
- 그렇지 않으면 블로킹이 발생해 성능이 떨어짐
- invoke()는 RecursiveTask 내부에서 사용 금지
- main() 또는 외부에서 호출할 때만 사용
- 순차코드에서 병렬 계산을 시작할 때만 invoke 를 사용한다.
- 왼쪽만 fork(), 오른쪽은 compute()로 처리
- 둘 다 fork() 하면 풀에 부하가 많아지고 오버헤드 증가
- 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있어 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
- 작업 단위는 포킹 비용보다 충분히 커야 함
- 너무 자잘한 작업은 오히려 병렬화가 느려짐
- I/O + 계산 병렬 처리
- 한 작업은 디스크/네트워크, 다른 작업은 CPU 계산을 맡게 하면 효율적
Spliterator와 병렬 스트림
Spliterator란?
- 스트림 분할에 특화된 반복자
- 병렬 처리 시 스트림을 효율적으로 나누는 역할
SplitIterator는 iterator 처럼 소스의 요소 탐색 기능을 제공하지만 병렬 작업에 특화되어있다는 점에서 다르다.
SplitIterator 가 어떻게 동작하는지 이해하면 병렬 스트림 동작과 관련한 통찰을 얻을 수 있다.
SplitIterator 인터페이스의 메서드
tryAdvance(Consumer) | 요소를 하나씩 순차적으로 소비하며 탐색해야할 요소가 남아있으면 참을 반환 (기존 iterator와 비슷) |
trySplit() | 스트림 일부를 분할해서 두번째 SplitIterator 를 생성. null 은 반환하면 더 이상 분할 불가능 |
estimateSize() | 탐색해야 할 요소 수 정보 제공 |
characteristics() | 정렬 여부, 순차성 등 특징 제공 (SplitIterator 자체의 특성 집합을 포함하는 int 반환) |
WordCounter 예제 – 단어 수 세기
요구사항
- 문자열에서 단어 수 세기
- 공백 기준으로 나눔
- 병렬 스트림 사용 시 단어 중간에서 분할되지 않도록 주의
package stream.parallel;
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c){
if(Character.isWhitespace(c)){
return lastSpace ? this : new WordCounter(counter,true);
} else {
return lastSpace ? new WordCounter(counter+1,false) : this ;
}
}
public WordCounter combine(WordCounter wordCounter){
return new WordCounter(counter+ wordCounter.counter,wordCounter.lastSpace);
}
public int getCounter() {
return counter;
}
}
병렬 처리 시 문제
- 스트림이 임의 위치에서 분할되면 한 단어가 두 파트로 나뉘어 잘못된 결과가 나올 수 있음
- 해결 방법: 단어 경계(공백) 기준으로만 분할되는 Spliterator 구현 필요 (단어 끝에서 문자열을 분할)