참고자료:
저번 포스팅에서 다뤘던 생산자 소비자 문제의 두번째 포스팅이다. 이 포스팅에선 어떻게 저번 포스팅에 말했던 문제를 해결하는지를 알아보자. 우선 저번 포스팅에서 말했던 문제는 생산자가 생산자를 깨워버릴 수 있고, 소비자가 소비자를 깨워버릴 수 있다는 문제였다. 그렇게 되면 결국 깨어난 스레드는 아무것도 하지 못하고 다시 기다리는 상태로 돌아가야 한다.
Lock Condition
이제, synchronized를 사용하지 않겠다. 이전에 synchronized의 단점을 극복하기 위해 사용했던 ReentrantLock을 사용하면 이 문제에서도 역시나 도움을 준다.
어떻게 도움을 주냐? 기존의 문제는 생산자가 생산자를 깨울 가능성이 있고 반대로 소비자가 소비자를 깨울 가능성이 있기 때문에 비효율이 발생하는 것이다. 그리고 그 근본 원인은? 스레드 대기 집합이 딱 하나이기 때문이다. 이 스레드 대기 집합은 모든 객체가 기본으로 가지고 있다. 모니터 락과 synchronized와 같이 사용되는 것이다.
그럼 스레드 대기 집합을 둘로 나누면 된다. 생산자용 대기 집합과 소비자용 대기 집합으로. 그리고 깨울땐 소비자라면 생산자용 스레드 대기 집합에 알리면 되고, 생산자라면 소비자용 스레드 대기 집합에 알리면 된다!
우선, 대기 집합을 분리하기 전에 앞에서 사용했던 synchronized, wait(), notify()를 통해 작성한 코드를 Lock 인터페이스와 ReentrantLock 구현체를 이용해서 다시 구현해보자!
BoundedQueueV4
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static util.MyLogger.log;
public class BoundedQueueV4 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public BoundedQueueV4(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
condition.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
condition.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
condition.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
condition.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
- synchronized 대신 Lock lock = new ReentrantLock을 사용한다.
- Condition이 등장한다. 이 Condition은 ReentrantLock을 사용하는 스레드가 대기하는 스레드 대기 공간이다.
- lock.newCondition() 메서드를 호출하면 스레드 대기 공간이 만들어진다. Lock(ReentrantLock)의 스레드 대기 공간은 이렇게 만들 수가 있다.
- 참고로 synchronized, 모니터 락, wait(), notify()에서 사용한 스레드 대기 공간은 모든 객체 인스턴스가 내부에 기본으로 가지고 있다. 반면에 Lock(ReentrantLock)을 사용하는 경우 이렇게 스레드 대기 공간을 직접 만들어서 사용해야 한다.
condition.await()
Object.wait()과 유사한 기능이다. 지정한 condition에 현재 스레드를 대기(WAITING)상태로 보관한다. 이때 ReentrantLock에서 획득한 락을 반납하고 대기 상태로 condition에 보관된다.
condition.signal()
Object.notify()와 유사한 기능이다. 지정한 condition에서 대기중인 스레드를 하나 깨운다. 깨어난 스레드는 condition에서 빠져나온다.
Lock lock = new ReentrantLock()
이 그림에서 lock은 synchronized에서 사용하는 객체 내부에 모니터 락이 아니라, ReentrantLock 락을 뜻한다. ReentrantLock은 내부에 락과, 락 획득을 대기하는 스레드를 관리하는 대기 큐가 있다.
이 그림에서 스레드 대기 공간은 synchronized에서 사용하는 스레드 대기 공간이 아니라, 다음 코드를 뜻한다.
Condition condition = lock.newCondition()
ReentrantLock을 사용하면, condition이 스레드 대기 공간이다.
지금까지는 synchronized, wait(), notify()를 사용한 이전 코드와 거의 비슷하다. 아직 생산자용, 소비자용 스레드 대기 공간을 따로 분리하지 않았기 때문에 기존 방식과 같다고 보면 된다. 다만 구현을 synchronized로 했는가 아니면 ReentrantLock을 사용해서 했는가에 차이가 있을 뿐이다.
이대로 실행해보면 실행결과는 기존과 똑같을 것이다. 이제 스레드 대기 공간을 생산자용과 소비자용으로 분리해보자!
생산자, 소비자 대기 공간 분리
이런 그림을 만들어보자! 생각보다 엄청 간단하다.
BoundedQueueV5
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static util.MyLogger.log;
public class BoundedQueueV5 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
consumerCond.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
여기서 핵심은 lock.newCondition()을 두 번 호출해서 ReentrantLock을 사용하는 스레드 대기 공간을 2개 만드는 것이다.
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
Condition 분리
- producerCond: 생산자를 위한 스레드 대기 공간
- consumerCond: 소비자를 위한 스레드 대기 공간
이렇게 하면 생산자 스레드, 소비자 스레드를 정확하게 나누어 관리하고 깨울 수 있다!
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
consumerCond.signal();
} finally {
lock.unlock();
}
}
- put(data) 메서드는 결국 생산자가 큐에 데이터를 생산해내는 것이다. 그렇다면 이 메서드에서 만약 큐가 가득찼다면 어디로 들어가면 될까? producerCond 안으로 들어가면 된다. 그래서 producerCond.await()을 호출한다.
- 그리고 큐가 가득차지 않아서 데이터를 잘 넣었다면 누굴 깨우면 될까? 소비자 스레드를 깨우면 된다. 그래서 consumerCond.signal()을 호출한다.
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
- take() 메서드는 소비자 스레드가 사용하는 메서드이다. 만약 큐에 데이터가 비었다면 어디로 들어가면 될까? 소비자용 스레드 대기 공간으로 들어가면 된다. 그래서 consumerCond.await()을 호출한다.
- 큐에 데이터가 있어서 데이터를 잘 소비했다면 누굴 깨우면 될까? 생산자 스레드를 깨우면 된다. 그래서 producerCond.signal()을 호출한다.
이렇게 굉장히 간단하게 딱 필요한 스레드 대기 공간에 알리고, 자기가 들어갈 곳을 잘 들어갈 수 있게 됐다. 그리고 실제로 이 코드로 실행해보면 비효율은 하나도 발생하지 않는다! 왜냐!? 생산자 스레드는 소비자 스레드만을 깨우고, 소비자 스레드는 생산자 스레드만을 깨우기 때문에.
실행 결과 로그는 따로 작성하지 않겠다. 직접 실행해보면 될 것 같다.
한번 이 큰 그림을 그림으로 하나씩 분석해보자.
참고로, 지금 그림을 가지고 하는 설명은 위 예제 실행 결과와 살짝 다르다. 이해하기 쉽게 조금 변형했다고 보면 된다
생산자 먼저 실행
- c1, c2, c3는 소비자 스레드 전용 대기 공간(consumerCond)에 대기중이다.
- p1, p2, p3는 생산자 스레드 전용 대기 공간(producerCond)에 대기중이다.
- 큐에 데이터가 비어있다.
- 생산자인 p0 스레드가 실행 예정이다.
- p0 스레드는 ReentrantLock의 락을 획득하고 큐에 데이터를 보관한다.
- 생산자 스레드가 큐에 데이터를 보관했기 때문에, 소비자 스레드가 가져갈 데이터가 추가되었다. 따라서 소비자 대기 공간(consumerCond)에 signal()을 통해 알려준다.
- 소비자 스레드 중에 하나가 깨어난다. c1이 깨어났다고 가정하자.
- c1은 락 획득까지 잠시 대기하다가 이후에 p0가 반납한 ReentrantLock의 락을 획득한다. 그리고 큐의 데이터를 획득한 다음에 완료된다.
소비자 먼저 실행
이 설명 역시 예제 결과와 살짝 다르다.
- c1, c2, c3는 소비자 스레드 전용 대기 공간(consumerCond)에 대기중이다.
- p1, p2, p3는 생산자 스레드 전용 대기 공간(producerCond)에 대기중이다.
- 큐에 데이터가 가득 차 있다.
- 소비자인 c0 스레드가 실행 예정이다.
- c0 스레드는 ReentrantLock의 락을 획득하고 큐에 있는 데이터를 획득한다.
- 큐에 데이터를 획득했기 때문에, 큐에 데이터를 생산할 수 있는 빈 공간이 생겼다. 생산자 대기 공간(producerCond)에 signal()을 통해 알려준다.
- 생산자 스레드 중에 하나가 깨어난다. p3가 깨어났다고 가정하자.
- p3는 이후에 c0가 반납한 ReentrantLock의 락을 획득하고, 큐의 데이터를 저장한 다음에 완료된다.
Object.notify() vs Condition.signal()
- Object.notify()
- 대기 중인 스레드 중 임의의 하나를 선택해 깨운다. 스레드가 깨어나는 순서는 보장되어 있지 않으며, JVM 구현에 따라 다르다. 보통은 먼저 들어온 스레드가 먼저 수행되지만 구현에 따라 다를 수 있다.
- synchronized 블록 내에서 모니터 락을 가지고 있는 스레드가 호출해야 한다.
- Condition.signal()
- 대기중인 스레드 중 하나를 깨우며, 일반적으로는 FIFO 순서로 깨운다. 이 부분은 자바 버전과 구현에 따라 달라질 수 있지만, 보통 Condition의 구현은 Queue 구조를 사용하기 때문에 FIFO 순서로 깨운다.
- ReentrantLock의 락을 가지고 있는 스레드가 호출해야 한다.
스레드의 대기
사실 지금까지 얘기하지 않고 있던 부분이 하나 있다.
이제는 synchronized, ReentrantLock의 대기 상태에 대해 이야기할 차례가 됐다.
먼저 synchronized의 대기 상태부터 얘기해보자면, 잘 생각해보면 2가지 단계의 대기 상태가 존재한다.
synchronized 대기
- 대기1: 락 획득 대기
- BLOCKED 상태로 모니터 락 획득 대기
- synchronized를 시작할 때 락이 없으면 대기
- 다른 스레드가 synchronized를 빠져나갈 때 모니터 락 획득 시도
- 대기2: wait() 대기
- WAITING 상태로 대기
- wait()을 호출했을 때 스레드 대기 집합에서 대기
- 다른 스레드가 notify()를 호출했을 때 빠져나감
- 소비자 스레드: c1, c2, c3
- 생산자 스레드: p1, p2, p3
- 소비자 스레드 c1, c2, c3가 먼저 동시에 실행된다고 가정하자.
- 소비자 스레드 c1이 가장 먼저 락을 획득한다.
- c2, c3는 락 획득을 대기하며 BLOCKED 상태가 된다.
c2, c3는 락 획득을 시도하지만, 모니터 락이 없기 때문에 락을 대기하며 BLOCKED 상태가 된다. c1은 나중에 락을 반납할 것이다. 그러면 c2, c3 중에 하나가 락을 획득해야 한다. 그런데 잘 생각해보면 락을 기다리는 c2, c3도 어딘가에서 관리가 되어야 한다. 그래야 락이 반환되었을 때 자바가 c2, c3중에 하나를 선택해서 락을 제공할 수 있다. 예를 들어서 List, Set, Queue와 같은 자료구조에 관리가 되어야 한다. 그림에서는 c2, c3가 단순히 BLOCKED 상태로 변경만 되었다. 그래서 관리되는 것처럼 보이지는 않는다.
사실은 BLOCKED 상태의 스레드도 자바 내부에서 따로 관리된다. 다음 그림을 보자.
- 이 그림은 이전 그림과 같은 상태를 좀 더 자세히 그린 그림이다.
- 그림을 보면 락 대기 집합이라는 곳이 있다. 이곳은 락을 기다리는 BLOCKED 상태의 스레드들을 관리한다.
- 락 대기 집합은 자바 내부에 구현되어 있기 때문에 모니터 락과 같이 개발자가 확인하기는 어렵다.
- 여기서는 BLOCKED 상태의 스레드 c2, c3가 관리된다.
- 언젠가 c1이 락을 반납하면 락 대기 집합에서 관리되는 스레드 중 하나가 락을 획득한다.
락 대기 집합이 지금에서야 나온 이유는,
단순하게 설명하기 위해 BLOCKED 상태에서 사용하는 락 대기 집합을 일부러 얘기하지 않았다. 지금쯤이면 이 내용을 말해도 이해하는데 어려움이 없을 것이다.
- c1이 큐에 데이터가 없으므로 wait()을 호출하고 스레드 대기 집합에 들어간다.
- c1은 락을 반납 후 스레드 대기 집합에 들어가고 WAITING 상태가 된다.
- 이후에 락이 반납됐으니 락을 기다리고있는 BLOCKED 상태의 스레드 중 하나가 락을 획득한다. 어차피 c2, c3 둘 다 소비자 스레드이기 때문에 락을 획득하고 임계 영역에 들어와도 아무것도 하지 못한채 wait()을 호출하고 둘 다 스레드 대기 집합으로 들아간 모습이다.
- p1이 락을 획득하고 데이터를 저장한 다음 notify()를 호출하여 스레드 대기 집합에 이 사실을 알린다.
- 스레드 대기 집합에 있는 c1이 스레드 대기 집합을 빠져나간다.
- 하지만 아직 끝난 것이 아니다. 락을 얻지 못한 상태이니까 BLOCKED 상태가 된다. 그리고 락을 기다리는 BLOCKED 상태의 스레드들은 락 대기 집합에서 대기한다.
그러니까 사실 2번의 대기 과정이 있고 대기 장소가 있는것이다. 그래서 이 그림이 된다.
- c1은 BLOCKED 상태에서 락을 얻을 때까지 락 대기 집합에서 대기한다.
- 드디어 p1이 락을 반납한다.
- 락이 반납되면 락 대기 집합에 있는 스레드 중 하나가 락을 획득한다. 여기서는 c1이 획득한다.
- c1은 드디어 락 대기 집합까지 탈출하고 임계 영역을 수행할 수 있다.
정리를 하자면,
자바의 모든 객체 인스턴스는 멀티 스레드와 임계 영역을 다루기 위해 내부에 3가지 기본 요소를 가진다.
- 모니터 락
- 락 대기 집합(모니터 락 대기 집합)
- 스레드 대기 집합
여기서 락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소라 생각하면 된다. 2차 대기소에 들어간 스레드는 2차, 1차 대기소를 모두 빠져나와야 임계 영역을 수행할 수 있다. 이 3가지 요소는 서로 맞물려 들어간다.
- synchronized를 사용한 임계 영역에 들어가려면 모니터 락이 필요하다.
- 모니터 락이 없으면 락 대기 집합에 들어가서 BLOCKED 상태로 락을 기다린다.
- 모니터 락을 반납하면 락 대기 집합에 있는 스레드 중 하나가 락을 획득하고 BLOCKED → RUNNABLE 상태가 된다.
- wait()을 호출해서 스레드 대기 집합에 들어가기 위해선 모니터 락이 필요하다.
- 스레드 대기 집합에 들어가면 모니터 락을 반납한다.
- 스레드가 notify()를 호출하면 스레드 대기 집합에 있는 스레드 중 하나가 스레드 대기 집합을 빠져나온다. 그리고 모니터 락 획득을 시도한다.
- 모니터 락을 획득하면 임계 영역을 수행한다.
- 모니터 락을 획득하지 못하면 락 대기 집합에 들어가서 BLOCKED 상태로 락을 기다린다.
여기까지가 synchronized를 사용한 스레드 대기의 전체 과정이다. 그럼 ReentrantLock은 크게 다를까? 똑같다. 살짝살짝만 다른 부분이 있고 이 메커니즘은 똑같다고 보면 된다.
ReentrantLock 대기
- 대기1: ReentrantLock 락 획득 대기
- ReentrantLock의 대기 큐에서 관리
- WAITING 상태로 락 획득 대기
- lock.lock()을 호출 했을 때 락이 없으면 대기
- 다른 스레드가 lock.unlock()을 호출 했을 때 락 획득을 시도, 락을 획득하면 대기 큐를 빠져나감
- 대기2: await() 대기
- condition.await()을 호출 했을 때, condition 객체의 스레드 대기 공간에서 관리
- WAITING 상태로 대기
- 다른 스레드가 condition.signal()을 호출 했을 때 condition 객체의 스레드 대기 공간에서 빠져나감
이 ReentrantLock도 synchronized와 마찬가지로 대기소가 2단계로 되어 있다. 2단계 대기소인 condition 객체의 스레드 대기 공간을 빠져나간다고 바로 실행되는 것이 아니다. 임계 영역 안에서는 항상 락이 있는 하나의 스레드만 실행될 수 있다. 여기서는 ReentrantLock의 락을 획득해야 RUNNABLE 상태가 되면서 그 다음 코드를 실행할 수 있다. 락을 획득하지 못하면 WAITING 상태로 락을 획득할 때 까지 ReentrantLock의 대기 큐에서 대기한다.
차이점이라고 하면, synchronized는 락 대기 집합에서 스레드들은 BLOCKED 상태라는 거고 ReentrantLock의 대기 큐에서 스레드들은 WAITING 상태라는 것이다. 그래서, 이런 사소한 차이가 있고 결국 대기소는 둘 다 2개가 있다라는 점이다. 물론 ReentrantLock을 사용하면 더 유연하고 많은 장점이 있다. 무한 대기를 하지 않아도 되는 점과 공정성을 해결할 수 있다는 것과 스레드 대기 집합을 여러개로 분리할 수 있다는 것이다.
중간 정리
여기까지 잘 익혔다면, 진짜 어지간한 멀티 스레드 환경과 흐름을 이해했다고 봐도 무방하다. 실무에서도 사용할 수 있는 정도의 수준이라고 말할 수 있다. 사실 이 예제는 이미 자바가 만들어 둔 java.util.concurrent.BlockingQueue를 사용했으면 되는 문제였다. 정확히 이 한정된 버퍼에 더 채울 수 있는 공간이 없으면 데이터 추가를 공간이 생길 때까지 차단해주고, 한정된 버퍼에 데이터가 없으면 데이터가 생길 때까지 데이터를 소비하는 것을 차단해주는 큐이다.
근데, 하나씩 풀어가면서 왜 이런 상황이 발생하고, 왜 생산자 스레드 입장에서 데이터가 비워질때까지(한정된 버퍼에 데이터를 넣을 공간이 생길때까지) 단순히 기다리기만 한다면 다른 스레드들이 BLOCKED 상태에서 빠져나오지 못하는지, 왜 synchronized, wait(), notify()를 사용해야 하고 이것들이 가진 한계가 무엇이길래 ReentrantLock, Condition을 사용하는지 이해해야 BlockingQueue라는 이미 잘 만들어진 큐를 사용할 자격이 생기는 것이다.
이제 위 내용을 다 이해했다. 그럼 이미 만들어놓은 기능인 BlockingQueue를 한번 사용해보자!
BlockingQueue
자바는 생산자 소비자 문제를 해결하기 위해 java.util.concurrent.BlockingQueue라는 특별한 멀티스레드 자료 구조를 제공한다.
이것은 이름 그대로 스레드를 차단(Blocking)할 수 있는 큐다.
- 데이터 추가 차단: 큐가 가득 차면 데이터 추가 작업을 시도하는 스레드는 공간이 생길때까지 차단된다.
- 데이터 획득 차단: 큐가 비어 있으면 획득 작업을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단된다.
BlockingQueue는 인터페이스이고, 다음과 같은 다양한 기능을 제공한다.
package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
...
}
주요 메서드만 정리했다.
- 데이터 추가 메서드: add(), offer(), put(), offer(타임아웃)
- 데이터 획득 메서드: take(), poll(타임아웃), remove()
- Queue를 상속 받는다. 큐를 상속 받았기 때문에 추가로 큐의 기능들도 사용할 수 있다.
보면 데이터 추가와 획득에서 메서드가 굉장히 많다는 것을 알 수 있다. 이게 왜 이렇게 여러개가 있는지도 이후에 설명한다.
BlockingQueue 인터페이스의 대표적인 구현체
- ArrayBlockingQueue: 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.
- LinkedBlockingQueue: 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도, 무한하게 사용할 수도 있다.
이제 BlockingQueue를 사용하도록 기존 코드를 변경해보자.
BoundedQueueV6_1
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BoundedQueueV6_1 implements BoundedQueue {
private final BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
this.queue = new ArrayBlockingQueue<String>(max);
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
- ArrayBlockingQueue를 사용한다.
- BlockingQueue.put(data): 데이터를 추가한다.
- BlockingQueue.take(): 데이터를 뽑아온다.
여기서 BlockingQueue.put(data) 메서드를 한번 어떻게 구현했는지 봐보자.
다음 코드가 java.util.concurrent.BlockingQueue를 구현한 ArrayBlockingQueue의 put() 메서드이다.
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
- 앞에서 만들어본 BoundedQueueV5와 굉장히 유사하게 생긴것을 알 수 있다. 다른건 lock.lock()이 아니라 lock.lockInterruptibly()를 사용했다 정도의 차이가 있다.
"어? signal()은 호출 안 하는데요!?" → enqueue(e)에 있다.
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
그러니까 결론은 자바에서 제공하는 멀티 스레드 용 자료구조가 이제 눈에 들어온다는 것이다.
이제 이 BoundedQueueV6_1를 사용해서 실행해보면 같은 결과를 얻을것이다.
BlockingQueue 기능 설명
아까 보니까 데이터를 추가하는 것도 많은 메서드가 있던것을 알 수 있었다. 왜 그럴까? 실무에서 멀티스레드를 사용할땐 응답성이 중요하다. 예를 들어, 대기 상태에 있어도 고객이 중지 요청을 하거나, 또는 너무 오래 대기한 경우 포기하고 빠져나갈 수 있는 방법이 필요하다.
생산자가 무언가 데이터를 생산하는데, 버퍼가 빠지지 않아서 너무 오래 대기해야 한다면 무한정 기다리는 것보다는 작업을 포기하고 사용자에게는 "죄송합니다. 현재 시스템에 문제가 있습니다. 나중에 다시 시도해주세요."라고 하는 것이 더 나은 선택일 것이다.
즉, 멀티스레드 세상에서는 정말 여러 상황과 결론이 만들어질 수 있다는 얘기고 그렇기에 이 BlockingQueue는 각 상황에 맞는 다양한 메서드를 제공하고 있다.
BlockingQueue의 다양한 기능
Operation | Throws Exception | Special Value | Blocks | Times Out |
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
- Throws Exception (대기 시 예외를 터트림)
- add(e): 지정된 요소를 큐에 추가하며, 큐가 가득 차면 IllegalStateException 예외를 던진다.
- remove(): 큐에서 요소를 제거하며 반환한다. 큐가 비어 있으면 NoSuchElementException 예외를 던진다.
- Special Value (대기 시 즉시 반환)
- offer(e): 지정된 요소를 큐에 추가하려고 시도하며, 큐가 가득 차면 false를 반환한다.
- poll(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 null을 반환한다.
- Blocks (대기)
- put(e): 지정된 요소를 큐에 추가할 때까지 대기한다. 큐가 가득 차면 공간이 생길 때까지 대기한다.
- take(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 요소가 준비될 때까지 대기한다.
- Times Out (시간만큼만 대기)
- offer(e, time, unit): 지정된 요소를 큐에 추가하려고 시도하며, 지정된 시간 동안 큐가 비워지기를 기다리다가 시간이 초과되면 false를 반환한다.
- poll(time, unit): 큐에서 요소를 제거하고 반환한다. 큐에 요소가 없다면 지정된 시간 동안 요소가 준비되기를 기다렸다가 시간이 초과되면 null을 반환한다.
참고로, BlockingQueue의 모든 대기, 시간 대기 메서드는 인터럽트를 제공한다. 대기하는 put(e), take()는 바로 위에서 예제로 직접 만들어보았다. 나머지도 하나씩 코드로 확인해보자.
BlockingQueue 즉시 반환
BoundedQueueV6_2
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static util.MyLogger.log;
public class BoundedQueueV6_2 implements BoundedQueue {
private final BlockingQueue<String> queue;
public BoundedQueueV6_2(int max) {
this.queue = new ArrayBlockingQueue<String>(max);
}
@Override
public void put(String data) {
boolean result = queue.offer(data);
log("저장 시도 결과 = " + result);
}
@Override
public String take() {
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
실행결과
2024-07-25 18:04:39.998 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV6_2 ==
2024-07-25 18:04:40.001 [ main] 생산자 시작
2024-07-25 18:04:40.010 [producer1] [생산 시도] data1 -> []
2024-07-25 18:04:40.010 [producer1] 저장 시도 결과 = true
2024-07-25 18:04:40.011 [producer1] [생산 완료] data1 -> [data1]
2024-07-25 18:04:40.111 [producer2] [생산 시도] data2 -> [data1]
2024-07-25 18:04:40.111 [producer2] 저장 시도 결과 = true
2024-07-25 18:04:40.111 [producer2] [생산 완료] data2 -> [data1, data2]
2024-07-25 18:04:40.215 [producer3] [생산 시도] data3 -> [data1, data2]
2024-07-25 18:04:40.215 [producer3] 저장 시도 결과 = false
2024-07-25 18:04:40.216 [producer3] [생산 완료] data3 -> [data1, data2]
2024-07-25 18:04:40.320 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-25 18:04:40.321 [ main] producer1: TERMINATED
2024-07-25 18:04:40.321 [ main] producer2: TERMINATED
2024-07-25 18:04:40.321 [ main] producer3: TERMINATED
2024-07-25 18:04:40.321 [ main] 소비자 시작
2024-07-25 18:04:40.322 [consumer1] [소비 시도] ? <- [data1, data2]
2024-07-25 18:04:40.322 [consumer1] [소비 완료] data1 <- [data2]
2024-07-25 18:04:40.427 [consumer2] [소비 시도] ? <- [data2]
2024-07-25 18:04:40.427 [consumer2] [소비 완료] data2 <- []
2024-07-25 18:04:40.530 [consumer3] [소비 시도] ? <- []
2024-07-25 18:04:40.530 [consumer3] [소비 완료] null <- []
2024-07-25 18:04:40.631 [ main] 현재 상태 출력, 큐 데이터: []
2024-07-25 18:04:40.631 [ main] producer1: TERMINATED
2024-07-25 18:04:40.632 [ main] producer2: TERMINATED
2024-07-25 18:04:40.632 [ main] producer3: TERMINATED
2024-07-25 18:04:40.632 [ main] consumer1: TERMINATED
2024-07-25 18:04:40.633 [ main] consumer2: TERMINATED
2024-07-25 18:04:40.633 [ main] consumer3: TERMINATED
2024-07-25 18:04:40.634 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV6_2 ==
실행 결과를 보면 생산 시 큐가 꽉찼을때 저장 시도 결과가 false가 찍히는 것을 알 수 있고, 데이터 소비 시 데이터가 없는 경우 null을 반환했음을 알 수 있다.
BlockingQueue 시간 대기
BoundedQueueV6_3
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static util.MyLogger.log;
public class BoundedQueueV6_3 implements BoundedQueue {
private final BlockingQueue<String> queue;
public BoundedQueueV6_3(int max) {
this.queue = new ArrayBlockingQueue<String>(max);
}
@Override
public void put(String data) {
try {
boolean result = queue.offer(data, 1, TimeUnit.NANOSECONDS);
log("저장 시도 결과 = " + result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.poll(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
실행 결과
2024-07-25 18:08:09.956 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV6_3 ==
2024-07-25 18:08:09.959 [ main] 생산자 시작
2024-07-25 18:08:09.968 [producer1] [생산 시도] data1 -> []
2024-07-25 18:08:09.969 [producer1] 저장 시도 결과 = true
2024-07-25 18:08:09.969 [producer1] [생산 완료] data1 -> [data1]
2024-07-25 18:08:10.069 [producer2] [생산 시도] data2 -> [data1]
2024-07-25 18:08:10.069 [producer2] 저장 시도 결과 = true
2024-07-25 18:08:10.069 [producer2] [생산 완료] data2 -> [data1, data2]
2024-07-25 18:08:10.172 [producer3] [생산 시도] data3 -> [data1, data2]
2024-07-25 18:08:10.173 [producer3] 저장 시도 결과 = false
2024-07-25 18:08:10.173 [producer3] [생산 완료] data3 -> [data1, data2]
2024-07-25 18:08:10.278 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-25 18:08:10.278 [ main] producer1: TERMINATED
2024-07-25 18:08:10.278 [ main] producer2: TERMINATED
2024-07-25 18:08:10.278 [ main] producer3: TERMINATED
2024-07-25 18:08:10.278 [ main] 소비자 시작
2024-07-25 18:08:10.279 [consumer1] [소비 시도] ? <- [data1, data2]
2024-07-25 18:08:10.279 [consumer1] [소비 완료] data1 <- [data2]
2024-07-25 18:08:10.381 [consumer2] [소비 시도] ? <- [data2]
2024-07-25 18:08:10.381 [consumer2] [소비 완료] data2 <- []
2024-07-25 18:08:10.486 [consumer3] [소비 시도] ? <- []
2024-07-25 18:08:10.591 [ main] 현재 상태 출력, 큐 데이터: []
2024-07-25 18:08:10.592 [ main] producer1: TERMINATED
2024-07-25 18:08:10.592 [ main] producer2: TERMINATED
2024-07-25 18:08:10.592 [ main] producer3: TERMINATED
2024-07-25 18:08:10.592 [ main] consumer1: TERMINATED
2024-07-25 18:08:10.593 [ main] consumer2: TERMINATED
2024-07-25 18:08:10.593 [ main] consumer3: TIMED_WAITING
2024-07-25 18:08:10.594 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV6_3 ==
2024-07-25 18:08:12.488 [consumer3] [소비 완료] null <- []
저장 시도 시 기다리는 시간을 1 나노초로 했기 때문에 그냥 뭐 안 기다리는 수준으로 기다리게 설정했다. 그러니까 실행 결과를 보면 저장 시도 결과는 false가 출력되고 저장을 마지막에 하지 못했으니 소비도 마지막 스레드는 할 수 없어 null이 반환됐다. 만약 기다리는 시간을 넉넉하게 잡고 awaitNanos(timeout)으로 잠시 대기하게 한 후 소비자 스레드가 들어와서 큐에 데이터를 소비해서 공간이 생긴후에도 지정한 시간을 지나지 않았다면 아마 데이터가 추가도, 데이터 소비도 정상적으로 될 것이다.
아, 참고로 여기서 awaitNanos(timeout)은 그냥 내가 이렇게 사용하자가 아니라 실제로 BlockingQueue가 offer(e, time, unit) 메서드에서 사용하고 있는 것이다. 아래 실제 구현 코드 참고!
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
BlockingQueue 예외 터트리기
BoundedQueueV6_4
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static util.MyLogger.log;
public class BoundedQueueV6_4 implements BoundedQueue {
private final BlockingQueue<String> queue;
public BoundedQueueV6_4(int max) {
this.queue = new ArrayBlockingQueue<String>(max);
}
@Override
public void put(String data) {
queue.add(data);
}
@Override
public String take() {
return queue.remove();
}
@Override
public String toString() {
return queue.toString();
}
}
실행결과
2024-07-25 18:14:03.964 [ main] == [생산자 먼저 실행] 시작, BoundedQueueV6_4 ==
2024-07-25 18:14:03.968 [ main] 생산자 시작
2024-07-25 18:14:03.978 [producer1] [생산 시도] data1 -> []
2024-07-25 18:14:03.978 [producer1] [생산 완료] data1 -> [data1]
2024-07-25 18:14:04.073 [producer2] [생산 시도] data2 -> [data1]
2024-07-25 18:14:04.073 [producer2] [생산 완료] data2 -> [data1, data2]
2024-07-25 18:14:04.176 [producer3] [생산 시도] data3 -> [data1, data2]
Exception in thread "producer3" java.lang.IllegalStateException: Queue full
at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:329)
at thread.bounded.BoundedQueueV6_4.put(BoundedQueueV6_4.java:19)
at thread.bounded.ProducerTask.run(ProducerTask.java:18)
at java.base/java.lang.Thread.run(Thread.java:1595)
2024-07-25 18:14:04.276 [ main] 현재 상태 출력, 큐 데이터: [data1, data2]
2024-07-25 18:14:04.277 [ main] producer1: TERMINATED
2024-07-25 18:14:04.277 [ main] producer2: TERMINATED
2024-07-25 18:14:04.277 [ main] producer3: TERMINATED
2024-07-25 18:14:04.277 [ main] 소비자 시작
2024-07-25 18:14:04.278 [consumer1] [소비 시도] ? <- [data1, data2]
2024-07-25 18:14:04.278 [consumer1] [소비 완료] data1 <- [data2]
2024-07-25 18:14:04.378 [consumer2] [소비 시도] ? <- [data2]
2024-07-25 18:14:04.378 [consumer2] [소비 완료] data2 <- []
2024-07-25 18:14:04.483 [consumer3] [소비 시도] ? <- []
Exception in thread "consumer3" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
at thread.bounded.BoundedQueueV6_4.take(BoundedQueueV6_4.java:24)
at thread.bounded.ConsumerTask.run(ConsumerTask.java:16)
at java.base/java.lang.Thread.run(Thread.java:1595)
2024-07-25 18:14:04.583 [ main] 현재 상태 출력, 큐 데이터: []
2024-07-25 18:14:04.583 [ main] producer1: TERMINATED
2024-07-25 18:14:04.584 [ main] producer2: TERMINATED
2024-07-25 18:14:04.584 [ main] producer3: TERMINATED
2024-07-25 18:14:04.584 [ main] consumer1: TERMINATED
2024-07-25 18:14:04.584 [ main] consumer2: TERMINATED
2024-07-25 18:14:04.584 [ main] consumer3: TERMINATED
2024-07-25 18:14:04.585 [ main] == [생산자 먼저 실행] 종료, BoundedQueueV6_4 ==
실행 결과를 보면 데이터를 추가할 때 공간이 없으면 IllegalStateException을, 데이터 소비할 때 소비할 데이터가 없으면 NoSuchElementException을 발생시키고 있음을 알 수 있다.
참고로, 지금 V6_1, V6_2, V6_3, V6_4 모두 BoundedQueue 인터페이스를 구현해서 만들고 있는데 이래야만 가능한게 아니라 기존에 작성한 코드들(producerFirst, consumerFirst, ...)이 전부 BoundedQueue를 의존하고 있기 때문에 이 인터페이스를 구현한 구현체를 만들어서 그 안에서 BlockingQueue를 사용하는 식으로 만든거고 그게 아니라면 그냥 바로 BlockingQueue를 사용해도 상관없다!
정리
이렇듯, 기존에 아주 아주 잘 만들어진 BlockingQueue를 사용하면 훨씬 더 다양한 상황을 더 유연하게 대처할 수 있음을 알게됐다.
무작정 기다릴수도, 정해진 시간만큼만 기다릴수도, 아예 바로 결과를 반환할수도, 예외를 터트릴수도 있다. 그리고 내부가 어떻게 구현됐는지도 이제 이해할 수 있는 레벨이 됐다!
'JAVA의 가장 기본이 되는 내용' 카테고리의 다른 글
멀티스레드 Part.11 동시성 컬렉션 (0) | 2024.07.27 |
---|---|
멀티스레드 Part.10 원자적 연산 (CAS) (0) | 2024.07.26 |
멀티스레드 Part.8 생산자 소비자 문제 (0) | 2024.07.24 |
멀티스레드 Part.7 고급 동기화 (0) | 2024.07.22 |
멀티스레드 Part.6 동기화 (0) | 2024.07.19 |