728x90
반응형
SMALL

참고자료:

 

김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 강의 | 김영한 - 인프런

김영한 | 멀티스레드와 동시성을 기초부터 실무 레벨까지 깊이있게 학습합니다., 국내 개발 분야 누적 수강생 1위, 제대로 만든 김영한의 실전 자바[사진][임베딩 영상]단순히 자바 문법을 안다?

www.inflearn.com

 

생산자 소비자 문제

생산자 소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제 중 하나로, 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룬다.

 

멀티스레드의 핵심을 제대로 이해하려면 반드시 생산자 소비자 문제를 이해하고, 올바른 해결 방안도 함께 알아두어야 한다. 생산자 소비자 문제를 제대로 이해하면 멀티스레드를 제대로 이해했다고 볼 수 있다. 그만큼 중요한 내용이다.

 

 

  • 생산자(Producer): 데이터를 생성하는 역할을 한다. 예를 들어, 파일에서 데이터를 읽어오거나 네트워크에서 데이터를 받아오는 스레드가 생산자 역할을 할 수 있다.
    • 위 프린터 예제에서 사용자의 입력을 프린터 큐에 전달하는 스레드가 생산자의 역할이다.
  • 소비자(Consumer): 생성된 데이터를 사용하는 역할을 한다. 예를 들어, 데이터를 처리하거나 저장하는 스레드가 소비자 역할을 할 수 있다.
    • 위 프린터 예제에서 프린터 큐에 전달된 데이터를 받아서 출력하는 스레드가 소비자 역할이다.
  • 버퍼(Buffer): 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며, 생산자와 소비자가 이 버퍼를 통해 데이터를 주고 받는다.
    • 위 프린터 예제에서 프린터 큐가 버퍼 역할이다.

 

그럼 이게 왜 문제가 된다는 것일까?

문제 상황

  • 생산자가 너무 빠를 때: 버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. 버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다.
  • 소비자가 너무 빠를 때: 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. 버퍼가 비어있을 때 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 한다.
예를 들면, 초밥집에 가서 주방장님이 초밥을 하나씩 만들어서 서빙 카운터에 초밥을 하나씩 올려 놓는데 더 이상 올려 놓을 곳이 없어 초밥을 만들지 못하는 경우가 생산자가 너무 빠른 경우이고, 반대로 서빙 카운터에 초밥을 내려놓자마자 손님이 초밥을 다 먹어버려서 음식이 나올때까지 기다려야 하는 경우가 소비자가 너무 빠른 경우이다. 이때 서빙 카운터는 버퍼라고 볼 수 있다.

 

이 문제는 다음 두 용어로도 불린다.

  • 생산자 소비자 문제(producer-consumer problem): 생산자 소비자 문제는, 생산자 스레드와 소비자 스레드가 특정 자원을 함께 생산하고 소비하면서 발생하는 문제이다.
  • 한정된 버퍼 문제(bounded-buffer problem): 이 문제는 결국 중간에 있는 버퍼의 크기가 한정되어 있기 때문에 발생한다. 따라서 한정된 버퍼 문제라고도 한다.

 

이 내용에 대해 예제를 만들어보고 직접 눈으로 보면서 이해해보자!

생산자 소비자 문제 - 예제 코드

BoundedQueue

package thread.bounded;

public interface BoundedQueue {
    void put(String data);

    String take();
}
  • BoundedQueue: 버퍼 역할을 하는 큐의 인터페이스이다.
  • put(data): 버퍼에 데이터를 보관한다. (생산자 스레드가 호출하고, 데이터를 생산한다.)
  • take(): 버퍼에 보관된 값을 가져간다. (소비자 스레드가 호출하고, 데이터를 소비한다.)

BoundedQueueV1

package thread.bounded;

import java.util.ArrayDeque;
import java.util.Queue;

import static util.MyLogger.log;

public class BoundedQueueV1 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV1(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        if (queue.size() == max) {
            log("[put] 큐가 가득 찼습니다. 버립니다." + data);
            return;
        }
        queue.offer(data);
    }

    @Override
    public synchronized String take() {
        if (queue.isEmpty()) {
            return null;
        }
        return queue.poll();
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
  • BoundedQueueV1: 한정된 버퍼 역할을 하는 가장 단순한 구현체이다. 이후에 버전이 점점 올라가면서 코드를 개선한다.
  • Queue, ArrayDeque: 데이터를 중간에 보관하는 버퍼로 큐(Queue)를 사용한다. 구현체로는 ArrayDeque를 사용한다.
  • int max: 한정된(Bounded) 버퍼이므로, 버퍼에 저장할 수 있는 최대 크기를 지정한다.
  • put(): 큐에 데이터를 저장한다. 큐가 가득 찬 경우, 더는 데이터를 보관할 수 없으므로 데이터를 버린다.
  • take(): 큐의 데이터를 가져간다. 큐에 데이터가 없는 경우 null을 반환한다.
  • toString(): 버퍼 역할을 하는 queue 정보를 출력한다.
주의! 원칙적으로는 toString()에도 synchronized를 적용해야 맞다. 그래야 toString()을 통한 조회 시점에도 정확한 데이터를 조회할 수 있다. 예를 들어, toString()을 호출하는 시점에 모니터 락을 사용하지 않으면 다른 스레드가 put을 하는 동시에 이 toString()을 호출해서 실제 데이터와 다른 데이터가 출력될 수 있으니까. 그러나, 예제 코드를 단순하게 유지하고 목적에 부합한 결과를 출력하기 위해 의도적으로 synchronized를 넣지 않았다.

 

임계 영역

여기서 핵심 공유 자원은 바로 queue이다. 여러 스레드가 접근할 예정이므로 synchronized를 사용해서 한번에 하나의 스레드만 put() 또는 take()를 실행할 수 있도록 안전한 임계 영역을 만든다. 

 

ProducerTask

package thread.bounded;

import static util.MyLogger.log;

public class ProducerTask implements Runnable {

    private BoundedQueue queue;
    private String request;

    public ProducerTask(BoundedQueue queue, String request) {
        this.queue = queue;
        this.request = request;
    }

    @Override
    public void run() {
        log("[생산 시도] " + request + " -> " + queue);
        queue.put(request);
        log("[생산 완료] " + request + " -> " + queue);
    }
}
  • ProducerTask: 데이터를 생성하는 생성자 스레드가 실행하는 클래스, Runnable을 구현한다.
  • 스레드를 실행하면 queue.put(request)를 호출해서 전달된 데이터(request)를 큐에 보관한다.

ConsumerTask

package thread.bounded;

import static util.MyLogger.log;

public class ConsumerTask implements Runnable {

    private BoundedQueue queue;

    public ConsumerTask(BoundedQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        log("[소비 시도]     ? <- " + queue);
        String data = queue.take();
        log("[소비 완료] " + data +  " <- " + queue);
    }
}
  • ConsumerTask: 데이터를 소비하는 소비자 스레드가 실행하는 클래스, Runnable을 구현한다.
  • 스레드를 실행하면 queue.take()를 호출해서 큐의 데이터를 가져와서 소비한다.

 

BoundedMain

package thread.bounded;

import java.util.ArrayList;
import java.util.List;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class BoundedMain {

    public static void main(String[] args) {
        //1. BoundedQueue 선택
        BoundedQueue queue = new BoundedQueueV1(2);

        //2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
        //producerFirst(queue);
        consumerFirst(queue);
    }

    private static void producerFirst(BoundedQueue queue) {
        log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startProducer(queue, threads);
        printAllState(queue, threads);
        startConsumer(queue, threads);
        printAllState(queue, threads);
        log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void consumerFirst(BoundedQueue queue) {
        log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startConsumer(queue, threads);
        printAllState(queue, threads);
        startProducer(queue, threads);
        printAllState(queue, threads);
        log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void startProducer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("생산자 시작");

        for (int i = 1; i <= 3; i++) {
            Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
            threads.add(producer);
            producer.start();
            sleep(100);
        }
    }

    private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("소비자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
            threads.add(consumer);
            consumer.start();
            sleep(100);
        }
    }

    private static void printAllState(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("현재 상태 출력, 큐 데이터: " + queue);
        for (Thread thread : threads) {
            log(thread.getName() + ": " + thread.getState());
        }
    }
}
  • 조금 길지만 하나씩 살펴보면, 우선 BoundedQueue를 구현한 특정 구현체를 선택하는 queue가 있다. V1이기 때문에 최대 사이즈를 생성자에 넣어준다. 
  • 생산자, 소비자의 실행 순서를 선택할 수 있어야 한다. 이건 의도적으로 생산자를 먼저 실행하거나 소비자를 먼저 실행하도록 구현했다. (producerFirst(queue), consumerFirst(queue))
    • 이렇게 만들어 두었으면 반드시 둘 중 하나만 사용해서 실행해야 한다. 의도적으로 소비자 또는 생산자가 먼저 실행될 수 있도록 만들었다.
private static void producerFirst(BoundedQueue queue) {
    log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
    List<Thread> threads = new ArrayList<>();
    startProducer(queue, threads);
    printAllState(queue, threads);
    startConsumer(queue, threads);
    printAllState(queue, threads);
    log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
}
  • threads: 스레드의 결과 상태를 한꺼번에 출력하기 위해 생성한 스레드를 보관해두는 리스트이다. startProducer, startConsumer에 파라미터로 전달되어 각 메서드에서 만들어지는 생산자 스레드와 소비자 스레드가 담기게 된다.
  • startProducer: 생산자 스레드를 만들어서 실행하는 메서드이다.
  • printAllState: 모든 스레드의 상태를 출력한다.
  • startConsumer: 소비자 스레드를 만들어서 실행하는 메서드이다.
private static void consumerFirst(BoundedQueue queue) {
    log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
    List<Thread> threads = new ArrayList<>();
    startConsumer(queue, threads);
    printAllState(queue, threads);
    startProducer(queue, threads);
    printAllState(queue, threads);
    log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
}
  • producerFirst 메서드와 완전히 동일하고 먼저 실행되는 메서드가 startProducer가 아니라 startConsumer라는 차이만 있다.
private static void startProducer(BoundedQueue queue, List<Thread> threads) {
    System.out.println();
    log("생산자 시작");

    for (int i = 1; i <= 3; i++) {
        Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
        threads.add(producer);
        producer.start();
        sleep(100);
    }
}
  • 3개의 생산자 스레드를 만들고 각 스레드들을 실행한다. 로그 출력을 조금 알아보기 쉽게 하기 위해 sleep(100)을 추가했다. 의도적으로 추가한 것이고 실제 업무라면 없는게 맞다. 이렇게 잠시 0.1초 동안 대기하게 만들어 두면 로그로 producer1 → producer2producer3 순으로 이쁘게 출력될 것이다.
  • 만들어지는 각각의 생산자 스레드 모두 파라미터로 넘어온 threads에 추가된다.
private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
    System.out.println();
    log("소비자 시작");
    for (int i = 1; i <= 3; i++) {
        Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
        threads.add(consumer);
        consumer.start();
        sleep(100);
    }
}
  • startProducer와 완전히 동일한 코드이지만 생산자 스레드를 만드는 게 아니라 소비자 스레드를 만들어 실행한다.
private static void printAllState(BoundedQueue queue, List<Thread> threads) {
    System.out.println();
    log("현재 상태 출력, 큐 데이터: " + queue);
    for (Thread thread : threads) {
        log(thread.getName() + ": " + thread.getState());
    }
}
  • 파라미터로 넘어온 threads에 담긴 모든 스레드(생산자, 소비자)의 상태를 출력한다.
  • 파라미터로 넘어온 queue의 상태를 출력한다.

 

생산자 먼저 실행하는 코드로 실행

public static void main(String[] args) {
    //1. BoundedQueue 선택
    BoundedQueue queue = new BoundedQueueV1(2);

    //2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
    producerFirst(queue);
    //consumerFirst(queue);
}

실행 결과

2024-07-24 13:14:14.019 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV1 ==

2024-07-24 13:14:14.023 [     main] 생산자 시작
2024-07-24 13:14:14.033 [producer1] [생산 시도] data1 -> []
2024-07-24 13:14:14.033 [producer1] [생산 완료] data1 -> [data1]
2024-07-24 13:14:14.129 [producer2] [생산 시도] data2 -> [data1]
2024-07-24 13:14:14.129 [producer2] [생산 완료] data2 -> [data1, data2]
2024-07-24 13:14:14.229 [producer3] [생산 시도] data3 -> [data1, data2]
2024-07-24 13:14:14.230 [producer3] [put] 큐가 가득 찼습니다. 버립니다.data3
2024-07-24 13:14:14.230 [producer3] [생산 완료] data3 -> [data1, data2]

2024-07-24 13:14:14.330 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-24 13:14:14.330 [     main] producer1: TERMINATED
2024-07-24 13:14:14.331 [     main] producer2: TERMINATED
2024-07-24 13:14:14.331 [     main] producer3: TERMINATED

2024-07-24 13:14:14.331 [     main] 소비자 시작
2024-07-24 13:14:14.332 [consumer1] [소비 시도]     ? <- [data1, data2]
2024-07-24 13:14:14.333 [consumer1] [소비 완료] data1 <- [data2]
2024-07-24 13:14:14.437 [consumer2] [소비 시도]     ? <- [data2]
2024-07-24 13:14:14.438 [consumer2] [소비 완료] data2 <- []
2024-07-24 13:14:14.539 [consumer3] [소비 시도]     ? <- []
2024-07-24 13:14:14.539 [consumer3] [소비 완료] null <- []

2024-07-24 13:14:14.643 [     main] 현재 상태 출력, 큐 데이터: []
2024-07-24 13:14:14.643 [     main] producer1: TERMINATED
2024-07-24 13:14:14.643 [     main] producer2: TERMINATED
2024-07-24 13:14:14.644 [     main] producer3: TERMINATED
2024-07-24 13:14:14.644 [     main] consumer1: TERMINATED
2024-07-24 13:14:14.644 [     main] consumer2: TERMINATED
2024-07-24 13:14:14.645 [     main] consumer3: TERMINATED
2024-07-24 13:14:14.645 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV1 ==

 

소비자 먼저 실행하는 코드로 실행

public static void main(String[] args) {
    //1. BoundedQueue 선택
    BoundedQueue queue = new BoundedQueueV1(2);

    //2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
    //producerFirst(queue);
    consumerFirst(queue);
}

실행 결과

2024-07-24 13:15:15.838 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV1 ==

2024-07-24 13:15:15.840 [     main] 소비자 시작
2024-07-24 13:15:15.845 [consumer1] [소비 시도]     ? <- []
2024-07-24 13:15:15.850 [consumer1] [소비 완료] null <- []
2024-07-24 13:15:15.950 [consumer2] [소비 시도]     ? <- []
2024-07-24 13:15:15.950 [consumer2] [소비 완료] null <- []
2024-07-24 13:15:16.052 [consumer3] [소비 시도]     ? <- []
2024-07-24 13:15:16.053 [consumer3] [소비 완료] null <- []

2024-07-24 13:15:16.157 [     main] 현재 상태 출력, 큐 데이터: []
2024-07-24 13:15:16.158 [     main] consumer1: TERMINATED
2024-07-24 13:15:16.158 [     main] consumer2: TERMINATED
2024-07-24 13:15:16.159 [     main] consumer3: TERMINATED

2024-07-24 13:15:16.159 [     main] 생산자 시작
2024-07-24 13:15:16.160 [producer1] [생산 시도] data1 -> []
2024-07-24 13:15:16.160 [producer1] [생산 완료] data1 -> [data1]
2024-07-24 13:15:16.265 [producer2] [생산 시도] data2 -> [data1]
2024-07-24 13:15:16.266 [producer2] [생산 완료] data2 -> [data1, data2]
2024-07-24 13:15:16.367 [producer3] [생산 시도] data3 -> [data1, data2]
2024-07-24 13:15:16.367 [producer3] [put] 큐가 가득 찼습니다. 버립니다.data3
2024-07-24 13:15:16.368 [producer3] [생산 완료] data3 -> [data1, data2]

2024-07-24 13:15:16.471 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-24 13:15:16.471 [     main] consumer1: TERMINATED
2024-07-24 13:15:16.471 [     main] consumer2: TERMINATED
2024-07-24 13:15:16.472 [     main] consumer3: TERMINATED
2024-07-24 13:15:16.472 [     main] producer1: TERMINATED
2024-07-24 13:15:16.472 [     main] producer2: TERMINATED
2024-07-24 13:15:16.473 [     main] producer3: TERMINATED
2024-07-24 13:15:16.473 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV1 ==

 

이 결과에 대해 자세히 분석해보자!

 

생산자 소비자 문제 - 생산자 우선 결과 분석

  • p1: producer1 생산자 스레드를 뜻한다.
  • c1: consumer1 소비자 스레드를 뜻한다.
  • 임계 영역은 synchronized 영역을 뜻한다. 스레드가 이 영역에 들어가려면? 모니터 락(lock)이 필요하다.
  • 물론 스레드가 처음부터 모두 생성되어 있는 것은 아니지만, 모두 그려두고 시작한다.

  • 먼저 생산자 스레드가 하나씩 생성되면서 큐에 데이터를 하나씩 추가한다. 이 데이터를 추가하는 부분은 임계 영역이므로 모니터 락이 필요하다.

  • producer1, producer2 스레드가 모두 큐에 데이터를 넣고 TERMINATED 상태가 되었다. 이제 producer3 스레드가 데이터를 큐에 넣으려고 시도하지만 최대 크기가 2이므로 더 이상 데이터를 넣지 못하고 버리게 된다. 

이 상태의 시점의 로그는 다음 부분이다.

2024-07-24 13:14:14.330 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-24 13:14:14.330 [     main] producer1: TERMINATED
2024-07-24 13:14:14.331 [     main] producer2: TERMINATED
2024-07-24 13:14:14.331 [     main] producer3: TERMINATED

 

이제 소비자 스레드를 하나씩 만들면서 데이터를 소비한다.

  • consumer1, consumer2 스레드가 모두 한번씩 실행되면서 큐에 있는 데이터를 소비했다. 이제 consumer3이 실행될 차례다.

  • consumer3이 데이터를 소비하려고 했지만 데이터가 없다. 아무것도 가져올 수 없다. 아무런 데이터도 얻지 못한 채 쓸쓸히 돌아가게 된다.

  • 이 시점이 마지막 시점이다. 결과적으로 버퍼가 가득차서 p3 스레드가 생성한 데이터는 버려졌고, c3 스레드는 버퍼에 아무런 데이터도 없기 때문에 어떠한 데이터도 얻지 못한 채 돌아갔다. 이 부분에서 문제점이 보인다. 만약 이걸 해결하려면 가장 간단한 대안으로는 기다리는 대안이 있을 것 같다. p3 입장에서는 버퍼에 남는 공간이 생길 때 까지 기다렸다가 데이터를 넣고, c3 입장에서는 버퍼에 데이터가 생길 때 까지 기다렸다가 생기면 데이터를 가져오는 식으로 말이다.

생산자 소비자 문제 - 소비자 우선 결과 분석

이번엔 반대로 소비자 먼저 실행한 결과를 분석해보자.

  • c1, c2, c3 순으로 스레드가 큐에서 데이터를 가져오려고 시도하지만, 데이터가 아무것도 없다. 결국 모든 소비자 스레드는 빈털털이로 돌아오게 된다.

  • 이제 생산자 스레드가 하나씩 생성되면서 큐에 데이터를 넣는다.

  • 마지막 p3 스레드는 역시나 버퍼에 데이터가 꽉 차있기 때문에 데이터를 넣지 못하고 버리게 된다.

 

  • 이게 마지막 시점의 모습이다. 부분에서도 역시나 문제점이 많다. 소비자 스레드는 모든 스레드가 다 데이터를 가지지 못했고, 생산자 스레드는 결국 p3 스레드는 여전히 데이터를 추가하지 못했다.

 

총평을 내려보자면, 

  • 생산자 스레드 먼저 실행의 경우, p3이 보관하는 데이터는 버려지고 c3은 데이터를 받지 못한다.
  • 소비자 스레드 먼저 실행의 경우, c1, c2, c3 모두 데이터를 받지 못하고 p3이 보관하는 데이터는 버려진다.

 

결국 생산자 소비자 문제는 이런 결과를 발생시킨다. 어디 한쪽이 너무 빠르면 뭐가 됐건 문제가 생기고 그 근본 원인은 사실 버퍼의 사이즈다. 버퍼가 가득 찬 경우는 생산자 입장에서 버퍼에 여유가 생길 때 까지 조금만 기다리면 되는데 기다리지 못하고 데이터를 버리는 것이 아쉽고, 버퍼가 빈 경우는 소비자 입장에서 버퍼에 데이터가 채워질 때 까지 조금만 기다리면 되는데 기다리지 못하고 데이터를 못 얻는게 아쉽다. 

 

예제 변경 - 대기하는 코드로 바꿔보기

가장 간단한 해결 방법은 기다리는 것이다! 한번 기다리게 해서 문제를 해결해보자!

 

BoundedQueueV2

package thread.bounded;

import java.util.ArrayDeque;
import java.util.Queue;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class BoundedQueueV2 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV2(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
            sleep(1000);
        }
        queue.offer(data);
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
            sleep(1000);
        }
        return queue.poll();
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
  • BoundedQueueV2를 만들고 여기서 put(data), take() 메서드에서 데이터가 꽉차거나 없다면 빠져나가는게 아니라 기다리는 것이다.
  • 그래서, put(String data)를 보면 큐의 사이즈가 꽉 찼다면 1초 정도 대기한 후 다시 확인해보는 것이다. 데이터가 빠져나갈 때까지.
  • 또한, take()도 큐에 아무런 데이터가 없다면 1초 정도 대기한 후 다시 확인해보는 것이다. 데이터가 들어올 때까지.

 

이렇게 하면 과연 잘 될까? 생산자 우선 코드로 실행해보자.

실행결과

2024-07-24 14:12:07.563 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV2 ==

2024-07-24 14:12:07.565 [     main] 생산자 시작
2024-07-24 14:12:07.573 [producer1] [생산 시도] data1 -> []
2024-07-24 14:12:07.574 [producer1] [생산 완료] data1 -> [data1]
2024-07-24 14:12:07.674 [producer2] [생산 시도] data2 -> [data1]
2024-07-24 14:12:07.674 [producer2] [생산 완료] data2 -> [data1, data2]
2024-07-24 14:12:07.777 [producer3] [생산 시도] data3 -> [data1, data2]
2024-07-24 14:12:07.777 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.

2024-07-24 14:12:07.881 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-24 14:12:07.882 [     main] producer1: TERMINATED
2024-07-24 14:12:07.882 [     main] producer2: TERMINATED
2024-07-24 14:12:07.882 [     main] producer3: TIMED_WAITING

2024-07-24 14:12:07.883 [     main] 소비자 시작
2024-07-24 14:12:07.884 [consumer1] [소비 시도]     ? <- [data1, data2]
2024-07-24 14:12:07.985 [consumer2] [소비 시도]     ? <- [data1, data2]
2024-07-24 14:12:08.087 [consumer3] [소비 시도]     ? <- [data1, data2]

2024-07-24 14:12:08.188 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-24 14:12:08.189 [     main] producer1: TERMINATED
2024-07-24 14:12:08.189 [     main] producer2: TERMINATED
2024-07-24 14:12:08.190 [     main] producer3: TIMED_WAITING
2024-07-24 14:12:08.190 [     main] consumer1: BLOCKED
2024-07-24 14:12:08.190 [     main] consumer2: BLOCKED
2024-07-24 14:12:08.191 [     main] consumer3: BLOCKED
2024-07-24 14:12:08.191 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV2 ==
2024-07-24 14:12:08.778 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:12:09.783 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:12:10.787 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:12:11.791 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.

 

실행 결과를 봤더니 뭔가 이상하다. 분명히 기다리는 코드를 작성했고, 소비자 스레드가 버퍼에서 데이터를 빼가기만 하면 넣을 수 있을 것 같은데 소비자 스레드는 작동하지 않고 있다. 왜 그럴까? p3 스레드가 모니터 락을 가지고 안 놔주고 있기 때문이다.

 

그리고 take(), put() 모두 synchronized 메서드이기 때문에 이 메서드에 진입하려면 모니터 락이 필요하다. 그 결과아주 심각한 무한대기 현상이 발생한다.

 

p3 입장에선 소비자 스레드 하나만이라도 버퍼에서 데이터를 가져가면 내가 데이터를 추가하고 빠져나올 수 있는데 내가 가지고 있는 락 때문에 어떤 소비자든 생산자든 어떤 스레드도 접근이 불가능한 것이다. 

 

그럼 소비자 우선 코드로 실행하면 어떻게 될까? 가관이다.

2024-07-24 14:15:52.311 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV2 ==

2024-07-24 14:15:52.313 [     main] 소비자 시작
2024-07-24 14:15:52.318 [consumer1] [소비 시도]     ? <- []
2024-07-24 14:15:52.319 [consumer1] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 14:15:52.419 [consumer2] [소비 시도]     ? <- []
2024-07-24 14:15:52.522 [consumer3] [소비 시도]     ? <- []

2024-07-24 14:15:52.626 [     main] 현재 상태 출력, 큐 데이터: []
2024-07-24 14:15:52.632 [     main] consumer1: TIMED_WAITING
2024-07-24 14:15:52.633 [     main] consumer2: BLOCKED
2024-07-24 14:15:52.633 [     main] consumer3: BLOCKED

2024-07-24 14:15:52.633 [     main] 생산자 시작
2024-07-24 14:15:52.635 [producer1] [생산 시도] data1 -> []
2024-07-24 14:15:52.739 [producer2] [생산 시도] data2 -> []
2024-07-24 14:15:52.839 [producer3] [생산 시도] data3 -> []

2024-07-24 14:15:52.944 [     main] 현재 상태 출력, 큐 데이터: []
2024-07-24 14:15:52.945 [     main] consumer1: TIMED_WAITING
2024-07-24 14:15:52.945 [     main] consumer2: BLOCKED
2024-07-24 14:15:52.945 [     main] consumer3: BLOCKED
2024-07-24 14:15:52.945 [     main] producer1: BLOCKED
2024-07-24 14:15:52.945 [     main] producer2: BLOCKED
2024-07-24 14:15:52.946 [     main] producer3: BLOCKED
2024-07-24 14:15:52.946 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV2 ==
2024-07-24 14:15:53.322 [consumer1] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 14:15:54.323 [consumer1] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 14:15:55.323 [consumer1] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 14:15:56.329 [consumer1] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다

 

c1 스레드가 버퍼에 데이터가 아무것도 없기 때문에 데이터가 들어올때까지 기다리고 있어서 락을 놔주고 있지 않다. 그 결과 어떤 스레드도 접근이 불가능하다. 그래서 c1 스레드를 제외한 모든 스레드가 다 BLOCKED 상태가 돼버렸다.

 

"어? 그럼 sleep()말고 yield()를 사용해서 다른 스레드에게 양보하면 되지 않아요?"

 

진짜 좋은 생각이다. 한번 그렇게 해볼까? 다음 코드를 보자.

@Override
public synchronized void put(String data) {
    while (queue.size() == max) {
        log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
        // sleep(1000);
        Thread.yield();
    }
    queue.offer(data);
}

 

이번엔 yield()를 사용해서 욕심 부리지 말고 다른 스레드에게 양보하자! 과연 잘 될까?

생산자 스레드 먼저 실행 코드로 실행해보면 다음과 같은 결과를 얻는다.

...
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
...

 

결국 같은 현상이다. 대신 1초의 대기 시간이 사라지니까 너무 빠르게 출력이 된 것 뿐이다. 왜 같은 현상이 일어날까?

Thread.yield()synchronized의 락을 반납하는게 아니다.

그저 다른 스레드에게 CPU 사용을 양보하는 것 뿐이지 synchronized의 락은 여전히 본인이 가지고 있다. 언제까지? synchronized 블록이 끝날 때까지. 그래서 결국 다른 스레드들은 같은 모니터 락이 필요한 synchronized 메서드나 블록에 접근하지 못하게 된다.

 

 

그럼 이 생산자 소비자 문제는 도대체 어떻게 해결할까?

양보하는게 맞다! 근데 양보를 할 때 락도 양보하면 된다. 

자바의 Object.wait(), Object.notify()를 사용하면, 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다. 바로 알아보자!

 

Object.wait(), Object.notify()

저번에도 말했지만, 자바는 처음부터 멀티스레드를 고려하고 탄생한 언어이다. 앞서 설명한 synchronized를 사용한 임계 영역 안에서 락을 가지고 무한 대기하는 문제는 흥미롭게도 Object 클래스에 해결 방안이 있다. Object 클래스는 이런 문제를 해결할 수 있는 wait(), notify()라는 메서드를 제공한다. Object는 모든 자바 객체의 부모이기 때문에, 여기 있는 기능들은 모두 자바 언어의 기본 기능이라 생각하면 된다.

 

  • Object.wait()
    • 현재 스레드가 가진 락을 반납하고 대기(WAITING)한다.
    • 현재 스레드를 대기 상태로 전환한다. 이 메서드는 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을 때만 호출할 수 있다. 호출한 스레드는 락을 반납하고, 다른 스레드가 해당 락을 획득할 수 있도록 한다. 이렇게 대기 상태로 전환된 스레드는 다른 스레드가 notify() 또는 notifyAll()을 호출할 때까지 대기 상태를 유지한다.
  • Object.notify()
    • 대기 중인 스레드 중 하나를 깨운다.
    • 이 메서드는 synchronized 블록이나 메서드에서 호출되어야 한다. 깨운 스레드는 락을 다시 획득할 기회를 얻게 된다. 만약, 대기 중인 스레드가 여러개라면 그 중 하나만이 깨워지게 된다.
  • Object.notifyAll()
    • 대기 중인 모든 스레드를 깨운다.
    • 이 메서드 역시 synchronized 블록이나 메서드에서 호출되어야 한다. 모든 대기중인 스레드가 락을 획득할 수 있는 기회를 얻게 된다. 이 방법은 모든 스레드를 깨워야 할 필요가 있는 경우에 유용하다.

wait(), notify() 메서드를 적절하게 사용하면, 멀티스레드 환경에서 발생할 수 있는 문제를 효율적으로 해결할 수 있다. 이 기능을 활용해서 스레드가 락을 가지고 임계 영역안에서 무한 대기하는 문제를 해결해보자!

 

BoundedQueueV3

package thread.bounded;

import java.util.ArrayDeque;
import java.util.Queue;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class BoundedQueueV3 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV3(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
            try {
                wait(); // RUNNABLE -> WAITING, 락 반납
                log("[put] 생산자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        queue.offer(data);
        log("[put] 생산자 데이터 저장, notify() 호출");
        notify(); // 대기 스레드에게 WAIT -> BLOCKED
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
            try {
                wait();
                log("[take] 소비자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        String data = queue.poll();
        log("[take] 소비자 데이터 획득, notify() 호출");
        notify(); // 대기 스레드에게 WAIT -> BLOCKED
        return data;
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}
  • 이제 V3이다. 앞서 사용한 sleep() 코드는 제거하고, Object.wait()을 사용하자. Object는 모든 클래스의 부모이므로 자바의 모든 객체는 해당 기능을 사용할 수 있다.
@Override
public synchronized void put(String data) {
    while (queue.size() == max) {
        log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
        try {
            wait(); // RUNNABLE -> WAITING, 락 반납
            log("[put] 생산자 깨어남");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    queue.offer(data);
    log("[put] 생산자 데이터 저장, notify() 호출");
    notify(); // 대기 스레드에게 WAIT -> BLOCKED
}
  • synchronized를 통해 임계 영역을 설정한다. 생산자 스레드는 락 획득을 시도한다.
  • 락을 획득한 생산자 스레드는 반복문을 통해서 큐에 빈 공간이 생기는지 주기적으로 체크한다. 만약, 빈 공간이 없다면 Object.wait()을 사용해서 대기한다. 참고로 대기할 때 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면 다시 반복문에서 큐의 빈 공간을 체크한다.
  • wait()을 호출해서 대기하는 경우 RUNNABLE → WAITING 상태가 된다.
  • 생산자가 데이터를 큐에 저장하고 나면 notify()를 통해 대기하는 스레드에게 저장된 데이터가 있다고 알려주어야 한다. 예를 들어서 큐에 데이터가 없어서 대기하는 소비자 스레드가 있다고 가정하자. 이때 notify()를 호출하면 소비자 스레드는 깨어나서 저장된 데이터를 가져갈 수 있다.
@Override
public synchronized String take() {
    while (queue.isEmpty()) {
        log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
        try {
            wait();
            log("[take] 소비자 깨어남");
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    String data = queue.poll();
    log("[take] 소비자 데이터 획득, notify() 호출");
    notify(); // 대기 스레드에게 WAIT -> BLOCKED
    return data;
}
  • synchronized를 통해 임계 영역을 설정한다. 소비자 스레드는 락 획득을 시도한다.
  • 락을 획득한 소비자 스레드는 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크한다. 만약, 데이터가 없다면 Object.wait()을 사용해서 대기한다. 참고로 대기할 때 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면, 다시 반복문에서 큐에 데이터가 있는지 체크한다. 
  • 대기하는 경우 RUNNABLE → WAITING 상태가 된다.
  • 소비자가 데이터를 획득하고 나면 notify()를 통해 대기하는 생산자 스레드에게 큐에 저장할 여유 공간이 생겼다고 알려주어야 한다. 예를 들어, 큐에 데이터가 꽉차서 데이터를 넣지 못해 대기하는 생산자 스레드가 있다고 가정하자. 이때 notify()를 호출하면 생산자 스레드는 깨어나서 저장된 데이터를 획득할 수 있다. 

wait()으로 대기 상태에 빠진 스레드는 notify()를 사용해야 깨울 수 있다. 생산자는 생산을 완료하면 notify()로 대기하는 스레드를 깨워서 생산된 데이터를 가져가게 하고, 소비자는 소비를 완료하면 notify()로 대기하는 스레드를 깨워서 데이터를 생산하라고 하면 된다. 여기서 중요한 핵심은 wait()을 호출해서 대기 상태에 빠질 땐 락을 반납하고 대기 상태에 빠진다는 것이다. 대기 상태에 빠지면 어차피 아무일도 하지 않으므로 락도 필요하지 않다.

 

V3로 변경하고 생산자 먼저 실행 코드로 변경 하기

public static void main(String[] args) {
    //1. BoundedQueue 선택
    BoundedQueue queue = new BoundedQueueV3(2);

    //2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
    producerFirst(queue);
    //consumerFirst(queue);
}

 

실행 결과

2024-07-24 15:29:08.277 [     main] == [생산자 먼저 실행] 시작, BoundedQueueV3 ==

2024-07-24 15:29:08.279 [     main] 생산자 시작
2024-07-24 15:29:08.287 [producer1] [생산 시도] data1 -> []
2024-07-24 15:29:08.287 [producer1] [put] 생산자 데이터 저장, notify() 호출
2024-07-24 15:29:08.288 [producer1] [생산 완료] data1 -> [data1]
2024-07-24 15:29:08.384 [producer2] [생산 시도] data2 -> [data1]
2024-07-24 15:29:08.384 [producer2] [put] 생산자 데이터 저장, notify() 호출
2024-07-24 15:29:08.384 [producer2] [생산 완료] data2 -> [data1, data2]
2024-07-24 15:29:08.487 [producer3] [생산 시도] data3 -> [data1, data2]
2024-07-24 15:29:08.487 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.

2024-07-24 15:29:08.590 [     main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-24 15:29:08.591 [     main] producer1: TERMINATED
2024-07-24 15:29:08.591 [     main] producer2: TERMINATED
2024-07-24 15:29:08.592 [     main] producer3: WAITING

2024-07-24 15:29:08.592 [     main] 소비자 시작
2024-07-24 15:29:08.593 [consumer1] [소비 시도]     ? <- [data1, data2]
2024-07-24 15:29:08.593 [consumer1] [take] 소비자 데이터 획득, notify() 호출
2024-07-24 15:29:08.593 [producer3] [put] 생산자 깨어남
2024-07-24 15:29:08.594 [consumer1] [소비 완료] data1 <- [data2]
2024-07-24 15:29:08.594 [producer3] [put] 생산자 데이터 저장, notify() 호출
2024-07-24 15:29:08.594 [producer3] [생산 완료] data3 -> [data2, data3]
2024-07-24 15:29:08.696 [consumer2] [소비 시도]     ? <- [data2, data3]
2024-07-24 15:29:08.697 [consumer2] [take] 소비자 데이터 획득, notify() 호출
2024-07-24 15:29:08.697 [consumer2] [소비 완료] data2 <- [data3]
2024-07-24 15:29:08.797 [consumer3] [소비 시도]     ? <- [data3]
2024-07-24 15:29:08.798 [consumer3] [take] 소비자 데이터 획득, notify() 호출
2024-07-24 15:29:08.798 [consumer3] [소비 완료] data3 <- []

2024-07-24 15:29:08.897 [     main] 현재 상태 출력, 큐 데이터: []
2024-07-24 15:29:08.898 [     main] producer1: TERMINATED
2024-07-24 15:29:08.898 [     main] producer2: TERMINATED
2024-07-24 15:29:08.898 [     main] producer3: TERMINATED
2024-07-24 15:29:08.899 [     main] consumer1: TERMINATED
2024-07-24 15:29:08.899 [     main] consumer2: TERMINATED
2024-07-24 15:29:08.900 [     main] consumer3: TERMINATED
2024-07-24 15:29:08.900 [     main] == [생산자 먼저 실행] 종료, BoundedQueueV3 ==

 

V3로 변경하고 소비자 먼저 실행 코드로 변경 하기

public static void main(String[] args) {
    //1. BoundedQueue 선택
    BoundedQueue queue = new BoundedQueueV3(2);

    //2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
    //producerFirst(queue);
    consumerFirst(queue);
}

 

실행 결과 

2024-07-24 15:30:27.418 [     main] == [소비자 먼저 실행] 시작, BoundedQueueV3 ==

2024-07-24 15:30:27.421 [     main] 소비자 시작
2024-07-24 15:30:27.425 [consumer1] [소비 시도]     ? <- []
2024-07-24 15:30:27.425 [consumer1] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 15:30:27.528 [consumer2] [소비 시도]     ? <- []
2024-07-24 15:30:27.529 [consumer2] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 15:30:27.629 [consumer3] [소비 시도]     ? <- []
2024-07-24 15:30:27.629 [consumer3] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다

2024-07-24 15:30:27.731 [     main] 현재 상태 출력, 큐 데이터: []
2024-07-24 15:30:27.736 [     main] consumer1: WAITING
2024-07-24 15:30:27.736 [     main] consumer2: WAITING
2024-07-24 15:30:27.736 [     main] consumer3: WAITING

2024-07-24 15:30:27.737 [     main] 생산자 시작
2024-07-24 15:30:27.738 [producer1] [생산 시도] data1 -> []
2024-07-24 15:30:27.738 [producer1] [put] 생산자 데이터 저장, notify() 호출
2024-07-24 15:30:27.738 [consumer1] [take] 소비자 깨어남
2024-07-24 15:30:27.739 [producer1] [생산 완료] data1 -> [data1]
2024-07-24 15:30:27.739 [consumer1] [take] 소비자 데이터 획득, notify() 호출
2024-07-24 15:30:27.739 [consumer2] [take] 소비자 깨어남
2024-07-24 15:30:27.739 [consumer1] [소비 완료] data1 <- []
2024-07-24 15:30:27.739 [consumer2] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 15:30:27.840 [producer2] [생산 시도] data2 -> []
2024-07-24 15:30:27.841 [producer2] [put] 생산자 데이터 저장, notify() 호출
2024-07-24 15:30:27.841 [consumer3] [take] 소비자 깨어남
2024-07-24 15:30:27.841 [producer2] [생산 완료] data2 -> [data2]
2024-07-24 15:30:27.841 [consumer3] [take] 소비자 데이터 획득, notify() 호출
2024-07-24 15:30:27.842 [consumer2] [take] 소비자 깨어남
2024-07-24 15:30:27.842 [consumer3] [소비 완료] data2 <- []
2024-07-24 15:30:27.842 [consumer2] [take] 큐에 데이터가 없습니다. 소비자는 대기합니다
2024-07-24 15:30:27.942 [producer3] [생산 시도] data3 -> []
2024-07-24 15:30:27.942 [producer3] [put] 생산자 데이터 저장, notify() 호출
2024-07-24 15:30:27.942 [producer3] [생산 완료] data3 -> [data3]
2024-07-24 15:30:27.942 [consumer2] [take] 소비자 깨어남
2024-07-24 15:30:27.943 [consumer2] [take] 소비자 데이터 획득, notify() 호출
2024-07-24 15:30:27.943 [consumer2] [소비 완료] data3 <- []

2024-07-24 15:30:28.047 [     main] 현재 상태 출력, 큐 데이터: []
2024-07-24 15:30:28.047 [     main] consumer1: TERMINATED
2024-07-24 15:30:28.048 [     main] consumer2: TERMINATED
2024-07-24 15:30:28.048 [     main] consumer3: TERMINATED
2024-07-24 15:30:28.049 [     main] producer1: TERMINATED
2024-07-24 15:30:28.049 [     main] producer2: TERMINATED
2024-07-24 15:30:28.049 [     main] producer3: TERMINATED
2024-07-24 15:30:28.050 [     main] == [소비자 먼저 실행] 종료, BoundedQueueV3 ==

 

로그만 보면 결국 잘 저장하고 잘 사용한것 같아 보인다. 근데 로그만으로는 이해하기 쉽지 않다. 그림으로 하나씩 분석해보자!

 

wait(), notify() 생산자 우선 분석

  • 우선 못보던게 하나 생겼다. 스레드 대기 집합
  • 스레드 대기 집합 (wait set)
    • synchronized 임계 영역 안에서 Object.wait()을 호출하면 스레드는 대기 상태에 들어간다. 이렇게 대기 상태에 들어간 스레드를 관리하는 것을 대기 집합이라 한다. 참고로 모든 객체는 각자 자기의 대기 집합과 모니터 락을 가지고 있다. 그리고 이 둘은 한 쌍으로 사용된다. 따라서 락을 획득한 객체의 대기 집합을 사용해야 한다. 여기서는 BoundedQueue(x001) 구현 인스턴스의 락과 대기 집합을 사용한다. 
      • synchronized를 메서드에 적용하면 해당 인스턴스의 락을 사용한다. 여기서는 BoundedQueue(x001)의 구현체이다.
      • wait() 호출은 앞에 this를 생략할 수 있다. this는 해당 인스턴스를 뜻한다. 여기서는 BoundedQueue(x001)의 구현체이다.

 

이제 순서대로 흐름을 분석해보자!

  • p1이 락을 획득하고 큐에 데이터를 저장한다.
  • 큐에 데이터가 추가 되었기 때문에 스레드 대기 집합에 이 사실을 알려야 한다. (코드 흐름이 그렇다)
  • notify()를 호출하면 스레드 대기 집합에서 대기하는 스레드 중 하나를 깨운다.
  • 현재 대기 집합에 스레드가 없으므로 아무일도 발생하지 않는다. 만약 소비자 스레드가 대기 집합에 있었다면 깨어나서 큐에 들어있는 데이터를 소비했을 것이다.

  • p1은 할일을 다 끝내고 락을 반납한다. 

  • p2도 큐에 데이터를 저장하고 생산을 완료했다.

  • p3가 데이터를 생산하려고 하는데, 큐가 가득 찼다. wait()을 호출한다.

  • wait()을 호출하면 락을 반납한다.
  • wait()을 호출하면 스레드의 상태가 RUNNABLE → WAITING으로 변경된다.
  • wait()을 호출하면 스레드 대기 집합에서 관리된다.
  • 스레드 대기 집합에서 관리되는 스레드는 이후에 다른 스레드가 notify()를 통해 스레드 대기 집합에 신호를 주면 깨어날 수 있다.

  • 이제 소비자 스레드들이 움직일 차례가 됐다.

  • c1이 데이터를 획득했다. 그래서 큐에 데이터를 보관할 빈자리 생겼다.
  • c1notify()를 호출한다 (코드 흐름이 그렇다)
  • 스레드 대기 집합에 있는 p3를 깨운다.

  • 스레드 대기 집합은 notify() 신호를 받으면 대기 집합에 있는 스레드 중 하나를 깨운다.
  • 그런데 대기 집합에 있는 스레드가 깨어난다고 바로 작동하는 것이 아니다. 왜냐하면 이 스레드는 여전히 임계 영역 안에 있기 때문이다.
  • 임계 영역에 있는 코드를 실행하려면 가장 먼저 락이 필요하다. p3는 대기 집합에서는 나가지만 여전히 임계 영역에 있으므로 락을 획득하기 위해 BLOCKED 상태로 대기한다. p3: WAITING → BLOCKED 
  • 참고로 이때 임계 영역의 코드를 처음으로 돌아가서 실행하는 게 아니다. 대기 집합에 들어오게 된 wait()을 호출한 부분부터 다시 실행된다. 락을 획득하면 wait() 이후의 코드를 실행한다. 

  • c1은 데이터 소비를 완료하고 락을 반납하고 임계 영역을 빠져나간다.

  • p3가 락을 획득한다.
    • BLOCKED → RUNNABLE
    • wait() 코드에서 대기했기 때문에 이후의 코드를 실행한다.
    • data3을 큐에 저장한다.
    • notify()를 호출한다. 데이터를 저장했기 때문에 혹시 스레드 대기 집합에 소비자가 대기하고 있다면 소비자를 하나 깨워줘야 한다. 물론 지금은 대기 집합에 스레드가 없기 때문에 아무일도 일어나지 않는다.

  • p3는 락을 반납하고 임계 영역을 빠져나간다.
  • 이제 c2, c3가 하나씩 데이터를 원래대로 가져갈 것이다.

  • c2, c3가 실행됐고 데이터가 있으므로 둘 다 데이터를 소비하고 완료한다.
  • 둘 다 notify()를 호출했지만 대기 집합에 스레드가 없으므로 아무일도 일어나지 않는다.

wait(), notify() 덕분에 스레드가 락을 놓고 대기하고, 또 대기하는 스레드를 필요한 시점에 깨울 수 있었다. 생산자 스레드가 큐가 가득차서 대기해도 소비자 스레드가 큐의 데이터를 소비하고 나면 알려주기 때문에 최적의 타이밍에 깨어나서 데이터를 생산할 수 있었다. 

덕분에 최종 결과를 보면 p1, p2, p3 모두 데이터를 정상 생산하고 c1, c2, c3 모두 데이터를 정상 소비할 수 있었다. 

 

wait(), notify() 소비자 우선 분석

  • 이제 소비자 우선 코드로 시작해보자. 최초의 상태이다.

  • c1이 락을 얻고 임계 영역에 들어왔지만, 데이터가 없다. wait()을 호출하고 대기 집합에 대기하게 된다.

  • 큐에 데이터가 없기 때문에 c1, c2, c3 모두 스레드 대기 집합에서 대기하게 된다.
  • 이후에 생산자가 큐에 데이터를 생산하면 notify()를 통해 이 스레드들을 하나씩 깨워서 데이터를 소비할 수 있을것이다.

  • p1은 락을 획득하고 큐에 데이터를 생산한다. 큐에 데이터가 있기 때문에 소비자를 하나 깨울 수 있다. notify()를 통해 스레드 대기 집합에 이 사실을 알려준다.

  • notify()를 받은 스레드 대기 집합은 스레드 중에 하나를 깨운다.
  • 여기서 c1, c2, c3 중에 어떤 스레드가 깨어날지는 알 수 없다.
    • 어떤 스레드가 깨워질지는 JVM 스펙에 명시되어 있지 않다. 따라서 JVM 버전 및 환경에 따라 달라진다.
  • 그런데 대기 집합에 있는 스레드가 깨어난다고 바로 작동하는 것은 아니다. 깨어난 스레드는 여전히 임계 영역 안에 있다.
  • 임계 영역 안에 있는 코드를 실행하려면 먼저 락이 필요하다. 대기 집합에서는 나가지만 여전히 임계 영역 안에 있으므로 락을 획득하기 위해 BLOCKED 상태로 대기한다. WAITING → BLOCKED

  • p1이 락을 반납하고 임계 영역에서 나간다.
  • c1은 락을 획득한다.

  • c1은 락을 획득하고 임계 영역 안에서 실행되며 데이터를 획득한다.
  • c1이 데이터를 획득했으므로 큐에 데이터를 넣을 공간이 있다는 것을 대기 집합에 알려준다. 만약 대기 집합에 생산자 스레드가 대기하고 있다면 큐에 데이터를 넣을 수 있을 것이다.

  • c1notify()로 스레드 대기 집합에 알렸지만, 생산자 스레드가 아니라 소비자 스레드만 있다. 따라서 의도와는 다르게 소비자 스레드인 c2가 대기상태에서 깨어난다. (물론 대기 집합에 있는 어떤 스레드가 깨어날지는 알 수 없다. 여기서는 c2가 깨어난다고 가정한다. 심지어 생산자와 소비자 스레드가 함께 대기 집합에 있어도 어떤 스레드가 깨어날지는 알 수 없다.)

  • c1은 작업을 완료한다.
  • c1c2를 깨웠지만, 문제가 있다. 바로 큐에 데이터가 없다는 것이다.
  • c2는 락을 획득하고 큐에 데이터를 소비하려고 시도한다. 그런데 큐에 데이터가 없으므로 c2는 결국 wait()을 호출해서 다시 대기 상태로 변하며 대기 집합에 들어간다.

  • 이처럼 소비자인 c1이 같은 소비자인 c2를 깨우는것은 상당히 비효율적이다.
  • c1 입장에서 c2를 깨우게 되면 아무 일도 하지 않고 그냥 다시 스레드 대기 집합에 들어갈 수 있다. 결과적으로 CPU만 사용하고, 아무 일도 하지 않은 상태로 다시 대기 상태가 되어버린다.
  • 그렇다고 c1이 스레드 대기 집합에 있는 어떤 스레드를 깨울지 선택할 수는 없다. notify()는 스레드 대기 집합에 있는 스레드 중 임의의 하나를 깨울뿐이다.
  • 물론 이게 비효율적이라는 것이지 문제가 되는 것은 아니다. 결과에는 아무런 문제가 없다. 살짝 돌아갈 뿐이다.

  • p2가 락을 획득하고, 데이터를 저장한 다음에 notify()를 호출한다. 데이터가 있으므로 소비자 스레드가 깨어난다면 데이터를 소비할 수 있다.

  • 스레드 대기 집합에 있는 c3가 깨어난다. 참고로 어떤 스레드가 깨어날지는 알 수 없다.
  • c3는 임계 영역 안에 있으므로 락을 획득하기 위해 대기(BLOCKED) 한다.

  • p2가 작업을 끝마치고 락을 반납하고 나간다. 
  • c3는 락을 획득하고 BLOCKED → RUNNABLE 상태가 된다.
  • c3는 데이터를 획득한 다음에 notify()를 통해 스레드 대기 집합에 알린다. 큐에 여유 공간이 생겼기 때문에 생산자 스레드가 대기 중이라면 데이터를 생산할 수 있다.

  • notify()를 호출했지만, 스레드 대기 집합에는 소비자인 c2만 존재한다. 
  • c2가 깨어나지만 임계 영역 안에 있으므로 락을 기다리는 BLOCKED 상태가 된다.

  • c3는 락을 반납하고 임계 영역을 나간다.
  • c2가 락을 획득하고, 큐에서 데이터를 획득하려 하지만 데이터가 없다.
  • c2는 다시 wait()을 호출해서 대기(WAITING)상태에 들어가고, 다시 대기 집합에서 관리된다.

  • 물론 c2의 지금 이 사이클은 CPU 자원만 소모하고 다시 대기 집합에 들어갔기 때문에 비효율적이다.
  • 만약 소비자인 c3 입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면, 소비자인 c2를 깨우지는 않았을 것이다. 하지만 notify()는 이런 선택을 할 수 없다.

  • p3가 락을 얻고 데이터를 저장한다. notify()를 통해 스레드 대기 집합에 알린다.
  • 스레드 대기 집합에는 소비자 c2가 있으므로 생산한 데이터를 잘 소비할 수 있다.

  • c2notify()를 통해 깨어나고 BLOCKED 상태로 대기하고 있다가 락을 획득하면 큐에 데이터를 잘 소비해서 임계영역을 빠져나오고 종료된다.

정리를 하자면

최종 결과를 보면 p1, p2, p3 모두 데이터를 잘 생산하고 c1, c2, c3 모두 데이터를 잘 소비했다. 하지만 소비자인 c1이 같은 소비자인 c2, c3를 깨울 수 있었다. 이 경우 큐에 데이터가 없을 가능성이 있다. 이땐 깨어난 소비자 스레드가 CPU 자원만 소모하고 다시 대기 집합에 들어갔기 때문에 비효율적이다. 

 

만약, 소비자인 c1 입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면, 소비자인 c2를 깨우지는 않았을 것이다. 예를 들어 소비자는 생산자만 깨우고, 생산자는 소비자만 깨울 수 있다면 더 효율적으로 작동할 수 있을 것 같다. 하지만 notify()는 이런 선택을 할 수 없다. 

 

비효율적이지만 문제는 없다. 좀 돌아갈 뿐이다. 

 

Object - wait(), notify() 한계

지금까지 살펴본 Object.wait(), Object.notify() 방식은 스레드 대기 집합 하나에 생산자, 소비자 스레드를 모두 관리한다. 그리고 notify()를 호출할 때 임의의 스레드가 선택된다. 따라서 앞서 살펴본 것 처럼 큐에 데이터가 없는 상황에 소비자가 같은 소비자를 깨우는 비효율이 발생할 수 있다. 또는 큐에 데이터가 가득 차있는데 생산자가 같은 생산자를 깨우는 비효율도 발생할 수 있다.

 

  • 다음과 같은 상황을 연출해보자.
  • 큐에 dataX가 보관되어 있다.
  • 스레드 대기 집합에는 다음 스레드가 대기하고 있다.
    • 소비자: c1, c2, c3
    • 생산자: p1, p2, p3
  • p0 스레드가 data0 생산을 시도한다.

  • p0 스레드가 실행되면서 data0을 큐에 저장한다. 이때 큐에 데이터가 가득찬다.
  • notify()를 통해 대기 집합의 스레드를 하나 깨운다.

  • 만약, notify()의 결과로 소비자 스레드가 깨어나게 되면 소비자 스레드는 큐의 데이터를 획득하고 완료된다.

  • 그러나 notify()의 결과로 생산자 스레드를 깨우게 되면, 이미 큐에 데이터는 가득 차 있다. 따라서 데이터를 생산하지 못하고 다시 대기 집합으로 이동하는 비효율이 발생한다.

 

이번엔 반대의 경우로 소비자에 대해서도 이야기 해보자. 아래와 같은 상황이 있다.

  • c0 스레드가 실행되고 data0을 획득한다.
  • 이제 큐에 데이터는 비어있게 된다.
  • c0 스레드는 notify()를 호출한다.

  • 스레드 대기 집합에서 소비자 스레드가 깨어나면 큐에 데이터가 없기 때문에 다시 대기 집합으로 이동하는 비효율이 발생한다.

 

결국, 같은 종류의 스레드를 깨울 때 비효율이 발생한다.

이 내용을 통해서 알 수 있는 사실은 생산자가 생산자를 깨우거나, 소비자가 소비자를 깨울 때 비효율이 발생한다. 생산자가 소비자를 깨우고 반대로 소비자가 생산자를 깨운다면 이런 비효율은 발생하지 않을 것이다.

 

또 하나의 문제가 있다. 바로 스레드 기아(thread starvation) 문제점이다.

notify()의 또 다른 문제점으로는 어떤 스레드가 깨어날 지 알 수 없기 때문에 발생할 수 있는 스레드 기아 문제가 있다.

  • notify()가 어떤 스레드를 깨울지는 알 수 없다. 최악의 경우 c1 - c5 스레드가 반복해서 깨어날 수 있다.
    • c1 - c5 스레드가 깨어나도 큐에 소비할 데이터가 없다. 따라서 다시 스레드 대기 집합에 들어간다.
    • notify()로 다시 깨우는데 어떤 스레드를 깨울지 알 수 없다. 따라서 c1 - c5 스레드가 반복해서 깨어날 수 있다.
  • 이렇게 대기 상태의 스레드가 실행 순서를 계속 얻지 못해서 실행되지 않는 상황을 스레드 기아 상태라 한다. 
  • 여기서 깨어나야 할 이상적인 스레드는 바로 생산자 스레드인 p1이다. 

이 스레드 기아를 해결하는 방법 중 하나인 notifyAll()이 있다. 

 

notifyAll()

이 메서드는 스레드 대기 집합에 있는 모든 스레드를 한번에 다 깨울 수 있다.

  • 데이터를 획득한 c0 스레드가 notifyAll()을 호출한다.

  • 대기 집합에 있는 모든 스레드가 깨어난다.
  • 모든 스레드는 다 임계 영역 안에 있다. 따라서 먼저 락을 획득해야 한다.
  • 락을 획득하지 못하면 BLOCKED 상태에서 머무르게 된다.
  • 만약, c1이 먼저 락을 획득한다면 큐에 데이터가 없으므로 다시 스레드 대기 집합에 들어간다.
  • c2 - c5도 마찬가지다.
  • 따라서, p1이 가장 늦게 락 획득을 시도해도 c1 - c5 모두 스레드 대기 집합에 들어가있으므로 결과적으로 p1만 남게되고 결국 락을 획득하게 된다.

 

그러나, 이 경우에 스레드 기아 문제를 해결한다 하더라도 비효율은 해결하지 못한다. 결국 가장 좋은 방법은 소비자는 생산자를, 생산자는 소비자를 깨우는 방법이다. 

 

정리

  • 생산자 - 소비자 문제란?
    • 생산자가 너무 빠를 경우 버퍼에 데이터가 꽉 차서 더 이상 데이터를 생산해낼 수 없다.
    • 소비자가 너무 빠를 경우 버퍼에 남은 데이터가 없어 데이터를 소비할 수 없다.

이 문제를 해결하는 방법은 생산자의 경우 버퍼가 빈 공간이 생길때까지 기다리는 것이고, 소비자의 경우 버퍼에 데이터가 생길때까지 기다리는 것이다. 결국 기다리는 것이다.

 

그러나, 단순히 기다릴 순 없다. 왜냐하면 임계 영역은 딱 하나의 스레드만 작업할 수 있게 설계되었다. 아무리 특정 스레드가 하루종일 기다린다해도 본인이 락을 들고 있는 상태에서 놔주지 않으면 다른 스레드는 진입할 수 없다.

 

그래서 단순히 기다린 게 아니라 락을 반납하고 기다린다.

이때 사용할 수 있는 것이 Object.wait()이다. 이 wait(), notify(), notifyAll()synchronized와 같이 사용할 수 있다. 

 

wait()으로 락을 반납하고 해당 스레드는 스레드 대기 집합에 들어간다. 그리고 락을 반납했으니 다른 스레드가 진입할 수 있게 된다. 다른 스레드가 작업을 다 마치고 notify()로 스레드 대기 집합에 알린다. 그럼 스레드 대기 집합에 있는 임의의 스레드 하나가 튀어나온다.

그땐 스레드는 BLOCKED 상태이다. 아직 락을 얻지 못한 상태이니까. 그리고 최종적으로 락을 반납하고 스레드가 나가면 튀어나온 스레드가 이 락을 얻어 작업을 진행할 수 있게 됐다. 

 

근본적인 생산자 소비자 문제를 해결했지만, 여기서 파생되는 비효율이 발생했다.

생산자가 생산자를 계속 깨우거나, 소비자가 소비자를 계속 깨우면 의미없이 CPU 자원만 소모하고 아무것도 할 수 없다. 

 

즉, 가장 좋은 방법은 깨울때 생산자는 소비자를, 소비자는 생산자를 깨우는 게 가장 좋은 방법이다. 이 방법을 다음 포스팅에서 알아보자!

 

728x90
반응형
LIST

+ Recent posts