본문 바로가기
인프런/김영한의 실전 자바 - 고급 3편, 람다, 스트림, 함수형 프로그래밍

[인프런] 김영한의 실전 자바 - 고급 3편, 람다, 스트림, 함수형 프로그래밍 / 12. 병렬 스트림

by hxxyeoniii 2025. 7. 3.

Fork / Join 패턴

분할(Fork), 처리(Execute), 모음(Join)

스레드는 한 번에 하나의 작업을 처리할 수 있다.

> 따라서 하나의 큰 작업을 여러 스레드가 처리할 수 있는 작은 단위의 작업으로 분할(Fork)해야 함

> 이렇게 분할한 작업을 각각 스레드가 처리(Execute)

> 각 스레드의 분할된 작업 처리가 끝나면 분할된 결과를 하나로 모아야(Join)

 

이런 멀티스레딩 패턴을 Fork/Join 패턴이라고 함


Fork / Join 프레임워크 1 - 소개

자바의 Fork/Join 프레임워크는 자바 7부터 도입된 java.util.concurrent 패키지의 일부로, 멀티코어 프로세서를 효율적으로 활용하기 위한 병렬 처리 프레임워크이다.

 

<주요 개념>

1. 분할 정복 전략(Divide and Conquer)

> 큰 작업을 작은 단위로 재귀적 분할(fork)

> 각 작은 작업의 결과를 합쳐(join) 최종 결과 생성

> 멀티코어 환경에서 작업을 효율적으로 분산 처리

 

2. 작업 훔치기 알고리즘(Work Stealing)

> 각 스레드는 자신의 작업 큐를 가짐

> 작업이 없는 스레드는 다른 바쁜 스레드의 큐에서 작업을 훔쳐 대신 처리

> 부하 균형을 자동으로 조절해 효율성 향상

 

 

 

<주요 클래스>

1. ForkJoinPool

   > Fork/Join 작업을 실행하는 특수한 ExecutorService 스레드 풀

   > 작업 스케줄링 및 스레드 관리 담당

   > 기본적으로 사용 가능한 프로세서 수 만큼 스레드 생성

   > 분할 정복과 작업 훔치기에 특화된 스레드 풀

 

2. ForkJoinTask

   > Future 구현

   > 개발자는 주로 다음 두 하위 클래스를 구현해 사용 : RecursiveTask & RecursiveAction

 

 

 

<예시>

작업의 크기가 8이고, 임계값이 4라고 가정

1. Fork : 임계값을 넘었으므로 작업을 절반으로 분할

2. Execute : 작업의 크기가 4이므로 임계값 범위 안에 들어옴 -> 처리

3. Join : 최종 결과를 합침

 

 

 

SumTask.java

package parallel.forkjoin;

import parallel.HeavyJob;
import util.MyLogger;

import java.util.List;
import java.util.concurrent.RecursiveTask;

import static util.MyLogger.log;

public class SumTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 4; // 임계값

    private final List<Integer> list;

    public SumTask(List<Integer> list) {
        this.list = list;
    }

    @Override
    protected Integer compute() {
        if(list.size() < 4) {
            // 작업 범위가 작으면 직접 계산
            log("처리 시작 " + list);
            int sum = list.stream()
                    .mapToInt(HeavyJob::heavyTask)
                    .sum();
            log("처리 완료 " + list + " -> " + sum);
            return sum;
        } else {
            // 작업 범위가 크면 반으로 나눠 병렬 처리
            int mid = list.size() / 2;
            List<Integer> leftList = list.subList(0, mid);
            List<Integer> rightList = list.subList(mid, list.size());

            SumTask leftTask = new SumTask(leftList);
            SumTask rightTask =new SumTask(rightList);
            
            // 왼쪽 작업은 다른 스레드에서 처리
            leftTask.fork();
            
            // 오른쪽 작업은 직접 현재스레드에서 처리
            Integer rightResult = rightTask.compute();
        
            // 왼쪽 작업 결과 기다림
            Integer leftResult = leftTask.join();
            int joinSum = leftResult + rightResult;
            log("left : " + leftResult + ", right : " + rightResult);
            return joinSum;
        }
    }
}

 

* RecursiveTask, RecursiveAction 구현 방법

: compute() 메서드를 재정의해 필요한 작업 로직 작성

: 일반적으로 일정 기준(임계값)을 두고, 작업 범위가 작으면 직접 처리하고 크면 작업을 둘로 분할해 각각 병렬로 처리하도록 구현

 

* fork() : 현재 스레드에서 다른 스레드로 작업을 분할해 보내는 동작

 

* join() : 분할된 작업이 끝날 때까지 기다린 후 결과를 가져오는 동작

 

 

 

ForkJoinMain1.java

package parallel.forkjoin;

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

import static util.MyLogger.log;

public class ForkJoinMain1 {
    public static void main(String[] args) {
        List<Integer> data = IntStream.rangeClosed(1, 8)
                .boxed()
                .toList();

        log("생성 : " + data);

        // ForkJoinPool 생성 및 작업 수행
        long startTime = System.currentTimeMillis();

        ForkJoinPool pool = new ForkJoinPool(10);
        SumTask task = new SumTask(data); // 1 ~ 8

        // 병렬로 합을 구한 후 결과 출력
        Integer result = pool.invoke(task);
        pool.close();

        long endTime = System.currentTimeMillis();

        log("time: " + (endTime - startTime) + "ms, sum: " + result);
        log("pool: " + pool);


    }
}

 

실행 결과

 

결과 분석


Fork / Join 프레임워크 2 - 작업 훔치기

Q. 임계값이 더 낮아진다면?

-> 더 많은 스레드가 병렬로 작업을 처리하게 됨

 

 

작업 훔치기 알고리즘(* 참고로만 알아두자)

> Fork/Join 풀의 스레드는 각자 자신의 작업 큐를 가짐

> 덕분에 작업을 큐에서 가져가기 위한 스레드간 경합이 줄어듬

> 자신의 작업이 없는 경우, 다른 스레드의 작업 큐에 대기중인 작업을 훔쳐서 대신 처리

 

 

 

위 예제에서 임계값이 2라고 가정

1. pool.invoke(task) : ForkJoinPool에 작업 요청 시 내부에 있는 외부 작업 큐에 작업이 저장

 

2. w1 스레드가 처리할 일이 없어 외부 작업 큐의 작업을 훔쳐 대신 처리한다.

 

3. w1이 훔친 작업의 compute()를 호출해 작업 시작

: 작업의 크기가 크기에 둘로 분할

: [1,2,3,4]는 fork 호출 & [5,6,7,8]은 스스로 compute()로 처리

: 사실, fork()는 자신의 작업 큐에 작업을 넣어두는 것!

 

4. w1은 [5,6,7,8] 작업을 분할 & w2는 w1의 작업을 훔침

 

...

-> 결과적으로 4개의 작업이 4개의 스레드에 분할되어 동시에 수행됨

 

 

 

정리

임계값을 낮춤으로 작업이 더 잘게 분할되고 더 많은 스레드가 병렬로 작업을 처리할 수 있게 됨

이는 Fork/Join 프레임워크의 핵심 개념인 분할 정복 전략을 명확히 보여줌

 

* 적절한 작업 크기 선택

너무 작은 단위로 분할 시 스레드 생성 및 관리에 드는 오버헤드가 생길 수 있고, 너무 큰 단위로 분할 시 병렬 처리 이점을 충분히 활용하지 못할 수 있음

 

만약, 1000개의 작업이 있다면?

1) 1개 단위로 쪼갤 때 : 1000개의 분할과 결합이 필요, 한 스레드당 100개의 작업 처리

2) 10개 단위로 쪼갤 때 : 100개의 분할과 ..

3) 100개 단위로 쪼갤 때 : 10개의 분할과 ..

4) 500개 단위로 쪼갤 때 : 2개의 분할과 ..

 

-> 이상적으로는 한 스레드당 1개의 작업을 처리하는 것이 좋다.

-> 스레드를 100% 사용하며 분할과 결합의 오버헤드도 최소화할 수 있기 때문

 

하지만, 작업시간이 다른 경우를 고려한다면 한 스레드당 1개의 작업 보다 잘게 쪼개어 두는 것이 좋음, 훔치기 기능을 제공하기에 쉬는 스레드 없이 최대한 많은 스레드를 활용할 수 있도록


Fork / Join 프레임워크 3 - 공용 풀(Common Pool)

자바 8에서는 공용 풀이라는 개념이 도입되었는데, 이는 Fork/Join 작업을 위한 자바가 제공하는 기본 스레드 풀이다.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

 

1. 시스템 전체에서 공유 : 애플리케이션 내에서 단일 인스턴스로 공유되어 사용

2. 자동 생성 : 별도로 생성하지 않아도 ForkJoinPool.commonPool()로 접근 가능

3. 편리한 사용 : 별도 풀을 만들지 않고도 RecursiveTask/RecursiveAction을 사용할 때 기본적으로 이 공용 풀이 사용됨

4. 병렬 스트림 활용 : 자바 8의 병렬 스트림은 내부적으로 이 공용 풀을 사용

5. 자원 효율성 : 여러 곳에서 별도의 풀을 생성하는 대신 공용 풀을 사용함으로 시스템 자원을 효율적으로 관리

6. 병렬 수준 자동 설정 : 기본적으로 시스템의 가용 프로세서 수에서 1을 뺀 값으로 병렬 수준이 설정됨

 

 

 

이전에는 직접 풀을 만들어(ForkJoinPool) 작업을 요청함

ForkJoinPool pool = new ForkJoinPool(10);
SumTask task = new SumTask(data);
int result = pool.invoke(task);

 

이를 공용 풀을 사용하게 바꾸면 다음과 같다.

SumTask task = new SumTask(data);
Integer result = task.invoke(); // 공용 풀 사용

 

실행 결과

공용 풀로 변경 후 로그

 

-> 풀에 작업을 요청하는 것이 아닌, 작업에 있는 invoke()를 직접 호출

-> 메인 스레드가 스레드 풀이 아닌 RecursiveTask의 invoke()를 직접 호출하면 메인 스레드가 작업의 compute()를 호출하게 됨, 그리고 내부에서 fork() 호출 시 공용 풀의 워커 스레드로 작업이 분할됨

-> 메인 스레드는 최종 결과가 나올 때 까지 대기해야 함

   > invoke() : 호출 스레드가 작업을 도우며 대기함

   > fork() : 작업을 비동기로 호출하려면 invoke() 대신 fork()를 호출하면 됨(Future를 반환 받음)

 

 

 

정리

1. 공용 풀은 JVM이 종료될 때까지 계속 유지되므로 별도로 풀을 종료(shutdown())하지 않아도 됨

2. 공용 풀을 활용하면, 별도로 풀을 생성/관리하는 코드를 작성하지 않아도 됨

 

 

 

공용 풀이 "CPU - 1"만큼 스레드를 생성하는 이유

기본적으로 자바의 Fork/Join 공용 풀은 시스템의 가용 CPU 코어 수에서 1을 뺀 값을 병렬 수준으로 사용함

 

1. 메인 스레드의 참여

Fork/Join 작업은 공용 풀의 워커 스레드뿐 아니라 메인 스레드도 연산에 참여할 수 있음

메인 스레드도 작업을 도와주기에 공용 풀에서 최대로 스레드를 생성하지 않아도 충분히 CPU 코어를 활용할 수 있음

 

2. 다른 프로세스와 자원 경쟁 고려

애플리케이션이 실행되는 환경에서 OS나 다른 애플리케이션, 혹은 GC 같은 내부 작업들도 CPU를 사용해야 함

모든 코어를 최대치로 점유하도록 설정하면 다른 중요한 작업이 지연되거나 컨텍스트 스위칭 비용이 증가함

하나의 코어를 여유분으로 남겨 시스템 성능을 안정적으로 유지하려는 목적이 있음

 

3. 효율적인 자원 활용

CPU 코어 수와 동일하게 스레드를 만들어도 성능상 큰 문제는 없지만, 특정 상황에서도 병목을 일으키지 않는 선에서 효율적으로 CPU를 활용할 수 있음


자바 병렬 스트림

자바의 병렬 스트림 : parallel()을 사용해보자

package parallel;

import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

import static util.MyLogger.log;

public class ParallelMain4 {
    public static void main(String[] args) {
        int processorCount = Runtime.getRuntime().availableProcessors();
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        log("processorCount = " + processorCount + ", commonPool = " + commonPool.getParallelism());

        long startTime = System.currentTimeMillis();

        int sum = IntStream.rangeClosed(1, 8)
                .parallel() // 추가!!!
                .map(HeavyJob::heavyTask)
                .reduce(0, (a, b) -> a + b);

        long endTime = System.currentTimeMillis();
        log("time: " + (endTime - startTime) + "ms, sum: " + sum);
    }
}

 

실행 결과

-> parallel() 사용 전에는 소요 시간이 10초, 사용 후에는 소요 시간이 1초 정도로 단축됨

 

 

> 로그를 보면 commonPool의 스레드들이 동시에 일을 처리하고 있음

> 스트림에 parallel() 메서드 호출 시 직접 스레드를 만들 필요 없이 자동으로 병렬 처리

> 스트림에 parallel()을 선언하면 스트림은 공용 ForkJoinPool을 사용하고, 내부족으로 병렬 처리 가능한 스레드 숫자와 작업의 크기 등을 확인하며 Spliterator를 통해 데이터를 자동으로 분할

> 분할 방식은 데이터 소스의 특성에 따라 최적화 되어 있음

> 이것이 람다 스트림을 활용한 선언적 프로그래밍 방식의 큰 장점


병렬 스트림 사용시 주의점

Fork/Join 프레임워크는 CPU 바운드 작업에만 사용해라!

 

1. 스레드 블로킹에 따른 CPU 낭비

: ForkJoinPool은 CPU 코어 수에 맞춰 제한된 개수의 스레드를 사용함(특히 공용 풀)

: I/O 작업으로 스레드가 블로킹되면 CPU가 놀게 되어, 전체 병렬 처리 효율이 크게 떨어짐

 

2. 컨텍스트 스위칭 오버헤드 증가

: I/O 작업 때문에 스레드를 늘리면, 실제 연산보다 대기 시간이 길어지는 상황 발생

: 스레드가 많아질수록 컨텍스트 스위칭 비용도 증가해 성능이 떨어짐

 

3. 작업 훔치기 기법 무력화

: ForkJoinPool이 제공하는 작업 훔치기 알고리즘은 CPU 바운드 작업에서 빠르게 작업 단위를 계속 처리하도록 설계되었음

: I/O 대기 시간이 많은 작업은 스레드가 대기하는 경우가 많아 작업 훔치기가 빛을 발휘하기 어려움

 

4. 분할정복 이점 감소

: 작업을 잘게 나누어도, I/O 병목 발생 시 CPU 병렬화 이점이 줄어듬

 

 

 

예시) 공용 풀 병렬 수준이 3일 경우

-> 요청 스레드도 자신의 작업에 참여하므로 각 작업당 4개의 스레드가 사용됨

-> 12개 요청을 처리하는데 필요한 스레드 자원이 부족함..

 

 

 

특히 실무에서는 여러 요청을 동시에 처리하는 애플리케이션 서버를 사용하는데 수 많은 요청이 공용 풀을 사용하는 것은 매우 위험!

공용 풀은 절대!! I/O 바운드 작업을 하면 안됨