본문 바로가기
인프런/김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성

[인프런] 김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 / 8. 생산자 소비자 문제 2

by hxxyeoniii 2025. 3. 27.

Lock Condition

생산자용, 소비자용 대기 집합을 서로 나누어 분리한다면 비효율 문제를 해결할 수 있다.

 

BoundedQueueV4.java

우선 대기 집합 분리 전, synchronized 대신 Lock 인터페이스와 ReentrantLock 구현체를 사용하도록 코드 수정

private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition(); // 대기 집합

 

 

 

Condition

1. ReentrantLock을 사용하는 스레드가 대기하는 스레드 대기 공간

2. lock.condition()으로 스레드 대기 공간이 만들어짐

3. Object.wait()의 스레드 대기 공간은 모든 객체 인스턴스가 내부에 기본으로 가지고 있으나 Lock(ReentrantLock)을 사용하는 경우는 스레드 대기 공간을 직접 만들어 사용해야 함

 

 

 

put() 수정

 

-> condition.await() : 지정한 condition에 현재 스레드를 대기(WAITING) 상태로 보관

-> condition.signal() : notify()랑 유사, 지정한 condition에서 대기중인 스레드를 하나 깨움

 

 

 

take() 수정


생산자 소비자 대기 공간 분리

아래와 같이 생산자 스레드를 위한 스레드 대기 공간과 소비자 스레드를 위한 스레드 대기 공간을 분리해 따로 만들어보자.

 

 

 

BoundedQueueV5.java

private final Condition producerCond = lock.newCondition(); // 생산자 대기 집합
private final Condition consumerCond = lock.newCondition(); // 소비자 대기 집합

 

 

 

put() 수정

 

-> producerCond.await()로 생산자 스레드를 생산자 전용 스레드 대기 공간에 보관

-> consumerCond.signal()로 대기중인 소비자 스레드를 깨움

 

 

 

take() 수정

 

-> consumerCond.await()로 소비자 스레드를 소비자 전용 스레드 대기 공간에 보관

-> producerCond.signal()로 생산자 스레드를 깨움

 

=> 생산자는 소비자를 깨우고, 소비자는 생산자를 깨움으로써 비효율 제거

 

 

 

Object.notify() vs Condition.signal()

1. Object.notify()

   > 대기 중인 스레드 중 임의의 하나를 깨움

   > 스레드가 깨어나는 순서는 정의되어 있지 않으며 JVM 구현에 따라 다르다. -> 보통은 먼저 들어온 스레드가 먼저 수행되지만 구현에 따라 다를 수 있음

   > synchronized 블록 내에 모니터 락을 가지고 있는 스레드가 호출해야 함

 

2. Condition.signal()

   > 대기 중인 스레드 중 하나를 깨우며, 일반적으로는 FIFO 순서로 깨움

   > 이는 자바 버전과 구현에 따라 달라질 수 있지만, 보통 Condition 구현은 Queue 구조를 사용하기에 FIFO 순서로 깨움

   > ReentrantLock을 가지고 있는 스레드가 호출해야 함


스레드의 대기 - 1. synchronized 대기

1.1 락 획득 대기

   > BLOCKED 상태로 락 획득 대기

   > synchronized를 시작할 때 락이 없으면 대기

   > 다른 스레드가 synchronized를 빠져나갈 때 대기가 풀리며 락 획득 시도

 

1.2 wait() 대기

   > WAITING 상태로 대기

   > wait() 호출 시 스레드 대기 집합에서 대기

   > 다른 스레드가 notify() 호출 시 빠져나감

 

 

 

락 대기 집합

사실은 BLOCKED 상태의 스레드도 자바 내부에서 따로 관리되고 있음!

 

1. 락 대기 집합에서 락을 기다리는 BLOCKED 상태의 스레드들 관리

2. 락 대기 집합은 모니터 락처럼 자바 내부에 구현되어 있어 개발자가 확인하기 어려움

3. 언젠가 c1이 락을 반납하면, 락 대기 집합에서 관리되는 스레드 중 하나가 락을 획득

 

 

1. 이후 p1이 큐에 데이터를 넣고 notify() 수행 시, c1이 스레드 대기 집합을 빠져나간다고 가정

2. c1은 스레드 대기 집합소에서 나가, 락 대기 집합으로 들어감

   -> 락 대기 집합이 1차 대기소 & 스레드 대기 집합이 2차 대기소

   -> c1은 2차 대기소 뿐 아니라, 1차 대기소까지 빠져나와야 임계 영역에서 로직을 수행할 수 있음(= 2중 감옥) 

 

 

 

정리

자바의 모든 객체 인스턴스는 멀티스레드와 임계 영역을 다루기 위해 내부에 3가지 기본 요소를 가짐

1. 모니터 락

2. 락 대기 집합(1차 대기소)

3. 스레드 대기 집합(2차 대기소)

 

1. synchronized를 사용한 임계 영역에 들어가려면 모니터 락이 필요

2. 모니터 락이 없으면 락 대기 집합에서 BLOCKED 상태로 락을 기다림

3. 모니터 락을 반납하면 락 대기 집합의 스레드 중 하나가 락을 획득하고 BLOCKED -> RUNNABLE이 됨

4. wait()를 호출해 스레드 대기 집합에 들어가기 위해서는 모니터 락이 필요

5. 스레드 대기 집합에 들어가면 모니터 락 반납

6. 스레드가 notify() 호출 시 스레드 대기 집합의 스레드 중 하나가 스레드 대기 집합을 빠져나오며 모니터 락 획득 시도

   -> 모니터 락을 획득하면 임계 영역 수행

   -> 모니터 락을 획득하지 못하면 락 대기 집합에 들어가 BLOCKED 상태로 기다림

 

 

 

스레드의 대기 - 2. ReentrantLck 대기

 

1.1 ReentrantLock 락 획득 대기

   > ReentrantLock의 대기 큐에서 관리

   > WAITING 상태로 락 획득 대기

   > lock.lock() 호출 시 락이 없으면 대기

   > 다른 스레드가 lock.unlock() 호출 시 대기가 풀리며 락 획득 시도, 락을 획득하면 대기 큐를 빠져나감

 

1.2 await() 대기

   > condition.await() 호출 시 condition 객체의 스레드 대기 공간에서 관리

   > WAITING 상태로 대기

   > 다른 스레드가 condition.signal() 호출 시 condition 객체의 스레드 대기 공간에서 빠져나감

 

 

 

정리

synchronized와 마찬가지로 ReentrantLock도 대기소가 2단계로 되어있다.

2차 대기소인 condition 객체의 스레드 대기 공간을 빠져나온다고 바로 실행되는 것이 아니라 ReentrantLock의 락을 획득해야 RUNNABLE이 되며 다음 코드를 실행할 수 있다.


BlockingQueue

지금까지 구현한 BoundedQueueV5는 단순한 큐의 기능을 넘어 스레드를 효과적으로 제어하는 기능도 포함한다.

그리고 이미 이는 BlockingQueue로 구현되어 있다.

 

BlockingQueue

: 스레드 관점에서 보면 큐가 특정 조건이 만족될 때까지 스레드의 작업을 차단(blocking)한다.

 

1. 데이터 추가 차단 : 큐가 가득 차면 추가 작업(put)을 시도하는 스레드는 공간이 생길 때까지 차단됨

2. 데이터 획득 차단 : 큐가 비어 있으면 획득 작업(take)을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단됨

 

=> 자바는 생산자 소비자 문제, 한정된 버퍼라고 불리는 문제들을 해결하기 위해 java.util.concurrent.BlocingQueue라는 인터페이스와 구현체들을 제공한다.

 

 

 

java.util.concurrent.BlockingQueue

주요 메서드들

 

-> 데이터 추가 메서드 : add(), offer(), put(), offer(타임아웃)

-> 데이터 획득 메서드 : take(), poll(타임아웃), remove(..)

-> Queue를 상속 받았기에 추가로 큐의 기능들을 사용할 수 있음

 

 

 

BlockingQueue 인터페이스의 대표적 구현체

1. ArrayBlockingQueue : 배열 기반 구현, 버퍼의 크기가 고정

2. LinkedBlockingQueue : 링크 기반 구현, 버퍼의 크기를 고정 또는 무한히 사용 가능

 

 

 

BoundedQueueV6_1.java

package thread.bounded;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BoundedQueueV6_1 implements BoundedQueue {

    private BlockingQueue<String> queue;

    public BoundedQueueV6_1(int max) {
        this.queue = new ArrayBlockingQueue<>(max);
    }

    @Override
    public void put(String data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return "BoundedQueueV6_1{" +
                "queue=" + queue +
                '}';
    }
}

-> 실제 기능은 BoundedAueueV5와 같음

 

 

 

ArrayBlockingQueue.put() 구현 내용 확인

 

-> notEmpty : 소비자 스레드가 대기하는 condition

-> notFull : 생산자 스레드가 대기하는 condition

 

 

 

 

-> 내부에서 ReentrantLock을 사용

-> 버퍼가 가득 차면 생산자 스레드는 생산자 대기실에서 await()

-> 생산자가 생산을 완료하면 소비자 대기실에 signal()으로 신호 전달


BlockingQueue 기능 설명

실무에서 멀티스레드를 사용할 때는 응답성이 중요

 

예를 들어, 생산자는 서버에 상품을 주문하는 고객

고객이 상품을 주문하면 고객 요청을 생산자 스레드가 큐에 넣는다고 가정

소비자 스레드는 주문 요청을 꺼내 주문을 처리하는 스레드

 

주문이 폭주하고 큐의 한계가 1000개라면, 그리고 소비자 스레드는 한 번에 겨우 10개의 주문만 처리할 수 있다면?

소비가 생산을 따라가지 못하고 큐가 가득 차게 됨

고객은 무한 대기하거나 나중에 다시 시도해달라는 요청을 받을 수 있음

 

 

 

큐가 가득 찼을 때, 선택지는 4가지

1. 예외를 던지고 예외를 받아 처리

2. 대기하지 않고 즉시 false 반환

3. 대기

4. 특정 시간 만큼 대기

 

 

 

BlockingQueue의 다양한 기능(from 공식 API)

Operation Throws Exception Special Value Blocks Times Out
Insert(추가) add(e) offer(e) put(e) offer(e, time, unit)
Remove(제거) remove(e) poll() take() poll(time, unit)
Examine(관찰) element() peek() not applicable not applicable

 

 

1. Throws Exception : 대기 시 예외

   > add(e) : 지정된 요소를 큐에 추가, 가득 차면 IllegalStateException 예외를 던짐

   > remove() : 큐에서 요서 제거해 반환, 큐가 비어 있으면 NoSuchElementException 예외를 던짐

   > element() : 큐의 머리 요소 반환(제거는 X), 큐가 비어 있으면 NoSuchElementException 예외를 던짐

 

2. Special Value : 대기시 즉시 반환

   > offer(e) : 지정된 요소를 큐에 추가, 가득 차면 false 반환

   > poll() : 큐에서 요소 제거하고 반환, 비어 있으면 null 반환

   > peek() : 큐의 머리 요소 반환(제거 X), 비어 있으면 null 반환

 

3. Blocks : 대기

   > put(e) : 지정된 요소를 큐에 추가할 때까지 대기, 가득 차면 공간이 생길 때까지 대기

   > take() : 큐에서 요소 제거하고 반환, 비어 있으면 요소가 준비될 때까지 대기

   > Excamine(관찰)

 

4. Times Out : 시간 대기

   > offer(e, time, unit) : 지정된 요소를 큐에 추가, 지정된 시간 동안 큐가 비워지기를 기다리다가 시간 초과 시 false 반환

   > poll(time, unit) : 큐에서 요소 제거하고 반환, 요소가 없다면 지정된 시간 동안 요소가 준비되길 기다리다가 시간 초과 시 null 반환

   > Examine(관찰)


정리

자바 1.5에 추가된 java.util.concurrent 패키지가 제공하는 Lock, ReentrantLock, Condition, BlockingQueue 등을 보면 참 견고하게 잘 만들어진 라이브러리라는 생각이 든다.

이는 다양한 동시성 시나리오를 대응할 수 있고, 개발자가 쉽고 편리하게 복잡한 동시성 문제를 다룰 수 있게 해준다.