생산자 소비자 문제 - 소개
멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제 중 하나
앞에서 살펴본 프린터 예제

1. 생산자(Producer)
: 데이터를 생성하는 역할
: 파일에서 데이터를 읽어오거나 네트워크에서 데이터를 받아오는 스레드
: 프린터 예제에서, 사용자 입력을 프린터 큐에 전달하는 스레드
2. 소비자(Consumer)
: 생성된 데이터를 사용하는 역할
: 데이터를 처리하거나 저장하는 스레드
: 프린터 예제에서, 프린터 큐에 전달된 데이터를 받아 출력하는 스레드
3. 버퍼(Buffer)
: 생산자가 생성한 데이터를 일시적으로 저장하는 공간
: 한정된 크기를 가지며, 생산자와 소비자가 이 버퍼를 통해 데이터를 주고 받음
: 프린터 예제에서, 프린터 큐가 버퍼 역할
생산자 소비자 문제 - 예제 1
BoundQueue 인터페이스 : 한정된 자원을 표현
package thread.bounded;
public interface BoundedQueue {
void put(String data);
String take();
}
BoundQueueV1.java
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import static util.MyLogger.log;
public class BoundedQueueV1 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max; // 한정된 버퍼
public BoundedQueueV1(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
if(queue.size() == max) {
log("[put] 큐가 가득 참, 버림: " + data);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
if(queue.isEmpty()) {
return null;
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
* 원칙적으로는 toString()에도 synchronized를 적용해야 하지만, 예제 코드를 단순히 유지하기 위해 붙이지 않음
ProducerTask.java : 소비자 스레드
package thread.bounded;
import static util.MyLogger.log;
public class ProducerTask implements Runnable {
private BoundedQueue queue;
private String request;
public ProducerTask(BoundedQueue queue, String request) {
this.queue = queue;
this.request = request;
}
@Override
public void run() {
log("[생산 시도]" + request + "->" + queue);
queue.put(request);
log("[생산 완료]" + request + "->" + queue);
}
}
ConsumerTask.java : 소비자 스레드
package thread.bounded;
import static util.MyLogger.log;
public class ConsumerTask implements Runnable {
private BoundedQueue queue;
public ConsumerTask(BoundedQueue queue) {
this.queue = queue;
}
@Override
public void run() {
log("[소비 시도] ? <-" + queue);
String data = queue.take();
log("[소비 완료]" + data + "<-" + queue);
}
}
BoundedMain.java
package thread.bounded;
import java.util.ArrayList;
import java.util.List;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class BoundedMain {
public static void main(String[] args) {
// 1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV1(2);
// 2. 생산자, 소비자 실행 순서 선택
producerFirst(queue); // 생산자 먼저 실행
//consumerFirst(queue); // 소비자 먼저 실행
}
private static void producerFirst(BoundedQueue queue) {
log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + "==");
List<Thread> threads = new ArrayList<>();
startProducer(queue, threads);
printAllState(queue, threads);
startConsumer(queue, threads);
printAllState(queue, threads);
log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + "==");
}
private static void consumerFirst(BoundedQueue queue) {
log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + "==");
List<Thread> threads = new ArrayList<>();
startConsumer(queue, threads);
printAllState(queue, threads);
startProducer(queue, threads);
printAllState(queue, threads);
log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + "==");
}
private static void startProducer(BoundedQueue queue, List<Thread> threads) {
System.out.println(); log("생산자 시작");
for (int i = 1; i <= 3; i++) {
Thread producer = new Thread(new ProducerTask(queue, "data" + i),
"producer" + i);
threads.add(producer);
producer.start();
sleep(100);
}
}
private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
System.out.println(); log("소비자 시작");
for (int i = 1; i <= 3; i++) {
Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
threads.add(consumer);
consumer.start();
sleep(100);
}
}
private static void printAllState(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("현재 상태 출력, 큐 데이터: " + queue);
for (Thread thread : threads) {
log(thread.getName() + ": " + thread.getState());
}
}
}
1. 생산자 먼저 실행하는 producerFirst 메서드
> startPropducer : 생산자 스레드 3개를 만들고 0.1초 간격으로 실행
> startConsumer : 소비자 스레드 3개를 만들고 0.1초 간격으로 실행


1. p1, p2가 각각 락을 얻고 data1과 data2를 큐에 집어넣는다.
1. p3은 data3을 큐에 저장하려고 하지만 큐가 가득차있기 때문에 데이터를 추가할 수 없다.
데이터를 버리지 않으려면?
data3을 버리지 않는 데안은, 큐가 가득 찼을 때 큐에 빈 공간이 생길 때까지 생산자 스레드가 기다리면 된다.
2. 소비자 먼저 실행하는 consumerFirst 메서드

문제점
1. 생산자 먼저 실행 : p3이 보관하는 data3은 버려지고 c3은 null을 받게 됨
2. 소비자 먼저 실행 : c1, c2, c3은 데이터를 받지 못하고 p3이 보관하는 data3은 버려짐
-> 버퍼가 가득 찬 경우와 버퍼가 빈 경우 문제가 발생함
-> 문제의 해결 방법은 스레드가 기다리면 되는 것
생산자 소비자 문제 - 예제 2
BoundedQueueV2.java
큐에 데이터를 집어넣거나 뺄때 sleep()으로 기다리면 되지 않을까?
put() 수정 : 큐가 가득찼다면 sleep()으로 대기

take() 수정 : 큐에 데이터가 없다면 sleep()으로 대기

실행 결과(생산자 먼저 실행)

실행 분석

1. 생산자 스레드인 p3는 임계 영역에 들어가기 위해 락을 획득함
2. 큐에 data3을 저장하려고 시도하지만 큐가 꽉 차있음
3. p3는 sleep(1000)으로 대기하고 이때 RUNNABLE -> TIMED_WAITING 상태가 됨
4. 1초마다 큐에 빈자리가 있는지 확인한다.
하지만, 이때 p3가 락을 가지고 있는 상태에서 큐에 빈 자리가 나올 때까지 대기한다는 것!

c1이 임계 영역에 들어가기 위해 락을 획득하려 하지만, p3이 락을 가지고 있기에 들어갈 수 없음
-> 심각한 무한 대기 문제 발생
-> 결국 c1, c2, c3은 BLOCKED 상태로 무한 대기
실행 결과(소비자 먼저 실행)

-> c1이 락을 획득하고 큐의 데이터를 획득하려 하지만, 데이터가 없어 RUNNABLE -> TIMED_WAITING 상태가 됨
-> c1이 락을 가지고 있기에 다른 스레드는 임계 영역에 들어올 수 없음
-> c2, c3, p1, p2, p3 모두 BLOCKED 상태가 되고 무한 대기함
생산자 소비자 문제 - 예제 3 : Object - wait, notify
락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다면?
Object 클래스는 이런 문제를 해결할 수 있는 wait(), notify() 메서드를 제공한다.
Object는 모든 자바 객체의 부모이기에 자바 기본 기능이라고 생각하면 된다.
Object.wait()
> 현재 스레드가 가진 락을 반납하고 대기(WAITING)함
> 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을 때만 호출 가능
> 호출한 스레드는 락을 반납하고, 다른 스레드가 해당 락을 획득할 수 있게 함
> 이렇게 대기하고 있는 스레드는 다른 스레드가 notify()나 notifyAll()을 호출할 때까지 대기 상태 유지
Object.notify()
> 대기 중인 스레드 중 하나를 깨움
> synchronized 블록이나 메서드에서 호출되어야 함
> 깨운 스레드는 락을 다시 획득할 기회를 얻게 됨
> 대기 중인 스레드가 여러 개라면, 그 중 하나만이 깨워지게 됨
Object.notifyAll()
> 대기 중인 모든 스레드를 깨움
> synchronized 블록이나 메서드에서 호출되어야 함
> 모든 대기 중인 스레드가 락을 획득할 기회를 얻게 됨
> 모든 스레드를 깨워야 할 때 유용
BoundedQueueV3.java
sleep() 대신 wait(), notify()를 사용해 put()과 take() 메서드 수정
put() 수정

-> 락을 획득한 생산자 스레드는 큐에 빈 공간이 생기는지 주기적으로 체크
-> 빈 공간이 없다면, Object.wait()로 대기하며 대기할 때 락을 반납하고 대기한다.
-> wait() 호출로 대기할 시, RUNNABLE -> WAITING 상태가 됨
-> 큐에 데이터가 없어 대기하는 소비자 스레드가 있을 때, notify() 호출로 소비자 스레드는 깨어나 저장된 데이터를 획득할 수 있음
take() 수정

-> 락을 획득한 소비자 스레드는 큐에 데이터가 있는지 주기적으로 체크
-> 큐에 데이터가 없다면, Object.wait()로 대기하며 대기할 때 락을 반납하고 대기
-> 대기하는 경우 RUNNABLE -> WAITING 상태가 됨
-> 큐에 데이터가 가득 차 대기하는 생산자 스레드가 있을 때, notify() 호출로 생산자 스레드가 깨어나 큐에 데이터를 저장할 수 있음
=> 생산자는 생산을 완료하면 notify()로 대기하는 스레드를 깨워 생산된 데이터를 가져가게 하고, 소비자는 소비를 완료하면 notify()로 대기하는 스레드를 깨워 데이터를 생산하라고 하게 됨!!
실행 결과(생산자 먼저 실행)

* 스레드 대기 집합(wait set)
: synchronized 임계 영역 안에서 Object.wati() 호출 시 스레드는 대기 상태에 들어감
: 대기 상태에 들어간 스레드를 관리하는 것을 대기 집합이라 한다. -> 모든 객체는 각자의 대기 집합을 가지고 있음
: 모든 객체는 모니터 락과 대기 집합을 가지고 있으며 둘은 한 쌍으로 사용된다.
-> 위 예제에서는 BoundedQueue(x001) 구현 인스턴스의 락과 대기 집합을 사용하는 것
실행 분석

1. p3이 데이터를 생산하려고 하는데 큐가 가득차 wait()를 호출한다.
2. wait()를 호출하면
> 락 반납
> 스레드 상태가 RUNNABLE -> WAITING으로 변경
> 스레드 대기 집합에서 관리
3. 스레드 대기 집합에서 관리되는 스레드는 이후 다른 스레드가 notify()로 신호를 주면 깨어날 수 있음

1. 이후 소비자 스레드가 데이터를 획득해 큐에 빈자리가 생김
2. 소비자 스레드는 notify() 호출로 스레드 대기 집합에 이 사실을 알려주고 대기 집합에 있는 스레드 중 하나를 깨움
3. 임계 영역에 있는 코드를 실행하려면 락이 필요!!
-> p3은 대기 집합에서는 나가지만, 여전히 임계 영역에 있으므로 락을 획득하기 위해 BLOCKED 상태로 대기
-> p3 : WAITING -> BLOCKED
4. 이후 p3이 락을 획득하면 대기 집합에 들어오게 된 wait() 이후의 코드 실행
실행 결과(소비자 먼저 실행)


실행 분석

1. 소비자 스레드 c1, c2, c3은 모두 WAITING 상태로 대기하며 스레드 대기 집합에 들어감
2. p1이 data1을 큐에 집어넣고 notify()로 대기 집합에 알려줌
3. 여기서 어떤 스레드가 먼저 깨어날지는 알 수 없음 : JVM 버전 환경 등에 따라 달라짐

1. 대기 집합에 있던 c1이 락을 획득하고 notify()로 대기 집합에 알려줌
2. 대기 집합에는 소비자 스레드만 있음 -> 의도와 다르게 소비자 스레드가 깨어남
3. c1이 c2를 깨웠으나 큐가 비어있어 c2는 wait()를 호출해 대기 상태로 변함
4. c1이 c2나 c3을 깨우고 이때 큐에 데이터가 없다면, 깨어난 소비자 스레드는 CPU 자원만을 소비하고 다시 대기 집합에 들어가게 됨
-> 비효율적이나 결과적으로는 문제가 없음
-> 약간 돌아서 갈뿐..
Object - wait, notify의 한계
앞에서 살펴본 것 처럼 큐에 데이터가 없는데 소비자를 깨우거나, 큐에 데이터가 가득 찼는데 생산자를 깨우는 비효율이 발생할 수 있음
1. 같은 종류의 스레드를 깨울 때 비효율이 발생한다.
: 생산자가 생산자를 깨우거나 소비자가 소비자를 깨울 때
2. 스레드 기아(thread starvation)
: notify()는 어떤 스레드가 깨어날 지 알 수 없음

-> 큐가 비어있는데, c1 ~ c5가 반복해서 깨어난다면 p1은 실행 순서를 얻지 못하다가 아주 나중에 깨어날 수 있음
-> 이를 해결하기 위해 notifyAll()을 사용할 수 있다.
notifyAll()
: 스레드 대기 집합에 있는 모든 스레드를 한번에 다 깨움

1. 대기 집합에 있는 모든 스레드가 깨어남
2. 모든 스레드는 임계 영역 안에 있어 락을 획득해야 함
3. 락을 획득하지 못하면 BLOCKED 상태가 됨
4. c1 ~ c5이 락을 획득한다면 큐에 데이터가 없어 다시 스레드 대기 집합에 들어감
5. p1이 가장 늦게 락을 획득해도 c1 ~ c5가 모두 스레드 대기 집합에 들어가므로 결국 락을 획득하게 됨
-> notifyAll()로 스레드 기아 문제는 막을 수 있지만 비효율을 막지는 못한다.
'인프런 > 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성' 카테고리의 다른 글
| [인프런] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 / 9. CAS - 동기화와 원자적 연산 (0) | 2025.03.31 |
|---|---|
| [인프런] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 / 8. 생산자 소비자 문제 2 (0) | 2025.03.27 |
| [인프런] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 / 7. 고급 동기화 - concurrent.Lock (0) | 2025.03.20 |
| [인프런] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 / 6. 동기화 - synchronized (0) | 2025.03.13 |
| [인프런] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 / 5. 메모리 가시성 (0) | 2025.02.26 |