본문 바로가기
인프런/김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성

[인프런] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 / 11. 스레드 풀과 Executor 프레임워크 1

by hxxyeoniii 2025. 4. 1.

스레드를 직접 사용할 때의 문제점

실무에서 스레드를 직접 생성해 사용하면 다음과 같은 문제가 있다.

 

1. 스레드 생성 비용으로 인한 성능 문제

1.1 메모리 할당 : 각 스레드는 자신만의 호출 스택을 가지고 있어야 함 -> 스레드가 실행되는 동안 사용하는 메모리 공간

1.2 스레드 관리 문제 : 스레드 생성 작업은 OS 커널 수준에서 이뤄지며 시스템 콜을 통해 처리됨 -> CPU와 메모리 리소스 소모

1.3 운영체제 스케줄러 설정 : 새로운 스레드 생성 시 OS 스케줄러는 이 스레드를 관리하고 실행 순서를 조장해야 함 -> 스케줄링 알고리즘에 따라 오버헤드가 발생할 수 있음

 

-> 단순 자바 객체 생성과는 비교할 수 없을 정도로 스레드 생성은 큰 작업

 

2. 스레드 관리 문제

서버의 CPU, 메모리 자원은 한정적이기에 스레드를 무한히 만들 수 없음

 

3. Runnable 인터페이스의 불편함

3.1 반환 값이 없음 : run() 메서드는 반환 값이 없어 실행 결과를 얻기 위해 별도 메커니즘이 필요

3.2 예외 처리 : run() 메서드는 체크 예외를 던질 수 없다.

 

 

 

해결 = 스레드 풀

: 스레드를 생성하고 관리하는 풀

: 스레드를 재사용할 수 있어 스레드 생성 시간을 절약할 수 있음

: 처리할 작업이 없다면 스레드는 스레드 풀 안에 WAITING 상태로 관리되고, 작업 요청이 오면 RUNNABLE 상태로 변경됨

 

-> 자바의 Executor 프레임워크는 스레드 풀, 스레드 관리, Runnable의 문제점, 생산자 소비자 문제를 해결해주는 자바 멀티스레드 최고의 도구


Executor 프레임워크 소개

Executor 인터페이스

 

-> 가장 단순한 작업 실행 인터페이스로 execute() 메서드 하나를 가지고 있음

 

ExecutorService 인터페이스

 

-> Executor 인터페이스를 확장해 작업 제출과 제어 기능을 추가로 제공

-> 대부분 이 인터페이스를 사용

 

주요 메서드

1. execute : Runnable 작업 제출

void execute(Runnable command

 

2. submit(Callable<T> task) : Callable 작업을 제출하고 결과를 반환받음

<T> Future<T> submit(Callable<T> task)

 

3. submit(Runnable task) : Runnable 작업을 제출하고 결과를 반환받음

Future<?> submit(Runnable task)

-> Runnable은 반환 값이 없기 때문에 future.get() 호출 시 null을 반환한다.

-> 결과가 없을 뿐 나머지 작업은 똑같음

 

 

 

로그 출력 유틸리티 생성

ExecutorUtils.java : ThreadPoolExecutor 구현체가 넘어오면 구성한 로그를 출력하고 아니면 인스턴스 자체 출력

package thread.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static util.MyLogger.log;

public abstract class ExecutorUtils {

    public static void printState(ExecutorService executorService) {
        if(executorService instanceof ThreadPoolExecutor poolExecutor) {
            int pool = poolExecutor.getPoolSize();
            int active = poolExecutor.getActiveCount();
            int queuedTasks = poolExecutor.getQueue().size();
            long completedTask = poolExecutor.getCompletedTaskCount();
            log("[pool=" + pool + ", active=" + active + ", queuedTasks=" +
                    queuedTasks + ", completedTasks=" + completedTask + "]");
        } else {
            log(executorService);
        }
    }

}

ExecutorService 코드로 시작

RunnableTask.java : 1초간 대기하는 작업 수행

package thread.executor;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class RunnableTask implements Runnable {

    private final String name;
    private int sleepMs = 1000;

    public RunnableTask(String name) {
        this.name = name;
    }

    public RunnableTask(String name, int sleepMs) {
        this.name = name;
        this.sleepMs = sleepMs;
    }

    @Override
    public void run() {
        log(name + " 시작");
        sleep(sleepMs);
        log(name + " 완료");
    }
}

 

ExecutorBasicMain.java

package thread.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static thread.executor.ExecutorUtils.printState;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class ExecutorBasicMain {

    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());

        log("== 초기 상태 ==");
        printState(es);
        es.execute(new RunnableTask("taskA"));
        es.execute(new RunnableTask("taskB"));
        es.execute(new RunnableTask("taskC"));
        es.execute(new RunnableTask("taskD"));
        log("== 작업 수행 중 ==");
        printState(es);

        sleep(3000);
        log("== 작업 수행 완료 ==");
        printState(es);

        // es.close(); // 자바 19부터 지원
        es.shutdown();
        log("== shutdown 완료 ==");
        printState(es);
    }
}

 

실행 결과

 

 

 

ThreadPoolExecutor : ExecutorService의 가장 대표적인 구현체

크게 2가지 요소로 구성되어 있다.

1. 스레드 풀 : 스레드 관리

2. BlockingQueue : 작업 보관, 생산자 소비자 문제 해결을 위해 단순 큐가 아닌 BlockingQueue 사용

 

생성자

new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());

-> corePoolSize : 스레드 풀에서 관리되는 기본 스레드 수

-> maximumPoolSize : 스레드 풀에서 관리되는 최대 스레드 수

-> keepAliveTime, TimeUnit unit : 기본 스레드 수를 초과해 만들어진 스레드가 생존할 수 있는 대기 시간

-> BlockingQueue workQueue : 작업을 보관할 블로킹 큐

 

생산자(main 스레드)가 es.execute(new RunnableTask("xx")를 호출하면, RunnableTask("xx")의 인스턴스가 BlockingQueue에 보관된다.

> 생산자 : es.execute(작업) 호출 시 내부에서 BlockingQueue에 작업 보관

> 소비자 : 스레드 풀에 있는 스레드가 소비자, 이후 소비자 중 하나가 BlockingQueue의 작업을 받아 처리

 

 

 

실행 결과 분석

1. ThreadPoolExecutor 생성

 

-> ThreadPoolExecutor 생성 시점에 스레드 풀에 스레드를 미리 만들어두지는 않는다.

 

2. es.execute("taskA ~ taskD") 호출

 

 

-> main 스레드는 작업을 큐에 보관까지만 하고 바로 다음 코드를 수행한다.

-> taskA ~ taskD 요청이 블로킹 큐에 들어온다.

-> 최초 작업이 들어오면 이때 스레드를 만든다. 

-> taskA가 들어오는 시점에 스레드1을, taskB가 들어오는 시점에 스레드2를 생성한다.

-> corePoolSize 크기까지만 스레드를 만든다. 이후에는 스레드를 생성하지 않고 앞서 만든 스레드를 재사용한다.

 

3. 작업 완료

 

-> 작업이 완료되면 스레드는 다시 스레드 풀에서 대기

 

4. close() 호출

 

-> close() 호출 시 ThreadPoolExecutor가 종료된다. 이때 스레드 풀에 대기하는 스레드도 제거된다.

 

* close()는 자바 19부터 지원하는 메서드로 shutdown() 대신 호출


Runnable의 불편함

RunnableMain.java : 스레드에서 무작위 값을 하나 구현하는 코드

package thread.executor.future;

import java.util.Random;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class RunnableMain {
    public static void main(String[] args) throws InterruptedException {
        MyRunnable task = new MyRunnable();
        Thread thread = new Thread(task, "Thread-1");
        thread.start();
        thread.join(); // main이 Thread-1 스레드가 종료될 때까지 대기
        int result = task.value;
        log("result value = " + result);
    }
    
    static class MyRunnable implements Runnable {

        int value;
        
        @Override
        public void run() {
            log("Runnable 시작");
            sleep(2000);
            value = new Random().nextInt(10);
            log("create value = " + value);
            log("Runnable 완료");
        }
    }
}

 

실행 결과

 

별도 스레드에서 만든 랜덤 값을 하나 받아오는 과정이 복잡하다.

작업 스레드(Thread-1)은 어딘가에 값을 보관해두어야 하고, 요청 스레드(main 스레드)는 작업이 끝날 때까지 join으로 대기한 후 보관된 값을 찾아 꺼내야만 한다.

 

-> 이런 문제를 해결하기 위해 Executor 프레임워크는 Callable과 Future라는 인터페이스를 도입했다.


Future1 - 시작

Callable 인터페이스

 

-> java.util.concurrent에서 제공되는 기능

-> call()의 반환 타입이 제네릭 V

-> Exception을 던지고 있음 = 해당 인터페이스를 구현하는 모든 메서드는 체크 예외인 Exception과 하위 예외를 던질 수 있음

 

CallableMainV1.java

package thread.executor.future;

import java.util.Random;
import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class CallableMainV1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<Integer> future = es.submit(new MyCallable());
        Integer result = future.get();
        log("result value = " + result);
        es.shutdown(); // es.close();
    }

    static class MyCallable implements Callable<Integer> {

        @Override
        public Integer call() {
            log("Callable 시작");
            sleep(2000);
            int value = new Random().nextInt(10);
            log("Callable 완료");
            return value;
        }
    }
}

 

실행 결과

 

 

 

submit()

 

> ExecuterService가 제공하는 submit()으로 Callable을 작업으로 전달

> MyCallable 인스턴스가 블로킹 큐에 전달되고 스레드 풀의 스레드 중 하나가 이 작업을 실행

> 작업의 처리 결과는 Future라는 인터페이스를 통해 반환

 

 

 

Executor 프레임워크 강점

요청 스레드가 결과를 받아야 한다면, Callable이 훨씬 더 편리 -> 스레드를 생성하거나 join()으로 스레드 제어하는 코드가 없음

마치 싱글 스레드 방식으로 개발한다는 느낌이 든다.

 

하지만 만약, future.get()을 호출했을 때 MyCallable 작업을 처리하는 스레드 풀의 스레드가 작업을 완료하지 않았다면?


Future2 - 분석

Future는 미래의 결과를 받을 수 있는 객체라는 뜻

Future<Integer> future = es.submit(new MyCallable());

 

> sumbit()의 호출로 MyCallable의 인스턴스 전달

> MyCallable.call()이 반환하는 무작위 숫자 대신 Future를 반환

> MyCallable.call()은 호출 스레드가 실행하는 것이 아닌, 스레드 풀의 다른 스레드가 실행하기에 언제 실행이 완료되어 결과를 반환할지 알 수 없음

> 결과를 즉시 받는 것이 불가능하기에 결과를 나중에 받을 수 있는 Future라는 객체를 대신 제공

 

 

 

CallableMainV1.java에 로그 추가

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(1);
        log("submit() 호출");

        Future<Integer> future = es.submit(new MyCallable());
        log("future 즉시 반환, future = " + future);

        log("future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING");
        Integer result = future.get();
        log("future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE");

        log("result value = " + result);
        log("future 완료, future = " + future);
        es.shutdown(); // es.close();
    }

 

실행 결과

 

실행 결과 분석

* 편의성 스레드 풀에 스레드가 1개 있다고 가정, MyCallable 인스턴스를 taskA라고 가정

 

1. submit()을 호출해 ExecutorService에 taskA 전달

 

2. Future 생성

 

> 요청 스레드는 es.submit(taskA)를 호출하고 있는 중

> ExecutorService는 taskA의 Future 객체 생성 -> 이때 생성되는 실제 구현체는 FutureTask

> 생성한 Future 객체 안에 taskA 인스턴스 보관

> Future는 내부에 taskA 작업 완료 여부와 작업의 결과 값을 가짐

 

3. Future는 값을 즉시 반환

 

> sumbit() 호출 시 Future가 만들어지고, taskA가 바로 블로킹 큐에 담기는 것이 아닌 taskA를 감싸고 있는 Future가 블로킹 큐에 담김

 

14::50::25.489 [     main] future 즉시 반환, 
future = java.util.concurrent.FutureTask@5674cd4d[Not completed, 
task = thread.executor.future.CallableMainV2$MyCallable@85ede7b]

-> 로그를 보면, Future의 구현체는 FutureTask

-> Future의 상태는 "Not completed"이고, 연관된 작업은 전달한 taskA(MyCallable 인스턴스)

-> 생성한 Future를 즉시 반환하기에 요청 스레드는 대기하지 않고 즉시 다음 코드 호출

 

4. 스레드 풀의 스레드1이 작업 시작

 

> 큐의 Future[taskA]를 꺼내 스레드 풀의 스레드1이 작업 시작

> Future의 FutureTask는 Runnable 인터페이스도 구현하고 있음

> 스레드1이 FutureTask의 run() 메서드 수행

> run()이 taskA의 call()을 호출하고 결과를 받아 처리

> FutureTask.run() -> MyCallable.call()

 

14::50::25.489 [     main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
14::50::27.495 [pool-1-thread-1] Callable 완료
14::50::27.497 [     main] future.get() [블로킹] 메서드 호출 완료 -> main 스레드 RUNNABLE

-> 요청 스레드는 Future 인스턴스 참조를 가지고 있어 필요할 때 Future.get()을 호출해 taskA의 미래 결과를 받을 수 있음

-> 요청 스레드가 Future.get()을 호출하면 Future가 완료 상태가 될 때 까지 대기(RUNNABLE -> WAITING)

 

* Future.get() 호출 시

1) Future 완료 : Future에 결과가 포함되어 있어 요청 스레드는 즉시 값을 반환 받음

2) Future 진행중 : 요청 스레드가 마치 락을 얻을 때처럼, 결과를 얻기 위해 대기 = 블로킹

 

5. Future 완료

 

> taskA 작업을 완료하면 Future에 taskA 반환 결과를 담고 상태를 완료로 변경

> 요청 스레드를 깨운다 -> WAITING에서 RUNNABLE

> 작업을 마친 스레드1은 스레드 풀로 반환된다 -> RUNNABLE에서 WAITING


Future3 - 활용

SumTaskMainV2.java : 1~50까지 더하는 스레드1과 51~100까지 더하는 스레드2

package thread.executor.future;

import java.util.concurrent.*;

import static util.MyLogger.log;

public class SumTaskMainV2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        SumTask task1 = new SumTask(1, 50);
        SumTask task2 = new SumTask(51, 100);

        ExecutorService es = Executors.newFixedThreadPool(2);

        Future<Integer> future1 = es.submit(task1);
        Future<Integer> future2 = es.submit(task2);

        Integer sum1 = future1.get();
        Integer sum2 = future2.get();

        log("task1.result = " + sum1);
        log("task2.result = " + sum2);

        int sumAll = sum1 + sum2;
        log("sumAll = " + sumAll);
        log("End");

        es.shutdown();
    }

    static class SumTask implements Callable<Integer> {
        int startValue;
        int endValue;

        public SumTask(int startValue, int endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        public Integer call() throws Exception {
            log("작업 시작");
            Thread.sleep(2000);
            int sum = 0;
            for(int i=startValue; i<=endValue; i++) {
                sum += i;
            }
            log("작업 완료 result = " + sum);
            return sum;
        }
    }
}

-> 코드가 직관적이고 깔끔함

 

실행 결과


Future4 - 이유

요청 스레드가 task1, task2를 동시에 요청할 수 있다.

 

<task1>

요청 스레드는 task1을 ExecutorService에 요청

요청 스레드는 즉시 Future를 반환받고 작업 스레드1이 task1 수행

 

<task2>

요청 스레드는 task2을 ExecutorService에 요청

요청 스레드는 즉시 Future를 반환받고 작업 스레드2이 task2 수행

 

=> 두 작업은 동시에 수행

 

요청 스레드가 future1.get() 호출 후 2초 대기

이후 future2.get() 호출 시에는 이미 2초간 작업을 완료했기에 즉시 결과 반환

 

 

 

Future을 잘못 활용한 예시들

 

Future<Integer> future1 = es.submit(task1); // non-blocking 
Integer sum1 = future1.get(); // blocking, 2초 대기

Future<Integer> future2 = es.submit(task2); // non-blocking 
Integer sum2 = future2.get(); // blocking, 2초 대기

-> 총 4초 소요

 

Integer sum1 = es.submit(task1).get(); // get()에서 블로킹 
Integer sum2 = es.submit(task2).get(); // get()에서 블로킹

-> 총 4초 소요

 

 

정리

1. Future라는 개념이 없다면 결과를 받을 때 까지 요청 스레드는 아무 일도 못하고 대기

2. Future 덕분에 요청 스레드는 대기하지 않고 다른 작업 수행 가능

3. 필요한 요청을 한 다음 Future.get()을 통해 블로킹 상태로 대기하며 결과를 받으면 된다!


Future5 - 주요 메서드

1. cancel

boolean cancel(boolean mayInterruptIfRunning)

기능 : 아직 완료되지 않은 작업 취소

반환값 : 작업이 성공적으로 취소된 경우 true, 이미 완료되었거나 취소할 수 없는 경우 false

설명 : 작업이 실행 중이 아니거나 아직 시작되지 않았으면 취소하고 실행 중인 작업의 경우 mayInterruptIfRunning이 true면 중단 시도

 

2. isCancelled()

boolean isCancelled()

기능 : 작업 취소 여부 확인

반환값 : 작업이 취소된 경우 true, 그렇지 않은 경우 false

설명 : 작업이 cancel() 메서드에 의해 취소된 경우 true 반환

 

3. isDone()

boolean isDone()

기능 : 작업 완료 여부 확인

반환값 : 작업이 완료된 경우 true, 그렇지 않은 경우 false

설명 : 작업이 정상적으로 완료되었거나 취소되었거나 예외가 발생해 종료된 경우 true 반환

 

4. state()

State state()

기능 : Future의 상태 반환, 자바 19부터 지원

 

> RUNNING : 작업 실행중

> SUCCESS : 성공 완료

> FAILED : 실패 완료

> CANCELLED : 취소 완료

 

5. get()

V get()

기능 : 작업이 완료될 때까지 대기하고 완료되면 결과 반환

반환값 : 작업 결과

설명 : 작업이 완료될 때까지 get()을 호출한 현재 스레드를 대기(블로킹)

 

6. get(long timeout, TimeUnit unit)

V get(long timeout, TimeUnit unit)

기능 : get()과 같으나 시간이 초과되면 TimeoutException 발생


Future6 - 취소

cancel()의 동작을 살펴보자.

 

FutureCancelMain.java

package thread.executor.future;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class FutureCancelMain {

    private static boolean mayInterruptIfRunning = true;

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1);
        Future<String> future = es.submit(new MyTask());

        sleep(3000);

        log("future.cancel(" + mayInterruptIfRunning + ") 호출");
        boolean cancelResult = future.cancel(mayInterruptIfRunning);
        log("cancel" + mayInterruptIfRunning + ") result : " + cancelResult);

        try {
            log("Future result: " + future.get());
        } catch(CancellationException e) {
            log("Future는 이미 취소되었습니다.");
        } catch(InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        es.shutdown();
    }

    static class MyTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            try {
                for (int i = 0; i < 10; i++) {
                    log("작업중 : " + i);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                    log("인터럽트 발생");
                    return "Interrupted";
                }

            return "Completed";
            }
        }
}

 

mayInterruptIfRunning = true 결과

-> 작업이 실행중이라면 interrupt()를 호출해 작업 중단

 

mayInterruptIfRunning = false 결과

-> 이미 실행 중인 작업을 중단하지 않음

-> 하지만 cancel()을 호출했기에 Future은 CANCEL 상태가 되고 이후 get() 호출 시 CancellationException 예외 발생


ExecutorService - 작업 컬렉션 처리

ExecutorService는 여러 작업을 한 번에 편리하게 처리하는 invokeAll(), invokeAny() 기능을 제공한다.

 

invokeAll()

: 모든 Callable 작업을 제출하고, 모든 작업이 완료될 때까지 기다림

 

invokeAny()

: 지정된 시간 내에 하나의 Callable 작업이 완료될 때까지 기다리고, 가장 먼저 완료된 작업의 결과 반환

: 완료되지 않은 나머지 작업은 취소