예외 공통처리는 어떤 프레임워크를 사용하건 심지어 서블릿만 사용하더라도 잘 알아야 하는 부분이다.
개인적으로 중요한 이유 중 가장 큰 이유는, 비즈니스 로직이 깔끔해지고 관심사가 분리된다는 점인것 같다.
JIRA DC 플러그인 개발은 JAX-RS를 사용한다.
이 JAX-RS는 ExceptionMapper를 사용해야 한다.
긴 말 필요없이 바로 코드를 보면 굉장히 간단하다.
IOExceptionMapper
package kr.osci.aijql.exception;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import java.io.IOException;
@Slf4j
@Provider
@Component
public class IOExceptionMapper implements ExceptionMapper<IOException> {
@Override
public Response toResponse(IOException e) {
log.error("[toResponse] IOException, root cause = ", e);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
}
}
우선, @Provider 애노테이션을 사용해야 한다. 이 애노테이션은 JAX-RS 런타임에 특정 클래스를 자동으로 등록해서 공통으로 사용할 수 있게 해준다. 대표적으로 이렇게 ExceptionMapper를 구현한 구현체를 등록해서 이 지정된 예외가 발생 시 이 클래스가 호출되도록 말이다.
그리고 보면, @Component 애노테이션도 달려있다. 이전 포스팅에서 설명했듯 스프링 스캐너를 사용한다. 그래서 이 클래스 자체가 빈으로 자동 주입이 되어야 한다. 그래야 이 공통 클래스를 사용할수가 있으니까.
그리고 어떤 예외를 처리할지를 제네릭에 넣어준다. 위 코드는 IOException을 처리하는 클래스이다.
그래서 이 IOException이 어디선가 발생하고 그걸 잡아주지 않는다면 외부로 던져질때 이 클래스를 통한다.
그래서, 위 코드는 개발자가 나중에 알아볼 수 있는 에러로그가 찍히고 500 에러 상태 코드를 가진 반환을 한다. 응답 바디는 에러 메시지가 들어가게 된다. 이렇게 공통 처리할 예외 클래스를 ExceptionMapper로 등록하여 공통 처리할 수 있다.
이 코드에 대해 조금 설명을 하자면, 어떤 상품 등록을 하는 폼이 있고 그 폼에서 상품 등록 버튼을 클릭하면 호출되는 @PostMapping이다. 그럼 상품 등록을 폼으로 한다고 하면 폼으로부터 전달되는 데이터가 있을 것인데 그 데이터가 저 @ModelAttribute로 담기게 된다.
그리고 타입은 Item 이라는 클래스인데 다음과 같이 생겼다.
Item
package hello.itemservice.domain.item;
import lombok.Data;
@Data
public class Item {
private Long id;
private String itemName;
private Integer price;
private Integer quantity;
public Item() {
}
public Item(String itemName, Integer price, Integer quantity) {
this.itemName = itemName;
this.price = price;
this.quantity = quantity;
}
}
그래서 폼으로부터 들어오는 데이터가 이 클래스의 필드로 다 만족을 하면 스프링 MVC는 자동으로 폼으로 들어오는 데이터를 저렇게 바인딩 할 수 있다. 그래서 정확히 저 코드는 사실 이렇게 생긴것이다.
@PostMapping("/add")
public String save(@RequestParam String itemName,
@RequestParam Integer price,
@RequestParam Integer quantity,
Model model) {
// @ModelAttribute가 대신 해주는 작업
Item item = new Item();
item.setItemName(itemName);
item.setPrice(price);
item.setQuantity(quantity);
model.addAttribute("item", item);
// @ModelAttribute가 대신 해주는 작업 끝
itemRepository.save(item);
return "basic/item";
}
"어? Item 객체를 만들어서 데이터를 넣어주는 것까진 알겠는데 model.addAttribute("item", item); 까지 해준다고?!" 그렇다. 저 @ModelAttribute는 Model 객체에 데이터를 담아주는 것까지 해준다. 그리고 그때 key값은 클래스의 앞글자만 소문자로 바꾼 형태가 된다. (Item → item)
Redirect
이번엔 리다이렉트를 하는 방법이다. 간단하다.
@PostMapping("/{itemId}/edit")
public String edit(@ModelAttribute Item item, @PathVariable Long itemId) {
itemRepository.update(itemId, item);
return "redirect:/basic/items/{itemId}";
}
저렇게 "redirect:/redirect할 경로" 를 입력하면 된다. 그리고 리다이렉트 할 경로에 PathVariable이 있는 경우 코드처럼 문자열로 {pathVariable}를 입력하면 된다. 그럼 스프링이 알아서 이 메서드의 @PathVariable에 있는 값을 매핑시켜준다.
당연히 이렇게도 가능하다. 오히려 @PathVariable을 받지 않는 메서드에서 itemId가 있어야 하는 경로로 리다이렉트 할 땐 아래 코드처럼만 해야한다.
이렇게 파라미터로 RedirectAttributes를 받으면 이 녀석을 사용할 수가 있는데, addAttribute()로 원하는 key/value를 넣으면 그 key를 가지고 {PathVariable}을 사용할 수가 있다.
그리고 "status"라는 키도 있는데 이렇게 addAttribute()로 key/value를 저장하고 PathVariable로 사용하지 않는건 쿼리 파라미터로 들어간다. 그리고 이 키는 저장이 잘 됐다면 리다이렉트된 화면에서 뭔가 잘 저장됐다는 표시를 보여주고 싶어서 플래그를 사용했다고 생각하면 된다. 그리고 그 플래그를 Thymeleaf랑 같이 사용할 때 이렇게 param이라는 키로 받을수가 있다.
이런 방식이 훨씬 더 깔끔하고 인코딩도 다 해주기 때문에 더 좋은 접근방법이다. 이렇게 사용하자.
PRG - Post/Redirect/Get
이건 실무에서도 자주 사용되는 방식인데, 폼을 통해 POST 요청을 하고 보여지는 화면에서 사용자가 새로 고침을 누르면 POST 요청이 계속 들어간다. 그런 경우에 POST 요청이 계속 들어오면 만약 그게 상품 저장 기능이었다면 새로 고침한만큼 상품 저장이 되는 문제가 발생한다. 그 것을 방지하기 위해 폼을 통해 POST 요청을 처리하는 컨트롤러에서는 그 요청의 반환으로 Redirect를 해서 GET으로 최종 목적지를 변경해줘야 한다.
그래서 위 코드도 POST로 요청이 들어온 상품 저장 기능에 반환으로 리다이렉트를 통해 저장된 상품의 상세 목록으로 페이지를 이동시킨다. 그래야 사용자는 저장한 후 보여지는 화면에서 새로고침을 눌러도 POST 요청이 계속 발생하지 않는다. 그러니까 새로고침은 가장 마지막에 한 행위를 다시 하는것이다. 그래서 새로고침을 누르더라도 POST 요청이 다시 일어나지 않도록 리다이렉트로 마지막에 요청한 행위는 그저 상품 상세 화면을 보고 있는 GET 요청으로 바꿔줘야 한다.
이때 서버 애플리케이션이 고객의 주문을 처리하고 있는 도중에 갑자기 재시작 된다면, 해당 고객의 주문이 제대로 진행되지 못할 것이다. 가장 이상적인 방향은 새로운 주문 요청은 막고, 이미 진행중인 주문은 모두 완료한 다음에 서버를 재시작 하는 것이 가장 좋을 것이다. 이처럼 서비스를 안정적으로 종료하는 것도 매우 중요하다. 이렇게 문제 없이 우아하게 종료하는 방식을 graceful shutdown이라 한다.
이런 관점에서 ExecutorService의 종료에 대해서 알아보자.
ExecutorService의 종료 메서드 관련
void shutdown()
새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료한다.
논 블로킹 메서드(이 메서드를 호출한 스레드는 대기하지 않고 즉시 다음 코드를 호출한다)
List<Runnable> shutdownNow()
실행 중인 작업을 중단하고, 대기 중인 작업(블로킹 큐에 있는 대기 작업들을 의미)을 반환하며 즉시 종료한다.
실행 중인 작업을 중단하기 위해 인터럽트를 발생시킨다.
논 블로킹 메서드(이 메서드를 호출한 스레드는 대기하지 않고 즉시 다음 코드를 호출한다)
boolean isShutdown()
ExecutorService가 종료되었는지 확인한다.
boolean isTerminated()
shutdown(), shutdownNow() 호출 후, 모든 작업이 완료되었는지 확인한다.
블로킹 메서드(이 메서드를 호출한 스레드는 이 메서드가 종료될 때까지 대기 상태가 된다.)
void close()
자바 19부터 지원하는 서비스 종료 메서드이다. 이 메서드는 shutdown()과 같다고 생각하면 된다. 더 정확히는 shutdown()을 호출하고, 하루를 기다려도 작업이 완료되지 않으면 shutdownNow()를 호출한다.
호출한 스레드에 인터럽트가 발생해도 shutdownNow()를 호출한다.
ExecutorService에 아무런 작업이 없고, 스레드만 2개 대기하고 있다.
shutdown()을 호출한다.
ExecutorService는 새로운 요청을 거절한다.
거절 시 기본적으로 java.util.concurrent.RejectedExecutionException 예외가 발생한다.
스레드 풀의 자원을 정리한다.
shutdown()을 호출한다.
ExecutorService는 새로운 요청을 거절한다.
스레드 풀의 스레드는 처리중인 작업을 완료한다.
스레드 풀의 스레드는 큐에 남아있는 작업도 모두 꺼내서 완료한다.
모든 작업을 완료하면 자원을 정리한다.
결과적으로 처리중이던 taskA, taskB는 물론이고, 큐에 대기중이던 taskC, taskD도 완료된다.
shutdownNow()를 호출한다.
ExecutorService는 새로운 요청을 거절한다.
큐를 비우면서, 큐에 있는 작업을 모두 꺼내서 컬렉션으로 반환한다.
List<Runnable> runnables = es.shutdownNow()
작업 중인 스레드에 인터럽트가 발생한다.
작업 중인 taskA, taskB는 인터럽트가 걸린다.
큐에 대기중인 taskC, taskD는 수행되지 않는다.
작업을 완료하면 자원을 정리한다.
ExecutorService 우아한 종료 - 구현
shutdown()을 호출해서 이미 들어온 모든 작업을 다 처리하고 서비스를 우아하게 종료(graceful shutdown)하는 것이 가장 이상적이지만, 갑자기 요청이 너무 많이 들어와서 큐에 대기중인 작업이 너무 많아 작업 완료가 어렵거나, 작업이 너무 오래 걸리거나, 또는 버그가 발생해서 특정 작업이 끝나지 않을 수 있다. 이렇게 되면 서비스가 너무 늦게 종료되거나 종료되지 않는 문제가 발생할 수 있다.
이럴 때는 보통 우아하게 종료하는 시간을 정한다. 예를 들어서 60초까지는 작업을 다 처리할 수 있게 기다리는 것이다. 그리고 60초가 지나면, 무언가 문제가 있다고 가정하고 shutdownNow()을 호출해서 작업들을 강제로 종료한다.
close()
close()의 경우 이렇게 구현되어 있다. shutdown()을 호출하고, 하루를 기다려도 작업이 완료되지 않으면 shutdownNow()를 호출한다. 그런데 대부분 하루를 기다릴 수는 없을 것이다.
방금 설명한대로 우선은 shutdown()을 통해 우아한 종료를 시도하고, 10초간 종료되지 않으면 shutdownNow()를 통해 강제 종료하는 방식을 구현해보자. (예제에서 60초는 너무 길다..) 참고로 구현할 shutdownAndAwaitTermination()은 ExecutorService 공식 API 문서에서 제안하는 방식이다.
ExecutorShutdownMain
package thread.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static util.MyLogger.log;
public class ExecutorShutdownMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("longTask", 100_000));
ExecutorUtils.printState(es);
log("== shutdown 시작==");
shutdownAndAwaitTermination(es);
log("== shutdown 완료==");
ExecutorUtils.printState(es);
}
private static void shutdownAndAwaitTermination(ExecutorService es) {
es.shutdown();
try {
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
log("서비스 정상 종료 실패 -> 강제 종료 시도");
es.shutdownNow();
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
log("서비스가 종료되지 않았습니다.");
}
}
} catch (InterruptedException e) {
es.shutdownNow();
}
}
}
실행 결과
2024-07-30 13:01:53.014 [pool-1-thread-2] taskB 시작
2024-07-30 13:01:53.014 [ main] [pool= 2, active=2, queuedTasks=2, completedTask=0]
2024-07-30 13:01:53.014 [pool-1-thread-1] taskA 시작
2024-07-30 13:01:53.018 [ main] == shutdown 시작==
2024-07-30 13:01:54.020 [pool-1-thread-2] taskB 완료
2024-07-30 13:01:54.020 [pool-1-thread-1] taskA 완료
2024-07-30 13:01:54.022 [pool-1-thread-2] taskC 시작
2024-07-30 13:01:54.022 [pool-1-thread-1] longTask 시작
2024-07-30 13:01:55.023 [pool-1-thread-2] taskC 완료
2024-07-30 13:02:03.024 [ main] 서비스 정상 종료 실패 -> 강제 종료 시도
2024-07-30 13:02:03.026 [pool-1-thread-1] 인터럽트 발생, null
2024-07-30 13:02:03.029 [ main] == shutdown 완료==
2024-07-30 13:02:03.030 [ main] [pool= 0, active=0, queuedTasks=0, completedTask=4]
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.InterruptedException
at util.ThreadUtils.sleep(ThreadUtils.java:12)
at thread.executor.RunnableTask.run(RunnableTask.java:23)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1595)
Caused by: java.lang.InterruptedException
at java.base/java.lang.Thread.sleepImpl(Native Method)
at java.base/java.lang.Thread.sleep(Thread.java:516)
at util.ThreadUtils.sleep(ThreadUtils.java:9)
... 4 more
작업 처리에 필요한 시간
taskA, taskB, taskC: 1초
longTask: 100초
서비스 종료
es.shutdown();
새로운 작업을 받지 않는다. 처리 중이거나, 큐에 이미 대기중인 작업은 처리한다. 이후에 풀의 스레드를 종료한다.
shutdown()은 블로킹 메서드가 아니다. ExecutorService가 종료될 때까지 main 스레드가 대기하지 않는다. main 스레드는 바로 다음 코드를 호출한다.
if (!es.awaitTermination(10, TimeUnit.SECONDS) { ... }
블로킹 메서드이다.
main 스레드는 대기하며 서비스 종료를 10초간 기다린다.
만약 10초안에 모든 작업이 끝나면 true를 반환한다.
여기서 taskA, taskB, taskC의 수행이 완료된다. 그런데 longTask는 10초가 지나도 완료되지 않았다. 따라서 false를 반환한다.
서비스 정상 종료 실패 → 강제 종료 시도
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
log("서비스 정상 종료 실패 -> 강제 종료 시도");
es.shutdownNow();
if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
log("서비스가 종료되지 않았습니다.");
}
}
정상 종료가 10초 이상 너무 오래 걸렸다.
shutdownNow()를 통해 강제 종료에 들어간다. shutdown()과 마찬가지로 블로킹 메서드가 아니다.
강제 종료를 하면 작업 중인 스레드에 인터럽트가 발생한다. 다음 로그를 통해 인터럽트를 확인할 수 있다.
인터럽트가 발생하면서 스레드도 작업을 종료하고, shutdownNow()를 통한 강제 shutdown도 완료된다.
2024-07-30 13:02:03.024 [ main] 서비스 정상 종료 실패 -> 강제 종료 시도
2024-07-30 13:02:03.026 [pool-1-thread-1] 인터럽트 발생, null
2024-07-30 13:02:03.029 [ main] == shutdown 완료==
서비스 종료 실패
그런데 마지막에 강제 종료인 es.shutdownNow()를 호출한 다음에 왜 10초간 또 기다릴까?
shutdownNow()가 작업 중인 스레드에 인터럽트를 호출하는 것은 맞다. 인터럽트를 호출하더라도 여러가지 이유로 작업에 시간이 걸릴 수 있다. 인터럽트 이후에 자원을 정리하는 어떤 간단한 작업을 수행할 수도 있다. 이런 시간을 기다려주는 것이다. 극단적으로 최악의 경우 스레드가 다음과 같이 인터럽트를 받을 수 없는 코드를 수행중일 수 있다. 이 경우 인터럽트 예외가 발생하지 않고, 스레드도 계속 수행된다.
인터럽트를 받을 수 없는 코드
while(true) {}
이런 스레드는 자바를 강제 종료해야 제거할 수 있다.
이런 경우를 대비해서 강제 종료 후 10초간 대기해도 작업이 완료되지 않으면 "서비스가 종료되지 않았습니다"라고 개발자가 인지할 수 있는 로그를 남겨두어야 한다. 그래야 개발자가 나중에 문제를 찾아서 코드를 수정도 할 수 있고 이 종료되지 않는 프로그램을 자바를 강제 종료 시켜서라도 종료할 수 있다.
이 부분은 왜 있을까? InterruptedException을 왜 catch로 잡았을까? 잡은 후 왜 또 shutdownNow()를 호출할까?
그 이유는 아래 코드를 호출할 때, 다른 스레드에서 이 awaitTermination()을 호출한 스레드에 인터럽트를 걸 수도 있기 때문이다.
그 경우에도 무사히 종료할 수 있도록 catch로 잡아서 shutdownNow()를 호출한다.
es.awaitTermination(10, TimeUnit.SECONDS)
정리
서비스를 종료할 때 생각보다 고려해야 할 점이 많다는 것을 이해했을 것이다. 기본적으로 우아한 종료를 선택하고, 우아한 종료가 되지 않으면 무한정 기다릴 수는 없으니, 그 다음으로 강제 종료를 하는 방식으로 접근하는 것이 좋다.
Executor 스레드 풀 관리 - 코드
이번 시간에는 Executor 프레임워크가 어떤식으로 스레드를 관리하는지 깊이있게 알아보자. 이 부분을 알아두면 실무에서 대량의 요청을 별도의 스레드에서 어떤식으로 처리해야 하는지에 대한 기본기를 쌓을 수 있을 것이다.
ExecutorService의 기본 구현체인 ThreadPollExecutor의 생성자는 다음 속성을 지원한다.
corePoolSize: 스레드 풀에서 관리되는 기본 스레드의 수
maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
keepAliveTime, TimeUnit unit: 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간, 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
BlockingQueue workQueue: 작업을 보관할 블로킹 큐
corePoolSize와 maximumPoolSize의 차이를 알아보기 위해 간단한 예제를 만들어보자.
먼저 예제를 좀 더 쉽게 확인하기 위해 ExecutorUtils에 메서드를 하나 추가하자.
ExecutorUtils
package thread.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static util.MyLogger.log;
public abstract class ExecutorUtils {
public static void printState(ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize();
int active = poolExecutor.getActiveCount();
int queuedTasks = poolExecutor.getQueue().size();
long completedTask = poolExecutor.getCompletedTaskCount();
log("[pool= " + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
} else {
log(executorService);
}
}
public static void printState(ExecutorService executorService, String taskName) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize();
int active = poolExecutor.getActiveCount();
int queuedTasks = poolExecutor.getQueue().size();
long completedTask = poolExecutor.getCompletedTaskCount();
log(taskName + " -> [pool= " + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
} else {
log(executorService);
}
}
}
printState() 메서드를 하나 오버로딩했다. 단순히 taskName을 출력하는 부분이 추가되었다.
중복된 부분을 제거할 수 있지만, 기본 코드를 유지하기 위해 그대로 복사해서 약간만 수정했다.
추가로 이전에 만든 RunnableTask를 사용한다. 다음 코드는 앞서 만든 코드이니 참고만 하자. RuunableTask는 기본 1초 정도 작업을 수행한다고 가정한다.
RunnableTask
package thread.executor;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000;
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작");
sleep(sleepMs);
log(name + " 완료");
}
}
이제 이것들을 활용해서 한번 maximumPoolSize의 비밀을 파헤쳐보자.
PoolSizeMainV1
package thread.executor.poolsize;
import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;
import java.util.concurrent.*;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class PoolSizeMainV1 {
public static void main(String[] args) {
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(
2,
4,
3000,
TimeUnit.MILLISECONDS,
workQueue);
ExecutorUtils.printState(es);
es.execute(new RunnableTask("task1"));
ExecutorUtils.printState(es, "task1");
es.execute(new RunnableTask("task2"));
ExecutorUtils.printState(es, "task2");
es.execute(new RunnableTask("task3"));
ExecutorUtils.printState(es, "task3");
es.execute(new RunnableTask("task4"));
ExecutorUtils.printState(es, "task4");
es.execute(new RunnableTask("task5"));
ExecutorUtils.printState(es, "task5");
es.execute(new RunnableTask("task6"));
ExecutorUtils.printState(es, "task6");
try {
es.execute(new RunnableTask("task7"));
} catch (RejectedExecutionException e) {
log("task7 실행 거절 예외 발생: " + e);
}
sleep(3000);
log("== 작업 수행 완료 ==");
ExecutorUtils.printState(es);
sleep(3000);
log("== maximumPoolSize 대기 시간 초과 ==");
ExecutorUtils.printState(es);
es.close();
log(" == shutdown 완료 ==");
ExecutorUtils.printState(es);
}
}
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(
2,
4,
3000,
TimeUnit.MILLISECONDS,
workQueue);
작업을 보관할 블로킹 큐의 구현체로 ArrayBlockingQueue(2)를 사용했다. 사이즈를 2로 설정했으므로 최대 2개까지 작업을 큐에 보관할 수 있다.
corePoolSize = 2, maximumPoolSize = 4 를 사용해서 기본 스레드 2개, 최대 스레드는 4개로 설정했다.
스레드 풀에 기본 2개의 스레드를 운영한다. 요청이 너무 많거나 급한 경우 스레드 풀은 최대 4개까지 스레드를 증가시켜서 사용할 수 있다. 이렇게 기본 스레드 수를 초과해서 만들어진 스레드를 초과 스레드라고 하겠다.
3000, TimeUnit.MILLISECONDS
초과 스레드가 생존할 수 있는 대기 시간을 뜻한다. 이 시간 동안 초과 스레드가 처리할 작업이 없다면 초과 스레드는 제거한다.
여기서는 3000 밀리초(3초)를 설정했으므로, 초과 스레드가 3초간 작업을 하지 않고 대기한다면 초과 스레드는 스레드 풀에서 제거된다.
근데, 이 maximumPoolSize가 그냥 단순하게 기본 스레드가 다 없으면 바로 이 최대 스레드 수까지 스레드 수를 늘려서 작업을 처리하는 게 아니다.
우선 아래 코드를 실행해보자.
package thread.executor.poolsize;
import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;
import java.util.concurrent.*;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class PoolSizeMainV1 {
public static void main(String[] args) {
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(
2,
4,
3000,
TimeUnit.MILLISECONDS,
workQueue);
ExecutorUtils.printState(es);
es.execute(new RunnableTask("task1"));
ExecutorUtils.printState(es, "task1");
es.execute(new RunnableTask("task2"));
ExecutorUtils.printState(es, "task2");
es.close();
log(" == shutdown 완료 ==");
ExecutorUtils.printState(es);
}
}
결과를 보니, 이제 작업중인 스레드가 2개고 큐에 들어간 작업들도 꽉 찬 상태인 2개이다. 이렇듯 maximumPoolSize는 기본 스레드 수보다 작업이 더 많이 들어온다고 바로 스레드를 생성하는 게 아니다. 큐에 대기할 수 있는 공간이 있을때까지 더 들어온 작업들은 큐에 대기상태로 보관된다.
그럼? 큐에 대기 상태로 보관될 공간까지도 부족하면? 이때 바로 최대 스레드 수까지 스레드가 생성된다.
이렇듯 RejectedExecutionExcpetion 예외가 터진다. 그래서 적절하게 corePoolSize, maximumPoolSize, 블로킹 큐 사이즈를 설정해야 한다.
그래서 우선 이 예외를 try - catch로 잡아보자.
package thread.executor.poolsize;
import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;
import java.util.concurrent.*;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class PoolSizeMainV1 {
public static void main(String[] args) {
ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(
2,
4,
3000,
TimeUnit.MILLISECONDS,
workQueue);
ExecutorUtils.printState(es);
es.execute(new RunnableTask("task1"));
ExecutorUtils.printState(es, "task1");
es.execute(new RunnableTask("task2"));
ExecutorUtils.printState(es, "task2");
es.execute(new RunnableTask("task3"));
ExecutorUtils.printState(es, "task3");
es.execute(new RunnableTask("task4"));
ExecutorUtils.printState(es, "task4");
es.execute(new RunnableTask("task5"));
ExecutorUtils.printState(es, "task5");
es.execute(new RunnableTask("task6"));
ExecutorUtils.printState(es, "task6");
try {
es.execute(new RunnableTask("task7"));
} catch (RejectedExecutionException e) {
log("task7 실행 거절 예외 발생: " + e);
}
sleep(3000);
log("== 작업 수행 완료 ==");
ExecutorUtils.printState(es);
sleep(3000);
log("== maximumPoolSize 대기 시간 초과 ==");
ExecutorUtils.printState(es);
es.close();
log(" == shutdown 완료 ==");
ExecutorUtils.printState(es);
}
}
그런 다음 3초 정도 대기한 후 현재 상태를 찍어보자. 이 시기에 딱 작업이 모든 작업이 끝날 것이다. 그리고 나서 또 3초를 대기해보자. 그러면 keepAliveTime을 3초로 설정했기 때문에 기본 스레드 수보다 더 많이 생성된 스레드들이 정해진 시간 이상으로 놀고 있기 때문에 스레드를 지울것이다. 그리고 바로 아래가 그 결과다.
2024-07-30 15:36:01.310 [ main] task7 실행 거절 예외 발생: java.util.concurrent.RejectedExecutionException: Task thread.executor.RunnableTask@c415607b rejected from java.util.concurrent.ThreadPoolExecutor@da251ccc[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]
task7 작업을 요청한다.
큐가 가득찼다.
스레드 풀의 스레드도 max 사이즈만큼 가득찼다.
RejectedExecutionException이 발생한다.
이 경우 큐에 넣을 수도 없고, 작업을 수행할 스레드도 만들 수 없다. 따라서 작업을 거절한다.
작업들이 수행된다.
스레드1이 task1을 스레드3이 task5의 작업을 완료하고 스레드 풀에 대기 상태로 돌아간다.
스레드 풀의 스레드는 큐의 데이터를 획득하기 위해 대기한다.
스레드1: task3을 획득한다.
스레드3: task4를 획득한다.
계속 작업을 수행한다.
2024-07-30 15:36:04.316 [ main] == 작업 수행 완료 ==
2024-07-30 15:36:04.317 [ main] [pool= 4, active=0, queuedTasks=0, completedTask=6]
모든 작업이 완료된다.
2024-07-30 15:36:07.323 [ main] == maximumPoolSize 대기 시간 초과 ==
2024-07-30 15:36:07.325 [ main] [pool= 2, active=0, queuedTasks=0, completedTask=6]
스레드3, 스레드4와 같은 초과 스레드들은 지정된 시간까지 작업을 하지 않고 대기하면 제거된다. 긴급한 작업들이 끝난 것으로 이해하면 된다.
여기서는 지정한 3초간 스레드3, 스레드4가 작업을 진행하지 않았기 때문에 스레드 풀에서 제거된다.
참고로 초과 스레드가 작업을 처리할 때마다 시간은 계속 초기화가 된다.
작업 요청이 계속 들어온다면 긴급한 상황이 끝난 것이 아니다. 따라서 긴급한 상황이 끝날 때 까지는 초과 스레드를 살려두는 것이 많은 스레드를 사용해서 작업을 더 빨리 처리할 수 있다.
초과 스레드가 제거된 모습이다.
이후에 shutdown()이 진행되면 풀의 스레드도 모두 제거된다.
정리를 하자면,
작업을 요청하면 core 사이즈 만큼 스레드를 만든다.
core 사이즈를 초과하면 큐에 작업을 넣는다.
큐를 초과하면 max 사이즈 만큼 스레드를 만든다. 임시로 사용되는 초과 스레드가 생성된다.
큐가 가득차서 큐에 넣을 수도 없다. 초과 스레드가 바로 수행해야 한다.
max 사이즈를 초과하면 요청을 거절한다. 예외가 발생한다.
Executor에 큐도 가득차고, 풀 최대 생성 가능한 스레드 수도 가득 찼다. 작업을 받을 수 없다.
스레드 미리 생성하기 (팁)
응답 시간이 아주 중요한 서버라면, 서버가 고객의 처음 요청을 받기 전에 스레드를 스레드 풀에 미리 생성해두고 싶을 수 있다. 스레드를 미리 생성해두면, 처음 요청에서 사용되는 스레드의 생성 시간을 줄일 수 있다. ThreadPoolExecutor.prestartAllCoreThreads()를 사용하면 기본 스레드를 미리 생성할 수 있다. 참고로 ExecutorService는 이 메서드를 제공하지 않는다.
PreStartPoolMain
package thread.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class PreStartPoolMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1000);
ExecutorUtils.printState(es);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) es;
threadPoolExecutor.prestartAllCoreThreads();
ExecutorUtils.printState(es);
}
}
모든 작업이 대기하지 않고 작업의 수 만큼 스레드가 생기면서 바로 실행되는 것을 확인할 수 있다.
"maximumPoolSize 대기 시간 초과" 로그를 통해 초과 스레드가 대기 시간이 지나서 모두 사라진 것을 확인할 수 있다.
이 전략은 다음과 같은 특징이 있다.
특징
캐시 스레드 풀 전략은 매우 빠르고, 유연한 전략이다.
이 전략은 기본 스레드도 없고, 대기 큐에 작업도 쌓이지 않는다. 대신에 작업 요청이 오면 초과 스레드로 작업을 바로바로 처리한다. 따라서 빠른 처리가 가능하다. 초과 스레드의 수도 제한이 없기 때문에 CPU, 메모리 자원만 허용한다면 시스템의 자원을 최대로 사용할 수 있다.
추가로 초과 스레드는 60초간 생존한다. 그래서 작업 수에 맞추어 적절한 수의 스레드가 재사용된다. 이런 특징 때문에 요청이 갑자기 증가하면 스레드도 갑자기 증가하고, 요청이 줄어들면 스레드도 점점 줄어든다. 이 전략은 작업의 요청 수에 따라서 스레드도 증가하고 감소하므로, 매우 유연한 전략이다.
그런데 어떻게 기본 스레드 없이 초과 스레드만 만들 수 있을까? Executor 스레드 풀 기본 관리 정책을 다시 확인해보자.
Executor 스레드 풀 관리
작업을 요청하면 core 사이즈 만큼 스레드를 만든다.
core 사이즈가 없다. 바로 core 사이즈를 초과한다.
core 사이즈를 초과하면 큐에 작업을 넣는다.
큐에 작업을 넣을 수 없다. (SynchronousQueue는 큐의 저장 공간이 0인 특별한 큐이다)
큐를 초과하면 max 사이즈 만큼 스레드를 만든다. 임시로 사용되는 초과 스레드가 생성된다.
초과 스레드가 생성된다. 물론 풀에 대기하는 초과 스레드가 있으면 재사용된다.
max 사이즈를 초과하면 요청을 거절한다. 예외가 발생한다.
참고로 max 사이즈가 무제한이다. 따라서 초과 스레드를 무제한으로 만들 수 있다.
결과적으로 이 전략의 모든 작업은 초과 스레드가 처리한다.
주의
이 방식은 작업 수에 맞추어 스레드 수가 변하기 때문에, 작업의 처리 속도가 빠르고, CPU, 메모리를 매우 유연하게 사용할 수 있다는 장점이 있다. 하지만 상황에 따라 장점이 가장 큰 단점이 되기도 한다.
상황1 - 점진적인 사용자 확대
개발한 서비스가 잘 되어서 사용자가 점점 늘어난다.
캐시 스레드 전략을 사용하면 이런 경우 크게 문제가 되지 않는다.
캐시 스레드 전략은 이런 경우에는 문제를 빠르게 찾을 수 있다. 사용자가 점점 증가하면서 스레드 사용량도 함께 늘어난다. 따라서 CPU 메모리의 사용량도 자연스럽게 증가한다.
물론 CPU, 메모리 자원은 한계가 있기 때문에 적절한 시점에 시스템을 증설해야 한다. 그렇지 않으면 CPU, 메모리 같은 시스템 자원을 너무 많이 사용하면서 시스템이 다운될 수 있다.
상황2 - 갑작스런 요청 증가
마케팅 팀의 이벤트가 대성공 하면서 갑자기 사용자가 폭증했다.
고객은 응답을 받지 못한다고 항의한다.
상황2 - 확인
개발자는 급하게 CPU, 메모리 사용량을 확인해보는데, CPU 사용량이 100%이고, 메모리 사용량도 지나치게 높아져 있다.
스레드 수를 확인해보니 스레드가 수 천개 실행되고 있다. 너무 많은 스레드가 작업을 처리하면서 시스템 전체가 느려지는 현상이 발생한다.
캐시 스레드 풀 전략은 스레드가 무한으로 생성될 수 있다.
수 천개의 스레드가 처리하는 속도보다 더 많은 작업이 들어온다.
시스템은 너무 많은 스레드에 잠식 당해서 거의 다운된다. 메모리도 거의 다 사용되어 버린다.
시스템이 멈추는 장애가 발생한다.
고정 스레드 풀 전략은 서버 자원은 여유가 있는데, 사용자만 점점 느려지는 문제가 발생할 수 있다. 반면에 캐시 스레드 풀 전략은 서버의 자원을 최대한 사용하지만, 서버가 감당할 수 있는 임계점을 넘는 순간 시스템이 다운될 수 있다.
Executor 전략 - 사용자 정의 풀 전략
상황1 - 점진적인 사용자 확대
개발한 서비스가 잘 되어서 사용자가 점점 늘어난다.
상황2 - 갑작스런 요청 증가
마케팅 팀의 이벤트가 대성공 하면서 갑자기 사용자가 폭증했다.
다음과 같이 세분화된 전략을 사용하면 상황1, 상황2를 모두 어느정도 대응할 수 있다.
일반: 일반적인 상황에는 CPU, 메모리 자원을 예측할 수 있도록 고정 크기의 스레드로 서비스를 안정적으로 운영한다.
긴급: 사용자의 요청이 갑자기 증가하면 긴급하게 스레드를 추가로 투입해서 작업을 빠르게 처리한다.
거절: 사용자의 요청이 폭증해서 긴급 대응도 어렵다면 사용자의 요청을 거절한다.
이 방법은 평소에는 안정적으로 운영하다가, 사용자의 요청이 갑자기 증가하면 긴급하게 스레드를 더 투입해서 급한 불을 끄는 방법이다. 물론 긴급 상황에는 CPU, 메모리 자원을 더 사용하기 때문에 적정 수준을 찾아야 한다. 일반적으로는 여기까지 대응이 되겠지만, 시스템이 감당할 수 없을 정도로 사용자의 요청이 폭증하면, 처리 가능한 수준의 사용자 요청만 처리하고 나머지 요청은 거절해야 한다. 어떤 경우도 시스템이 다운되는 최악의 상황은 피해야 한다.
사용자 정의 풀 전략은 다음과 같이 적용할 수 있다.
ExecutorService es = new ThreadPoolExecutor(100, 200, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
100개의 기본 스레드를 사용한다.
추가로 긴급 대응 가능한 긴급 스레드 100개를 사용한다. 긴급 스레드는 60초의 생존 주기를 가진다.
1000개의 작업이 큐에 대기할 수 있다.
PoolSizeMainV4
package thread.executor.poolsize;
import thread.executor.RunnableTask;
import java.util.concurrent.*;
import static thread.executor.ExecutorUtils.printState;
import static util.MyLogger.log;
public class PoolSizeMainV4 {
public static final int TASK_SIZE = 1100;
// public static final int TASK_SIZE = 1200; // 긴급
// public static final int TASK_SIZE = 1201; // 거절
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(
100,
200,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000));
printState(es);
long startMs = System.currentTimeMillis();
for (int i = 1; i <= TASK_SIZE; i++) {
String taskName = "task" + i;
try {
es.execute(new RunnableTask(taskName));
printState(es, taskName);
} catch (RejectedExecutionException e) {
log(taskName + " -> " + e);
}
}
es.close();
long endMs = System.currentTimeMillis();
log("time : " + (endMs - startMs) + "ms");
}
}
이 전략은 다음과 같이 작동한다. 하나의 작업을 처리하는데 약 1초가 걸린다고 가정해보자.
일반: 1000개 이하의 작업이 큐에 담겨있다. → 100개의 기본 스레드가 처리한다.
긴급: 큐에 담긴 작업이 이미 1000개인데 그 이상으로 계속 작업이 들어온다 → 1000개의 기본 스레드 + 100개의 초과 스레드가 처리한다.
거절: 초과 스레드를 투입했지만, 큐에 담긴 작업이 1000개이고 또 초과 스레드도 넘어간 상황이다. → 이 경우 예외를 발생시킨다.
코드상에 작성해 둔 TASK_SIZE를 각각 한번씩 주석을 변경해서 실행해보자. 아래와 같은 실행 결과를 얻는다.
긴급 투입한 스레드로도 작업이 빠르게 소모되지 않는다는 것은, 시스템이 감당하기 어려운 많은 요청이 들어오고 있다는 의미이다.
여기서는 큐에 대기하는 작업 1000개 + 스레드가 처리 중인 작업 200개 → 총 1200개의 작업을 초과하면 예외가 발생한다.
따라서 1201번에서 예외가 발생한다.
이런 경우 요청을 거절한다. 고객 서비스라면 시스템에 사용자가 너무 많으니 나중에 다시 시도해달라고 해야 한다.
나머지 1200개의 작업들은 긴급 상황과 같이 정상 처리된다.
이런 실수를 하면 안된다!
new ThreadPoolExecutor(100, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
기본 스레드 100개
최대 스레드 200개
큐 사이즈: 무한대
이렇게 설정하면 절대로 최대 스레드가 동작하지 않는다. 왜냐하면 큐가 가득차야 긴급 상황으로 인지 되는데, LinkedBlockingQueue를 기본 생성자를 통해 무한대의 사이즈로 사용하게 되면, 큐가 가득찰 수가 없다. 결국 기본 스레드 100개만으로 무한대의 작업을 처리해야 하는 문제가 발생한다. 실무에서 자주 하는 실수 중에 하나이다.
Executor 예외 정책
생산자 소비자 문제를 실무에서 사용할 때는, 결국 소비자가 처리할 수 없을 정도로 생산 요청이 가득 차면 어떻게 할지를 정해야 한다. 개발자가 인지할 수 있게 로그도 남겨야 하고, 사용자에게 현재 시스템에 문제가 있다고 알리는 것도 필요하다. 이런 것을 위해 예외 정책이 필요하다.
ThreadPoolExecutor에 작업을 요청할 때, 큐도 가득차고, 초과 스레드도 더는 할당할 수 없다면 작업을 거절한다.
ThreadPoolExecutor는 작업을 거절하는 다양한 정책을 제공한다.
AbortPolicy: 새로운 작업을 제출할 때 RejectedExecutionException을 발생시킨다. 기본 정책이다.
DiscardPolicy: 새로운 작업을 조용히 버린다.
CallerRunsPolicy: 새로운 작업을 제출한 스레드가 대신해서 직접 작업을 실행한다.
사용자 정의(RejectedExecutionHandler): 개발자가 직접 정의한 거절 정책을 사용할 수 있다.
참고로, ThreadPoolExecutor를 shutdown()하면 이후에 요청하는 작업을 거절하는데, 이때도 같은 정책이 적용된다.
AbortPolicy
작업이 거절되면 RejectedExecutionException을 던진다. 기본적으로 설정되어 있는 정책이다.
RejectMainV1
package thread.executor.rejected;
import thread.executor.RunnableTask;
import java.util.concurrent.*;
import static util.MyLogger.log;
public class RejectMainV1 {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(
1,
1,
0,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
es.execute(new RunnableTask("task1"));
try {
es.execute(new RunnableTask("task2"));
} catch (RejectedExecutionException e) {
log("요청 초과");
// 포기, 다시 시도 등 다양한 고민을 하면 됨
log(e);
}
es.close();
}
}
ThreadPoolExecutor 생성자 마지막에 new ThreadPoolExecutor.AbortPolicy()를 제공하면 된다.
참고로 이것이 기본 정책이기 때문에 생략해도 된다.
스레드는 1개만 사용한다. 예제를 단순하게 만들기 위해 큐에 작업을 넣지 않도록 SynchronousQueue를 사용한다.
task2를 요청하면 허용 작업을 초과한다. 따라서 RejectedExecutionException이 발생한다.
RejectedExecutionException 예외를 잡아서 작업을 포기하거나, 사용자에게 알리거나, 다시 시도하면 된다. 이렇게 예외를 잡아서 필요한 코드를 직접 구현해도 되고 아니면 다음에 설명한 다른 정책들을 사용해도 된다.
RejectedExecutionHandler
마지막에 전달한 AbortPolicy는 RejectedExecutionHandler의 구현체이다. ThreadPoolExecutor 생성자는 RejectedExecutionHandler의 구현체를 전달 받는다.
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
ThreadPoolExecutor는 거절해야 하는 상황이 발생하면 RejectedExecutionHandler가 가지고 있는 rejectExecution()을 호출한다.
DiscardPolicy
거절된 작업을 무시하고 아무런 예외도 발생시키지 않는다.
RejectMainV2
package thread.executor.rejected;
import thread.executor.RunnableTask;
import java.util.concurrent.*;
import static util.MyLogger.log;
public class RejectMainV2 {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(
1,
1,
0,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy());
es.execute(new RunnableTask("task1"));
es.execute(new RunnableTask("task2"));
es.execute(new RunnableTask("task3"));
es.close();
}
}
ThreadPoolExecutor 생성자 마지막에 new ThreadPoolExecutor.DiscardPolicy()를 제공하면 된다.
실행 결과
2024-07-31 12:47:28.663 [pool-1-thread-1] task1 시작
2024-07-31 12:47:29.668 [pool-1-thread-1] task1 완료
task2, task3은 거절된다. DiscardPolicy는 조용히 버리는 정책이다.
다음 구현 코드를 보면 왜 조용히 버리는 정책인지 이해가 될 것이다. (아무것도 없다)
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
CallerRunsPolicy
이제 이게 재밌는데, 호출한 스레드가 직접 작업을 수행하게 한다. 그러니까 예를 들어, 피자를 먹으러 온 손님한테 주인이 "내가 두 팔을 다 사용중이라 더 피자를 못 만드니까 당신이 알아서 해 먹으세요!" 라고 직접 시키는 것과 같다. 이로 인해 새로운 작업을 제출하는 스레드의 속도가 느려질 수 있다. 왜냐? 원래 같으면 새로운 작업을 제출하는 스레드는 execute() 또는 submit()으로 계속 작업을 요청하는데 요청했더니 나보고 이걸 하라고 시키면? 직접 작업을 처리하는 시간동안 요청을 못하게 된다.
RejectMainV3
package thread.executor.rejected;
import thread.executor.RunnableTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class RejectMainV3 {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(
1,
1,
0,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
es.execute(new RunnableTask("task1"));
es.execute(new RunnableTask("task2"));
es.execute(new RunnableTask("task3"));
es.close();
}
}
이 정책의 특징은 생산자 스레드가 소비자 대신 일을 수행하는 것도 있지만, 생산자 스레드가 대신 일을 수행하는 덕분에 작업의 생산 자체가 느려진다는 점이다. 덕분에 작업의 생산 속도가 너무 빠르다면, 생산 속도를 조절할 수 있다. 원래대로 하면 main 스레드가 task1, task2, task3, task4를 연속해서 바로 생산해야 한다. CallerRunsPolicy 정책 덕분에 main 스레드는 task2를 본인이 직접 완료하고 나서야 task3을 생산할 수 있다. 결과적으로 생산 속도가 조절되었다.
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
r.run() 코드를 보면 별도의 스레드에서 수행하는 것이 아니라 main 스레드가 직접 수행하는 것을 알 수 있다. Runnable이 가진 run()을 직접 호출하면 절대 안된다고 예전 포스팅에서 말했다. 그 이유는 직접 호출하면 그 호출한 스레드가 이 작업을 수행하는 것이지 새로운 스레드가 작업을 수행하는게 아니라고 말했다. 그 이유 그대로가 여기서 사용된다. 재밌다.
참고로 ThreadPoolExecutor를 shutdown()을 하면 이후에 요청하는 작업을 거절하는데, 이때도 생성할 때 설정한 예외 정책이 그대로 적용된다. 그래서 CallerRunsPolicy 정책은 shutdown() 이후에도 받은 작업을 수행하게 될테니 shutdown() 조건을 체크해서 이 경우에는 작업을 수행하지 않도록 해두었다.
사용자 정의
사용자가 직접 RejectedExecutionHandler 인터페이스를 구현해서 자신만의 예외 처리 전략을 정의할 수 있다. 이를 통해 특정 요구사항에 맞는 작업 거절 방식을 설정할 수 있다.
RejectMainV4
package thread.executor.rejected;
import thread.executor.RunnableTask;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static util.MyLogger.log;
public class RejectMainV4 {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(
1,
1,
0,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new MyRejectedExecutionHandler());
es.execute(new RunnableTask("task1"));
es.execute(new RunnableTask("task2"));
es.execute(new RunnableTask("task3"));
es.close();
}
static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
AtomicInteger count = new AtomicInteger();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
int counted = count.incrementAndGet();
log("[경고]: 누적된 거절 작업 수 : " + counted);
}
}
}
실행 결과
2024-07-31 13:01:29.707 [ main] [경고]: 누적된 거절 작업 수 : 1
2024-07-31 13:01:29.707 [pool-1-thread-1] task1 시작
2024-07-31 13:01:29.710 [ main] [경고]: 누적된 거절 작업 수 : 2
2024-07-31 13:01:30.712 [pool-1-thread-1] task1 완료
정리
길고 긴 멀티스레드 세상이 끝났다! 이 포스팅에서는 Executor 프레임워크를 조금 더 실무 관점에서 다루는 방식에 대해 알아보았다. 그래서 어떻게 우아한 종료를 할 수 있을지에 대한 고민과 스레드 풀의 corePoolSize, maximumPoolSize가 어떤 방식으로 동작하는지 이해해보았다. 그래서 풀 전략도 세가지로 나뉘어졌었다.
고정 스레드 풀 전략: 트래픽이 일정하고, 시스템 안정성이 가장 중요
캐시 스레드 풀 전략: 일반적인 성장하는 서비스
사용자 정의 풀 전략: 다양한 상황에 대응
그리고 마지막으로, 예외 정책에 대해서도 알아봤다. 개인적으로 가장 재밌는건 CallerRunsPolicy였다. 사용하진 않을 것 같다ㅋㅋ
이제 진짜 재밌어진다. 지금까지는 직접 스레드를 만들고 관리해서 사용했다. 그러나 스레드를 직접 사용하면 여러 문제가 있다.
스레드를 직접 사용할 때의 문제점
스레드 생성 시간으로 인한 성능 문제
스레드 관리 문제
Runnable 인터페이스의 불편함
1. 스레드 생성 시간으로 인한 성능 문제
스레드를 사용하려면 먼저 스레드를 생성해야 한다. 그런데 스레드는 다음과 같은 이유로 매우 무겁다.
메모리 할당: 각 스레드는 자신만의 호출 스택(call stack)을 가지고 있어야 한다. 이 호출 스택은 스레드가 실행되는 동안 사용하는 메모리 공간이다. 따라서 스레드를 생성할 때는 이 호출 스택을 위한 메모리를 할당해야 한다.
운영체제 자원 사용: 스레드를 생성하는 작업은 운영체제 커널 수준에서 이루어지며, 시스템 콜을 통해 처리된다. 이는 CPU와 메모리 리소스를 소모하는 작업이다.
운영체제 스케쥴러 설정: 새로운 스레드가 생성되면 운영체제의 스케쥴러는 이 스레드를 관리하고 실행 순서를 조정해야 한다. 이는 운영체제의 스케쥴링 알고리즘에 따라 추가적인 오버헤드가 발생할 수 있다.
참고로 스레드 하나는 보통 1MB 이상의 메모리를 사용한다.
스레드를 생성하는 작업은 상대적으로 무겁다. 단순히 자바 객체를 하나 생성하는 것과는 비교할 수 없을 정도로 큰 작업이다. 예를 들어, 어떤 작업 하나를 수행할 때 마다 스레드를 각각 생성하고 실행한다면, 스레드의 생성 비용 때문에 이미 많은 시간이 소모된다. 근데 그 작업이 아주 가벼운 작업이라면 작업의 실행 시간보다 스레드의 생성 시간이 더 오래 걸릴 수도 있다.
이런 문제를 해결하려면 생성한 스레드를 재사용하는 방법을 고려해볼 수 있다. 스레드를 재사용하면 처음 생성할 때를 제외하고는 생성을 위한 시간이 들지 않는다. 따라서 스레드가 아주 빠르게 작업을 수행할 수 있다.
2. 스레드 관리 문제
서버의 CPU, 메모리 자원은 한정되어 있기 때문에, 스레드는 무한하게 만들 수 없다. 예를 들어, 사용자의 주문을 처리하는 서비스라고 가정하자. 그리고 사용자의 주문이 들어올 때 마다 스레드를 만들어서 요청을 처리한다고 가정하겠다. 서비스 마케팅을 위해 선착순 할인 이벤트를 진행한다고 가정해보자. 그러면 사용자가 갑자기 몰려들 수 있다. 평소 동시에 100개 정도의 스레드면 충분했는데 갑자기 10000개의 스레드가 필요한 상황이 된다면 CPU, 메모리 자원이 버티지 못할 것이다. 이런 문제를 해결하려면 우리 시스템이 버틸 수 있는, 최대 스레드의 수 까지만 스레드를 생성할 수 있게 관리해야 한다.
또한 이런 문제도 있다. 예를 들어 애플리케이션을 종료한다고 가정해보자. 이때 안전한 종료를 위해 실행 중인 스레드가 남은 작업은 모두 수행한 다음에 프로그램을 종료하고 싶다거나, 또는 급하게 종료해야 해서 인터럽트 등의 신호를 주고 스레드를 종료하고 싶다고 가정해보자. 이런 경우에도 스레드가 어딘가에 관리가 되어 있어야 한다.
3. Runnable 인터페이스의 불편함
public interface Runnable {
void run();
}
Runnable 인터페이스는 다음과 같은 이유로 불편하다.
반환값이 없다: run()메서드는 반환값을 가지지 않는다. 따라서 실행 결과를 얻기 위해서는 별도의 메커니즘을 사용해야 한다. 쉽게 이야기해서 스레드의 실행 결과를 직접 받을 수 없다. 앞에서 공부한 SumTask의 예를 생각해보자. 스레드가 실행한 결과를 멤버 변수에 넣어두고, join()등을 사용해서 스레드가 종료되길 기다린 다음에 멤버 변수에 보관한 값을 받아야 한다.
예외 처리: run() 메서드는 체크 예외(Checked Exception)를 던질 수 없다. 체크 예외의 처리는 메서드 내부에서 처리해야 한다.
이런 문제를 해결하려면 반환 값도 받을 수 있고, 예외도 좀 더 쉽게 처리할 수 있는 방법이 필요하다. 추가로 반환 값 뿐만 아니라 해당 스레드에서 발생한 예외도 받을 수 있다면 더 좋을 것이다.
해결 방안
지금까지 설명한 1번과 2번 문제(스레드 생성 시간으로 인한 성능 문제, 스레드 관리 문제)를 해결하려면 스레드를 생성하고 관리하는 풀(Pool)이 필요하다.
스레드를 관리하는 스레드 풀(스레드가 모여서 대기하는 수영장 풀 같은 개념)에 스레드를 미리 필요한 만큼 만들어둔다.
스레드는 스레드 풀에서 대기하며 쉰다.
작업 요청이 온다.
스레드 풀에서 이미 만들어진 스레드를 하나 조회한다.
조회한 스레드1로 작업을 처리한다.
스레드1은 작업을 완료한다.
작업을 완료한 스레드는 종료하는게 아니라, 다시 스레드 풀에 반납한다. 스레드1은 이후에 다시 재사용 될 수 있다.
이렇게 스레드 풀이라는 개념을 이용하면 스레드를 재사용할 수 있어서 재사용 시 스레드의 생성 시간을 절약할 수 있다. 그리고 스레드 풀에서 스레드가 관리되기 때문에 필요한 만큼만 스레드를 만들 수 있고 또 관리할 수 있다.
사실 스레드 풀이라는 것이 별 것이 아니다. 그냥 컬렉션에 스레드를 보관하고 재사용할 수 있게 하면 된다. 하지만 스레드 풀에 있는 스레드는 처리할 작업이 없다면 대기(WAITING)상태로 관리해야 하고, 작업 요청이 오면 RUNNABLE 상태로 변경해야 한다. 막상 구현하려고 하면 생각보다 매우 복잡하다는 사실을 알게 될 것이다. 여기에 생산자 소비자 문제까지 겹친다. 잘 생각해보면 어떤 생산자가 작업(task)를 만들 것이고, 우리의 스레드 풀에 있는 스레드가 소비자가 되는 것이다.
이런 문제를 한방에 해결해주는 것이 바로 자바가 제공하는 Executor 프레임워크이다.
Executor 프레임워크는 스레드 풀, 스레드 관리, Runnable 인터페이스의 문제점은 물론이고, 생산자 소비자 문제까지 한방에 해결해주는 자바 멀티스레드 최고의 도구이다. 지금까지 우리가 배운 멀티 스레드 기술의 총 집합이 여기에 들어있다.
참고로 앞서 설명한 이유와 같이 스레드를 사용할 때는 생각보다 고려해야 할 일이 많다. 그래서 실무에서는 스레드를 직접 하나하나 생성해서 사용하는 일이 거의 없다. 대신에 지금부터 설명할 Executor 프레임워크를 주로 사용하는데, 이 기술을 사용하면 매우 편리하게 멀티스레드 프로그래밍을 할 수 있다.
Executor 프레임워크 소개
자바의 Executor 프레임워크는 멀티스레딩 및 병렬 처리를 쉽게 사용할 수 있도록 돕는 기능의 모음이다. 이 프레임워크는 작업 실행의 관리 및 스레드 풀 관리를 효율적으로 처리해서 개발자가 직접 스레드를 생성하고 관리하는 복잡함을 줄여준다.
Executor 프레임워크 주요 구성 요소
Executor 인터페이스
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
가장 단순한 작업 실행 인터페이스로, execute(Runnable command) 메서드 하나를 가지고 있다.
ExecutorService 인터페이스의 기본 구현체는 ThreadPoolExecutor이다. 우선은 이런것이 있구나 정도만 보면 된다. 직접 코드로 실행하면서 학습해보자.
로그 출력 유틸리티 만들기
ExecutorUtils
package thread.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import static util.MyLogger.log;
public abstract class ExecutorUtils {
public static void printState(ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor poolExecutor) {
int pool = poolExecutor.getPoolSize();
int active = poolExecutor.getActiveCount();
int queuedTasks = poolExecutor.getQueue().size();
long completedTask = poolExecutor.getCompletedTaskCount();
log("[pool= " + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
} else {
log(executorService);
}
}
}
Executor 프레임워크의 상태를 확인하기 위한 로그 출력 유틸리티이다.
pool: 스레드 풀에서 관리되는 스레드의 숫자
active: 작업을 수행하는 스레드의 숫자
queuedTasks: 큐에 대기중인 작업의 숫자
completedTask: 완료된 작업의 숫자
참고로, ExecutorService 인터페이스는 getPoolSize(), getActiveCount() 같은 자세한 기능은 제공하지 않는다. 이 기능은 ExecutorService의 대표 구현체인 ThreadPoolExecutor를 사용해야 한다. printState() 메서드에 ThreadPoolExecutor 구현체가 넘어오면 우리가 구성한 로그를 출력하고, 그렇지 않은 경우에는 인스턴스 자체를 출력한다.
그리고 다음 코드는 한 줄로 캐스팅을 한 것이다.
if (executorService instanceof ThreadPoolExecutor poolExecutor) {...}
이 코드 한 줄은 원래는 없던 기능인데 자바 16부터 추가가 됐다. 그래서 만약 executorService가 ThreadPoolExecutor의 인스턴스면 poolExecutor 라는 이름을 가지는 변수로 캐스팅할 수 있게 된다.
ExecutorService 코드로 시작하기
먼저 1초간 대기하는 아주 간단한 작업을 만들자.
RunnableTask
package thread.executor;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class RunnableTask implements Runnable {
private final String name;
private int sleepMs = 1000;
public RunnableTask(String name) {
this.name = name;
}
public RunnableTask(String name, int sleepMs) {
this.name = name;
this.sleepMs = sleepMs;
}
@Override
public void run() {
log(name + " 시작");
sleep(sleepMs);
log(name + " 완료");
}
}
Runnable 인터페이스를 구현한다. 1초의 작업이 걸리는 간단한 작업으로 가정하자.
ExecutorBasicMain
package thread.executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static thread.executor.ExecutorUtils.*;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class ExecutorBasicMain {
public static void main(String[] args) {
ExecutorService es = new ThreadPoolExecutor(
2,
2,
0,
TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<>());
log("== 초기 상태 ==");
printState(es);
es.execute(new RunnableTask("taskA"));
es.execute(new RunnableTask("taskB"));
es.execute(new RunnableTask("taskC"));
es.execute(new RunnableTask("taskD"));
log("== 작업 수행 중 ==");
printState(es);
sleep(3000);
log("== 작업 수행 완료==");
printState(es);
es.close();
log("== shutdown 완료 ==");
printState(es);
}
}
ExecutorService의 가장 대표적인 구현체는 ThreadPoolExecutor이다.
ThreadPoolExecutor(ExecutorService)는 크게 두 가지 요소로 구성되어 있다.
스레드 풀: 스레드를 관리한다.
BlockingQueue: 작업을 보관한다. 생산자 소비자 문제를 해결하기 위해 그냥 일반적인 큐를 사용하는 게 아니고 BlockingQueue를 사용한다.
close()를 호출하면 ThreadPoolExecutor가 종료된다. 이때 스레드 풀에 대기하는 스레드도 함께 제거된다.
참고로, close()는 자바 19부터 지원되는 메서드이다. 자바 19 미만 버전을 사용한다면 shutdown()을 호출하자. 둘의 차이는 뒤에서 설명하겠다.
이렇게 간단하게 ExecutorService를 사용해서 직접 스레드를 만들지 않고 멀티 스레드 환경으로 어떤 작업을 처리해보았다. 스레드 생성과 스레드 관리를 개발자 대신 다 해주니 편리한 것 같다. 계속해서 알아보자!
Runnable의 불편함
앞서 Runnable 인터페이스는 다음과 같은 불편함이 있다고 설명했다.
public interface Runnable {
void run();
}
반환값이 없다:run()메서드는 반환값을 가지지 않는다. 따라서 실행 결과를 얻기 위해서는 별도의 메커니즘을 사용해야 한다. 쉽게 이야기해서 스레드의 실행 결과를 직접 받을 수 없다. 앞에서 공부한SumTask의 예를 생각해보자. 스레드가 실행한 결과를 멤버 변수에 넣어두고,join()등을 사용해서 스레드가 종료되길 기다린 다음에 멤버 변수에 보관한 값을 받아야 한다.
예외 처리:run()메서드는 체크 예외(Checked Exception)를 던질 수 없다. 체크 예외의 처리는 메서드 내부에서 처리해야 한다.
Executor프레임워크는 어떤 방식으로 이런 불편함을 해결하는지 알아보자.
Runnable 사용
이해를 돕기 위해 먼저 Runnable을 통해 별도의 스레드에서 무작위 값을 하나 구하는 간단한 코드를 작성해보자.
RunnableMain
package thread.executor;
import java.util.Random;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class RunnableMain {
public static void main(String[] args) throws InterruptedException {
MyRunnable task = new MyRunnable();
Thread thread = new Thread(task, "Thread-1");
thread.start();
thread.join();
int result = task.value;
log("result value = " + result);
}
static class MyRunnable implements Runnable {
int value;
@Override
public void run() {
log("Runnable 시작");
sleep(2000);
value = new Random().nextInt(10);
log("create value = " + value);
log("Runnable 종료");
}
}
}
실행 결과
2024-07-28 14:31:38.547 [ Thread-1] Runnable 시작
2024-07-28 14:31:40.569 [ Thread-1] create value = 3
2024-07-28 14:31:40.570 [ Thread-1] Runnable 종료
2024-07-28 14:31:40.571 [ main] result value = 3
프로그램이 시작되면 Thread-1 이라는 별도의 스레드를 하나 만든다.
Thread-1이 수행하는 MyRunnable은 무작위 값을 하나 구한 다음에 value 필드에 보관한다.
클라이언트인 main 스레드가 이 별도의 스레드에서 만든 무작위 값을 얻어오려면 Thread-1스레드가 종료될때까지 기다려야 한다. 그래서 main 스레드는 join()을 호출해서 대기한다.
이후에 main 스레드에서 MyRunnable 인스턴스의 value 필드를 통해 최종 무작위 값을 획득한다.
별도의 스레드에서 만든 무작위 값 하나를 받아오는 과정이 이렇게 복잡하다. 작업 스레드(Thread-1)는 값을 어딘가에 보관해야 하고, 요청 스레드(main)는 작업 스레드의 작업이 끝날 때까지 join()을 호출해서 대기한 다음에, 어딘가에 보관된 값을 찾아서 꺼내야 한다.
작업 스레드는 간단히 값을 return을 통해 반환하고, 요청 스레드는 그 반환 값을 바로 받을 수 있다면 코드가 훨씬 더 간결해질 것이다. 이런 문제를 해결하기 위해 Executor 프레임워크는 Callable과 Future라는 인터페이스를 도입했다.
Future - 시작
Runnable 인터페이스는 다음과 같다.
public interface Runnable {
void run();
}
Runnable의 run()은 반환 타입이 void이다. 따라서 값을 반환할 수 없다.
예외가 선언되어 있지 않다. 따라서 해당 인터페이스를 구현하는 모든 메서드는 체크 예외를 던질 수 없다.
자식은 부모의 예외 범위를 넘어설 수 없다. 부모에 예외가 선언되어 있지 않으므로 예외를 던질 수 없다.
물론 런타임 예외는 제외다.
Callable 인터페이스는 다음과 같다.
package java.util.concurrent;
public interface Callable<V> {
V call() throws Exception;
}
java.util.concurrent에서 제공되는 기능이다.
Callable의 call()은 반환 타입이 제네릭 V이다. 따라서 값을 반환할 수 있다.
throws Exception 예외가 선언되어 있다. 따라서 해당 인터페이스를 구현하는 모든 메서드는 체크 예외인 Exception과 그 하위 예외를 모두 던질 수 있다.
Callable을 실제 어떻게 사용하는지 알아보자.
CallableMainV1
package thread.executor.future;
import java.util.Random;
import java.util.concurrent.*;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class CallableMainV1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
Future<Integer> future = es.submit(new MyCallable());
Integer result = future.get();
log("result value = " + result);
es.close();
}
static class MyCallable implements Callable<Integer> {
@Override
public Integer call() {
log("Callable 시작");
sleep(2000);
int value = new Random().nextInt(10);
log("Callable 완료");
return value;
}
}
}
java.util.concurrent.Executors 가 제공하는 newFixedThreadPool(size)를 사용하면 편리하게 ExecutorService를 생성할 수 있다.
기존 코드
ExecutorService es = new ThreadPoolExecutor(1,1,0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
편의 코드
ExecutorService es = Executors.newFixedThreadPool(1);
실행 결과
2024-07-28 14:57:01.055 [pool-1-thread-1] Callable 시작
2024-07-28 14:57:03.060 [pool-1-thread-1] Callable 완료
2024-07-28 14:57:03.069 [ main] result value = 7
먼저 MyCallable을 구현하는 부분을 보자.
숫자를 반환하므로 반환할 제네릭 타입을 <Integer>로 선언했다.
구현은 Runnable 코드와 비슷한데, 유일한 차이는 결과를 필드에 담아두는 게 아니라 결과를 반환한다는 점이다. 따라서 결과를 보관할 필드를 별도로 만들지 않아도 된다.
submit()
<T> Future<T> submit(Callable<T> task);
ExecutorService가 제공하는 submit()을 통해 Callable을 작업으로 전달할 수 있다. (기존에는 execute())
MyCallable 인스턴스가 블로킹 큐에 전달되고, 스레드 풀의 스레드 중 하나가 이 작업을 실행할 것이다. 이때 작업의 처리 결과는 직접 반환되는 것이 아니라 Future라는 특별한 인터페이스를 통해 반환된다.
Integer result = future.get();
future.get()을 호출하면 MyCallable의 call()이 반환한 결과를 받을 수 있다.
참고로, Future.get()은 InterruptedException, ExecutionException 체크 예외를 던진다. 여기서는 잡지말고 간단하게 밖으로 던지자. 예외에 대한 부분은 뒤에서 설명한다.
Executor 프레임워크의 강점
요청 스레드가 결과를 받아야 하는 상황이라면, Callable을 사용한 방식은 Runnable을 사용하는 방식보다 훨씬 편리하다. 코드만 보면 복잡한 멀티 스레드를 사용한다는 느낌보다는, 단일 스레드 방식으로 개발한다는 느낌이 들 것이다.
이 과정에서 내가 스레드를 생성하거나, join()으로 스레드를 제어하거나 한 코드는 전혀 없다. 심지어 Thread라는 코드도 없다. 단순하게 ExecutorService에 필요한 작업을 요청하고 결과를 받아서 쓰면 된다! 복잡한 멀티 스레드를 매우 편리하게 사용할 수 있는 것이 바로 Executor 프레임워크의 큰 강점이다.
하지만 편리한 것은 편리한 것이고, 기반 원리를 제대로 이해해야 문제없이 사용할 수 있다. 여기서 잘 생각해보면 한가지 애매한 점이 있다.
future.get()을 호출하는 요청 스레드(main)는 future.get()을 호출했을 때 2가지 상황으로 나뉘게 된다.
MyCallable 작업을 처리하는 스레드 풀의 스레드가 작업을 완료했다.
MyCallable 작업을 처리하는 스레드 풀의 스레드가 아직 작업을 완료하지 못했다.
future.get()을 호출했을 때 스레드 풀의 스레드가 작업을 완료했다면, 반환 받을 결과가 있을 것이다. 그런데 아직 작업을 처리 중이라면 어떻게 될까?
이런 의문도 들 것이다. 왜 결과를 바로 반환하지 않고 불편하게 Future라는 객체를 대신 반환할까? 이 부분을 제대로 이해해야 한다.
Future 분석
Future를 번역하면 미래라는 뜻이고, 여기서는 미래의 결과를 받을 수 있는 객체라는 뜻이다. 그렇다면 누구의 미래를 말하는 것일까?
Future는 내부에 작업의 완료 여부와 작업의 결과값을 가진다. 작업이 완료되지 않았기 때문에 아직은 결과 값이 없다.
로그를 보면 Future의 구현체는 FutureTask이다.
Future의 상태는 Not completed이고, 연관된 작업은 전달한 taskA(MyCallable 인스턴스)이다.
여기서 중요한 핵심이 있는데 작업을 전달할 때 생성된 Future는 즉시 반환된다는 점이다.
다음 로그를 보자.
2024-07-28 17:45:15.178 [ main] future 즉시 반환, future = java.util.concurrent.FutureTask@edb64911[Not completed, task = thread.executor.future.CallableMainV2$MyCallable@5cfe400b]
2024-07-28 17:45:15.178 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
생성한 Future를 즉시 반환하기 때문에 요청 스레드는 대기하지 않고, 자유롭게 본인의 다음 코드를 호출할 수 있다. 이것은 마치 Thread.start()를 호출한 것과 비슷하다. Thread.start()를 호출하면 스레드의 작업 코드가 별도의 스레드에서 실행된다. 요청 스레드는 대기하지 않고, 즉시 다음 코드를 호출할 수 있다.
2024-07-28 17:45:15.177 [pool-1-thread-1] Callable 시작
큐에 들어있는 Future[taskA]를 꺼내서 스레드 풀의 스레드1이 작업을 시작한다.
참고로 Future의 구현체인 FutureTask는 Runnable 인터페이스도 함께 구현하고 있다.
스레드1은 FutureTask의 run() 메서드를 수행한다.
그리고 run() 메서드가 taskA의 call() 메서드를 호출하고 그 결과를 받아서 처리한다. FutureTask.run() → MyCallable.call()
2024-07-28 17:45:15.178 [ main] future.get() [블로킹] 메서드 호출 시작 -> main 스레드 WAITING
스레드1
스레드1은 taskA의 작업을 아직 처리중이다. 아직 완료하지는 않았다.
요청 스레드
요청 스레드는 Future 인스턴스의 참조를 가지고 있다.
그리고 언제든지 본인이 필요할 때 Future.get()을 호출해서 taskA 작업의 미래 결과를 받을 수 있다.
요청 스레드는 작업의 결과가 필요해서 future.get()을 호출한다.
Future에는 완료 상태가 있다. taskA의 작업이 완료되면 Future의 상태도 완료로 변경된다.
그런데 여기서 taskA의 작업이 아직 완료되지 않았다. 따라서 Future도 완료 상태가 아니다.
요청 스레드가 future.get()을 호출하면 Future가 완료 상태가 될 때 까지 대기한다. 이때 요청 스레드의 상태는 RUNNABLE → WAITING 상태가 된다.
future.get()을 호출했을 때
Future가 완료 상태: Future가 완료 상태면 Future에 결과도 포함되어 있다. 이 경우 요청 스레드는 대기하지 않고 값을 즉시 반환받을 수 있다.
Future가 완료 상태가 아님: taskA가 아직 수행되지 않았거나 또는 수행 중이라는 뜻이다. 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 대기해야 한다. 요청 스레드가 마치 락을 얻을 때처럼, 결과를 얻기 위해 대기한다. 이처럼 스레드가 어떤 결과를 얻기 위해 대기하는 것을 블로킹이라 한다.
참고: 블로킹 메서드
Thread.join(), Future.get()과 같은 메서드는 스레드가 작업을 바로 수행하지 않고, 다른 작업이 완료될 때까지 기다리게 하는 메서드이다. 이러한 메서드를 호출하면 호출한 스레드는 지정한 작업이 완료될 때까지 블록(대기)되어 다른 작업을 수행할 수 없다.
submit() 호출 시 Future는 즉시 반환된다. 덕분에 요청 스레드는 블로킹 되지 않고 필요한 작업을 수행할 수 있다.
Integer result = future.get();
작업의 결과가 필요하면 Future.get()을 호출하면 된다.
Future가 완료 상태: Future가 완료 상태면 Future에 결과도 포함되어 있다. 이 경우 요청 스레드는 대기하지 않고 값을 즉시 반환받을 수 있다.
Future가 완료 상태가 아님: 작업이 아직 수행되지 않았거나 또는 수행 중이라는 뜻이다. 이때는 어쩔 수 없이 요청 스레드가 결과를 받기 위해 블로킹 상태로 대기해야 한다.
Future가 필요한 이유?
이제 Future, Callable이 어떻게 동작하고 어떻게 결과를 주고 받는지 이해하게 됐다. 그런데 생각해보면 한 가지 의문이 들 수 있다. 다음 코드를 보자.
Future를 반환하는 코드
Future<Integer> future = es.submit(new MyCallable()); // 여기는 블로킹 아님
future.get(); // 여기서 블로킹
ExecutorService를 설계할 때 지금처럼 복잡하게 Future를 반환하는 게 아니라 다음과 같이 결과를 직접 받도록 설계하는게 더 단순하고 좋지 않았을까?
결과를 직접 반환 하는 코드 (가정)
Integer result = es.submit(new MyCallable()); // 여기서 블로킹
물론 이렇게 설계하면 submit()을 호출할 때, 작업의 결과가 언제 나올지 알 수 없다. 따라서 작업의 결과를 받을 때까지 요청 스레드는 대기해야 한다. 그런데 이것은 Future를 사용할 때도 마찬가지다. Future만 즉시 반환 받을 뿐, 작업의 결과를 얻으려면 결국 Future.get()을 호출해야 한다. 그리고 이 시점에는 작업의 결과를 받을 때 까지 대기해야 한다.
다음 활용 예제를 보면 Future라는 개념이 왜 필요한지 이해가 될 것이다.
Future활용
이번에는 숫자를 나누어 더하는 기능을 멀티스레드로 수행해보자.
1 - 100까지 더하는 경우를 스레드를 사용해서 1 - 50, 51 - 100으로 나누어 처리해보자.
예전에 했던 Runnable을 이용했던 코드는 다음과 같다.
SumTaskMainV1
package thread.executor.future;
import thread.control.join.JoinMainV3;
import util.ThreadUtils;
import static util.MyLogger.log;
public class SumTaskMainV1 {
public static void main(String[] args) throws InterruptedException {
log("start");
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
Thread thread1 = new Thread(task1, "thread-1");
Thread thread2 = new Thread(task2, "thread-2");
thread1.start();
thread2.start();
log("join(500) - main 스레드가 thread1, thread2 종료까지 대기");
thread1.join(2500);
thread2.join(2500);
log("main 스레드 대기 완료");
log("task1.result = " + task1.result);
log("task2.result = " + task2.result);
int sumAll = task1.result + task2.result;
log("task1 + task2 = " + sumAll);
log("end");
}
static class SumTask implements Runnable {
int startValue;
int endValue;
int result;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
public void run() {
log("작업 시작");
ThreadUtils.sleep(2000);
int sum = 0;
for (int i = startValue; i <= endValue; i++) {
sum += i;
}
result = sum;
log("작업 완료 result = " + result);
}
}
}
실행 결과
2024-07-28 20:53:50.797 [ main] start
2024-07-28 20:53:50.801 [ main] join(500) - main 스레드가 thread1, thread2 종료까지 대기
2024-07-28 20:53:50.801 [ thread-1] 작업 시작
2024-07-28 20:53:50.801 [ thread-2] 작업 시작
2024-07-28 20:53:52.819 [ thread-2] 작업 완료 result = 3775
2024-07-28 20:53:52.819 [ thread-1] 작업 완료 result = 1275
2024-07-28 20:53:52.819 [ main] main 스레드 대기 완료
2024-07-28 20:53:52.820 [ main] task1.result = 1275
2024-07-28 20:53:52.820 [ main] task2.result = 3775
2024-07-28 20:53:52.820 [ main] task1 + task2 = 5050
2024-07-28 20:53:52.821 [ main] end
우선, Runnable을 사용하는 코드는 Thread를 만들고, SumTask를 만들고, start()를 하고, join()을 하고 할 게 굉장히 많다. 그리고 결과는 또 SumTask라는 Runnable 구현체의 필드에 접근해서 가져와야 한다. 그리고 또 하나는 run() 메서드는 체크 예외를 던질 수 없어서 ThreadUtils.sleep()을 따로 만들었었다.
근데 Callable과 ExecutorService로 처리해보자.
SumTaskMainV2
package thread.executor.future;
import java.util.concurrent.*;
import static util.MyLogger.log;
public class SumTaskMainV2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
SumTask task1 = new SumTask(1, 50);
SumTask task2 = new SumTask(51, 100);
ExecutorService es = Executors.newFixedThreadPool(2);
Future<Integer> future1 = es.submit(task1);
Future<Integer> future2 = es.submit(task2);
Integer sum1 = future1.get();
Integer sum2 = future2.get();
log("task1.result = " + sum1);
log("task2.result = " + sum2);
int sumAll = sum1 + sum2;
log("task1 + task2 = " + sumAll);
log("End");
es.close();
}
static class SumTask implements Callable<Integer> {
int startValue;
int endValue;
public SumTask(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
public Integer call() throws Exception {
log("작업 시작");
Thread.sleep(2000);
int sum = 0;
for (int i = startValue; i <= endValue; i++) {
sum += i;
}
log("작업 완료 result = " + sum);
return sum;
}
}
}
실행 결과
2024-07-28 20:56:57.658 [pool-1-thread-1] 작업 시작
2024-07-28 20:56:57.658 [pool-1-thread-2] 작업 시작
2024-07-28 20:56:59.670 [pool-1-thread-1] 작업 완료 result = 1275
2024-07-28 20:56:59.670 [pool-1-thread-2] 작업 완료 result = 3775
2024-07-28 20:56:59.671 [ main] task1.result = 1275
2024-07-28 20:56:59.671 [ main] task2.result = 3775
2024-07-28 20:56:59.672 [ main] task1 + task2 = 5050
2024-07-28 20:56:59.672 [ main] End
ExecutorService와 Callable을 사용한 덕분에, 이전 코드보다 훨씬 직관적이고 깔끔하게 코드를 작성할 수 있었다.
예를 들면, Thread를 직접 만드는 일도 없고, ExecutorService를 통해서 스레드 풀에 원하는 사이즈만큼 스레드를 생성해 놓으면 스레드 관리도 해주고 스레드 생성도 해주고, 만든 Callable Task를 submit()으로 넘겨주기만 하면 된다. 그리고 값을 얻고 싶으면 그 submit()을 호출하고 받는 반환 객체 Future의 get()만 호출해주면 된다. 다른것보다 엄청 직관적이다.
그리고 또 하나의 디테일인 Callable은 call() 메서드가 Exception을 던지기 때문에 재정의 하는 자식들은 Exception과 그 하위 예외를 모두 던질 수 있다. 그래서 지저분한 try - catch도 없다.
그럼 Callable과 ExecutorService를 사용하면, 사용하기 편하고 직관적인 코드를 작성할 수 있다는 것을 알았다. 그래서 Future가 왜 필요한건데?에 대한 질문을 이제 답해보자.
Future가 필요한 이유
이제 Future가 필요한 이유를 이번 코드를 통해 알아보자.
Future를 반환하는 코드
Future<Integer> future1 = es.submit(task1); // 여기는 블로킹 아님
Future<Integer> future2 = es.submit(task2); // 여기는 블로킹 아님
Integer sum1 = future1.get(); // 여기서 블로킹
Integer sum2 = future2.get(); // 여기서 블로킹
Future가 없는 세상
Integer sum1 = es.submit(task1); // 여기서 블로킹
Integer sum2 = es.submit(task2); // 여기서 블로킹
먼저 ExecutorService가 Future 없이 결과를 직접 반환한다고 가정해보자.
요청 스레드는 task1을 ExecutorService에 요청하고 결과를 기다린다.
작업 스레드가 작업을 수행하는데 2초가 걸린다.
요청 스레드는 결과를 받을 때 까지 2초간 대기한다.
요청 스레드는 2초 후에 결과를 받고 다음 라인을 수행한다.
요청 스레드는 task2을 ExecutorService에 요청하고 결과를 기다린다.
작업 스레드가 작업을 수행하는데 2초가 걸린다.
요청 스레드는 결과를 받을 때 까지 2초간 대기한다.
결과를 받고 요청 스레드가 다음 라인을 수행한다.
Future를 사용하지 않는 경우, 결과적으로 task1의 결과를 기다린 다음에 task2를 요청한다 따라서 총 4초의 시간이 걸렸다. 이것은 마치 단일 스레드가 작업을 한 것과 비슷한 결과이다.
이번에는 Future를 반환한다고 가정해보자.
요청 스레드는 task1을 ExecutorService에 요청한다.
요청 스레드는 즉시 Future를 반환받는다.
작업 스레드1은 task1을 수행한다.
요청 스레드는 task2를 ExecutorService에 요청한다.
요청 스레드는 즉시 Future를 반환 받는다.
작업 스레드2는 task2를 수행한다.
요청 스레드는 task1, task2를 동시에 수행할 수 있다. 따라서 두 작업은 동시에 수행된다.
이후에 요청 스레드는 future1.get()을 호출하며 대기한다.
작업 스레드1이 작업을 진행하는 약 2초간 대기하고 결과를 받는다.
이후에 요청 스레드는 future2.get()을 호출하며 즉시 결과를 받는다.
작업 스레드2는 이미 2초간 작업을 완료했다. 따라서 future2.get()은 거의 즉시 결과를 반환한다.
즉 Future를 사용하면 이 경우, 총 대기 시간을 절반으로 줄일 수가 있게 된다. 왜냐하면 하나를 호출하고 바로 블로킹된 상태로 기다리는게 아니라 호출은 즉시 Future를 반환받기 때문에 요청 스레드는 다음 코드를 아무런 막힘 없이 수행할 수 있기 때문이다.
Future를 잘못 사용하는 예
앞서 설명한 문제 상황과 같은 원리로 Future를 호출하자 마자 바로 get()을 호출해도 문제가 될 수 있다.
Future를 잘못 활용한 예1과 똑같은 코드이다. 대신에 submit()을 호출하고 그 결과를 변수에 담지 않고 바로 연결해서 get()을 호출한다.
총 4초의 시간이 걸린다.
정리를 하자면,
Future라는 개념이 없다면 결과를 받을 때 까지 요청 스레드는 아무일도 못하고 대기해야 한다. 따라서 다른 작업을 동시에 수행할 수도 없다.
Future라는 개념 덕분에 요청 스레드는 대기하지 않고, 다른 작업을 수행할 수 있다. 예를 들어서 다른 작업을 더 요청할 수 있다. 그리고 모든 작업이 끝난 다음에, 본인이 필요할 때 Future.get()을 호출해서 최종적으로 결과를 받을 수 있다.
Future를 사용하는 경우 결과적으로 task1, task2를 동시에 요청할 수 있다. 두 작업을 바로 요청했기 때문에 작업을 동시에 제대로 수행할 수 있다.
Future는 요청 스레드를 블로킹(대기) 상태로 만들지 않고, 필요한 요청을 모두 수행할 수 있게 해준다. 필요한 모든 요청을 한 다음에 Future.get()을 통해 블로킹 상태로 대기하며 결과를 받으면 된다. (물론 그 전에 결과를 다 뽑아낸 상태면 그마저도 기다리지 않아도 된다)
이런 이유로 ExecutorService는 결과를 직접 반환하지 않고, Future를 반환한다.
Future 주요 메서드
Future는 작업의 미래 계산의 결과를 나타내며, 계산이 완료되었는지 확인하고, 완료될 때까지 기다릴 수 있는 기능을 제공한다.
Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
enum State {
RUNNING,
SUCCESS,
FAILED,
CANCELLED
}
default State state() {}
}
주요 메서드
boolean cancel(boolean mayInterruptIfRunning)
기능: 아직 완료되지 않은 작업을 취소한다.
매개변수: mayInterruptIfRunning
cancel(true): Future를 취소 상태로 변경한다. 이때 작업이 실행중이라면 Thread.Interrupt()를 호출해서 작업을 중단한다.
cancel(false): Future를 취소 상태로 변경한다. 단, 이미 실행 중인 작업을 중단하지는 않는다.
반환값: 작업이 성공적으로 취소된 경우 true, 이미 완료되었거나 취소를 할 수 없는 경우 false
설명: 작업이 실행중이 아니거나 아직 시작되지 않았으면 취소하고, 실행 중인 작업의 경우 mayInterruptIfRunning이 true이면 중단을 시도한다.
참고: cancel(false)든, cancel(true)든 취소 상태의 Future에 Future.get()을 호출하면 CancellationException 런타임 예외가 발생한다.
boolean isCancelled()
기능: 작업이 취소되었는지 여부를 확인한다.
반환값: 작업이 취소된 경우 true, 그렇지 않은 경우 false
설명: 이 메서드는 작업이 cancel()메서드에 의해 취소된 경우에 true를 반환한다.
boolean isDone()
기능: 작업이 완료되었는지 여부를 확인한다.
반환값: 작업이 완료된 경우 true, 그렇지 않은 경우 false
설명: 작업이 정상적으로 완료되었거나, 취소되었거나, 예외가 발생하여 종료된 경우에 true를 반환한다.
State state()
기능: Future의 상태를 반환한다. 자바 19부터 지원한다.
RUNNING: 작업 실행 중
SUCCESS: 성공 완료
FAILED: 실패 완료
CANCELED: 취소 완료
V get()
기능: 작업이 완료될 때까지 대기하고, 완료되면 결과를 반환한다.
반환값: 작업의 결과
예외
InterruptedException: 대기 중에 현재 스레드가 인터럽트된 경우 발생
ExecutionException: 작업 계산 중에 예외가 발생한 경우 발생
설명: 작업이 완료될 때 까지 get()을 호출한 현재 스레드를 대기(블로킹)한다. 작업이 완료되면 결과를 반환한다.
V get(long timeout, TimeUnit unit)
기능: get()과 같은데, 시간 초과되면 예외를 발생시킨다.
매개변수:
timeout: 대기할 최대 시간
unit: timeout 매개변수의 시간 단위 지정
반환값: 작업의 결과
예외
InterruptedException: 대기 중에 현재 스레드가 인터럽트된 경우 발생
ExecutionException: 계산 중에 예외가 발생한 경우 발생
TimeoutException: 주어진 시간 내에 작업이 완료되지 않은 경우 발생
설명: 지정된 시간동안 결과를 기다린다. 시간이 초과되면 TimeoutException을 발생시킨다.
매개변수 mayInterruptIfRunning을 변경하면서 어떻게 작동하는지 차이를 확인해보자.
cancel(true): Future를 취소 상태로 변경한다. 이때 작업이 실행중이라면 Thread.interrupt()를 호출해서 작업을 중단한다.
cancel(false): Future를 취소 상태로 변경한다. 단, 이미 작업이 실행중이라면 작업을 중단하지는 않는다.
실행 결과 (mayInterruptIfRunning = true)
2024-07-30 08:06:19.043 [ main] Future.state: RUNNING
2024-07-30 08:06:19.043 [pool-1-thread-1] 작업 중: 0
2024-07-30 08:06:20.047 [pool-1-thread-1] 작업 중: 1
2024-07-30 08:06:21.048 [pool-1-thread-1] 작업 중: 2
2024-07-30 08:06:22.049 [pool-1-thread-1] 작업 중: 3
2024-07-30 08:06:22.053 [ main] future.cancel(true) 호출
2024-07-30 08:06:22.058 [pool-1-thread-1] 인터럽트 발생
2024-07-30 08:06:22.068 [ main] cancel(true) result = true
2024-07-30 08:06:22.068 [ main] Future는 이미 취소되었습니다.
cancel(true)를 호출했다.
mayInterruptIfRunning = true 를 사용하면 실행중인 작업에 인터럽트가 발생해서 실행중인 작업을 중지 시도한다.
이후 Future.get()을 호출하면, CancellationException 런타임 예외가 발생한다.
실행 결과 (mayInterruptIfRunning = false)
2024-07-30 08:08:29.032 [ main] Future.state: RUNNING
2024-07-30 08:08:29.032 [pool-1-thread-1] 작업 중: 0
2024-07-30 08:08:30.036 [pool-1-thread-1] 작업 중: 1
2024-07-30 08:08:31.038 [pool-1-thread-1] 작업 중: 2
2024-07-30 08:08:32.040 [pool-1-thread-1] 작업 중: 3
2024-07-30 08:08:32.042 [ main] future.cancel(false) 호출
2024-07-30 08:08:32.054 [ main] cancel(false) result = true
2024-07-30 08:08:32.054 [ main] Future는 이미 취소되었습니다.
2024-07-30 08:08:33.042 [pool-1-thread-1] 작업 중: 4
2024-07-30 08:08:34.044 [pool-1-thread-1] 작업 중: 5
2024-07-30 08:08:35.046 [pool-1-thread-1] 작업 중: 6
2024-07-30 08:08:36.048 [pool-1-thread-1] 작업 중: 7
2024-07-30 08:08:37.050 [pool-1-thread-1] 작업 중: 8
2024-07-30 08:08:38.051 [pool-1-thread-1] 작업 중: 9
cancel(true)를 호출했다.
mayInterruptIfRunning = false 를 사용하면 실행중인 작업은 그냥 실행을 계속하도록 둔다.
실행중인 작업은 그냥 두더라도 cancel()을 호출했기 때문에 Future의 상태는 CANCEL이 된다.
이후 Future.get()을 호출하면 CancellationException 런타임 예외가 발생한다.
Future 예외 터트려보기
Future.get()을 호출하면 작업의 결과값 뿐만 아니라, 작업 중에 발생한 예외도 받을 수 있다.
FutureExceptionMain
package thread.executor.future;
import java.util.concurrent.*;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class FutureExceptionMain {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(1);
log("작업 전달");
Future<Integer> future = es.submit(new ExCallable());
sleep(1000);
try {
log("future.get() 호출 시도, future.state(): " + future.state());
Integer result = future.get();
log("result value = " + result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
log("e = " + e);
Throwable cause = e.getCause();
log("cause = " + cause);
}
es.close();
}
static class ExCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
log("Callable 실행, 예외 발생");
throw new IllegalStateException("Ex!");
}
}
}
실행 결과
2024-07-30 08:18:29.619 [ main] 작업 전달
2024-07-30 08:18:29.624 [pool-1-thread-1] Callable 실행, 예외 발생
2024-07-30 08:18:30.634 [ main] future.get() 호출 시도, future.state(): FAILED
2024-07-30 08:18:30.635 [ main] e = java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Ex!
2024-07-30 08:18:30.636 [ main] cause = java.lang.IllegalStateException: Ex!
요청 스레드: es.submit(new ExCallable())을 호출해서 작업을 전달한다.
작업 스레드: ExCallable을 실행하는데, IllegalStateException 예외가 발생한다.
작업 스레드는 Future에 발생한 예외를 담아둔다. 참고로 예외도 객체이다. 잡아서 필드에 보관할 수 있다.
예외가 발생했으므로 Future의 상태는 FAILED가 된다.
요청 스레드: 결과를 얻기 위해 future.get()을 호출한다.
Future의 상태가 FAILED이면 ExecutionException 예외를 던진다.
이 예외는 내부에 앞서 Future에 저장해둔 IllegalStateException을 포함하고 있다.
e.getCause()를 호출하면 작업에서 발생한 원본 예외를 받을 수 있다.
참고로, 어떻게 IllegalStateException을 던졌는데 ExecutionException으로 받을 수 있는지 의문이 든다면 Future 내부 코드에는 작업 중 어떤 예외가 발생하면 ExecutionException으로 받은 예외를 감싸는 코드가 있다. 그래서 다음 코드와 같다.
catch (IllegalException e) {
throw new ExecutionException(e);
}
이렇게 받은 예외를 새로운 예외로 감싸서 Future가 작업 중 예외가 터지면 언제나 ExecutionException으로 던지는 것이다.
Future.get()은 작업의 결과 값을 받을 수도 있고 예외를 받을 수도 있다. 마치 싱글 스레드 상황에서 일반적인 메서드를 호출하는 것 같다. Executor 프레임워크가 얼마나 잘 설계되어 있는지 알 수 있는 부분이다.
ExecutorService - 작업 컬렉션 처리
ExecutorService는 여러 작업을 한번에 편리하게 처리하는 invokeAll(), invokeAny() 기능을 제공한다.
java.util 패키지에 있는 컬렉션 프레임워크는 원자적 연산을 제공할까? 예를 들어 하나의 ArrayList 인스턴스에 여러 스레드가 동시에 접근해도 괜찮을까? 참고로 여러 스레드가 동시에 접근해도 괜찮은 경우를 스레드 세이프(Thread Safe)하다고 한다. 그렇다면 ArrayList는 스레드 세이프 할까?
컬렉션에 데이터를 추가하는 add() 메서드를 생각해보면, 단순히 컬렉션에 데이터를 하나 추가하는 것 뿐이다. 따라서 이것은 마치 연산이 하나만 있는 원자적인 연산처럼 느껴진다. 원자적 연산은 쪼갤 수 없기 때문에 멀티스레드 상황에 문제가 되지 않는다. 물론 멀티스레드는 중간에 스레드의 실행 순서가 변경될 수 있으므로 [A, B] 또는 [B, A]로 데이터의 저장 순서는 변경될 수 있지만 결과적으로 데이터는 모두 안전하게 저장될 것 같다.
하지만, 컬렉션 프레임워크가 제공하는 대부분의 연산은 원자적인 연산이 아니다.
컬렉션을 아주 간단하게 직접 만들어보자.
SimpleList
package thread.collections.simple.list;
public interface SimpleList {
int size();
void add(Object o);
Object get(int index);
}
직접 만들 컬렉션의 인터페이스이다.
크기 조회, 데이터 추가, 데이터 조회의 3가지 메서드만 가진다.
BasicList
package thread.collections.simple.list;
import java.util.Arrays;
import static util.ThreadUtils.sleep;
public class BasicList implements SimpleList {
private static final int DEFAULT_CAPACITY = 5;
private Object[] elements;
private int size = 0;
public BasicList() {
elements = new Object[DEFAULT_CAPACITY];
}
@Override
public int size() {
return size;
}
@Override
public void add(Object o) {
elements[size] = o;
sleep(100); // 멀티 스레드 문제를 쉽게 확인하는 코드
size++;
}
@Override
public Object get(int index) {
return elements[index];
}
@Override
public String toString() {
return Arrays.toString(Arrays.copyOf(elements, size)) + ", size = " + size + ", capacity = " + elements.length;
}
}
가장 간단한 컬렉션의 구현이다. 내부에서는 배열을 사용해서 데이터를 보관한다.
ArrayList의 최소 구현 버전이라 생각하면 된다.
DEFAULT_CAPACITY: 최대 5개의 데이터를 저장할 수 있다.
size: 저장한 데이터의 크기를 나타낸다.
add(): 컬렉션에 데이터를 추가한다.
sleep(100): 잠시 대기한다. 이렇게 하면 멀티스레드 상황에 발생하는 문제를 확인하기 쉽다.
일단은 단일 스레드로 잘 동작하는지 확인해보자.
SimpleListMainV1
package thread.collections.simple;
import thread.collections.simple.list.BasicList;
import thread.collections.simple.list.SimpleList;
public class SimpleListMainV1 {
public static void main(String[] args) {
SimpleList basicList = new BasicList();
basicList.add("A");
basicList.add("B");
System.out.println("basicList = " + basicList);
}
}
실행 결과
basicList = [A, B], size = 2, capacity = 5
단일 스레드로 실행했기 때문에 전혀 문제 없이 잘 동작한다. 이제 멀티 스레드로 이 자료구조에 데이터를 추가해보자!
SimpleListMainV2
package thread.collections.simple;
import thread.collections.simple.list.BasicList;
import thread.collections.simple.list.SimpleList;
import static util.MyLogger.log;
public class SimpleListMainV2 {
public static void main(String[] args) throws InterruptedException {
test(new BasicList());
}
private static void test(SimpleList list) throws InterruptedException {
log(list.getClass().getSimpleName());
Runnable addA = new Runnable() {
@Override
public void run() {
list.add("A");
log("Thread-1: list.add(A)");
}
};
Runnable addB = new Runnable() {
@Override
public void run() {
list.add("B");
log("Thread-2: list.add(B)");
}
};
Thread thread1 = new Thread(addA, "Thread-1");
Thread thread2 = new Thread(addB, "Thread-2");
thread1.start();
thread2.start();
thread1.join();
thread2.join();
log(list);
}
}
실행 결과를 보면, size는 2인데, 데이터는 B 하나만 입력되어 있다. 어떻게 된 것일까?
참고로, 어떤 스레드가 먼저 실행됐냐에 따라 [A, null]이 결과가 될 수 있다.
@Override
public void add(Object o) {
elements[size] = o;
sleep(100); // 멀티 스레드 문제를 쉽게 확인하는 코드
size++;
}
스레드1, 스레드2가 element[size] = o; 이 코드를 동시에 수행한다. 여기서는 스레드1이 약간 빠르게 수행했다.
스레드1 수행: element[0] = A, element[0]의 값은 A가 된다.
스레드2 수행: element[0] = B, element[0]의 값은 A → B가 된다.
결과적으로 element[0]의 값은 B가 된다.
스레드1, 스레드2가 sleep()에서 잠시 대기한다. 여기서 sleep()을 사용한 이유는 동시성 문제를 쉽게 확인하기 위해서다. 이 코드를 제거하면 size++이 너무 빨리 호출되기 때문에 스레드1이 add()메서드를 완전히 수행하고 나서 스레드2가 add()메서드를 수행할 가능성이 높다. 당연한 이야기지만 sleep() 코드를 제거해도 멀티스레드 동시성 문제는 여전히 발생하고 있다. (확률을 더 높였을 뿐이다)
결론은 무엇이냐면 컬렉션 프레임워크 대부분은 스레드 세이프 하지 않다는 것이다.
우리가 일반적으로 자주 사용하는 ArrayList, LinkedList, HashSet, HashMap 등 수 많은 자료 구조들은 단순한 연산을 제공하는 것처럼 보인다. 예를 들어, 데이터를 추가하는 add()와 같은 연산은 마치 원자적 연산처럼 느껴진다. 하지만 그 내부에서는 수 많은 연산들이 함께 사용된다. 배열에 데이터를 추가하고, 사이즈를 변경하고, 배열을 새로 만들어서 배열의 크기도 늘리고, 노드를 만들어서 링크에 연결하는 등 수 많은 복잡한 연산이 함께 사용된다.
따라서, 일반적인 컬렉션들은 절대로! 스레드 세이프 하지 않다!
단일 스레드가 컬렉션에 접근하는 경우라면 아무런 문제가 되지 않지만, 멀티 스레드 상황에서 여러 스레드가 동시에 컬렉션에 접근하는 경우라면 java.util 패키지가 제공하는 일반적인 컬렉션들은 사용하면 안된다. (물론 일부 예외도 있다. 뒤에서 알아보자.)
동시성 컬렉션이 필요한 이유
컬렉션이 수많은 복잡한 연산으로 이루어져 있기 때문이다. 따라서 여러 스레드가 접근해야 한다면 synchronized, Lock등을 통해 안전한 임계 영역을 적절히 만들면 문제를 해결할 수 있다.
SyncList
package thread.collections.simple.list;
import java.util.Arrays;
import static util.ThreadUtils.sleep;
public class SyncList implements SimpleList {
private static final int DEFAULT_CAPACITY = 5;
private Object[] elements;
private int size = 0;
public SyncList() {
elements = new Object[DEFAULT_CAPACITY];
}
@Override
public synchronized int size() {
return size;
}
@Override
public synchronized void add(Object o) {
elements[size] = o;
sleep(100); // 멀티 스레드 문제를 쉽게 확인하는 코드
size++;
}
@Override
public synchronized Object get(int index) {
return elements[index];
}
@Override
public synchronized String toString() {
return Arrays.toString(Arrays.copyOf(elements, size)) + ", size = " + size + ", capacity = " + elements.length;
}
}
앞서 만든 BasicList를 복사해서 만든 SyncList에 synchronized 키워드만 추가했다.
아주 아주 잘 실행됐다. 이제 동시에 여러 스레드가 접근하더라도 걱정없이 사용할 수 있다. 근데! 문제가 있다.
BasicList 코드가 있는데, 이 코드를 거의 그대로 복사해서 synchronized 기능만 추가한 SyncList를 만들었다. 하지만 이렇게 되면 모든 컬렉션을 다 복사해서 동기화 용으로 새로 구현해야 한다. 이게 매우 비효율적이다.
프록시 도입
위에서 말한 문제를 다시 상기해보면, 고작 synchronized 키워드 하나를 추가하기 위해 같은 코드를 복사해서 새로운 클래스를 만들어내야 한다는 점이다. 그럼 다른 자료구조를 사용한다고 하면 그것 역시 또 새로운 클래스를 만들어야 한다. 즉, 단일 스레드용 클래스와 멀티 스레드용 클래스가 나뉘어진다는 점이다. 다음과 같이 말이다.
ArrayList → SyncArrayList
LinkedList → SyncLinkedList
원하는건 기존 코드를 그대로 사용하되 synchronized 기능만 살짝 추가하고 싶은 것이다. 이럴때 프록시를 사용하면 좋다.
프록시(Proxy)
대리자, 대체자라는 뜻으로 스프링에서도 굉장히 자주 등장하고 많이 사용된다. 요청을 하는 클라이언트와 요청을 받는 서버가 원래는 이런 형태였다면,
클라이언트 → 서버
다음과 같은 형태로 변형되는 것이다.
클라이언트 → 프록시 → 서버
중요한건 이렇게 변경이 되어도 클라이언트 코드는 바꿀게 하나도 없다는 것. 이게 바로 핵심이다.
SyncProxyList
package thread.collections.simple.list;
public class SyncProxyList implements SimpleList {
private SimpleList target;
public SyncProxyList(SimpleList target) {
this.target = target;
}
@Override
public synchronized int size() {
return target.size();
}
@Override
public synchronized void add(Object o) {
target.add(o);
}
@Override
public synchronized Object get(int index) {
return target.get(index);
}
@Override
public String toString() {
return target.toString() + " by " + this.getClass().getSimpleName();
}
}
프록시 역할을 하는 클래스이다.
SyncProxyList는 BasicList와 같은 SimpleList 인터페이스를 구현한다.
이 클래스는 생성자를 통해 SimpleList target을 주입받는다. 여기에 실제 호출되는 대상이 들어간다.
이 클래스는 빈 껍데기다. 이 클래스의 역할은 모든 메서드에 synchronized를 걸어주는 일 뿐이다. 그리고나서 target에 있는 같은 기능을 호출한다.
이 프록시 클래스는 synchronized만 걸고, 그 다음에 바로 실제 호출해야 하는 원본 대상(target)을 호출한다.
그리고 다음과 같이 호출하는 test() 메서드에 파라미터로 SyncProxyList를 던져주면 된다.
test() 메서드를 클라이언트라고 가정하면 test()메서드는 SimpleList라는 인터페이스에만 의존한다. 이것을 추상화에 의존한다고 표현한다.
덕분에 SimpleList 인터페이스의 구현체인 BasicList, SyncList, SyncProxyList 중에 어떤 것을 사용하든 클라이언트인 test()의 코드는 전혀 변경하지 않아도 된다.
클라이언트인 test() 입장에서 생각해보면 BasicList가 넘어올지, SyncProxyList가 넘어올지 알 수 없다. 단순히 SimpleList의 구현체 중 하나가 넘어와서 실행된다는 정도만 알 수 있다. 그래서 클라이언트인 test()는 매우 유연하다. SimpleList의 어떤 구현체든지 다 받아들일 수 있다.
런타임 의존 관계 - BasicList
먼저 BasicList를 사용하는 예를 보자.
그림과 같이 실제 런타임에 발생하는 인스턴스의 의존 관계를 런타임 의존 관계라 한다. 먼저 간단한 BasicList를 직접 사용하는 경우부터 알아보자.
test() 메서드에서 스레드를 만들고, 스레드에 있는 run()에서 list.add()를 호출한다.
그림은 간단하게 test()에서 호출하는 것으로 표현하겠다.
BasicList(x001) 인스턴스에 있는 add()가 호출된다.
런타임 의존 관계 - SyncProxyList
이번엔 BasicList가 아니라 SyncProxyList를 사용하는 예를 보자.
test(new SyncProxyList(new BasicList()));
먼저 BasicList(x001) 인스턴스가 만들어진다.
앞서 만든 BasicList(x001)의 참조를 SyncProxyList의 생성자에 전달하여 SyncProxyList(x002)가 만들어진다.
내부에는 원본 대상을 가르키는 target 변수를 포함하고 있다. 이 변수는 BasicList(x001)의 참조를 보관한다.
test()메서드는 SyncProxyList(x002) 인스턴스를 사용하게 된다.
SyncProxyList - add() 호출 과정
test()메서드에서 스레드를 만들고, 스레드에 있는 run()에서 list.add()를 호출한다.
SyncProxyList(x002)에 있는 add()가 호출된다.
그림은 간단하게 test()에서 호출하는 것으로 표현하겠다.
프록시인 SyncProxyList는 synchronized를 적용한다. 그리고 나서 target에 있는 add()를 호출한다.
원본 대상인 BasicList(x001)의 add()가 호출된다.
원본 대상의 호출이 끝나면 결과를 반환한다.
SyncProxyList에 있는 add()로 흐름이 돌아온다. 메서드를 반환하면서 synchronized를 해제한다.
test()로 흐름이 돌아온다.
프록시 정리
프록시인 SyncProxyList는 원본인 BasicList와 똑같은 SimpleList를 구현한다. 따라서 클라이언트인 test() 입장에서는 원본 구현체가 전달되든, 아니면 프록시 구현체가 전달되든 아무런 상관이 없다. 단지 수많은 SimpleList의 구현체 중 하나가 전달되었다고 생각할 뿐이다.
클라이언트 입장에서 보면 프록시는 원본과 똑같이 생겼고, 호출할 메서드도 똑같다. 단지 SimpleList의 구현체일 뿐이다.
프록시는 내부에 원본을 가지고 있다. 그래서 프록시가 필요한 일부의 일을 처리하고, 그 다음에 원본을 호출하는 구조를 만들 수 있다. 여기서 프록시는 synchronized를 통한 동기화를 적용한다.
프록시가 동기화를 적용하고 원본을 호출하기 때문에 원본 코드도 이미 동기화가 적용된 상태로 호출된다.
여기서 핵심은 원본 코드인 BasicList를 전혀 손대지 않고 프록시인 SyncProxyList를 통해 동기화 기능을 적용했다는 점이다. 또한 이후에 SimpleList를 구현한 BasicLinkedList 같은 연결 리스트를 만들더라도 서로 같은 인터페이스를 사용하기 때문에 SyncProxyList를 그대로 활용할 수 있다. 쉽게 이야기해서 SyncProxyList 프록시 하나로 SimpleList 인터페이스의 모든 구현체를 동기화 할 수 있다.
이런 프록시를 사용하는 걸 프록시 패턴이라고 하고 정말 자주 종종 사용되는 패턴이다. 특히 스프링의 AOP는 프록시의 끝판왕으로 생각하면 된다. 프록시 패턴의 주요 목적은 다음과 같다.
접근 제어: 실제 객체에 대한 접근을 제한하거나 통제할 수 있다.
성능 향상: 실제 객체의 생성을 지연시키거나 캐싱하여 성능을 최적화할 수 있다.
부가 기능 제공: 실제 객체에 추가적인 기능(로깅, 인증, 동기화 등)을 투명하게 제공할 수 있다.
그래서 자바는 어떤 동시성 컬렉션을 제공해왔을까? 알아보자!
자바 동시성 컬렉션 - synchronized
자바가 제공하는 java.util 패키지에 있는 컬렉션 프레임워크들은 대부분 스레드 안전하지 않다. 우리가 일반적으로 사용하는 ArrayList, LinkedList, HashSet, HashMap 등 수 많은 자료 구조들은 내부에서 수많은 연산들이 함께 사용된다. 그렇다면 처음부터 모든 자료 구조에 synchronized를 사용해서 동기화를 해두면 어떨까? synchronized, Lock, CAS등 모든 방식은 정도의 차이가 있지만 성능과 트레이드 오프가 있다. 결국 동기화를 사용하지 않는 것이 가장 빠르다.
그리고 컬렉션이 항상 멀티스레드에서 사용되는것도 아니다. 미리 동기화를 해둔다면 단일 스레드에서 사용할 때 동기화로 인해 성능이 저하된다. 따라서 동기화의 필요성을 정확히 판단하고 꼭 필요한 경우에만 동기화를 적용하는 것이 필요하다.
좋은 대안으로는 우리가 앞서 배운 것처럼 synchronized를 대신 적용해 주는 프록시를 만드는 방법이 있다. List, Set, Map 등 주요 인터페이스를 구현해서 synchronized를 적용할 수 있는 프록시를 만들면 된다. 이 방법을 사용하면 기존 코드를 유지하면서 필요한 경우에만 동기화를 적용할 수 있다.
자바는 컬렉션을 위한 프록시 기능을 제공한다.
SynchronizedListMain
package thread.collections.java;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class SynchronizedListMain {
public static void main(String[] args) {
List<String> list = Collections.synchronizedList(new ArrayList<>());
list.add("data1");
list.add("data2");
list.add("data3");
System.out.println(list.getClass());
System.out.println("list = " + list);
}
}
실행 결과
class java.util.Collections$SynchronizedRandomAccessList
list = [data1, data2, data3]
위에서 프록시를 직접 만들어 본 것처럼 이것도 역시 파리미터로 건네주는 자료 구조를 동기화 해주는 프록시 클래스이다.
예를 들어 이 클래스의 add() 메서드를 보면, synchronized 블록을 적용하고 그 다음에 원본 대상의 add()를 호출하는 것을 알 수 있다.
Collections는 다음과 같이 다양한 synchronized 동기화 메서드를 지원한다. 이 메서드를 사용하면 List, Collection, Map, Set 등 다양한 동기화 프록시를 만들 수 있다.
synchronizedList()
synchronizedCollection()
synchronizedMap()
synchronizedSet()
synchronizedNavigableMap()
synchronizedNavigableSet()
synchronizedSortedMap()
synchronizedSortedSet()
Collections가 제공하는 동기화 프록시 기능 덕분에 스레드 안전하지 않은 수많은 컬렉션들을 매우 편리하게 스레드 안전한 컬렉션으로 변경해서 사용할 수 있다.
synchronized 프록시 방식의 단점
하지만 synchronized 프록시를 사용하는 방식은 다음과 같은 단점이 있다.
첫째, 동기화 오버헤드가 발생한다. 비록 synchronized 키워드가 멀티 스레드 환경에서 안전한 접근을 보장하지만, 각 메서드 호출 시마다 동기화 비용이 추가된다. 이로 인해 성능 저하가 발생할 수 있다.
둘째, 전체 컬렉션에 대해 동기화가 이루어지기 때문에, 잠금 범위가 넓어질 수 있다. 이는 잠금 경합(lock contention)을 증가시키고, 병렬 처리의 효율성을 저하시키는 요인이 된다. 모든 메서드에 대해 동기화를 적용하다 보면, 특정 스레드가 컬렉션을 사용하고 있을 때 다른 스레드들이 대기해야 하는 상황이 빈번해질 수 있다.
셋째, 정교한 동기화가 불가능하다. synchronized 프록시를 적용하면 컬렉션 전체에 대한 동기화가 이루어지지만, 특정 부분이나 메서드에 대해 선택적으로 동기화를 적용하는 것은 어렵다. 이는 과도한 동기화로 이어질 수 있다.
쉽게 이야기해서 이 방식은 단순 무식하게 모든 메서드에 synchronized를 걸어버리는 것이다. 따라서 동기화에 대한 최적화가 이루어지지 않는다. 자바는 이런 단점을 보완하기 위해 java.util.concurrent 패키지에 동시성 컬렉션을 제공한다.
자바가 제공하는 동시성 컬렉션
위 자바가 제공하는 Collections.synchronizedXxx() 프록시 방식은 여러 스레드가 동시에 접근해도 동시성 문제가 발생하지 않도록 해준다. 그래서 안전하게 멀티 스레드 환경에서 사용할 수 있다. 그러나, 단점이 있는데 모든 메서드가 다 synchronized가 걸려있어서 필요없을 때 조차도 병렬 처리가 불가능하고 한마디로 무겁고 비용이 많이 든다.
그래서 java.util.concurrent 패키지에는 고성능 멀티 스레드 환경을 지원하는 다양한 동시성 컬렉션 클래스들을 제공한다. 예를 들어, ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue 등이 있다. 이 컬렉션들은 더 정교한 잠금 메커니즘을 사용하여 동시 접근을 효율적으로 처리하며, 필요한 경우 일부 메서드에 대해서만 동기화를 적용하는 등 유연한 동기화 전략을 제공한다.
여기에 다양한 성능 최적화 기법들이 적용되어 있는데, synchronized, Lock(ReentrantLock), CAS, 분할 잠금 기술(segment lock)등 다양한 방법을 섞어서 매우 정교한 동기화를 구현하면서 동시에 성능도 최적화했다. 각각의 최적화는 매우 어렵게 구현되어 있기 때문에, 자세히 구현을 이해하는 것보다는 멀티스레드 환경에 필요한 동시성 컬렉션들을 잘 선택해서 사용할 수 있으면 충분하다.
동시성 컬렉션의 종류
List
CopyOnWriteArrayList → ArrayList의 대안
Set
CopyOnWriteArraySet → HashSet의 대안
ConcurrentSkipListSet → TreeSet의 대안(정렬된 순서 유지, Comparator 사용 가능)
Map
ConcurrentHashMap → HashMap의 대안
ConcurrentSkipListMap → TreeMap의 대안(정렬된 순서 유지, Comparator 사용 가능)
Queue
ConcurrentLinkedQueue: 동시성 큐, 비 차단 큐이다.
Deque
ConcurrentLinkedDeque: 동시성 데크, 비 차단 큐이다.
참고로, LinkedHashSet, LinkedHashMap 처럼 입력 순서를 유지하는 동시에 멀티 스레드 환경에서 사용할 수 있는 Set, Map 구현체는 제공하지 않는다. 필요하다면 Collections.synchronizedXxx()를 사용해야 한다.
스레드를 차단하는 블로킹 큐도 알아보자.
BlockingQueue
ArrayBlockingQueue
크기가 고정된 블로킹 큐
공정(fair) 모드를 사용할 수 있다. 공정 모드를 사용하면 성능이 저하될 수 있다.
LinkedBlockingQueue
크기가 무한하거나 고정된 블로킹 큐
PriorityBlockingQueue
우선순위가 높은 요소를 먼저 처리하는 블로킹 큐
SynchronousQueue
데이터를 저장하지 않는 블로킹 큐로, 생산자가 데이터를 추가하면 소비자가 그 데이터를 받을 때까지 대기한다. 생산자 - 소비자 간 직접적인 핸드오프 메커니즘을 제공한다. 쉽게 이야기해서 중간에 큐 없이 생산자 - 소비자가 직접 거래한다.
DelayQueue
지연된 요소를 처리하는 블로킹 큐로, 각 요소는 지정된 지연 시간이 지난 후에야 소비될 수 있다. 일정 시간이 지난 후 작업을 처리해야 하는 스케쥴링 작업에 사용된다.
ListMain (List 예시)
package thread.collections.java;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class ListMain {
public static void main(String[] args) {
List<String> list = new CopyOnWriteArrayList<>();
list.add("a");
list.add("b");
System.out.println("list = " + list);
}
}
실행 결과
list = [a, b]
물론, 지금 실행 결과는 단일 스레드의 실행 결과이지만 이 CopyOnWriteArrayList는 ArrayList에 대한 동시성 접근을 잘 처리한 자료 구조이다. 지금처럼 이렇게 자바가 잘 만들어놓은 자료 구조를 사용하면 이렇게 list.add("a"), list.add("b") 와 같은 메서드만 있다면 굳이 synchronized 같은 동기화 기법을 사용할 필요가 없다. 내부적으로 동기화 기법이 잘 적용된 상태니까.
그러니까 쉽게 말해서, 자료 구조에 대한 작업을 위해서는 동기화 작업을 직접적으로 개발자가 따로 걸어줄 필요가 없다는 소리다. 당연히 그게 아니라 로직상에 원자적 연산이 아닌 코드가 있다면 그 부분에 대해서는 개발자가 직접 동기화 작업을 위한 코드를 작성해야 겠지만 위 코드처럼 이미 아주 효율적으로 동기화 작업이 되어 있는 자료 구조에 데이터를 추가하고 뭐 하고 하는 부분만 있으면 동기화 작업이 따로 필요 없다.
SetMain (Set 예시)
package thread.collections.java;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
public class SetMain {
public static void main(String[] args) {
Set<Integer> copySet = new CopyOnWriteArraySet<>();
copySet.add(1);
copySet.add(2);
copySet.add(3);
System.out.println("copySet = " + copySet);
ConcurrentSkipListSet<Object> skipSet = new ConcurrentSkipListSet<>();
skipSet.add(2);
skipSet.add(1);
skipSet.add(3);
System.out.println("skipSet = " + skipSet);
}
}
실행 결과
copySet = [1, 2, 3]
skipSet = [1, 2, 3]
CopyOnWriteArraySet은 HashSet의 대안이다.
ConcurrentSkipListSet은 TreeSet의 대안이다. 데이터의 정렬 순서를 유지한다. (Comparator 사용 가능)
ConcurrentSkipListMap은 TreeMap의 대안이다. 데이터의 정렬 순서를 유지한다. (Comparator 사용 가능)
정리
자바가 제공하는 동시성 컬렉션은 멀티 스레드 상황에 최적의 성능을 낼 수 있도록 다양한 최적화 기법이 적용되어 있다. 따라서 Collections.synchronizedXxx를 사용하는 것보다 더 좋은 성능을 제공한다. 당연한 이야기지만 동시성은 결국 성능과 트레이드 오프가 있다. 따라서 단일 스레드가 컬렉션을 사용하는 경우에는 동시성 컬렉션이 아닌 일반 컬렉션을 사용해야 한다.
반대로 멀티 스레드 상황에서 일반 컬렉션을 사용하면 정말 해결하기 어려운 버그를 만날 수 있다. 세상에서 가장 해결하기 어려운 버그가 멀티스레드로 인해 발생한 버그이다. 이러한 이유로 멀티스레드 환경에서는 동시성 컬렉션을 적절히 활용해서 버그를 예방하고 성능을 최적화하는 것이 중요하다. 동시성 컬렉션을 사용하면 코드의 안정성과 효율성을 높일 수 있으며, 예상치 못한 동시성 문제도 방지할 수 있다.
컴퓨터 과학에서 사용하는 원자적 연산(atomic operation)의 의미는 해당 연산이 더 이상 나눌 수 없는 단위로 수행된다는 것을 의미한다. 즉, 원자적 연산은 중단되지 않고 다른 연산과 간섭 없이 완전히 실행되거나 전혀 실행되지 않는 성질을 가지고 있다. 쉽게 이야기해서 멀티 스레드 환경에서 다른 스레드의 간섭 없이 안전하게 처리되는 연산이라는 뜻이다.
예를 들어 다음과 같은 필드가 있을 때,
volatile int i = 0;
다음 연산은 둘로 쪼갤 수 없는 원자적 연산이다.
i = 1
왜냐하면 이 연산은 다음 단 하나의 순서로 실행되기 때문이다.
오른쪽에 있는 1의 값을 왼쪽의 i변수에 대입한다.
하지만 다음 연산은 원자적 연산이 아니다.
i = i + 1
왜냐하면 이 연산은 다음 순서로 나누어 실행되기 때문이다.
오른쪽에 있는 i의 값을 읽는다. (i의 값을 10이라고 가정)
읽은 값에 1을 더해서 11을 만든다.
더한 11을 왼쪽 i 변수에 대입한다.
원자적 연산은 멀티 스레드 상황에서 아무런 문제가 발생하지 않는다. 하지만 원자적 연산이 아닌 경우에는 synchronized 블록이나 Lock등을 사용해서 안전한 임계 영역을 만들어야 한다.
원자적 연산 시작
원자적이지 않은 연산을 멀티스레드 환경에서 실행하면 어떤 문제가 발생하는지 코드로 알아보자.
IncrementInteger는 숫자 값을 하나씩 증가시키는 기능을 제공한다. 예를 들어서 지금까지 접속한 사용자의 수 등을 계산할 때 사용할 수 있다.
IncrementInteger
package thread.cas.increment;
public interface IncrementInteger {
void increment();
int get();
}
IncrementInteger는 값을 증가하는 기능을 가진 숫자 기능을 제공하는 인터페이스이다.
increment(): 값을 하나 증가
get(): 값을 조회
BasicInteger
package thread.cas.increment;
public class BasicInteger implements IncrementInteger {
private int value;
@Override
public void increment() {
value++;
}
@Override
public int get() {
return value;
}
}
IncrementInteger 인터페이스의 가장 기본 구현이다.
increment()를 호출하면 value++를 통해서 값을 하나 증가한다.
value 값은 인스턴스의 필드이기 때문에, 여러 스레드가 공유할 수 있다. 이렇게 공유 가능한 자원에 ++와 같은 원자적이지 않은 연산을 사용하면 멀티스레드 상황에 문제가 발생할 수 있다.
IncrementThreadMain
package thread.cas.increment;
import java.util.ArrayList;
import java.util.List;
import static util.ThreadUtils.sleep;
public class IncrementThreadMain {
public static final int THREAD_COUNT = 1000;
public static void main(String[] args) throws InterruptedException {
test(new BasicInteger());
}
private static void test(IncrementInteger incrementInteger) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run() {
sleep(10);
incrementInteger.increment();
}
};
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < THREAD_COUNT; i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
int result = incrementInteger.get();
System.out.println(incrementInteger.getClass().getSimpleName() + " result: " + result);
}
}
THREAD_COUNT 수 만큼 스레드를 생성하고 incrementInteger.increment()를 호출한다.
스레드를 1000개 생성했다면, increment() 메서드도 1000번 호출하기 때문에 결과는 1000이 되어야 한다.
참고로 스레드가 너무 빨리 실행되기 때문에 여러 스레드가 동시에 실행되는 상황을 확인하기 어렵다. 그래서 run() 메서드에 sleep(10)을 두어, 최대한 많은 스레드가 동시에 increment()를 호출하도록 한다.
실행결과
BasicInteger result: 950
실행결과를 보면 기대한 1000이 아니라 다른 숫자가 보인다. 이 문제는 앞서 설명한 것처럼 여러 스레드가 동시에 원자적이지 않은 value++을 호출했기 때문이다. 물론 멀티 스레드 환경에서는 공유 자원에 여러 스레드가 아무런 안전 장치 없이 자원에 쓰기 작업을 하면 문제가 발생하는것을 이제는 너무 잘 알지만 원자적 연산 관점으로 한번 생각을 해보자. 결국 공유 가능한 자원에 원자적이지 않은 연산을 하면 멀티스레드 환경에선 문제가 될 수 있다가 핵심이다!
참고로, value++은 value = value + 1;이다.
그럼 volatile, synchronized를 적용해보면 어떻게 나올지 결과를 보자!
VolatileInteger
package thread.cas.increment;
public class VolatileInteger implements IncrementInteger {
volatile private int value;
@Override
public void increment() {
value++;
}
@Override
public int get() {
return value;
}
}
SyncInteger
package thread.cas.increment;
public class SyncInteger implements IncrementInteger {
private int value;
@Override
public synchronized void increment() {
value++;
}
@Override
public synchronized int get() {
return value;
}
}
이 결과도 예측 가능한 결과였다. 이젠 volatile은 동시성 문제에 아무런 해결 방안이 되지 않는다는 것을 알고 있기 때문에. 그래서 synchronized 블록을 사용했을 때 드디어 원하는 결과가 나왔다. 근데 이럴때 그냥 원자적 연산을 가능하게 해주는 기능이 따로 있으면 편하지 않을까?
AtomicInteger
이거 얘기하려고 이만큼 빌드업했다..! 자바는 앞서 만든 SyncInteger와 같이 멀티 스레드 환경에서 안전하게 증가 연산을 수행할 수 있는 AtomicInteger라는 클래스를 제공한다. 이름 그대로 원자적인 Integer라는 뜻이다. 다음과 같이 MyAtomicInteger 클래스를 만들고, 자바가 제공하는 AtomicInteger를 사용해보자.
MyAtomicInteger
package thread.cas.increment;
import java.util.concurrent.atomic.AtomicInteger;
public class MyAtomicInteger implements IncrementInteger {
AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public void increment() {
atomicInteger.incrementAndGet();
}
@Override
public int get() {
return atomicInteger.get();
}
}
실행 결과를 보면 AtomicInteger를 사용하면 이 결과 역시 1000이 잘 찍힌것을 알 수 있다. 1000개의 스레드가 안전하게 증가 연산을 수행한 것이다. AtomicInteger는 멀티스레드 상황에 안전하고 또 다양한 값 증가, 감소 연산을 제공한다. 특정 값을 증가하거나 감소해야 하는데 여러 스레드가 해당 값을 공유해야 한다면, AtomicInteger를 사용하면 된다.
참고로, AtomicInteger, AtomicLong, AtomicBoolean 등 다양한 AtomicXXX 클래스가 존재한다.
원자적 연산 성능 테스트
AtomicInteger의 비밀을 하나씩 파헤쳐보자. 우선 한번 지금까지 만든 클래스들의 성능을 비교해보자.
IncrementPerformanceMain
package thread.cas.increment;
public class IncrementPerformanceMain {
public static final long COUNT = 100_000_000_0;
public static void main(String[] args) {
test(new BasicInteger());
test(new VolatileInteger());
test(new SyncInteger());
test(new MyAtomicInteger());
}
private static void test(IncrementInteger incrementInteger) {
long startMs = System.currentTimeMillis();
for (int i = 0; i < COUNT; i++) {
incrementInteger.increment();
}
long endMs = System.currentTimeMillis();
System.out.println(incrementInteger.getClass().getSimpleName() + ": ms= " + (endMs - startMs));
}
}
안전한 임계 영역도 없고, volatile도 사용하지 않기 때문에 멀티 스레드 상황에는 사용할 수 없다.
단일 스레드가 사용하는 경우엔 효율적이다.
VolatileInteger
volatile을 사용해서 CPU 캐시를 사용하지 않고 메인 메모리를 사용한다.
안전한 임계 영역이 없기 때문에 멀티 스레드 상황에는 사용할 수 없다.
단일 스레드가 사용하기엔 BasicInteger보다 느리다. 그리고 멀티 스레드 환경에서도 안전하지 않다.
SyncInteger
synchronized를 사용한 안전한 임계 영역이 있기 때문에 멀티 스레드 환경에서도 안전하게 사용할 수 있다.
MyAtomicInteger보다 성능이 느리다.
MyAtomicInteger
자바가 제공하는 AtomicInteger를 사용한다. 멀티 스레드 상황에 안전하게 사용할 수 있다.
성능도 synchronized, Lock(ReentrantLock)을 사용하는 경우보다 1.5 ~ 2배 정도 빠르다.
SyncInteger 처럼 락을 사용하는 경우보다, AtomicInteger가 더 빠른 이유는 무엇일까? i++ 연산은 원자적인 연산이 아니다. 따라서 분명히 synchronized, Lock(ReentrantLock)과 같은 락을 통해 안전히 임계 영역을 만들어야 할 것 같다. 놀랍게도 AtomicInteger가 제공하는 incrementAndGet() 메서드는 락을 사용하지 않고, 원자적 연산을 만들어 낸다.
CAS 연산
락 기반 방식의 문제점
SyncInteger와 같은 클래스는 데이터를 보호하기 위해 락을 사용한다. 여기서 말하는 락은 synchronized, Lock(ReentrantLock)등을 사용하는 것을 말한다. 락은 특정 자원을 보호하기 위해 스레드가 해당 자원에 대해 접근하는 것을 제한한다. 락이 걸려 있는 동안 다른 스레드들은 해당 자원에 접근할 수 없고, 락이 해제될 때까지 기다린다. 또한 락 기반 접근에서는 락을 획득하고 해제하는 데 시간이 소요된다.
예를 들어 락을 사용하는 연산이 있다고 하면 이런 흐름으로 동작한다.
락이 있는지 확인한다.
락을 획득하고 임계 영역에 들어간다.
작업을 수행한다.
락을 반납한다.
여기서 락을 획득하고 반납하는 과정이 계속 반복된다. 10000번의 연산이 있다면 10000번의 연산 모두 같은 과정을 반복한다. 이렇듯 락을 사용하는 방식은 직관적이지만 상대적으로 무겁다는 단점이 있다.
CAS
이런 문제를 해결하기 위해 락을 걸지 않고 원자적인 연산을 수행할 수 있는 방법이 있는데 이것을 CAS(Compare-And-Swap, Compare-And-Set) 연산이라고 한다. 이 방법은 락을 사용하지 않기 때문에 락 프리(lock-free) 기법이라고 한다. 참고로 CAS 연산은 락을 완전히 대체하는 게 아니라 작은 단위의 일부 영역에 적용할 수 있다. 기본은 락을 사용하고 특별한 경우에 CAS를 적용할 수 있다고 생각하면 된다.
다음 코드를 보자.
CasMainV1
package thread.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CasMainV1 {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
System.out.println("start value = " + atomicInteger.get());
boolean result1 = atomicInteger.compareAndSet(0, 1);
System.out.println("result1 = " + result1 + ", value = " + atomicInteger.get());
boolean result2 = atomicInteger.compareAndSet(0, 1);
System.out.println("result2 = " + result2 + ", value = " + atomicInteger.get());
}
}
new AtomicInteger(0); 내부에 있는 기본 숫자 값을 0으로 설정한다.
자바는 AtomicXxx의 compareAndSet() 메서드를 통해 CAS 연산을 지원한다.
실행 결과
start value = 0
result1 = true, value = 1
result2 = false, value = 1
compareAndSet(0, 1)
atomicInteger가 가지고 있는 값이 현재 0이면 이 값을 1로 변경하라는 매우 단순한 메서드이다.
만약 atomicInteger의 값이 현재 0이라면 atomicInteger의 값은 1로 변경된다. 이 경우 true를 반환한다.
만약 atomicInteger의 값이 현재 0이 아니라면 atomicInteger의 값은 변경되지 않는다. 이 경우 false를 반환한다.
여기서 가장 중요한 내용이 있는데, 이 메서드는 원자적으로 실행된다는 점이다. 그리고 이 메서드가 제공하는 기능이 바로 CAS(compareAndSet)연산이다.
여기서는 AtomicInteger 내부에 있는 value의 값이 0이라면, 1로 변경하고 싶다.
그런데 생각해보면 이 명령어는 2개로 나누어진 명령어이다. 따라서 원자적이지 않은 연산처럼 보인다.
1. 먼저 메인 메모리에 있는 값을 확인
2. 해당 값이 기대하는 값이라면 원하는 값으로 변경
CPU 하드웨어의 지원
원자적이지 않은 연산도 원자적 연산으로 할 수 있는 이유는 바로 CPU의 지원 때문이다!
CAS 연산은 이렇게 원자적이지 않은 두 개의 연산을 CPU 하드웨어 차원에서 특별하게 하나의 원자적인 연산으로 묶어서 제공하는 기능이다. 이것은 소프트웨어가 제공하는 기능이 아니라 하드웨어가 제공하는 기능이다. 대부분의 현대 CPU들은 CAS 연산을 위한 명령어를 제공한다.
CPU는 다음 두 과정을 묶어서 하나의 원자적인 명령으로 만들어버린다. 따라서 중간에 다른 스레드가 개입할 수 없다.
x001의 값을 확인한다.
읽은 값이 0이면 1로 변경한다.
CPU는 두 과정을 하나의 원자적인 명령으로 만들기 위해 1번과 2번 사이에 다른 스레드가 x001의 값을 변경하지 못하게 막는다. 참고로 1번과 2번 사이의 시간은 CPU 입장에서 보면 진짜 아주 잠깐 찰나의 순간이다. 생각을 해보자. 1초에 몇번의 연산을 CPU가 할 수 있는지? 수억번이다 수억번. 그러니까 저 1번과 2번 사이에 다른 스레드가 변경하지 못하게 막는 행위가 성능에 큰 영향을 끼치지도 않는다.
value의 값이 0 → 1이 되었다.
CAS 연산으로 값을 성공적으로 변경하고 나면 true를 반환한다.
여기까지 듣고보면 CAS 연산도 별 게 아니다. 그냥 CPU 차원에서 하드웨어적으로 원자적 연산을 가능하게 해준다는 것이다. CPU 입장에서 그 찰나의 순간은 사람이 느끼지도 못할 정도의 시간이니까 성능의 차이도 딱히 없다. 그럼 결국 1. 값을 확인하고, 2. 값을 변경하는 두 연산을 하나로 묶어 원자적으로 제공한다는 것을 이해했을 것이다. 그런데 이 기능이 어떻게 락을 일부 대체할 수 있다는 걸까?
어떤 값을 하나 증가하는 value++ 연산은 원자적 연산이 아니다. 이 연산은 다음과 같다.
value = value + 1;
이 연산은 다음 순서로 실행된다. value의 초기값은 0으로 가정하겠다.
오른쪽에 있는 value의 값을 읽는다. value의 값은 0이다.
읽은 0에 1을 더해서 1을 만든다.
더한 1을 왼쪽에 value 변수에 대입한다.
1번과 3번 연산 사이에 다른 스레드가 value의 값을 변경할 수 있기 때문에, 문제가 될 수 있다. 따라서 value++ 연산을 여러 스레드에서 사용한다면 락을 건 다음에 값을 증가해야 한다.
CAS 연산을 활용해서 락 없이 값을 증가하는 기능을 만들어보자. AtomicInteger가 제공하는 incrementAndGet()메서드가 어떻게 CAS 연산을 활용해서 락 없이 만들어졌는지 직접 구현해보자.
CasMainV2
package thread.cas;
import java.util.concurrent.atomic.AtomicInteger;
import static util.MyLogger.log;
public class CasMainV2 {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
System.out.println("start value = " + atomicInteger.get());
int resultValue1 = incrementAndGet(atomicInteger);
System.out.println("resultValue1 = " + resultValue1);
int resultValue2 = incrementAndGet(atomicInteger);
System.out.println("resultValue1 = " + resultValue2);
}
private static int incrementAndGet(AtomicInteger atomicInteger) {
int getValue;
boolean result;
do {
getValue = atomicInteger.get();
log("getValue : " + getValue);
result = atomicInteger.compareAndSet(getValue, getValue + 1);
log("result: " + result);
} while (!result);
return getValue + 1;
}
}
여기서 만든 incrementAndGet()은 atomicInteger 내부의 value 값을 하나 증가하는 메서드이다. 사실 atomicInteger도 이 메서드를 제공하지만 여기서는 이해를 위해 직접 구현해보자.
CAS 연산을 사용하면 여러 스레드가 같은 값을 사용하는 상황에서도 락을 걸지 않고, 안전하게 값을 증가할 수 있다. 여기서는 락을 걸지 않고 CAS 연산을 사용해서 값을 증가했다.
getValue = atomicInteger.get()을 사용해서 value값을 읽는다.
compareAndSet(getValue, getValue + 1)을 사용해서 방금 읽은 value값이 메모리의 value값과 같다면 value 값을 하나 증가한다. 여기서 CAS연산을 사용한다.
보면 결과가 false가 여러번 나오는 것을 확인할 수 있다. 그렇지만 결국 결과는 완벽하게 100을 찍었다. (스레드 개수를 100으로 했을 때 결과)
당연히 멀티 스레드 환경에서는 공유 자원에 대해 여러 스레드가 동시에 값을 읽고 쓰고 할 수 있기 때문에 위 실행 결과에서 Thread-69 입장에서 본인이 읽었을 때 시점과 0.1초 후에 다시 CAS 연산을 시도했을 때 예상한 원래 값이 달라질 수 있을 것이라는 추측이 가능하다. 그럼에도 원하는 결과를 정확히 찍을 수 있는 이유는 연산 시점에서는 적어도? 다른 스레드가 접근하지 못하도록 CPU 차원에서 막고 있기 때문이다. 이게 바로 CAS 연산이다.
정리를 하자면,
AtomicInteger가 제공하는 incrementAndGet() 코드가 우리가 직접 작성한 incrementAndGet() 메서드와 똑같이 CAS를 활용하도록 작성되어 있다. 그리고 조건에 맞을때까지 루프를 돌면서 확인하는 코드도 똑같다. CAS를 활용하면 락을 사용하지 않지만, 대신에 다른 스레드가 값을 먼저 증가해서 문제가 발생하는 경우 루프를 다시 돌아 재시도를 하는 방식으로 사용한다.
이 방식은 다음과 같이 동작한다.
현재 변수의 값을 읽어온다.
변수의 값을 1 증가시킬 때, 원래 값이 같은지 확인한다. (CAS 연산 활용)
동일하다면 증가된 값을 변수에 저장하고 종료한다.
동일하지 않다면 다른 스레드가 값을 중간에 변경한 것이므로, 다시 처음으로 돌아가 위 과정을 반복한다.
두 스레드가 동시에 실행되면서 문제가 발생하는 상황을 스레드가 충돌했다고 표현한다.
이 과정에서 충돌이 발생할 때 마다 반복해서 다시 시도하므로, 결과적으로 락 없이 데이터를 안전하게 변경할 수 있다. CAS 연산을 사용하는 방식은 충돌이 드물게 발생하는 환경에서는 락을 사용하지 않으므로 높은 성능을 발휘할 수 있다. 이는 락을 사용하는 방식과 비교했을 때, 스레드가 락을 획득하기 위해 대기하지 않기 때문에 대기 시간과 오버헤드가 줄어드는 장점이 있다.
그러나, 충돌이 빈번하게 발생하는 환경에서는 성능에 문제가 될 수 있다. 여러 스레드가 자주 동시에 동일한 변수의 값을 변경하려고 시도할 때, CAS는 자주 실패하고 재시도해야 하므로 성능 저하가 발생할 수 있다. 이런 상황에서는 반복문을 계속 돌기 때문에 CPU 자원을 많이 소모하게 된다.
CAS(Compare-And-Set)와 락(Lock)방식의 비교
락 방식
비관적(pessimistic) 접근법 (기본적으로 "여기선 충돌이 날거야!" 라고 생각하고 아예 입구를 틀어막는 관점)
데이터에 접근하기 전에 항상 락을 획득
다른 스레드의 접근을 막음
"다른 스레드가 방해할 것이다"라고 가정
CAS 방식
낙관적(optimistic) 접근법
락을 사용하지 않고 데이터에 바로 접근
충돌이 발생하면 그때 재시도
"대부분의 경우 충돌이 없을 것이다"라고 가정
정리하면, 충돌이 많이 없는 경우에 CAS 연산이 빠른 것을 확인할 수 있다.
그럼 충돌이 많이 발생하지 않는 연산은 어떤 것이 있을까? 언제 CAS 연산을 사용하면 좋을까?
사실 간단한 CPU 연산은 너무 빨리 처리되기 때문에 충돌이 자주 발생하지 않는다. 충돌이 발생하기도 전에 이미 연산을 완료하는 경우가 더 많다.
즉, 간단한 CPU 연산에는 락 보단 CAS 연산을 사용하면 더 효율적이고 복잡하고 어려운 과정이 들어가있는, 시간이 많이 소모되는 연산에 대해서는 락 방식으로 완전히 안전한 임계 영역을 만들어 사용하면 될 것 같다.
CAS 락
CAS는 단순 연산 뿐만 아니라, 락을 구현하는데도 사용할 수 있다. synchronized, Lock(ReentrantLock)없이 CAS를 활용해서 락을 구현해보자. 먼저 CAS의 필요성을 이해하기 위해 CAS없이 직접 락을 구현해보자.
SpinLockBad
package thread.cas.spinlock;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class SpinLockBad {
private volatile boolean lock = false;
public void lock() {
log("락 획득 시도");
while (true) {
if (!lock) { // 1. 락 사용 여부 확인
sleep(100);
lock = true; // 2. 락의 값 변경
break;
} else {
log("락 획득 실패 - 스핀 대기");
}
}
log("락 획득 완료");
}
public void unlock() {
lock = false;
log("락 반납 완료");
}
}
구현 원리는 매우 단순하다.
스레드가 락을 획득하면 lock의 값이 true가 된다.
스레드가 락을 반납하면 lock의 값이 false가 된다.
스레드가 락을 획득하면 while문을 빠져나온다.
스레드가 락을 획득하지 못하면 락을 획득할 때까지 while문을 계속 반복 실행한다.
"어?왜 스핀 락인가요?" 락이 해제되기를 기다리면서 계속해서 반복문을 통해 확인하는 모습이 마치 계속 빙글빙글 돌고 있는것 같다고 해서 스핀 락이라고 불린다.
SpinLockMain
package thread.cas.spinlock;
import static util.MyLogger.log;
public class SpinLockMain {
public static void main(String[] args) {
SpinLockBad spinLock = new SpinLockBad();
Runnable task = new Runnable() {
@Override
public void run() {
spinLock.lock();
try {
log("비즈니스 로직 실행");
} finally {
spinLock.unlock();
}
}
};
Thread t1 = new Thread(task, "Thread-1");
Thread t2 = new Thread(task, "Thread-2");
t1.start();
t2.start();
}
}
실행 결과
2024-07-26 15:43:41.447 [ Thread-1] 락 획득 시도
2024-07-26 15:43:41.447 [ Thread-2] 락 획득 시도
2024-07-26 15:43:41.553 [ Thread-1] 락 획득 완료
2024-07-26 15:43:41.553 [ Thread-2] 락 획득 완료
2024-07-26 15:43:41.553 [ Thread-1] 비즈니스 로직 실행
2024-07-26 15:43:41.553 [ Thread-2] 비즈니스 로직 실행
2024-07-26 15:43:41.554 [ Thread-1] 락 반납 완료
2024-07-26 15:43:41.554 [ Thread-2] 락 반납 완료
실행 결과를 보면 기대와 다르게 Thread-0, Thread-1 모두 둘 다 동시에 락을 획득했다. 이제는 왜 그런지 안다. 동시성 문제를 해결하지 못한 코드이기 때문이다. volatile은 동시성 문제를 해결하는 방안이 아니다.
그럼 여기서 어떤 부분이 문제일까? 바로 다음 두 부분이 원자적이지 않다는 것이다.
락 사용 여부 확인
락의 값 변경
이 둘은 한번에 하나의 스레드만 실행해야 한다. 따라서 synchronized 또는 Lock을 사용해서 두 코드를 동기화해서 안전한 임계 영역을 만들어야 한다. 여기서! 다른 해결 방안도 있다. 바로 두 코드를 하나로 묶어서 원자적 연산처리를 하는 것이다.
CAS 연산을 사용하면 두 연산을 하나로 묶어서 하나의 원자적인 연산으로 처리할 수 있다.
락의 사용 여부를 확인하고, 그 값이 기대하는 값과 같다면 변경하는 것이다. CAS 연산에 딱 들어 맞는다!
SpinLock
package thread.cas.spinlock;
import java.util.concurrent.atomic.AtomicBoolean;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class SpinLock {
private final AtomicBoolean lock = new AtomicBoolean(false);
public void lock() {
log("락 획득 시도");
while (!lock.compareAndSet(false, true)) {
log("락 획득 실패 - 스핀 대기");
}
log("락 획득 완료");
}
public void unlock() {
lock.set(false);
log("락 반납 완료");
}
}
CAS 연산을 지원하는 AtomicBoolean을 사용했다.
구현 원리는 단순하다.
스레드가 락을 획득하면 lock의 값이 true가 된다.
스레드가 락을 반납하면 lock의 값이 false가 된다.
스레드가 락을 획득하면 while문을 빠져나온다.
스레드가 락을 획득하지 못하면 락을 획득할 때까지 while문을 계속 반복 실행한다.
락을 획득할 때 매우 중요한 부분이 있다. 바로 다음 두 연산을 하나로 만들어야 한다는 것이다.
락 사용 여부 확인
락의 값 변경
여기에 딱 맞는 방법이 바로 다음 코드 한 줄이다.
lock.compareAndSet(false, true);
→ 현재 lock의 값이 false라면 true로 변경해라.
이것은 CAS 연산으로 수행된다.
이 SpinLock을 사용해서 다시 실행해보자!
package thread.cas.spinlock;
import static util.MyLogger.log;
public class SpinLockMain {
public static void main(String[] args) {
SpinLock spinLock = new SpinLock();
Runnable task = new Runnable() {
@Override
public void run() {
spinLock.lock();
try {
log("비즈니스 로직 실행");
} finally {
spinLock.unlock();
}
}
};
Thread t1 = new Thread(task, "Thread-1");
Thread t2 = new Thread(task, "Thread-2");
t1.start();
t2.start();
}
}
실행 결과
2024-07-26 15:50:56.510 [ Thread-1] 락 획득 시도
2024-07-26 15:50:56.510 [ Thread-2] 락 획득 시도
2024-07-26 15:50:56.516 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:50:56.516 [ Thread-1] 락 획득 완료
2024-07-26 15:50:56.516 [ Thread-1] 비즈니스 로직 실행
2024-07-26 15:50:56.516 [ Thread-1] 락 반납 완료
2024-07-26 15:50:56.516 [ Thread-2] 락 획득 완료
2024-07-26 15:50:56.517 [ Thread-2] 비즈니스 로직 실행
2024-07-26 15:50:56.517 [ Thread-2] 락 반납 완료
실행 결과를 보면 락이 잘 적용된 것을 알 수 있다.
이렇게 CAS는 연산뿐 아니라 락을 구현해낼 수도 있다. 그렇지만 단점도 있다. 만약, 다음 코드로 변경하면 어떻게 될까?
실제로 실행되는 비즈니스 로직, 그러니까 스레드가 실행하는 로직이 1 MS만 늘어나도 이 CAS를 사용한 락은 CPU를 많이 갉아먹을 것이다.
실행 결과
2024-07-26 15:55:23.806 [ Thread-1] 락 획득 시도
2024-07-26 15:55:23.806 [ Thread-2] 락 획득 시도
2024-07-26 15:55:23.812 [ Thread-1] 락 획득 완료
2024-07-26 15:55:23.812 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.813 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.813 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.813 [ Thread-1] 비즈니스 로직 실행
2024-07-26 15:55:23.813 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.814 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.814 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.814 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.815 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.815 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.815 [ Thread-2] 락 획득 실패 - 스핀 대기
2024-07-26 15:55:23.815 [ Thread-1] 락 반납 완료
2024-07-26 15:55:23.815 [ Thread-2] 락 획득 완료
2024-07-26 15:55:23.816 [ Thread-2] 비즈니스 로직 실행
2024-07-26 15:55:23.817 [ Thread-2] 락 반납 완료
무슨 말이냐면, 비즈니스 로직의 실행 처리 속도나 복잡도가 올라가면 올라갈수록 이 락을 획득하기 위해 계속해서 스핀락은 시도를 할거고 그 시도엔 CPU 자원을 사용한다는 점이다.
그래서, 계속 말하지만 결론은, 연산이 길지 않고 매우매우매우 짧게 끝날 때는 CAS가 더 효율적일 수 있고, 연산이 복잡하고 길다면 무조건 Lock(ReentrantLock)을 사용하면 된다. 그러니까 더 짧은 결론은 기본적으론 Lock(ReentrantLock)을 사용하는데 아주 특별한 경우에 한정해서 CAS를 사용하면 최적화할 수 있다.
정리
동기화 락을 사용하는 방식과 CAS를 활용하는 락 프리 방식의 장단점을 비교해보자!
CAS
장점
낙관적 동기화: 락을 걸지 않고도 값을 안전하게 업데이트 할 수 있다. CAS는 충돌이 자주 발생하지 않을 것이라고 가정한다. 이는 충돌이 적은 환경에서 높은 성능을 발휘한다.
락 프리: CAS는 락을 사용하지 않기 때문에, 락을 획득하기 위해 대기하는 시간이 없다. 따라서 스레드가 블로킹되지 않으며, 병렬 처리가 더 효율적일 수 있다.
단점
충돌이 빈번한 경우: 여러 스레드가 동시에 동일한 변수에 접근하여 업데이트를 시도할 때 충돌이 발생할 수 있다. 충돌이 발생하면 CAS는 루프를 돌며 재시도해야 하며, 이에 따라 CPU 자원을 계속 소모할 수 있다. 반복적인 재시도로 인해 오버헤드가 발생할 수 있다.
스핀락과 유사한 오버헤드: CAS는 충돌 시 반복적인 재시도를 하므로, 이 과정이 계속 반복되면 스핀락과 유사한 성능 저하가 발생할 수 있다. 특히 충돌 빈도가 높을수록 이런 현상이 두드러진다.
동기화 락
장점
충돌 관리: 락을 사용하면 하나의 스레드만 리소스에 접근할 수 있으므로 충돌이 발생하지 않는다. 여러 스레드가 경쟁할 경우에도 안정적으로 동작한다.
안정성: 복잡한 상황에서도 락은 일관성 있는 동작을 보장한다.
스레드 대기: 락을 대기하는 스레드는 CPU를 거의 사용하지 않는다.
단점
락 획득 대기 시간: 스레드가 락을 획득하기 위해 대기해야 하므로, 대기 시간이 길어질 수 있다.
컨텍스트 스위칭 오버헤드: 락을 사용하면, 락 획득을 대기하는 시점과 또 락을 획득하는 시점에 스레드의 상태가 변경된다. 이에 컨텍스트 스위칭이 발생할 수 있으며 이로 인해 오버헤드가 증가할 수 있다.
결론
일반적으로 동기화 락을 사용하고, 아주 특별한 경우에 한정해서 CAS를 활용해서 최적화해야 한다. CAS를 통한 최적화가 더 나은 경우는 스레드가 RUNNABLE → BLOCKED, WAITING 상태에서 다시 RUNNABLE 상태로 가는 것 보다는, 스레드를 RUNNABLE로 살려둔 상태에서 계속 락 획득을 반복 체크하는 것이 더 효율적인 경우에 사용해야 한다. 하지만 이 경우 대기하는 스레드가 CPU 자원을 계속 소모하기 때문에 대기 시간이 아주아주아주 짧아야 한다. 따라서 임계 영역이 필요는 하지만, 연산이 길지 않고 매우매우매우 짧게 끝날 때 사용해야 한다. 예를 들어 숫자 값의 증가, 자료 구조의 데이터 추가, 삭제와 같이 CPU 사이클이 금방 끝나지만 안전한 임계 영역, 또는 원자적인 연산이 필요한 경우에 사용해야 한다.
반면에 데이터베이스를 기다린다거나, 다른 서버의 요청을 기다리는 것처럼 오래 기다리는 작업에 CAS를 사용하면 CPU를 계속 사용하며 기다리는 최악의 결과가 나올 수도 있다. 이런 경우에는 동기화 락을 사용해야 한다.
또한 CAS는 충돌 가능성이 낮은 환경에서 매우 효율적이지만, 충돌 가능성이 높은 환경에서는 성능 저하가 발생할 수 있다. 이런 경우에는 상황에 맞는 적절한 동기화 전략을 사용하는 것이 중요하다. 때로는 락이 더 나은 성능을 발휘할 수 있으며 CAS가 항상 더 빠르다고 단정할 수는 없다. 따라서 각 접근 방식의 특성을 이해하고 애플리케이션의 특정 요구사항과 환경에 맞는 방식을 선택하는 것이 중요하다.
실무 관점
실무 관점에서 보면 대부분의 애플리케이션들은 공유 자원을 사용할 때 충돌할 가능성보다 충돌하지 않을 가능성이 훨씬 높다. 예를 들어, 여러 스레드에서 발생하는 주문 수를 실시간으로 증가하면서 카운트 한다고 가정해보자. 그리고 특정 피크시간에 주문이 100만건 들어오는 서비스라고 가정해보자 (이 정도면 국내 업계 탑이다).
1,000,000 / 60분 = 1분에 16,666건, 1초에 277건
CPU가 1초에 얼마나 많은 연산을 처리하는지 생각해보면, 백만 건 중에 충돌이 나는 경우는 아주 넉넉하게 잡아도 몇 십 건 이하일 것이다. 따라서 실무에서는 주문 수 증가와 같은 단순한 연산의 경우, 락을 걸고 시작하는 것 보다는, CAS처럼 낙관적인 방식이 더 나은 성능을 보인다.
그런데 여기서 중요한 핵심은 주문 수 증가와 같은 단순한 연산이라는 점이다. 이런 경우에는 AtomicInteger와 같은 CAS 연산을 사용하는 방식이 더 효과적이다. 이런 연산은 나노 초 단위로 발생하는 연산이다. 반면에 데이터베이스를 기다린다거나, 다른 서버의 요청을 기다리는 것 처럼 수 밀리초 이상의 시간이 걸리는 작업이라면 CAS를 사용하는 것보단 동기화 락을 사용하거나 스레드가 대기하는 방식이 더 효과적이다.
저번 포스팅에서 다뤘던 생산자 소비자 문제의 두번째 포스팅이다. 이 포스팅에선 어떻게 저번 포스팅에 말했던 문제를 해결하는지를 알아보자. 우선 저번 포스팅에서 말했던 문제는 생산자가 생산자를 깨워버릴 수 있고, 소비자가 소비자를 깨워버릴 수 있다는 문제였다. 그렇게 되면 결국 깨어난 스레드는 아무것도 하지 못하고 다시 기다리는 상태로 돌아가야 한다.
Lock Condition
이제, synchronized를 사용하지 않겠다. 이전에 synchronized의 단점을 극복하기 위해 사용했던 ReentrantLock을 사용하면 이 문제에서도 역시나 도움을 준다.
어떻게 도움을 주냐? 기존의 문제는 생산자가 생산자를 깨울 가능성이 있고 반대로 소비자가 소비자를 깨울 가능성이 있기 때문에 비효율이 발생하는 것이다. 그리고 그 근본 원인은? 스레드 대기 집합이 딱 하나이기 때문이다. 이 스레드 대기 집합은 모든 객체가 기본으로 가지고 있다. 모니터 락과 synchronized와 같이 사용되는 것이다.
그럼 스레드 대기 집합을 둘로 나누면 된다. 생산자용 대기 집합과 소비자용 대기 집합으로. 그리고 깨울땐 소비자라면 생산자용 스레드 대기 집합에 알리면 되고, 생산자라면 소비자용 스레드 대기 집합에 알리면 된다!
우선, 대기 집합을 분리하기 전에 앞에서 사용했던 synchronized, wait(), notify()를 통해 작성한 코드를 Lock 인터페이스와 ReentrantLock 구현체를 이용해서 다시 구현해보자!
BoundedQueueV4
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static util.MyLogger.log;
public class BoundedQueueV4 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public BoundedQueueV4(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
condition.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
condition.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
condition.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
condition.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
synchronized 대신 Lock lock = new ReentrantLock을 사용한다.
Condition이 등장한다. 이 Condition은 ReentrantLock을 사용하는 스레드가 대기하는 스레드 대기 공간이다.
lock.newCondition() 메서드를 호출하면 스레드 대기 공간이 만들어진다. Lock(ReentrantLock)의 스레드 대기 공간은 이렇게 만들 수가 있다.
참고로 synchronized, 모니터 락, wait(), notify()에서 사용한 스레드 대기 공간은 모든 객체 인스턴스가 내부에 기본으로 가지고 있다. 반면에 Lock(ReentrantLock)을 사용하는 경우 이렇게 스레드 대기 공간을 직접 만들어서 사용해야 한다.
condition.await()
Object.wait()과 유사한 기능이다. 지정한 condition에 현재 스레드를 대기(WAITING)상태로 보관한다. 이때 ReentrantLock에서 획득한 락을 반납하고 대기 상태로 condition에 보관된다.
condition.signal()
Object.notify()와 유사한 기능이다. 지정한 condition에서 대기중인 스레드를 하나 깨운다. 깨어난 스레드는 condition에서 빠져나온다.
Lock lock = new ReentrantLock()
이 그림에서 lock은 synchronized에서 사용하는 객체 내부에 모니터 락이 아니라, ReentrantLock 락을 뜻한다. ReentrantLock은 내부에 락과, 락 획득을 대기하는 스레드를 관리하는 대기 큐가 있다.
이 그림에서 스레드 대기 공간은 synchronized에서 사용하는 스레드 대기 공간이 아니라, 다음 코드를 뜻한다.
Condition condition = lock.newCondition()
ReentrantLock을 사용하면, condition이 스레드 대기 공간이다.
지금까지는 synchronized, wait(), notify()를 사용한 이전 코드와 거의 비슷하다. 아직 생산자용, 소비자용 스레드 대기 공간을 따로 분리하지 않았기 때문에 기존 방식과 같다고 보면 된다. 다만 구현을 synchronized로 했는가 아니면 ReentrantLock을 사용해서 했는가에 차이가 있을 뿐이다.
이대로 실행해보면 실행결과는 기존과 똑같을 것이다. 이제 스레드 대기 공간을 생산자용과 소비자용으로 분리해보자!
생산자, 소비자 대기 공간 분리
이런 그림을 만들어보자! 생각보다 엄청 간단하다.
BoundedQueueV5
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static util.MyLogger.log;
public class BoundedQueueV5 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
consumerCond.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
여기서 핵심은 lock.newCondition()을 두 번 호출해서 ReentrantLock을 사용하는 스레드 대기 공간을 2개 만드는 것이다.
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
Condition 분리
producerCond: 생산자를 위한 스레드 대기 공간
consumerCond: 소비자를 위한 스레드 대기 공간
이렇게 하면 생산자 스레드, 소비자 스레드를 정확하게 나누어 관리하고 깨울 수 있다!
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
consumerCond.signal();
} finally {
lock.unlock();
}
}
put(data) 메서드는 결국 생산자가 큐에 데이터를 생산해내는 것이다. 그렇다면 이 메서드에서 만약 큐가 가득찼다면 어디로 들어가면 될까? producerCond 안으로 들어가면 된다. 그래서 producerCond.await()을 호출한다.
그리고 큐가 가득차지 않아서 데이터를 잘 넣었다면 누굴 깨우면 될까? 소비자 스레드를 깨우면 된다. 그래서 consumerCond.signal()을 호출한다.
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
take() 메서드는 소비자 스레드가 사용하는 메서드이다. 만약 큐에 데이터가 비었다면 어디로 들어가면 될까? 소비자용 스레드 대기 공간으로 들어가면 된다. 그래서 consumerCond.await()을 호출한다.
큐에 데이터가 있어서 데이터를 잘 소비했다면 누굴 깨우면 될까? 생산자 스레드를 깨우면 된다. 그래서 producerCond.signal()을 호출한다.
이렇게 굉장히 간단하게 딱 필요한 스레드 대기 공간에 알리고, 자기가 들어갈 곳을 잘 들어갈 수 있게 됐다. 그리고 실제로 이 코드로 실행해보면 비효율은 하나도 발생하지 않는다! 왜냐!? 생산자 스레드는 소비자 스레드만을 깨우고, 소비자 스레드는 생산자 스레드만을 깨우기 때문에.
실행 결과 로그는 따로 작성하지 않겠다. 직접 실행해보면 될 것 같다.
한번 이 큰 그림을 그림으로 하나씩 분석해보자.
참고로, 지금 그림을 가지고 하는 설명은 위 예제 실행 결과와 살짝 다르다. 이해하기 쉽게 조금 변형했다고 보면 된다
생산자 먼저 실행
c1, c2, c3는 소비자 스레드 전용 대기 공간(consumerCond)에 대기중이다.
p1, p2, p3는 생산자 스레드 전용 대기 공간(producerCond)에 대기중이다.
큐에 데이터가 비어있다.
생산자인 p0 스레드가 실행 예정이다.
p0 스레드는 ReentrantLock의 락을 획득하고 큐에 데이터를 보관한다.
생산자 스레드가 큐에 데이터를 보관했기 때문에, 소비자 스레드가 가져갈 데이터가 추가되었다. 따라서 소비자 대기 공간(consumerCond)에 signal()을 통해 알려준다.
소비자 스레드 중에 하나가 깨어난다. c1이 깨어났다고 가정하자.
c1은 락 획득까지 잠시 대기하다가 이후에 p0가 반납한 ReentrantLock의 락을 획득한다. 그리고 큐의 데이터를 획득한 다음에 완료된다.
소비자 먼저 실행
이 설명 역시 예제 결과와 살짝 다르다.
c1, c2, c3는 소비자 스레드 전용 대기 공간(consumerCond)에 대기중이다.
p1, p2, p3는 생산자 스레드 전용 대기 공간(producerCond)에 대기중이다.
큐에 데이터가 가득 차 있다.
소비자인 c0 스레드가 실행 예정이다.
c0 스레드는 ReentrantLock의 락을 획득하고 큐에 있는 데이터를 획득한다.
큐에 데이터를 획득했기 때문에, 큐에 데이터를 생산할 수 있는 빈 공간이 생겼다. 생산자 대기 공간(producerCond)에 signal()을 통해 알려준다.
생산자 스레드 중에 하나가 깨어난다. p3가 깨어났다고 가정하자.
p3는 이후에 c0가 반납한 ReentrantLock의 락을 획득하고, 큐의 데이터를 저장한 다음에 완료된다.
Object.notify() vs Condition.signal()
Object.notify()
대기 중인 스레드 중 임의의 하나를 선택해 깨운다. 스레드가 깨어나는 순서는 보장되어 있지 않으며, JVM 구현에 따라 다르다. 보통은 먼저 들어온 스레드가 먼저 수행되지만 구현에 따라 다를 수 있다.
synchronized 블록 내에서 모니터 락을 가지고 있는 스레드가 호출해야 한다.
Condition.signal()
대기중인 스레드 중 하나를 깨우며, 일반적으로는 FIFO 순서로 깨운다. 이 부분은 자바 버전과 구현에 따라 달라질 수 있지만, 보통 Condition의 구현은 Queue 구조를 사용하기 때문에 FIFO 순서로 깨운다.
ReentrantLock의 락을 가지고 있는 스레드가 호출해야 한다.
스레드의 대기
사실 지금까지 얘기하지 않고 있던 부분이 하나 있다.
이제는 synchronized, ReentrantLock의 대기 상태에 대해 이야기할 차례가 됐다.
먼저 synchronized의 대기 상태부터 얘기해보자면, 잘 생각해보면 2가지 단계의 대기 상태가 존재한다.
synchronized대기
대기1: 락 획득 대기
BLOCKED 상태로 모니터 락 획득 대기
synchronized를 시작할 때 락이 없으면 대기
다른 스레드가 synchronized를 빠져나갈 때 모니터 락 획득 시도
대기2: wait()대기
WAITING 상태로 대기
wait()을 호출했을 때 스레드 대기 집합에서 대기
다른 스레드가 notify()를 호출했을 때 빠져나감
소비자 스레드: c1, c2, c3
생산자 스레드: p1, p2, p3
소비자 스레드 c1, c2, c3가 먼저 동시에 실행된다고 가정하자.
소비자 스레드 c1이 가장 먼저 락을 획득한다.
c2, c3는 락 획득을 대기하며 BLOCKED 상태가 된다.
c2, c3는 락 획득을 시도하지만, 모니터 락이 없기 때문에 락을 대기하며 BLOCKED 상태가 된다. c1은 나중에 락을 반납할 것이다. 그러면 c2, c3 중에 하나가 락을 획득해야 한다. 그런데 잘 생각해보면 락을 기다리는 c2, c3도 어딘가에서 관리가 되어야 한다. 그래야 락이 반환되었을 때 자바가 c2, c3중에 하나를 선택해서 락을 제공할 수 있다. 예를 들어서 List, Set, Queue와 같은 자료구조에 관리가 되어야 한다. 그림에서는 c2, c3가 단순히 BLOCKED 상태로 변경만 되었다. 그래서 관리되는 것처럼 보이지는 않는다.
사실은 BLOCKED 상태의 스레드도 자바 내부에서 따로 관리된다. 다음 그림을 보자.
이 그림은 이전 그림과 같은 상태를 좀 더 자세히 그린 그림이다.
그림을 보면 락 대기 집합이라는 곳이 있다. 이곳은 락을 기다리는 BLOCKED 상태의 스레드들을 관리한다.
락 대기 집합은 자바 내부에 구현되어 있기 때문에 모니터 락과 같이 개발자가 확인하기는 어렵다.
여기서는 BLOCKED 상태의 스레드 c2, c3가 관리된다.
언젠가 c1이 락을 반납하면 락 대기 집합에서 관리되는 스레드 중 하나가 락을 획득한다.
락 대기 집합이 지금에서야 나온 이유는,
단순하게 설명하기 위해 BLOCKED 상태에서 사용하는 락 대기 집합을 일부러 얘기하지 않았다. 지금쯤이면 이 내용을 말해도 이해하는데 어려움이 없을 것이다.
c1이 큐에 데이터가 없으므로 wait()을 호출하고 스레드 대기 집합에 들어간다.
c1은 락을 반납 후 스레드 대기 집합에 들어가고 WAITING 상태가 된다.
이후에 락이 반납됐으니 락을 기다리고있는 BLOCKED 상태의 스레드 중 하나가 락을 획득한다. 어차피 c2, c3 둘 다 소비자 스레드이기 때문에 락을 획득하고 임계 영역에 들어와도 아무것도 하지 못한채 wait()을 호출하고 둘 다 스레드 대기 집합으로 들아간 모습이다.
p1이 락을 획득하고 데이터를 저장한 다음 notify()를 호출하여 스레드 대기 집합에 이 사실을 알린다.
스레드 대기 집합에 있는 c1이 스레드 대기 집합을 빠져나간다.
하지만 아직 끝난 것이 아니다. 락을 얻지 못한 상태이니까 BLOCKED 상태가 된다. 그리고 락을 기다리는 BLOCKED 상태의 스레드들은 락 대기 집합에서 대기한다.
그러니까 사실 2번의 대기 과정이 있고 대기 장소가 있는것이다. 그래서 이 그림이 된다.
c1은 BLOCKED 상태에서 락을 얻을 때까지 락 대기 집합에서 대기한다.
드디어 p1이 락을 반납한다.
락이 반납되면 락 대기 집합에 있는 스레드 중 하나가 락을 획득한다. 여기서는 c1이 획득한다.
c1은 드디어 락 대기 집합까지 탈출하고 임계 영역을 수행할 수 있다.
정리를 하자면,
자바의 모든 객체 인스턴스는 멀티 스레드와 임계 영역을 다루기 위해 내부에 3가지 기본 요소를 가진다.
모니터 락
락 대기 집합(모니터 락 대기 집합)
스레드 대기 집합
여기서 락 대기 집합이 1차 대기소이고, 스레드 대기 집합이 2차 대기소라 생각하면 된다. 2차 대기소에 들어간 스레드는 2차, 1차 대기소를 모두 빠져나와야 임계 영역을 수행할 수 있다. 이 3가지 요소는 서로 맞물려 들어간다.
synchronized를 사용한 임계 영역에 들어가려면 모니터 락이 필요하다.
모니터 락이 없으면 락 대기 집합에 들어가서 BLOCKED 상태로 락을 기다린다.
모니터 락을 반납하면 락 대기 집합에 있는 스레드 중 하나가 락을 획득하고 BLOCKED → RUNNABLE 상태가 된다.
wait()을 호출해서 스레드 대기 집합에 들어가기 위해선 모니터 락이 필요하다.
스레드 대기 집합에 들어가면 모니터 락을 반납한다.
스레드가 notify()를 호출하면 스레드 대기 집합에 있는 스레드 중 하나가 스레드 대기 집합을 빠져나온다. 그리고 모니터 락 획득을 시도한다.
모니터 락을 획득하면 임계 영역을 수행한다.
모니터 락을 획득하지 못하면 락 대기 집합에 들어가서 BLOCKED 상태로 락을 기다린다.
여기까지가 synchronized를 사용한 스레드 대기의 전체 과정이다. 그럼 ReentrantLock은 크게 다를까? 똑같다. 살짝살짝만 다른 부분이 있고 이 메커니즘은 똑같다고 보면 된다.
ReentrantLock대기
대기1: ReentrantLock 락 획득 대기
ReentrantLock의 대기 큐에서 관리
WAITING 상태로 락 획득 대기
lock.lock()을 호출 했을 때 락이 없으면 대기
다른 스레드가 lock.unlock()을 호출 했을 때 락 획득을 시도, 락을 획득하면 대기 큐를 빠져나감
대기2: await() 대기
condition.await()을 호출 했을 때, condition 객체의 스레드 대기 공간에서 관리
WAITING 상태로 대기
다른 스레드가 condition.signal()을 호출 했을 때 condition 객체의 스레드 대기 공간에서 빠져나감
이 ReentrantLock도 synchronized와 마찬가지로 대기소가 2단계로 되어 있다. 2단계 대기소인 condition 객체의 스레드 대기 공간을 빠져나간다고 바로 실행되는 것이 아니다. 임계 영역 안에서는 항상 락이 있는 하나의 스레드만 실행될 수 있다. 여기서는 ReentrantLock의 락을 획득해야 RUNNABLE 상태가 되면서 그 다음 코드를 실행할 수 있다. 락을 획득하지 못하면 WAITING 상태로 락을 획득할 때 까지 ReentrantLock의 대기 큐에서 대기한다.
차이점이라고 하면, synchronized는 락 대기 집합에서 스레드들은 BLOCKED 상태라는 거고 ReentrantLock의 대기 큐에서 스레드들은 WAITING 상태라는 것이다. 그래서, 이런 사소한 차이가 있고 결국 대기소는 둘 다 2개가 있다라는 점이다. 물론 ReentrantLock을 사용하면 더 유연하고 많은 장점이 있다. 무한 대기를 하지 않아도 되는 점과 공정성을 해결할 수 있다는 것과 스레드 대기 집합을 여러개로 분리할 수 있다는 것이다.
중간 정리
여기까지 잘 익혔다면, 진짜 어지간한 멀티 스레드 환경과 흐름을 이해했다고 봐도 무방하다. 실무에서도 사용할 수 있는 정도의 수준이라고 말할 수 있다. 사실 이 예제는 이미 자바가 만들어 둔 java.util.concurrent.BlockingQueue를 사용했으면 되는 문제였다. 정확히 이 한정된 버퍼에 더 채울 수 있는 공간이 없으면 데이터 추가를 공간이 생길 때까지 차단해주고, 한정된 버퍼에 데이터가 없으면 데이터가 생길 때까지 데이터를 소비하는 것을 차단해주는 큐이다.
근데, 하나씩 풀어가면서 왜 이런 상황이 발생하고,왜 생산자 스레드 입장에서 데이터가 비워질때까지(한정된 버퍼에 데이터를 넣을 공간이 생길때까지) 단순히 기다리기만 한다면 다른 스레드들이 BLOCKED 상태에서 빠져나오지 못하는지, 왜 synchronized, wait(), notify()를 사용해야 하고이것들이 가진 한계가 무엇이길래 ReentrantLock, Condition을 사용하는지 이해해야 BlockingQueue라는 이미 잘 만들어진 큐를 사용할 자격이 생기는 것이다.
이제 위 내용을 다 이해했다. 그럼 이미 만들어놓은 기능인 BlockingQueue를 한번 사용해보자!
BlockingQueue
자바는 생산자 소비자 문제를 해결하기 위해 java.util.concurrent.BlockingQueue라는 특별한 멀티스레드 자료 구조를 제공한다.
이것은 이름 그대로 스레드를 차단(Blocking)할 수 있는 큐다.
데이터 추가 차단: 큐가 가득 차면 데이터 추가 작업을 시도하는 스레드는 공간이 생길때까지 차단된다.
데이터 획득 차단: 큐가 비어 있으면 획득 작업을 시도하는 스레드는 큐에 데이터가 들어올 때까지 차단된다.
BlockingQueue는 인터페이스이고, 다음과 같은 다양한 기능을 제공한다.
package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
boolean remove(Object o);
...
}
주요 메서드만 정리했다.
데이터 추가 메서드: add(), offer(), put(), offer(타임아웃)
데이터 획득 메서드: take(), poll(타임아웃), remove()
Queue를 상속 받는다. 큐를 상속 받았기 때문에 추가로 큐의 기능들도 사용할 수 있다.
보면 데이터 추가와 획득에서 메서드가 굉장히 많다는 것을 알 수 있다. 이게 왜 이렇게 여러개가 있는지도 이후에 설명한다.
BlockingQueue 인터페이스의 대표적인 구현체
ArrayBlockingQueue: 배열 기반으로 구현되어 있고, 버퍼의 크기가 고정되어 있다.
LinkedBlockingQueue: 링크 기반으로 구현되어 있고, 버퍼의 크기를 고정할 수도, 무한하게 사용할 수도 있다.
이제 BlockingQueue를 사용하도록 기존 코드를 변경해보자.
BoundedQueueV6_1
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BoundedQueueV6_1 implements BoundedQueue {
private final BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
this.queue = new ArrayBlockingQueue<String>(max);
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
ArrayBlockingQueue를 사용한다.
BlockingQueue.put(data): 데이터를 추가한다.
BlockingQueue.take(): 데이터를 뽑아온다.
여기서 BlockingQueue.put(data) 메서드를 한번 어떻게 구현했는지 봐보자.
다음 코드가 java.util.concurrent.BlockingQueue를 구현한 ArrayBlockingQueue의 put() 메서드이다.
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
앞에서 만들어본 BoundedQueueV5와 굉장히 유사하게 생긴것을 알 수 있다. 다른건 lock.lock()이 아니라 lock.lockInterruptibly()를 사용했다 정도의 차이가 있다.
그러니까 결론은 자바에서 제공하는 멀티 스레드 용 자료구조가 이제 눈에 들어온다는 것이다.
이제 이 BoundedQueueV6_1를 사용해서 실행해보면 같은 결과를 얻을것이다.
BlockingQueue 기능 설명
아까 보니까 데이터를 추가하는 것도 많은 메서드가 있던것을 알 수 있었다. 왜 그럴까? 실무에서 멀티스레드를 사용할땐 응답성이 중요하다. 예를 들어, 대기 상태에 있어도 고객이 중지 요청을 하거나, 또는 너무 오래 대기한 경우 포기하고 빠져나갈 수 있는 방법이 필요하다.
생산자가 무언가 데이터를 생산하는데, 버퍼가 빠지지 않아서 너무 오래 대기해야 한다면 무한정 기다리는 것보다는 작업을 포기하고 사용자에게는 "죄송합니다. 현재 시스템에 문제가 있습니다. 나중에 다시 시도해주세요."라고 하는 것이 더 나은 선택일 것이다.
즉, 멀티스레드 세상에서는 정말 여러 상황과 결론이 만들어질 수 있다는 얘기고 그렇기에 이 BlockingQueue는 각 상황에 맞는 다양한 메서드를 제공하고 있다.
BlockingQueue의 다양한 기능
Operation
Throws Exception
Special Value
Blocks
Times Out
Insert
add(e)
offer(e)
put(e)
offer(e, time, unit)
Remove
remove()
poll()
take()
poll(time, unit)
Throws Exception (대기 시 예외를 터트림)
add(e): 지정된 요소를 큐에 추가하며, 큐가 가득 차면 IllegalStateException 예외를 던진다.
remove(): 큐에서 요소를 제거하며 반환한다. 큐가 비어 있으면 NoSuchElementException 예외를 던진다.
Special Value (대기 시 즉시 반환)
offer(e): 지정된 요소를 큐에 추가하려고 시도하며, 큐가 가득 차면 false를 반환한다.
poll(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 null을 반환한다.
Blocks (대기)
put(e): 지정된 요소를 큐에 추가할 때까지 대기한다. 큐가 가득 차면 공간이 생길 때까지 대기한다.
take(): 큐에서 요소를 제거하고 반환한다. 큐가 비어 있으면 요소가 준비될 때까지 대기한다.
Times Out (시간만큼만 대기)
offer(e, time, unit): 지정된 요소를 큐에 추가하려고 시도하며, 지정된 시간 동안 큐가 비워지기를 기다리다가 시간이 초과되면 false를 반환한다.
poll(time, unit): 큐에서 요소를 제거하고 반환한다. 큐에 요소가 없다면 지정된 시간 동안 요소가 준비되기를 기다렸다가 시간이 초과되면 null을 반환한다.
참고로, BlockingQueue의 모든 대기, 시간 대기 메서드는 인터럽트를 제공한다. 대기하는 put(e), take()는 바로 위에서 예제로 직접 만들어보았다. 나머지도 하나씩 코드로 확인해보자.
BlockingQueue 즉시 반환
BoundedQueueV6_2
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static util.MyLogger.log;
public class BoundedQueueV6_2 implements BoundedQueue {
private final BlockingQueue<String> queue;
public BoundedQueueV6_2(int max) {
this.queue = new ArrayBlockingQueue<String>(max);
}
@Override
public void put(String data) {
boolean result = queue.offer(data);
log("저장 시도 결과 = " + result);
}
@Override
public String take() {
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
저장 시도 시 기다리는 시간을 1 나노초로 했기 때문에 그냥 뭐 안 기다리는 수준으로 기다리게 설정했다. 그러니까 실행 결과를 보면 저장 시도 결과는 false가 출력되고 저장을 마지막에 하지 못했으니 소비도 마지막 스레드는 할 수 없어 null이 반환됐다. 만약 기다리는 시간을 넉넉하게 잡고 awaitNanos(timeout)으로 잠시 대기하게 한 후 소비자 스레드가 들어와서 큐에 데이터를 소비해서 공간이 생긴후에도 지정한 시간을 지나지 않았다면 아마 데이터가 추가도, 데이터 소비도 정상적으로 될 것이다.
아, 참고로 여기서 awaitNanos(timeout)은 그냥 내가 이렇게 사용하자가 아니라 실제로 BlockingQueue가 offer(e, time, unit) 메서드에서 사용하고 있는 것이다. 아래 실제 구현 코드 참고!
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
BlockingQueue예외 터트리기
BoundedQueueV6_4
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static util.MyLogger.log;
public class BoundedQueueV6_4 implements BoundedQueue {
private final BlockingQueue<String> queue;
public BoundedQueueV6_4(int max) {
this.queue = new ArrayBlockingQueue<String>(max);
}
@Override
public void put(String data) {
queue.add(data);
}
@Override
public String take() {
return queue.remove();
}
@Override
public String toString() {
return queue.toString();
}
}
실행 결과를 보면 데이터를 추가할 때 공간이 없으면 IllegalStateException을, 데이터 소비할 때 소비할 데이터가 없으면 NoSuchElementException을 발생시키고 있음을 알 수 있다.
참고로, 지금 V6_1, V6_2, V6_3, V6_4 모두 BoundedQueue 인터페이스를 구현해서 만들고 있는데 이래야만 가능한게 아니라 기존에 작성한 코드들(producerFirst, consumerFirst, ...)이 전부 BoundedQueue를 의존하고 있기 때문에 이 인터페이스를 구현한 구현체를 만들어서 그 안에서 BlockingQueue를 사용하는 식으로 만든거고 그게 아니라면 그냥 바로 BlockingQueue를 사용해도 상관없다!
정리
이렇듯, 기존에 아주 아주 잘 만들어진 BlockingQueue를 사용하면 훨씬 더 다양한 상황을 더 유연하게 대처할 수 있음을 알게됐다.
무작정 기다릴수도, 정해진 시간만큼만 기다릴수도, 아예 바로 결과를 반환할수도, 예외를 터트릴수도 있다. 그리고 내부가 어떻게 구현됐는지도 이제 이해할 수 있는 레벨이 됐다!
생산자 소비자 문제는 멀티스레드 프로그래밍에서 자주 등장하는 동시성 문제 중 하나로, 여러 스레드가 동시에 데이터를 생산하고 소비하는 상황을 다룬다.
멀티스레드의 핵심을 제대로 이해하려면 반드시 생산자 소비자 문제를 이해하고, 올바른 해결 방안도 함께 알아두어야 한다. 생산자 소비자 문제를 제대로 이해하면 멀티스레드를 제대로 이해했다고 볼 수 있다. 그만큼 중요한 내용이다.
생산자(Producer): 데이터를 생성하는 역할을 한다. 예를 들어, 파일에서 데이터를 읽어오거나 네트워크에서 데이터를 받아오는 스레드가 생산자 역할을 할 수 있다.
위 프린터 예제에서 사용자의 입력을 프린터 큐에 전달하는 스레드가 생산자의 역할이다.
소비자(Consumer): 생성된 데이터를 사용하는 역할을 한다. 예를 들어, 데이터를 처리하거나 저장하는 스레드가 소비자 역할을 할 수 있다.
위 프린터 예제에서 프린터 큐에 전달된 데이터를 받아서 출력하는 스레드가 소비자 역할이다.
버퍼(Buffer): 생산자가 생성한 데이터를 일시적으로 저장하는 공간이다. 이 버퍼는 한정된 크기를 가지며, 생산자와 소비자가 이 버퍼를 통해 데이터를 주고 받는다.
위 프린터 예제에서 프린터 큐가 버퍼 역할이다.
그럼 이게 왜 문제가 된다는 것일까?
문제 상황
생산자가 너무 빠를 때: 버퍼가 가득 차서 더 이상 데이터를 넣을 수 없을 때까지 생산자가 데이터를 생성한다. 버퍼가 가득 찬 경우 생산자는 버퍼에 빈 공간이 생길 때까지 기다려야 한다.
소비자가 너무 빠를 때: 버퍼가 비어서 더 이상 소비할 데이터가 없을 때까지 소비자가 데이터를 처리한다. 버퍼가 비어있을 때 소비자는 버퍼에 새로운 데이터가 들어올 때까지 기다려야 한다.
예를 들면, 초밥집에 가서 주방장님이 초밥을 하나씩 만들어서 서빙 카운터에 초밥을 하나씩 올려 놓는데 더 이상 올려 놓을 곳이 없어 초밥을 만들지 못하는 경우가 생산자가 너무 빠른 경우이고, 반대로 서빙 카운터에 초밥을 내려놓자마자 손님이 초밥을 다 먹어버려서 음식이 나올때까지 기다려야 하는 경우가 소비자가 너무 빠른 경우이다. 이때 서빙 카운터는 버퍼라고 볼 수 있다.
이 문제는 다음 두 용어로도 불린다.
생산자 소비자 문제(producer-consumer problem): 생산자 소비자 문제는, 생산자 스레드와 소비자 스레드가 특정 자원을 함께 생산하고 소비하면서 발생하는 문제이다.
한정된 버퍼 문제(bounded-buffer problem): 이 문제는 결국 중간에 있는 버퍼의 크기가 한정되어 있기 때문에 발생한다. 따라서 한정된 버퍼 문제라고도 한다.
put(data): 버퍼에 데이터를 보관한다. (생산자 스레드가 호출하고, 데이터를 생산한다.)
take(): 버퍼에 보관된 값을 가져간다. (소비자 스레드가 호출하고, 데이터를 소비한다.)
BoundedQueueV1
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import static util.MyLogger.log;
public class BoundedQueueV1 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV1(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
if (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 버립니다." + data);
return;
}
queue.offer(data);
}
@Override
public synchronized String take() {
if (queue.isEmpty()) {
return null;
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
BoundedQueueV1: 한정된 버퍼 역할을 하는 가장 단순한 구현체이다. 이후에 버전이 점점 올라가면서 코드를 개선한다.
Queue, ArrayDeque: 데이터를 중간에 보관하는 버퍼로 큐(Queue)를 사용한다. 구현체로는 ArrayDeque를 사용한다.
int max: 한정된(Bounded) 버퍼이므로, 버퍼에 저장할 수 있는 최대 크기를 지정한다.
put(): 큐에 데이터를 저장한다. 큐가 가득 찬 경우, 더는 데이터를 보관할 수 없으므로 데이터를 버린다.
take(): 큐의 데이터를 가져간다. 큐에 데이터가 없는 경우 null을 반환한다.
toString(): 버퍼 역할을 하는 queue 정보를 출력한다.
주의! 원칙적으로는 toString()에도 synchronized를 적용해야 맞다. 그래야 toString()을 통한 조회 시점에도 정확한 데이터를 조회할 수 있다. 예를 들어, toString()을 호출하는 시점에 모니터 락을 사용하지 않으면 다른 스레드가 put을 하는 동시에 이 toString()을 호출해서 실제 데이터와 다른 데이터가 출력될 수 있으니까. 그러나, 예제 코드를 단순하게 유지하고 목적에 부합한 결과를 출력하기 위해 의도적으로 synchronized를 넣지 않았다.
임계 영역
여기서 핵심 공유 자원은 바로 queue이다. 여러 스레드가 접근할 예정이므로 synchronized를 사용해서 한번에 하나의 스레드만 put() 또는 take()를 실행할 수 있도록 안전한 임계 영역을 만든다.
위 producerFirst 메서드와 완전히 동일하고 먼저 실행되는 메서드가 startProducer가 아니라 startConsumer라는 차이만 있다.
private static void startProducer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("생산자 시작");
for (int i = 1; i <= 3; i++) {
Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
threads.add(producer);
producer.start();
sleep(100);
}
}
3개의 생산자 스레드를 만들고 각 스레드들을 실행한다. 로그 출력을 조금 알아보기 쉽게 하기 위해 sleep(100)을 추가했다. 의도적으로 추가한 것이고 실제 업무라면 없는게 맞다. 이렇게 잠시 0.1초 동안 대기하게 만들어 두면 로그로 producer1 → producer2 → producer3 순으로 이쁘게 출력될 것이다.
만들어지는 각각의 생산자 스레드 모두 파라미터로 넘어온 threads에 추가된다.
private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("소비자 시작");
for (int i = 1; i <= 3; i++) {
Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
threads.add(consumer);
consumer.start();
sleep(100);
}
}
위 startProducer와 완전히 동일한 코드이지만 생산자 스레드를 만드는 게 아니라 소비자 스레드를 만들어 실행한다.
public static void main(String[] args) {
//1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV1(2);
//2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
producerFirst(queue);
//consumerFirst(queue);
}
public static void main(String[] args) {
//1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV1(2);
//2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
//producerFirst(queue);
consumerFirst(queue);
}
consumer1, consumer2 스레드가 모두 한번씩 실행되면서 큐에 있는 데이터를 소비했다. 이제 consumer3이 실행될 차례다.
consumer3이 데이터를 소비하려고 했지만 데이터가 없다. 아무것도 가져올 수 없다. 아무런 데이터도 얻지 못한 채 쓸쓸히 돌아가게 된다.
이 시점이 마지막 시점이다. 결과적으로 버퍼가 가득차서 p3 스레드가 생성한 데이터는 버려졌고, c3 스레드는 버퍼에 아무런 데이터도 없기 때문에 어떠한 데이터도 얻지 못한 채 돌아갔다. 이 부분에서 문제점이 보인다. 만약 이걸 해결하려면 가장 간단한 대안으로는 기다리는 대안이 있을 것 같다. p3 입장에서는 버퍼에 남는 공간이 생길 때 까지 기다렸다가 데이터를 넣고, c3 입장에서는 버퍼에 데이터가 생길 때 까지 기다렸다가 생기면 데이터를 가져오는 식으로 말이다.
생산자 소비자 문제 - 소비자 우선 결과 분석
이번엔 반대로 소비자 먼저 실행한 결과를 분석해보자.
c1, c2, c3 순으로 스레드가 큐에서 데이터를 가져오려고 시도하지만, 데이터가 아무것도 없다. 결국 모든 소비자 스레드는 빈털털이로 돌아오게 된다.
이제 생산자 스레드가 하나씩 생성되면서 큐에 데이터를 넣는다.
마지막 p3 스레드는 역시나 버퍼에 데이터가 꽉 차있기 때문에 데이터를 넣지 못하고 버리게 된다.
이게 마지막 시점의 모습이다. 이부분에서도 역시나 문제점이 많다. 소비자 스레드는 모든 스레드가 다 데이터를 가지지 못했고, 생산자 스레드는 결국 p3 스레드는 여전히 데이터를 추가하지 못했다.
총평을 내려보자면,
생산자 스레드 먼저 실행의 경우, p3이 보관하는 데이터는 버려지고 c3은 데이터를 받지 못한다.
소비자 스레드 먼저 실행의 경우, c1, c2, c3 모두 데이터를 받지 못하고 p3이 보관하는 데이터는 버려진다.
결국 생산자 소비자 문제는 이런 결과를 발생시킨다. 어디 한쪽이 너무 빠르면 뭐가 됐건 문제가 생기고 그 근본 원인은 사실 버퍼의 사이즈다. 버퍼가 가득 찬 경우는 생산자 입장에서 버퍼에 여유가 생길 때 까지 조금만 기다리면 되는데 기다리지 못하고 데이터를 버리는 것이 아쉽고, 버퍼가 빈 경우는 소비자 입장에서 버퍼에 데이터가 채워질 때 까지 조금만 기다리면 되는데 기다리지 못하고 데이터를 못 얻는게 아쉽다.
예제 변경 - 대기하는 코드로 바꿔보기
가장 간단한 해결 방법은 기다리는 것이다! 한번 기다리게 해서 문제를 해결해보자!
BoundedQueueV2
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class BoundedQueueV2 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV2(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
sleep(1000);
}
queue.offer(data);
}
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
sleep(1000);
}
return queue.poll();
}
@Override
public String toString() {
return queue.toString();
}
}
BoundedQueueV2를 만들고 여기서 put(data), take() 메서드에서 데이터가 꽉차거나 없다면 빠져나가는게 아니라 기다리는 것이다.
그래서, put(String data)를 보면 큐의 사이즈가 꽉 찼다면 1초 정도 대기한 후 다시 확인해보는 것이다. 데이터가 빠져나갈 때까지.
또한, take()도 큐에 아무런 데이터가 없다면 1초 정도 대기한 후 다시 확인해보는 것이다. 데이터가 들어올 때까지.
c1 스레드가 버퍼에 데이터가 아무것도 없기 때문에 데이터가 들어올때까지 기다리고 있어서 락을 놔주고 있지 않다. 그 결과 어떤 스레드도 접근이 불가능하다. 그래서 c1 스레드를 제외한 모든 스레드가 다 BLOCKED 상태가 돼버렸다.
"어? 그럼 sleep()말고 yield()를 사용해서 다른 스레드에게 양보하면 되지 않아요?"
진짜 좋은 생각이다. 한번 그렇게 해볼까? 다음 코드를 보자.
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
// sleep(1000);
Thread.yield();
}
queue.offer(data);
}
이번엔 yield()를 사용해서 욕심 부리지 말고 다른 스레드에게 양보하자! 과연 잘 될까?
생산자 스레드 먼저 실행 코드로 실행해보면 다음과 같은 결과를 얻는다.
...
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
2024-07-24 14:18:59.595 [producer3] [put] 큐가 가득 찼습니다. 생산자는 대기합니다.
...
결국 같은 현상이다. 대신 1초의 대기 시간이 사라지니까 너무 빠르게 출력이 된 것 뿐이다. 왜 같은 현상이 일어날까?
Thread.yield()는 synchronized의 락을 반납하는게 아니다.
그저 다른 스레드에게 CPU 사용을 양보하는 것 뿐이지 synchronized의 락은 여전히 본인이 가지고 있다. 언제까지? synchronized 블록이 끝날 때까지. 그래서 결국 다른 스레드들은 같은 모니터 락이 필요한 synchronized 메서드나 블록에 접근하지 못하게 된다.
그럼 이 생산자 소비자 문제는 도대체 어떻게 해결할까?
양보하는게 맞다! 근데 양보를 할 때 락도 양보하면 된다.
자바의 Object.wait(), Object.notify()를 사용하면, 락을 가지고 대기하는 스레드가 대기하는 동안 다른 스레드에게 락을 양보할 수 있다. 바로 알아보자!
Object.wait(), Object.notify()
저번에도 말했지만, 자바는 처음부터 멀티스레드를 고려하고 탄생한 언어이다. 앞서 설명한 synchronized를 사용한 임계 영역 안에서 락을 가지고 무한 대기하는 문제는 흥미롭게도 Object 클래스에 해결 방안이 있다. Object 클래스는 이런 문제를 해결할 수 있는 wait(), notify()라는 메서드를 제공한다. Object는 모든 자바 객체의 부모이기 때문에, 여기 있는 기능들은 모두 자바 언어의 기본 기능이라 생각하면 된다.
Object.wait()
현재 스레드가 가진 락을 반납하고 대기(WAITING)한다.
현재 스레드를 대기 상태로 전환한다. 이 메서드는 현재 스레드가 synchronized 블록이나 메서드에서 락을 소유하고 있을 때만 호출할 수 있다. 호출한 스레드는 락을 반납하고, 다른 스레드가 해당 락을 획득할 수 있도록 한다. 이렇게 대기 상태로 전환된 스레드는 다른 스레드가 notify() 또는 notifyAll()을 호출할 때까지 대기 상태를 유지한다.
Object.notify()
대기 중인 스레드 중 하나를 깨운다.
이 메서드는 synchronized 블록이나 메서드에서 호출되어야 한다. 깨운 스레드는 락을 다시 획득할 기회를 얻게 된다. 만약, 대기 중인 스레드가 여러개라면 그 중 하나만이 깨워지게 된다.
Object.notifyAll()
대기 중인 모든 스레드를 깨운다.
이 메서드 역시 synchronized 블록이나 메서드에서 호출되어야 한다. 모든 대기중인 스레드가 락을 획득할 수 있는 기회를 얻게 된다. 이 방법은 모든 스레드를 깨워야 할 필요가 있는 경우에 유용하다.
wait(), notify() 메서드를 적절하게 사용하면, 멀티스레드 환경에서 발생할 수 있는 문제를 효율적으로 해결할 수 있다. 이 기능을 활용해서 스레드가 락을 가지고 임계 영역안에서 무한 대기하는 문제를 해결해보자!
BoundedQueueV3
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class BoundedQueueV3 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV3(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
wait(); // RUNNABLE -> WAITING, 락 반납
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
notify(); // 대기 스레드에게 WAIT -> BLOCKED
}
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
wait();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
notify(); // 대기 스레드에게 WAIT -> BLOCKED
return data;
}
@Override
public String toString() {
return queue.toString();
}
}
이제 V3이다. 앞서 사용한 sleep() 코드는 제거하고, Object.wait()을 사용하자. Object는 모든 클래스의 부모이므로 자바의 모든 객체는 해당 기능을 사용할 수 있다.
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 찼습니다. 생산자는 대기합니다.");
try {
wait(); // RUNNABLE -> WAITING, 락 반납
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
notify(); // 대기 스레드에게 WAIT -> BLOCKED
}
synchronized를 통해 임계 영역을 설정한다. 생산자 스레드는 락 획득을 시도한다.
락을 획득한 생산자 스레드는 반복문을 통해서 큐에 빈 공간이 생기는지 주기적으로 체크한다. 만약, 빈 공간이 없다면 Object.wait()을 사용해서 대기한다. 참고로 대기할 때 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면 다시 반복문에서 큐의 빈 공간을 체크한다.
wait()을 호출해서 대기하는 경우 RUNNABLE → WAITING 상태가 된다.
생산자가 데이터를 큐에 저장하고 나면 notify()를 통해 대기하는 스레드에게 저장된 데이터가 있다고 알려주어야 한다. 예를 들어서 큐에 데이터가 없어서 대기하는 소비자 스레드가 있다고 가정하자. 이때 notify()를 호출하면 소비자 스레드는 깨어나서 저장된 데이터를 가져갈 수 있다.
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없습니다. 소비자는 대기합니다");
try {
wait();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
notify(); // 대기 스레드에게 WAIT -> BLOCKED
return data;
}
synchronized를 통해 임계 영역을 설정한다. 소비자 스레드는 락 획득을 시도한다.
락을 획득한 소비자 스레드는 반복문을 사용해서 큐에 데이터가 있는지 주기적으로 체크한다. 만약, 데이터가 없다면 Object.wait()을 사용해서 대기한다. 참고로 대기할 때 락을 반납하고 대기한다. 그리고 대기 상태에서 깨어나면, 다시 반복문에서 큐에 데이터가 있는지 체크한다.
대기하는 경우 RUNNABLE → WAITING 상태가 된다.
소비자가 데이터를 획득하고 나면 notify()를 통해 대기하는 생산자 스레드에게 큐에 저장할 여유 공간이 생겼다고알려주어야 한다. 예를 들어, 큐에 데이터가 꽉차서 데이터를 넣지 못해 대기하는 생산자 스레드가 있다고 가정하자. 이때 notify()를 호출하면 생산자 스레드는 깨어나서 저장된 데이터를 획득할 수 있다.
wait()으로 대기 상태에 빠진 스레드는 notify()를 사용해야 깨울 수 있다. 생산자는 생산을 완료하면 notify()로 대기하는 스레드를 깨워서 생산된 데이터를 가져가게 하고, 소비자는 소비를 완료하면 notify()로 대기하는 스레드를 깨워서 데이터를 생산하라고 하면 된다. 여기서 중요한 핵심은 wait()을 호출해서 대기 상태에 빠질 땐 락을 반납하고 대기 상태에 빠진다는 것이다. 대기 상태에 빠지면 어차피 아무일도 하지 않으므로 락도 필요하지 않다.
V3로 변경하고 생산자 먼저 실행 코드로 변경 하기
public static void main(String[] args) {
//1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV3(2);
//2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
producerFirst(queue);
//consumerFirst(queue);
}
public static void main(String[] args) {
//1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV3(2);
//2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
//producerFirst(queue);
consumerFirst(queue);
}
로그만 보면 결국 잘 저장하고 잘 사용한것 같아 보인다. 근데 로그만으로는 이해하기 쉽지 않다. 그림으로 하나씩 분석해보자!
wait(), notify() 생산자 우선 분석
우선 못보던게 하나 생겼다. 스레드 대기 집합
스레드 대기 집합 (wait set)
synchronized 임계 영역 안에서 Object.wait()을 호출하면 스레드는 대기 상태에 들어간다. 이렇게 대기 상태에 들어간 스레드를 관리하는 것을 대기 집합이라 한다. 참고로 모든 객체는 각자 자기의 대기 집합과 모니터 락을 가지고 있다. 그리고 이 둘은 한 쌍으로 사용된다. 따라서 락을 획득한 객체의 대기 집합을 사용해야 한다. 여기서는 BoundedQueue(x001) 구현 인스턴스의 락과 대기 집합을 사용한다.
synchronized를 메서드에 적용하면 해당 인스턴스의 락을 사용한다. 여기서는 BoundedQueue(x001)의 구현체이다.
wait() 호출은 앞에 this를 생략할 수 있다. this는 해당 인스턴스를 뜻한다. 여기서는 BoundedQueue(x001)의 구현체이다.
이제 순서대로 흐름을 분석해보자!
p1이 락을 획득하고 큐에 데이터를 저장한다.
큐에 데이터가 추가 되었기 때문에 스레드 대기 집합에 이 사실을 알려야 한다. (코드 흐름이 그렇다)
notify()를 호출하면 스레드 대기 집합에서 대기하는 스레드 중 하나를 깨운다.
현재 대기 집합에 스레드가 없으므로 아무일도 발생하지 않는다. 만약 소비자 스레드가 대기 집합에 있었다면 깨어나서 큐에 들어있는 데이터를 소비했을 것이다.
p1은 할일을 다 끝내고 락을 반납한다.
p2도 큐에 데이터를 저장하고 생산을 완료했다.
p3가 데이터를 생산하려고 하는데, 큐가 가득 찼다. wait()을 호출한다.
wait()을 호출하면 락을 반납한다.
wait()을 호출하면 스레드의 상태가 RUNNABLE → WAITING으로 변경된다.
wait()을 호출하면 스레드 대기 집합에서 관리된다.
스레드 대기 집합에서 관리되는 스레드는 이후에 다른 스레드가 notify()를 통해 스레드 대기 집합에 신호를 주면 깨어날 수 있다.
이제 소비자 스레드들이 움직일 차례가 됐다.
c1이 데이터를 획득했다. 그래서 큐에 데이터를 보관할 빈자리 생겼다.
c1은 notify()를 호출한다 (코드 흐름이 그렇다)
스레드 대기 집합에 있는 p3를 깨운다.
스레드 대기 집합은 notify() 신호를 받으면 대기 집합에 있는 스레드 중 하나를 깨운다.
그런데 대기 집합에 있는 스레드가 깨어난다고 바로 작동하는 것이 아니다. 왜냐하면 이 스레드는 여전히 임계 영역 안에 있기 때문이다.
임계 영역에 있는 코드를 실행하려면 가장 먼저 락이 필요하다. p3는 대기 집합에서는 나가지만 여전히 임계 영역에 있으므로 락을 획득하기 위해 BLOCKED 상태로 대기한다. p3: WAITING → BLOCKED
참고로 이때 임계 영역의 코드를 처음으로 돌아가서 실행하는 게 아니다. 대기 집합에 들어오게 된 wait()을 호출한 부분부터 다시 실행된다. 락을 획득하면 wait() 이후의 코드를 실행한다.
c1은 데이터 소비를 완료하고 락을 반납하고 임계 영역을 빠져나간다.
p3가 락을 획득한다.
BLOCKED → RUNNABLE
wait() 코드에서 대기했기 때문에 이후의 코드를 실행한다.
data3을 큐에 저장한다.
notify()를 호출한다. 데이터를 저장했기 때문에 혹시 스레드 대기 집합에 소비자가 대기하고 있다면 소비자를 하나 깨워줘야 한다. 물론 지금은 대기 집합에 스레드가 없기 때문에 아무일도 일어나지 않는다.
p3는 락을 반납하고 임계 영역을 빠져나간다.
이제 c2, c3가 하나씩 데이터를 원래대로 가져갈 것이다.
c2, c3가 실행됐고 데이터가 있으므로 둘 다 데이터를 소비하고 완료한다.
둘 다 notify()를 호출했지만 대기 집합에 스레드가 없으므로 아무일도 일어나지 않는다.
wait(), notify() 덕분에 스레드가 락을 놓고 대기하고, 또 대기하는 스레드를 필요한 시점에 깨울 수 있었다. 생산자 스레드가 큐가 가득차서 대기해도 소비자 스레드가 큐의 데이터를 소비하고 나면 알려주기 때문에 최적의 타이밍에 깨어나서 데이터를 생산할 수 있었다.
덕분에 최종 결과를 보면 p1, p2, p3 모두 데이터를 정상 생산하고 c1, c2, c3 모두 데이터를 정상 소비할 수 있었다.
wait(), notify()소비자 우선 분석
이제 소비자 우선 코드로 시작해보자. 최초의 상태이다.
c1이 락을 얻고 임계 영역에 들어왔지만, 데이터가 없다. wait()을 호출하고 대기 집합에 대기하게 된다.
큐에 데이터가 없기 때문에 c1, c2, c3 모두 스레드 대기 집합에서 대기하게 된다.
이후에 생산자가 큐에 데이터를 생산하면 notify()를 통해 이 스레드들을 하나씩 깨워서 데이터를 소비할 수 있을것이다.
p1은 락을 획득하고 큐에 데이터를 생산한다. 큐에 데이터가 있기 때문에 소비자를 하나 깨울 수 있다. notify()를 통해 스레드 대기 집합에 이 사실을 알려준다.
notify()를 받은 스레드 대기 집합은 스레드 중에 하나를 깨운다.
여기서 c1, c2, c3 중에 어떤 스레드가 깨어날지는 알 수 없다.
어떤 스레드가 깨워질지는 JVM 스펙에 명시되어 있지 않다. 따라서 JVM 버전 및 환경에 따라 달라진다.
그런데 대기 집합에 있는 스레드가 깨어난다고 바로 작동하는 것은 아니다. 깨어난 스레드는 여전히 임계 영역 안에 있다.
임계 영역 안에 있는 코드를 실행하려면 먼저 락이 필요하다. 대기 집합에서는 나가지만 여전히 임계 영역 안에 있으므로 락을 획득하기 위해 BLOCKED 상태로 대기한다. WAITING → BLOCKED
p1이 락을 반납하고 임계 영역에서 나간다.
c1은 락을 획득한다.
c1은 락을 획득하고 임계 영역 안에서 실행되며 데이터를 획득한다.
c1이 데이터를 획득했으므로 큐에 데이터를 넣을 공간이 있다는 것을 대기 집합에 알려준다. 만약 대기 집합에 생산자 스레드가 대기하고 있다면 큐에 데이터를 넣을 수 있을 것이다.
c1이 notify()로 스레드 대기 집합에 알렸지만, 생산자 스레드가 아니라 소비자 스레드만 있다. 따라서 의도와는 다르게 소비자 스레드인 c2가 대기상태에서 깨어난다. (물론 대기 집합에 있는 어떤 스레드가 깨어날지는 알 수 없다. 여기서는 c2가 깨어난다고 가정한다. 심지어 생산자와 소비자 스레드가 함께 대기 집합에 있어도 어떤 스레드가 깨어날지는 알 수 없다.)
c1은 작업을 완료한다.
c1이 c2를 깨웠지만, 문제가 있다. 바로 큐에 데이터가 없다는 것이다.
c2는 락을 획득하고 큐에 데이터를 소비하려고 시도한다. 그런데 큐에 데이터가 없으므로 c2는 결국 wait()을 호출해서 다시 대기 상태로 변하며 대기 집합에 들어간다.
이처럼 소비자인 c1이 같은 소비자인 c2를 깨우는것은 상당히 비효율적이다.
c1 입장에서 c2를 깨우게 되면 아무 일도 하지 않고 그냥 다시 스레드 대기 집합에 들어갈 수 있다. 결과적으로 CPU만 사용하고, 아무 일도 하지 않은 상태로 다시 대기 상태가 되어버린다.
그렇다고 c1이 스레드 대기 집합에 있는 어떤 스레드를 깨울지 선택할 수는 없다. notify()는 스레드 대기 집합에 있는 스레드 중 임의의 하나를 깨울뿐이다.
물론 이게 비효율적이라는 것이지 문제가 되는 것은 아니다. 결과에는 아무런 문제가 없다. 살짝 돌아갈 뿐이다.
p2가 락을 획득하고, 데이터를 저장한 다음에 notify()를 호출한다. 데이터가 있으므로 소비자 스레드가 깨어난다면 데이터를 소비할 수 있다.
스레드 대기 집합에 있는 c3가 깨어난다. 참고로 어떤 스레드가 깨어날지는 알 수 없다.
c3는 임계 영역 안에 있으므로 락을 획득하기 위해 대기(BLOCKED) 한다.
p2가 작업을 끝마치고 락을 반납하고 나간다.
c3는 락을 획득하고 BLOCKED → RUNNABLE 상태가 된다.
c3는 데이터를 획득한 다음에 notify()를 통해 스레드 대기 집합에 알린다. 큐에 여유 공간이 생겼기 때문에 생산자 스레드가 대기 중이라면 데이터를 생산할 수 있다.
notify()를 호출했지만, 스레드 대기 집합에는 소비자인 c2만 존재한다.
c2가 깨어나지만 임계 영역 안에 있으므로 락을 기다리는 BLOCKED 상태가 된다.
c3는 락을 반납하고 임계 영역을 나간다.
c2가 락을 획득하고, 큐에서 데이터를 획득하려 하지만 데이터가 없다.
c2는 다시 wait()을 호출해서 대기(WAITING)상태에 들어가고, 다시 대기 집합에서 관리된다.
물론 c2의 지금 이 사이클은 CPU 자원만 소모하고 다시 대기 집합에 들어갔기 때문에 비효율적이다.
만약 소비자인 c3 입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면, 소비자인 c2를 깨우지는 않았을 것이다. 하지만 notify()는 이런 선택을 할 수 없다.
p3가 락을 얻고 데이터를 저장한다. notify()를 통해 스레드 대기 집합에 알린다.
스레드 대기 집합에는 소비자 c2가 있으므로 생산한 데이터를 잘 소비할 수 있다.
c2가 notify()를 통해 깨어나고 BLOCKED 상태로 대기하고 있다가 락을 획득하면 큐에 데이터를 잘 소비해서 임계영역을 빠져나오고 종료된다.
정리를 하자면
최종 결과를 보면 p1, p2, p3 모두 데이터를 잘 생산하고 c1, c2, c3 모두 데이터를 잘 소비했다. 하지만 소비자인 c1이 같은 소비자인 c2, c3를 깨울 수 있었다. 이 경우 큐에 데이터가 없을 가능성이 있다. 이땐 깨어난 소비자 스레드가 CPU 자원만 소모하고 다시 대기 집합에 들어갔기 때문에 비효율적이다.
만약, 소비자인 c1 입장에서 생산자, 소비자 스레드를 선택해서 깨울 수 있다면, 소비자인 c2를 깨우지는 않았을 것이다. 예를 들어 소비자는 생산자만 깨우고, 생산자는 소비자만 깨울 수 있다면 더 효율적으로 작동할 수 있을 것 같다. 하지만 notify()는 이런 선택을 할 수 없다.
비효율적이지만 문제는 없다. 좀 돌아갈 뿐이다.
Object - wait(), notify() 한계
지금까지 살펴본 Object.wait(), Object.notify() 방식은 스레드 대기 집합 하나에 생산자, 소비자 스레드를 모두 관리한다. 그리고 notify()를 호출할 때 임의의 스레드가 선택된다. 따라서 앞서 살펴본 것 처럼 큐에 데이터가 없는 상황에 소비자가 같은 소비자를 깨우는 비효율이 발생할 수 있다. 또는 큐에 데이터가 가득 차있는데 생산자가 같은 생산자를 깨우는 비효율도 발생할 수 있다.
다음과 같은 상황을 연출해보자.
큐에 dataX가 보관되어 있다.
스레드 대기 집합에는 다음 스레드가 대기하고 있다.
소비자: c1, c2, c3
생산자: p1, p2, p3
p0 스레드가 data0 생산을 시도한다.
p0 스레드가 실행되면서 data0을 큐에 저장한다. 이때 큐에 데이터가 가득찬다.
notify()를 통해 대기 집합의 스레드를 하나 깨운다.
만약, notify()의 결과로 소비자 스레드가 깨어나게 되면 소비자 스레드는 큐의 데이터를 획득하고 완료된다.
그러나 notify()의 결과로 생산자 스레드를 깨우게 되면, 이미 큐에 데이터는 가득 차 있다. 따라서 데이터를 생산하지 못하고 다시 대기 집합으로 이동하는 비효율이 발생한다.
이번엔 반대의 경우로 소비자에 대해서도 이야기 해보자. 아래와 같은 상황이 있다.
c0 스레드가 실행되고 data0을 획득한다.
이제 큐에 데이터는 비어있게 된다.
c0 스레드는 notify()를 호출한다.
스레드 대기 집합에서 소비자 스레드가 깨어나면 큐에 데이터가 없기 때문에 다시 대기 집합으로 이동하는 비효율이 발생한다.
결국, 같은 종류의 스레드를 깨울 때 비효율이 발생한다.
이 내용을 통해서 알 수 있는 사실은 생산자가 생산자를 깨우거나, 소비자가 소비자를 깨울 때 비효율이 발생한다. 생산자가 소비자를 깨우고 반대로 소비자가 생산자를 깨운다면 이런 비효율은 발생하지 않을 것이다.
또 하나의 문제가 있다. 바로 스레드 기아(thread starvation) 문제점이다.
notify()의 또 다른 문제점으로는 어떤 스레드가 깨어날 지 알 수 없기 때문에 발생할 수 있는 스레드 기아 문제가 있다.
notify()가 어떤 스레드를 깨울지는 알 수 없다. 최악의 경우 c1 - c5 스레드가 반복해서 깨어날 수 있다.
c1 - c5 스레드가 깨어나도 큐에 소비할 데이터가 없다. 따라서 다시 스레드 대기 집합에 들어간다.
notify()로 다시 깨우는데 어떤 스레드를 깨울지 알 수 없다. 따라서 c1 - c5 스레드가 반복해서 깨어날 수 있다.
이렇게 대기 상태의 스레드가 실행 순서를 계속 얻지 못해서 실행되지 않는 상황을 스레드 기아 상태라 한다.
여기서 깨어나야 할 이상적인 스레드는 바로 생산자 스레드인 p1이다.
이 스레드 기아를 해결하는 방법 중 하나인 notifyAll()이 있다.
notifyAll()
이 메서드는 스레드 대기 집합에 있는 모든 스레드를 한번에 다 깨울 수 있다.
데이터를 획득한 c0 스레드가 notifyAll()을 호출한다.
대기 집합에 있는 모든 스레드가 깨어난다.
모든 스레드는 다 임계 영역 안에 있다. 따라서 먼저 락을 획득해야 한다.
락을 획득하지 못하면 BLOCKED 상태에서 머무르게 된다.
만약, c1이 먼저 락을 획득한다면 큐에 데이터가 없으므로 다시 스레드 대기 집합에 들어간다.
c2 - c5도 마찬가지다.
따라서, p1이 가장 늦게 락 획득을 시도해도 c1 - c5 모두 스레드 대기 집합에 들어가있으므로 결과적으로 p1만 남게되고 결국 락을 획득하게 된다.
그러나, 이 경우에 스레드 기아 문제를 해결한다 하더라도 비효율은 해결하지 못한다. 결국 가장 좋은 방법은 소비자는 생산자를, 생산자는 소비자를 깨우는 방법이다.
정리
생산자 - 소비자 문제란?
생산자가 너무 빠를 경우 버퍼에 데이터가 꽉 차서 더 이상 데이터를 생산해낼 수 없다.
소비자가 너무 빠를 경우 버퍼에 남은 데이터가 없어 데이터를 소비할 수 없다.
이 문제를 해결하는 방법은 생산자의 경우 버퍼가 빈 공간이 생길때까지 기다리는 것이고, 소비자의 경우 버퍼에 데이터가 생길때까지 기다리는 것이다. 결국 기다리는 것이다.
그러나, 단순히 기다릴 순 없다. 왜냐하면 임계 영역은 딱 하나의 스레드만 작업할 수 있게 설계되었다. 아무리 특정 스레드가 하루종일 기다린다해도 본인이 락을 들고 있는 상태에서 놔주지 않으면 다른 스레드는 진입할 수 없다.
그래서 단순히 기다린 게 아니라 락을 반납하고 기다린다.
이때 사용할 수 있는 것이 Object.wait()이다. 이 wait(), notify(), notifyAll()은 synchronized와 같이 사용할 수 있다.
wait()으로 락을 반납하고 해당 스레드는 스레드 대기 집합에 들어간다. 그리고 락을 반납했으니 다른 스레드가 진입할 수 있게 된다. 다른 스레드가 작업을 다 마치고 notify()로 스레드 대기 집합에 알린다. 그럼 스레드 대기 집합에 있는 임의의 스레드 하나가 튀어나온다.
그땐 스레드는 BLOCKED 상태이다. 아직 락을 얻지 못한 상태이니까. 그리고 최종적으로 락을 반납하고 스레드가 나가면 튀어나온 스레드가 이 락을 얻어 작업을 진행할 수 있게 됐다.
근본적인 생산자 소비자 문제를 해결했지만, 여기서 파생되는 비효율이 발생했다.
생산자가 생산자를 계속 깨우거나, 소비자가 소비자를 계속 깨우면 의미없이 CPU 자원만 소모하고 아무것도 할 수 없다.
즉, 가장 좋은 방법은 깨울때 생산자는 소비자를, 소비자는 생산자를 깨우는 게 가장 좋은 방법이다. 이 방법을 다음 포스팅에서 알아보자!
이번에는 이벤트 리스너를 등록해보자. 이벤트 리스너란, 이벤트가 발생했을 때 원하는 후처리 작업을 할 수 있는 방법이다.
JavaScript의 이벤트 리스너랑 완전 똑같은 것이라고 보면 된다.
참고로, 이 포스팅은 공식 문서에서 제공하는 방식과 살짝 다르다. 스프링에서 InitializingBean, DisposableBean 인터페이스를 구현하여 빈으로 등록해서, 스프링 컨텍스트(컨테이너)가 최초로 띄워질때와 마지막에 종료될 때 호출될 메서드와 사용할 이벤트 리스너를 등록해 보았다. 왜 그러냐면, 이 플러그인 관련 포스팅을 Part.1에서 쭉 보다보면 스프링의 기술이 들어가 있는것을 알 수가 있는데 스프링의 기술을 사용중이니까 스프링과 잘 호환되는 기술을 사용해보고자 이런 방식을 구현했다
그리고 스프링 기술을 이용했기 때문에 Add-on Descriptor(atlassian-plugin.xml)에 어떠한 작업도 필요 없고 그래서 더 간결하다는 것을 캐치해서 유심히 봐보자!
IssueCreatedResolvedListener
package kr.osci.kapproval.com.jira.eventlistener;
import com.atlassian.event.api.EventListener;
import com.atlassian.event.api.EventPublisher;
import com.atlassian.jira.event.issue.IssueEvent;
import com.atlassian.jira.event.type.EventType;
import com.atlassian.jira.issue.Issue;
import com.atlassian.plugin.spring.scanner.annotation.imports.JiraImport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class IssueCreatedResolvedListener implements InitializingBean, DisposableBean {
@JiraImport
private final EventPublisher eventPublisher;
/**
* Called when the plugin has been enabled.
*/
@Override
public void afterPropertiesSet() {
log.info("Enabling plugin");
eventPublisher.register(this);
}
/**
* Called when the plugin is being disabled or removed.
*/
@Override
public void destroy() {
log.info("Disabling plugin");
eventPublisher.unregister(this);
}
@EventListener
public void onIssueEvent(IssueEvent issueEvent) {
Long eventTypeId = issueEvent.getEventTypeId();
Issue issue = issueEvent.getIssue();
if (eventTypeId.equals(EventType.ISSUE_CREATED_ID)) {
log.info("Issue {} has been created at {}.", issue.getKey(), issue.getCreated());
// 이슈 Created 이벤트가 발생했을 때 실행되는 부분
} else if (eventTypeId.equals(EventType.ISSUE_RESOLVED_ID)) {
log.info("Issue {} has been resolved at {}.", issue.getKey(), issue.getResolutionDate());
// 이슈 Resolved 이벤트가 발생했을 때 실행되는 부분
} else if (eventTypeId.equals(EventType.ISSUE_CLOSED_ID)) {
log.info("Issue {} has been closed at {}.", issue.getKey(), issue.getUpdated());
// 이슈 Closed 이벤트가 발생했을 때 실행되는 부분
}
}
}
우선, InitializingBean을 구현하려면 재정의 할 메서드가 있다.
afterPropertiesSet()
이 메서드는 스프링 컨텍스트가 완전히 띄워졌을 때, 호출되는 메서드이다. 그러니까 스프링이 진짜 이제 실행될 준비가 됐을 때 자동으로 호출되는 메서드이다. 여기서 무엇을 해야 하냐면 내가 이벤트 퍼블리셔를 등록하겠다고 선언해줘야 한다. 그래야 어떤 이벤트가 발생했을 때 이벤트를 캐치할 수 있게 된다.
그래서 이 메서드안에 다음 코드 한 줄이 있다.
eventPublisher.register(this);
그 다음, DisposableBean을 구현하려면 또 한가지 재정의 할 메서드가 있다.
destroy()
이 메서드는 스프링 컨텍스트가 내려가기 바로 전에 호출되는 메서드이다. 그러니까, 스프링이 내려가기 전 마지막으로 정리할 자원들을 정리하는 메서드라고 생각하면 된다. 그래서 등록한 이벤트 퍼블리셔를 다시 등록 해제하면 된다.
eventPublisher.unregister(this);
그리고, 실제 이벤트가 발생했을 때마다 호출될 메서드가 있다. 바로 다음 메서드.
@EventListener
public void onIssueEvent(IssueEvent issueEvent) {
Long eventTypeId = issueEvent.getEventTypeId();
Issue issue = issueEvent.getIssue();
if (eventTypeId.equals(EventType.ISSUE_CREATED_ID)) {
log.info("Issue {} has been created at {}.", issue.getKey(), issue.getCreated());
// 이슈 Created 이벤트가 발생했을 때 실행되는 부분
} else if (eventTypeId.equals(EventType.ISSUE_RESOLVED_ID)) {
log.info("Issue {} has been resolved at {}.", issue.getKey(), issue.getResolutionDate());
// 이슈 Resolved 이벤트가 발생했을 때 실행되는 부분
} else if (eventTypeId.equals(EventType.ISSUE_CLOSED_ID)) {
log.info("Issue {} has been closed at {}.", issue.getKey(), issue.getUpdated());
// 이슈 Closed 이벤트가 발생했을 때 실행되는 부분
}
}
주의 깊게 볼 건 @EventListener 애노테이션이다. 이 애노테이션은 어떠한 public 메서드라도 상관없이 달 수 있는데 이 애노테이션이 달린 메서드의 파라미터 이벤트가 발생할 때마다 이 메서드가 호출된다. 여기서는, IssueEvent라는 이슈 관련 이벤트를 파라미터로 받는다. 생성, 수정, 삭제 등등의 이벤트가 다 잡히게 될 것이다.
그래서 실제로 원하는 이벤트의 후처리 코드는 이 @EventListener 애노테이션이 달린 메서드에서 작업하면 된다.
이렇게 스프링과 JIRA가 제공하는 @EventListener 애노테이션을 사용해서 스프링의 라이프 사이클을 이용해 스프링 컨테이너가 완전히 올라왔을 때(플러그인이 띄워질 때)와 스프링 컨테이너가 완전히 내려가기 바로 직전에(플러그인이 내려가기 직전에) 딱 한 번씩만 이벤트 퍼블리셔를 등록할 수 있고, 이벤트 리스너 메서드를 만들 수 있다.
package kr.osci.aijql.eventlistener;
import com.atlassian.event.api.EventListener;
import com.atlassian.event.api.EventPublisher;
import com.atlassian.jira.event.issue.link.RemoteIssueLinkCreateEvent;
import com.atlassian.jira.event.issue.link.RemoteIssueLinkUICreateEvent;
import com.atlassian.jira.issue.link.RemoteIssueLinkManager;
import com.atlassian.plugin.spring.scanner.annotation.imports.JiraImport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class RemoteIssueLinkListener implements InitializingBean, DisposableBean {
@JiraImport
private final EventPublisher eventPublisher;
@JiraImport
private final RemoteIssueLinkManager remoteIssueLinkManager;
/**
* Called when the plugin has been enabled.
*/
@Override
public void afterPropertiesSet() {
log.debug("[afterPropertiesSet]: RemoteIssueLinkListener initialized.");
eventPublisher.register(this);
}
/**
* REST API 또는 애플리케이션에서 직접 Remote Issue Link 추가하는 경우 호출
* @param remoteIssueLinkCreateEvent remoteIssueLinkCreateEvent
*/
@EventListener
public void onCreateRemoteIssueLinkEvent(RemoteIssueLinkCreateEvent remoteIssueLinkCreateEvent) {
log.info("[onCreateRemoteIssueLinkEvent] called");
log.info("[onCreateRemoteIssueLinkEvent] remote issue link id = {}", remoteIssueLinkCreateEvent.getRemoteIssueLinkId());
log.info("[onCreateRemoteIssueLinkEvent] global id = {}", remoteIssueLinkCreateEvent.getGlobalId());
}
/**
* 오직 애플리케이션에서 사용자가 Remote Issue Link 추가하는 경우 호출
* @param remoteIssueLinkUiCreateEvent remoteIssueLinkUiCreateEvent
*/
@EventListener
public void onCreateUiRemoteIssueLinkEvent(RemoteIssueLinkUICreateEvent remoteIssueLinkUiCreateEvent) {
log.info("[onCreateUiRemoteIssueLinkEvent] called");
log.info("[onCreateUiRemoteIssueLinkEvent] remote issue link id = {}", remoteIssueLinkUiCreateEvent.getRemoteIssueLinkId());
log.info("[onCreateUiRemoteIssueLinkEvent] global id = {}", remoteIssueLinkUiCreateEvent.getGlobalId());
}
/**
* Called when the plugin is being disabled or removed.
*/
@Override
public void destroy() {
log.info("[destroy]: RemoteIssueLinkListener destroyed.");
eventPublisher.unregister(this);
}
}