728x90
반응형
SMALL

참고자료:

 

김영한의 실전 자바 - 고급 1편, 멀티스레드와 동시성 강의 | 김영한 - 인프런

김영한 | 멀티스레드와 동시성을 기초부터 실무 레벨까지 깊이있게 학습합니다., 국내 개발 분야 누적 수강생 1위, 제대로 만든 김영한의 실전 자바[사진][임베딩 영상]단순히 자바 문법을 안다?

www.inflearn.com

 

이제 실무적 관점에서 좀 더 깊이있게 알아보자.

 

ExecutorService 우아한 종료 

고객의 주문을 처리하는 서버를 운영중이라고 생각해보자.

만약 서버 기능을 업데이트 해야해서 서버를 재시작해야 한다고 가정해보자.

이때 서버 애플리케이션이 고객의 주문을 처리하고 있는 도중에 갑자기 재시작 된다면, 해당 고객의 주문이 제대로 진행되지 못할 것이다. 가장 이상적인 방향은 새로운 주문 요청은 막고, 이미 진행중인 주문은 모두 완료한 다음에 서버를 재시작 하는 것이 가장 좋을 것이다. 이처럼 서비스를 안정적으로 종료하는 것도 매우 중요하다. 이렇게 문제 없이 우아하게 종료하는 방식을 graceful shutdown이라 한다.

 

이런 관점에서 ExecutorService의 종료에 대해서 알아보자.

 

ExecutorService의 종료 메서드 관련

void shutdown()
  • 새로운 작업을 받지 않고, 이미 제출된 작업을 모두 완료한 후에 종료한다.
  • 논 블로킹 메서드(이 메서드를 호출한 스레드는 대기하지 않고 즉시 다음 코드를 호출한다)

 

List<Runnable> shutdownNow()
  • 실행 중인 작업을 중단하고, 대기 중인 작업(블로킹 큐에 있는 대기 작업들을 의미)을 반환하며 즉시 종료한다.
  • 실행 중인 작업을 중단하기 위해 인터럽트를 발생시킨다.
  • 논 블로킹 메서드(이 메서드를 호출한 스레드는 대기하지 않고 즉시 다음 코드를 호출한다)

 

boolean isShutdown()
  • ExecutorService가 종료되었는지 확인한다.

 

boolean isTerminated()
  • shutdown(), shutdownNow() 호출 후, 모든 작업이 완료되었는지 확인한다.

 

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptException
  • 서비스 종료 시 모든 작업이 완료될 때까지 대기한다. 이때 지정된 시간까지만 대기한다.
  • 블로킹 메서드(이 메서드를 호출한 스레드는 이 메서드가 종료될 때까지 대기 상태가 된다.)

 

void close()
  • 자바 19부터 지원하는 서비스 종료 메서드이다. 이 메서드는 shutdown()과 같다고 생각하면 된다. 더 정확히는 shutdown()을 호출하고, 하루를 기다려도 작업이 완료되지 않으면 shutdownNow()를 호출한다.
  • 호출한 스레드에 인터럽트가 발생해도 shutdownNow()를 호출한다. 

 

  • ExecutorService에 아무런 작업이 없고, 스레드만 2개 대기하고 있다.

  • shutdown()을 호출한다.
  • ExecutorService는 새로운 요청을 거절한다.
    • 거절 시 기본적으로 java.util.concurrent.RejectedExecutionException 예외가 발생한다.
  • 스레드 풀의 자원을 정리한다.

 

  • shutdown()을 호출한다.
  • ExecutorService는 새로운 요청을 거절한다.
  • 스레드 풀의 스레드는 처리중인 작업을 완료한다.
  • 스레드 풀의 스레드는 큐에 남아있는 작업도 모두 꺼내서 완료한다.

  • 모든 작업을 완료하면 자원을 정리한다.
  • 결과적으로 처리중이던 taskA, taskB는 물론이고, 큐에 대기중이던 taskC, taskD도 완료된다.

 

  • shutdownNow()를 호출한다.
  • ExecutorService는 새로운 요청을 거절한다.
  • 큐를 비우면서, 큐에 있는 작업을 모두 꺼내서 컬렉션으로 반환한다.
    • List<Runnable> runnables = es.shutdownNow()
  • 작업 중인 스레드에 인터럽트가 발생한다.
    • 작업 중인 taskA, taskB는 인터럽트가 걸린다.
    • 큐에 대기중인 taskC, taskD는 수행되지 않는다.
  • 작업을 완료하면 자원을 정리한다.

 

ExecutorService 우아한 종료 - 구현

shutdown()을 호출해서 이미 들어온 모든 작업을 다 처리하고 서비스를 우아하게 종료(graceful shutdown)하는 것이 가장 이상적이지만, 갑자기 요청이 너무 많이 들어와서 큐에 대기중인 작업이 너무 많아 작업 완료가 어렵거나, 작업이 너무 오래 걸리거나, 또는 버그가 발생해서 특정 작업이 끝나지 않을 수 있다. 이렇게 되면 서비스가 너무 늦게 종료되거나 종료되지 않는 문제가 발생할 수 있다.

 

이럴 때는 보통 우아하게 종료하는 시간을 정한다. 예를 들어서 60초까지는 작업을 다 처리할 수 있게 기다리는 것이다. 그리고 60초가 지나면, 무언가 문제가 있다고 가정하고 shutdownNow()을 호출해서 작업들을 강제로 종료한다.

 

close()

close()의 경우 이렇게 구현되어 있다. shutdown()을 호출하고, 하루를 기다려도 작업이 완료되지 않으면 shutdownNow()를 호출한다. 그런데 대부분 하루를 기다릴 수는 없을 것이다.

 

방금 설명한대로 우선은 shutdown()을 통해 우아한 종료를 시도하고, 10초간 종료되지 않으면 shutdownNow()를 통해 강제 종료하는 방식을 구현해보자. (예제에서 60초는 너무 길다..) 참고로 구현할 shutdownAndAwaitTermination()ExecutorService 공식 API 문서에서 제안하는 방식이다. 

 

ExecutorShutdownMain

package thread.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static util.MyLogger.log;

public class ExecutorShutdownMain {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);

        es.execute(new RunnableTask("taskA"));
        es.execute(new RunnableTask("taskB"));
        es.execute(new RunnableTask("taskC"));
        es.execute(new RunnableTask("longTask", 100_000));

        ExecutorUtils.printState(es);

        log("== shutdown 시작==");

        shutdownAndAwaitTermination(es);

        log("== shutdown 완료==");

        ExecutorUtils.printState(es);

    }

    private static void shutdownAndAwaitTermination(ExecutorService es) {
        es.shutdown();

        try {
            if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
                log("서비스 정상 종료 실패 -> 강제 종료 시도");
                es.shutdownNow();

                if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
                    log("서비스가 종료되지 않았습니다.");
                }
            }
        } catch (InterruptedException e) {
            es.shutdownNow();
        }
    }
}

실행 결과

2024-07-30 13:01:53.014 [pool-1-thread-2] taskB 시작
2024-07-30 13:01:53.014 [     main] [pool= 2, active=2, queuedTasks=2, completedTask=0]
2024-07-30 13:01:53.014 [pool-1-thread-1] taskA 시작
2024-07-30 13:01:53.018 [     main] == shutdown 시작==
2024-07-30 13:01:54.020 [pool-1-thread-2] taskB 완료
2024-07-30 13:01:54.020 [pool-1-thread-1] taskA 완료
2024-07-30 13:01:54.022 [pool-1-thread-2] taskC 시작
2024-07-30 13:01:54.022 [pool-1-thread-1] longTask 시작
2024-07-30 13:01:55.023 [pool-1-thread-2] taskC 완료
2024-07-30 13:02:03.024 [     main] 서비스 정상 종료 실패 -> 강제 종료 시도
2024-07-30 13:02:03.026 [pool-1-thread-1] 인터럽트 발생, null
2024-07-30 13:02:03.029 [     main] == shutdown 완료==
2024-07-30 13:02:03.030 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=4]
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.InterruptedException
	at util.ThreadUtils.sleep(ThreadUtils.java:12)
	at thread.executor.RunnableTask.run(RunnableTask.java:23)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1595)
Caused by: java.lang.InterruptedException
	at java.base/java.lang.Thread.sleepImpl(Native Method)
	at java.base/java.lang.Thread.sleep(Thread.java:516)
	at util.ThreadUtils.sleep(ThreadUtils.java:9)
	... 4 more

 

작업 처리에 필요한 시간

  • taskA, taskB, taskC: 1초
  • longTask: 100초

서비스 종료

es.shutdown();
  • 새로운 작업을 받지 않는다. 처리 중이거나, 큐에 이미 대기중인 작업은 처리한다. 이후에 풀의 스레드를 종료한다.
  • shutdown()은 블로킹 메서드가 아니다. ExecutorService가 종료될 때까지 main 스레드가 대기하지 않는다. main 스레드는 바로 다음 코드를 호출한다.
if (!es.awaitTermination(10, TimeUnit.SECONDS) { ... }
  • 블로킹 메서드이다.
  • main 스레드는 대기하며 서비스 종료를 10초간 기다린다.
    • 만약 10초안에 모든 작업이 끝나면 true를 반환한다.
  • 여기서 taskA, taskB, taskC의 수행이 완료된다. 그런데 longTask는 10초가 지나도 완료되지 않았다. 따라서 false를 반환한다.

 

서비스 정상 종료 실패 → 강제 종료 시도

if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
    log("서비스 정상 종료 실패 -> 강제 종료 시도");
    es.shutdownNow();

    if (!es.awaitTermination(10, TimeUnit.SECONDS)) {
        log("서비스가 종료되지 않았습니다.");
    }
}
  • 정상 종료가 10초 이상 너무 오래 걸렸다.
  • shutdownNow()를 통해 강제 종료에 들어간다. shutdown()과 마찬가지로 블로킹 메서드가 아니다.
  • 강제 종료를 하면 작업 중인 스레드에 인터럽트가 발생한다. 다음 로그를 통해 인터럽트를 확인할 수 있다.
  • 인터럽트가 발생하면서 스레드도 작업을 종료하고, shutdownNow()를 통한 강제 shutdown도 완료된다.
2024-07-30 13:02:03.024 [     main] 서비스 정상 종료 실패 -> 강제 종료 시도
2024-07-30 13:02:03.026 [pool-1-thread-1] 인터럽트 발생, null
2024-07-30 13:02:03.029 [     main] == shutdown 완료==

 

서비스 종료 실패

그런데 마지막에 강제 종료인 es.shutdownNow()를 호출한 다음에 왜 10초간 또 기다릴까?

shutdownNow()가 작업 중인 스레드에 인터럽트를 호출하는 것은 맞다. 인터럽트를 호출하더라도 여러가지 이유로 작업에 시간이 걸릴 수 있다. 인터럽트 이후에 자원을 정리하는 어떤 간단한 작업을 수행할 수도 있다. 이런 시간을 기다려주는 것이다. 극단적으로 최악의 경우 스레드가 다음과 같이 인터럽트를 받을 수 없는 코드를 수행중일 수 있다. 이 경우 인터럽트 예외가 발생하지 않고, 스레드도 계속 수행된다.

 

인터럽트를 받을 수 없는 코드

while(true) {}

이런 스레드는 자바를 강제 종료해야 제거할 수 있다.

 

이런 경우를 대비해서 강제 종료 후 10초간 대기해도 작업이 완료되지 않으면 "서비스가 종료되지 않았습니다"라고 개발자가 인지할 수 있는 로그를 남겨두어야 한다. 그래야 개발자가 나중에 문제를 찾아서 코드를 수정도 할 수 있고 이 종료되지 않는 프로그램을 자바를 강제 종료 시켜서라도 종료할 수 있다.

 

try {
    ...
} catch (InterruptedException e) {
    es.shutdownNow();
}

이 부분은 왜 있을까? InterruptedException을 왜 catch로 잡았을까? 잡은 후 왜 또 shutdownNow()를 호출할까?

그 이유는 아래 코드를 호출할 때, 다른 스레드에서 이 awaitTermination()을 호출한 스레드에 인터럽트를 걸 수도 있기 때문이다.

그 경우에도 무사히 종료할 수 있도록 catch로 잡아서 shutdownNow()를 호출한다.

es.awaitTermination(10, TimeUnit.SECONDS)

 

정리

서비스를 종료할 때 생각보다 고려해야 할 점이 많다는 것을 이해했을 것이다. 기본적으로 우아한 종료를 선택하고, 우아한 종료가 되지 않으면 무한정 기다릴 수는 없으니, 그 다음으로 강제 종료를 하는 방식으로 접근하는 것이 좋다.

 

Executor 스레드 풀 관리 - 코드

이번 시간에는 Executor 프레임워크가 어떤식으로 스레드를 관리하는지 깊이있게 알아보자. 이 부분을 알아두면 실무에서 대량의 요청을 별도의 스레드에서 어떤식으로 처리해야 하는지에 대한 기본기를 쌓을 수 있을 것이다. 

 

ExecutorService의 기본 구현체인 ThreadPollExecutor의 생성자는 다음 속성을 지원한다.

  • corePoolSize: 스레드 풀에서 관리되는 기본 스레드의 수
  • maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
  • keepAliveTime, TimeUnit unit: 기본 스레드 수를 초과해서 만들어진 초과 스레드가 생존할 수 있는 대기 시간, 이 시간 동안 처리할 작업이 없다면 초과 스레드는 제거된다.
  • BlockingQueue workQueue: 작업을 보관할 블로킹 큐

corePoolSizemaximumPoolSize의 차이를 알아보기 위해 간단한 예제를 만들어보자.

먼저 예제를 좀 더 쉽게 확인하기 위해 ExecutorUtils에 메서드를 하나 추가하자.

 

ExecutorUtils

package thread.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static util.MyLogger.log;

public abstract class ExecutorUtils {

    public static void printState(ExecutorService executorService) {

        if (executorService instanceof ThreadPoolExecutor poolExecutor) {
            int pool = poolExecutor.getPoolSize();
            int active = poolExecutor.getActiveCount();
            int queuedTasks = poolExecutor.getQueue().size();
            long completedTask = poolExecutor.getCompletedTaskCount();

            log("[pool= " + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
        } else {
            log(executorService);
        }
    }

    public static void printState(ExecutorService executorService, String taskName) {

        if (executorService instanceof ThreadPoolExecutor poolExecutor) {
            int pool = poolExecutor.getPoolSize();
            int active = poolExecutor.getActiveCount();
            int queuedTasks = poolExecutor.getQueue().size();
            long completedTask = poolExecutor.getCompletedTaskCount();

            log(taskName + " -> [pool= " + pool + ", active=" + active + ", queuedTasks=" + queuedTasks + ", completedTask=" + completedTask + "]");
        } else {
            log(executorService);
        }
    }
}
  • printState() 메서드를 하나 오버로딩했다. 단순히 taskName을 출력하는 부분이 추가되었다.
  • 중복된 부분을 제거할 수 있지만, 기본 코드를 유지하기 위해 그대로 복사해서 약간만 수정했다.

추가로 이전에 만든 RunnableTask를 사용한다. 다음 코드는 앞서 만든 코드이니 참고만 하자. RuunableTask는 기본 1초 정도 작업을 수행한다고 가정한다.

RunnableTask

package thread.executor;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class RunnableTask implements Runnable {

    private final String name;
    private int sleepMs = 1000;

    public RunnableTask(String name) {
        this.name = name;
    }

    public RunnableTask(String name, int sleepMs) {
        this.name = name;
        this.sleepMs = sleepMs;
    }

    @Override
    public void run() {
        log(name + " 시작");
        sleep(sleepMs);
        log(name + " 완료");
    }
}

 

 

이제 이것들을 활용해서 한번 maximumPoolSize의 비밀을 파헤쳐보자.

PoolSizeMainV1

package thread.executor.poolsize;

import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(
                2,
                4,
                3000,
                TimeUnit.MILLISECONDS,
                workQueue);

        ExecutorUtils.printState(es);

        es.execute(new RunnableTask("task1"));
        ExecutorUtils.printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        ExecutorUtils.printState(es, "task2");

        es.execute(new RunnableTask("task3"));
        ExecutorUtils.printState(es, "task3");

        es.execute(new RunnableTask("task4"));
        ExecutorUtils.printState(es, "task4");

        es.execute(new RunnableTask("task5"));
        ExecutorUtils.printState(es, "task5");

        es.execute(new RunnableTask("task6"));
        ExecutorUtils.printState(es, "task6");

        try {
            es.execute(new RunnableTask("task7"));
        } catch (RejectedExecutionException e) {
            log("task7 실행 거절 예외 발생: " + e);
        }

        sleep(3000);
        log("== 작업 수행 완료 ==");
        ExecutorUtils.printState(es);

        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과 ==");
        ExecutorUtils.printState(es);

        es.close();
        log(" == shutdown 완료 ==");
        ExecutorUtils.printState(es);
    }
}

 

ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
ExecutorService es = new ThreadPoolExecutor(
        2,
        4,
        3000,
        TimeUnit.MILLISECONDS,
        workQueue);
  • 작업을 보관할 블로킹 큐의 구현체로 ArrayBlockingQueue(2)를 사용했다. 사이즈를 2로 설정했으므로 최대 2개까지 작업을 큐에 보관할 수 있다.
  • corePoolSize = 2, maximumPoolSize = 4 를 사용해서 기본 스레드 2개, 최대 스레드는 4개로 설정했다.
    • 스레드 풀에 기본 2개의 스레드를 운영한다. 요청이 너무 많거나 급한 경우 스레드 풀은 최대 4개까지 스레드를 증가시켜서 사용할 수 있다. 이렇게 기본 스레드 수를 초과해서 만들어진 스레드를 초과 스레드라고 하겠다.
  • 3000, TimeUnit.MILLISECONDS
    • 초과 스레드가 생존할 수 있는 대기 시간을 뜻한다. 이 시간 동안 초과 스레드가 처리할 작업이 없다면 초과 스레드는 제거한다.
    • 여기서는 3000 밀리초(3초)를 설정했으므로, 초과 스레드가 3초간 작업을 하지 않고 대기한다면 초과 스레드는 스레드 풀에서 제거된다.

 

근데, 이 maximumPoolSize가 그냥 단순하게 기본 스레드가 다 없으면 바로 이 최대 스레드 수까지 스레드 수를 늘려서 작업을 처리하는 게 아니다.

 

우선 아래 코드를 실행해보자.

package thread.executor.poolsize;

import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(
                2,
                4,
                3000,
                TimeUnit.MILLISECONDS,
                workQueue);

        ExecutorUtils.printState(es);

        es.execute(new RunnableTask("task1"));
        ExecutorUtils.printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        ExecutorUtils.printState(es, "task2");

        es.close();
        log(" == shutdown 완료 ==");
        ExecutorUtils.printState(es);
    }
}

실행 결과

2024-07-30 14:21:29.778 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 14:21:29.783 [pool-1-thread-1] task1 시작
2024-07-30 14:21:29.793 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 14:21:29.793 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 14:21:29.793 [pool-1-thread-2] task2 시작
2024-07-30 14:21:30.785 [pool-1-thread-1] task1 완료
2024-07-30 14:21:30.795 [pool-1-thread-2] task2 완료
2024-07-30 14:21:30.799 [     main]  == shutdown 완료 ==
2024-07-30 14:21:30.799 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=2]

 

task가 총 2개이므로 기본 스레드 수와 일치한다. 이 경우 아무런 문제 없이 기본 스레드 수만으로 작업을 처리하게 된다.

그럼 단순하게 생각했을 땐, 이 경우에 task가 하나 더 늘어나면? "최대 스레드 수가 4개니까 스레드 한 개가 더 추가되겠다!" 라고 생각할 수 있다. 그래서 task 하나를 더 추가해서 실행해보자.

package thread.executor.poolsize;

import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(
                2,
                4,
                3000,
                TimeUnit.MILLISECONDS,
                workQueue);

        ExecutorUtils.printState(es);

        es.execute(new RunnableTask("task1"));
        ExecutorUtils.printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        ExecutorUtils.printState(es, "task2");
        
        es.execute(new RunnableTask("task3"));
        ExecutorUtils.printState(es, "task3");

        es.close();
        log(" == shutdown 완료 ==");
        ExecutorUtils.printState(es);
    }
}

실행 결과

2024-07-30 14:24:09.864 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 14:24:09.869 [pool-1-thread-1] task1 시작
2024-07-30 14:24:09.882 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 14:24:09.882 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 14:24:09.882 [pool-1-thread-2] task2 시작
2024-07-30 14:24:09.882 [     main] task3 -> [pool= 2, active=2, queuedTasks=1, completedTask=0]
2024-07-30 14:24:10.872 [pool-1-thread-1] task1 완료
2024-07-30 14:24:10.873 [pool-1-thread-1] task3 시작
2024-07-30 14:24:10.884 [pool-1-thread-2] task2 완료
2024-07-30 14:24:11.875 [pool-1-thread-1] task3 완료
2024-07-30 14:24:11.876 [     main]  == shutdown 완료 ==
2024-07-30 14:24:11.877 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=3]

결과를 보니, queuedTasks에 들어갔다. 생각해보니, 2개가 작업중이고 블로킹 큐 사이즈가 2라면 이 상태에서 더 들어온 작업은 큐에 대기 상태로 남는게 더 합리적이다. 그럼 큐 사이즈가 2이니까 최대 사이즈까지 RunnableTask를 추가해보자.

package thread.executor.poolsize;

import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(
                2,
                4,
                3000,
                TimeUnit.MILLISECONDS,
                workQueue);

        ExecutorUtils.printState(es);

        es.execute(new RunnableTask("task1"));
        ExecutorUtils.printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        ExecutorUtils.printState(es, "task2");

        es.execute(new RunnableTask("task3"));
        ExecutorUtils.printState(es, "task3");

        es.execute(new RunnableTask("task4"));
        ExecutorUtils.printState(es, "task4");

        es.close();
        log(" == shutdown 완료 ==");
        ExecutorUtils.printState(es);
    }
}

실행 결과

2024-07-30 14:26:44.081 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 14:26:44.084 [pool-1-thread-1] task1 시작
2024-07-30 14:26:44.095 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 14:26:44.095 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 14:26:44.095 [pool-1-thread-2] task2 시작
2024-07-30 14:26:44.096 [     main] task3 -> [pool= 2, active=2, queuedTasks=1, completedTask=0]
2024-07-30 14:26:44.096 [     main] task4 -> [pool= 2, active=2, queuedTasks=2, completedTask=0]
2024-07-30 14:26:45.086 [pool-1-thread-1] task1 완료
2024-07-30 14:26:45.087 [pool-1-thread-1] task3 시작
2024-07-30 14:26:45.097 [pool-1-thread-2] task2 완료
2024-07-30 14:26:45.097 [pool-1-thread-2] task4 시작
2024-07-30 14:26:46.088 [pool-1-thread-1] task3 완료
2024-07-30 14:26:46.099 [pool-1-thread-2] task4 완료
2024-07-30 14:26:46.103 [     main]  == shutdown 완료 ==
2024-07-30 14:26:46.103 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=4]

 

 

결과를 보니, 이제 작업중인 스레드가 2개고 큐에 들어간 작업들도 꽉 찬 상태인 2개이다. 이렇듯 maximumPoolSize는 기본 스레드 수보다 작업이 더 많이 들어온다고 바로 스레드를 생성하는 게 아니다. 큐에 대기할 수 있는 공간이 있을때까지 더 들어온 작업들은 큐에 대기상태로 보관된다. 

 

그럼? 큐에 대기 상태로 보관될 공간까지도 부족하면? 이때 바로 최대 스레드 수까지 스레드가 생성된다.

package thread.executor.poolsize;

import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(
                2,
                4,
                3000,
                TimeUnit.MILLISECONDS,
                workQueue);

        ExecutorUtils.printState(es);

        es.execute(new RunnableTask("task1"));
        ExecutorUtils.printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        ExecutorUtils.printState(es, "task2");

        es.execute(new RunnableTask("task3"));
        ExecutorUtils.printState(es, "task3");

        es.execute(new RunnableTask("task4"));
        ExecutorUtils.printState(es, "task4");

        es.execute(new RunnableTask("task5"));
        ExecutorUtils.printState(es, "task5");

        es.execute(new RunnableTask("task6"));
        ExecutorUtils.printState(es, "task6");

        es.close();
        log(" == shutdown 완료 ==");
        ExecutorUtils.printState(es);
    }
}

실행 결과

2024-07-30 14:29:07.291 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 14:29:07.295 [pool-1-thread-1] task1 시작
2024-07-30 14:29:07.305 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 14:29:07.305 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 14:29:07.305 [pool-1-thread-2] task2 시작
2024-07-30 14:29:07.305 [     main] task3 -> [pool= 2, active=2, queuedTasks=1, completedTask=0]
2024-07-30 14:29:07.306 [     main] task4 -> [pool= 2, active=2, queuedTasks=2, completedTask=0]
2024-07-30 14:29:07.306 [     main] task5 -> [pool= 3, active=3, queuedTasks=2, completedTask=0]
2024-07-30 14:29:07.306 [pool-1-thread-3] task5 시작
2024-07-30 14:29:07.307 [     main] task6 -> [pool= 4, active=4, queuedTasks=2, completedTask=0]
2024-07-30 14:29:07.307 [pool-1-thread-4] task6 시작
2024-07-30 14:29:08.297 [pool-1-thread-1] task1 완료
2024-07-30 14:29:08.298 [pool-1-thread-1] task3 시작
2024-07-30 14:29:08.307 [pool-1-thread-2] task2 완료
2024-07-30 14:29:08.307 [pool-1-thread-2] task4 시작
2024-07-30 14:29:08.308 [pool-1-thread-3] task5 완료
2024-07-30 14:29:08.308 [pool-1-thread-4] task6 완료
2024-07-30 14:29:09.299 [pool-1-thread-1] task3 완료
2024-07-30 14:29:09.308 [pool-1-thread-2] task4 완료
2024-07-30 14:29:09.310 [     main]  == shutdown 완료 ==
2024-07-30 14:29:09.311 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=6]

 

결과를 보면 작업 6개를 돌리니 최대 스레드 수까지 스레드가 늘어나고 그 수보다 더 남은 작업들은 큐에 보관된 상태로 관리된다.

이게 corePoolSize, maximumPoolSize의 메커니즘이다.

 

그럼 만약, 최대 스레드 수도 꽉 찼고 큐에 보관 가능한 공간도 꽉 찬 상태에서 또 작업이 들어오면?

package thread.executor.poolsize;

import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(
                2,
                4,
                3000,
                TimeUnit.MILLISECONDS,
                workQueue);

        ExecutorUtils.printState(es);

        es.execute(new RunnableTask("task1"));
        ExecutorUtils.printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        ExecutorUtils.printState(es, "task2");

        es.execute(new RunnableTask("task3"));
        ExecutorUtils.printState(es, "task3");

        es.execute(new RunnableTask("task4"));
        ExecutorUtils.printState(es, "task4");

        es.execute(new RunnableTask("task5"));
        ExecutorUtils.printState(es, "task5");

        es.execute(new RunnableTask("task6"));
        ExecutorUtils.printState(es, "task6");

        es.execute(new RunnableTask("task7"));
        ExecutorUtils.printState(es, "task7");

        es.close();
        log(" == shutdown 완료 ==");
        ExecutorUtils.printState(es);
    }
}

실행 결과

2024-07-30 14:32:01.841 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 14:32:01.846 [pool-1-thread-1] task1 시작
2024-07-30 14:32:01.858 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 14:32:01.858 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 14:32:01.858 [pool-1-thread-2] task2 시작
2024-07-30 14:32:01.858 [     main] task3 -> [pool= 2, active=2, queuedTasks=1, completedTask=0]
2024-07-30 14:32:01.859 [     main] task4 -> [pool= 2, active=2, queuedTasks=2, completedTask=0]
2024-07-30 14:32:01.859 [     main] task5 -> [pool= 3, active=3, queuedTasks=2, completedTask=0]
2024-07-30 14:32:01.859 [pool-1-thread-3] task5 시작
2024-07-30 14:32:01.860 [pool-1-thread-4] task6 시작
2024-07-30 14:32:01.860 [     main] task6 -> [pool= 4, active=4, queuedTasks=2, completedTask=0]
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task thread.executor.RunnableTask@5240ca61 rejected from java.util.concurrent.ThreadPoolExecutor@aa6715fa[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
	at thread.executor.poolsize.PoolSizeMainV1.main(PoolSizeMainV1.java:42)
2024-07-30 14:32:02.847 [pool-1-thread-1] task1 완료
2024-07-30 14:32:02.848 [pool-1-thread-1] task3 시작
2024-07-30 14:32:02.860 [pool-1-thread-2] task2 완료
2024-07-30 14:32:02.860 [pool-1-thread-2] task4 시작
2024-07-30 14:32:02.861 [pool-1-thread-4] task6 완료
2024-07-30 14:32:02.861 [pool-1-thread-3] task5 완료
2024-07-30 14:32:03.850 [pool-1-thread-1] task3 완료
2024-07-30 14:32:03.862 [pool-1-thread-2] task4 완료

이렇듯 RejectedExecutionExcpetion 예외가 터진다. 그래서 적절하게 corePoolSize, maximumPoolSize, 블로킹 큐 사이즈를 설정해야 한다. 

 

그래서 우선 이 예외를 try - catch로 잡아보자.

package thread.executor.poolsize;

import thread.executor.ExecutorUtils;
import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV1 {

    public static void main(String[] args) {
        ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
        ExecutorService es = new ThreadPoolExecutor(
                2,
                4,
                3000,
                TimeUnit.MILLISECONDS,
                workQueue);

        ExecutorUtils.printState(es);

        es.execute(new RunnableTask("task1"));
        ExecutorUtils.printState(es, "task1");

        es.execute(new RunnableTask("task2"));
        ExecutorUtils.printState(es, "task2");

        es.execute(new RunnableTask("task3"));
        ExecutorUtils.printState(es, "task3");

        es.execute(new RunnableTask("task4"));
        ExecutorUtils.printState(es, "task4");

        es.execute(new RunnableTask("task5"));
        ExecutorUtils.printState(es, "task5");

        es.execute(new RunnableTask("task6"));
        ExecutorUtils.printState(es, "task6");

        try {
            es.execute(new RunnableTask("task7"));
        } catch (RejectedExecutionException e) {
            log("task7 실행 거절 예외 발생: " + e);
        }

        sleep(3000);
        log("== 작업 수행 완료 ==");
        ExecutorUtils.printState(es);

        sleep(3000);
        log("== maximumPoolSize 대기 시간 초과 ==");
        ExecutorUtils.printState(es);

        es.close();
        log(" == shutdown 완료 ==");
        ExecutorUtils.printState(es);
    }
}

 

그런 다음 3초 정도 대기한 후 현재 상태를 찍어보자. 이 시기에 딱 작업이 모든 작업이 끝날 것이다. 그리고 나서 또 3초를 대기해보자. 그러면 keepAliveTime3초로 설정했기 때문에 기본 스레드 수보다 더 많이 생성된 스레드들이 정해진 시간 이상으로 놀고 있기 때문에 스레드를 지울것이다. 그리고 바로 아래가 그 결과다.

 

실행 결과

2024-07-30 14:39:33.945 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 14:39:33.950 [pool-1-thread-1] task1 시작
2024-07-30 14:39:33.963 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 14:39:33.964 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 14:39:33.964 [pool-1-thread-2] task2 시작
2024-07-30 14:39:33.964 [     main] task3 -> [pool= 2, active=2, queuedTasks=1, completedTask=0]
2024-07-30 14:39:33.964 [     main] task4 -> [pool= 2, active=2, queuedTasks=2, completedTask=0]
2024-07-30 14:39:33.965 [     main] task5 -> [pool= 3, active=3, queuedTasks=2, completedTask=0]
2024-07-30 14:39:33.965 [pool-1-thread-3] task5 시작
2024-07-30 14:39:33.965 [     main] task6 -> [pool= 4, active=4, queuedTasks=2, completedTask=0]
2024-07-30 14:39:33.965 [pool-1-thread-4] task6 시작
2024-07-30 14:39:33.966 [     main] task7 실행 거절 예외 발생: java.util.concurrent.RejectedExecutionException: Task thread.executor.RunnableTask@e51fe68d rejected from java.util.concurrent.ThreadPoolExecutor@4df7cb20[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]
2024-07-30 14:39:34.953 [pool-1-thread-1] task1 완료
2024-07-30 14:39:34.954 [pool-1-thread-1] task3 시작
2024-07-30 14:39:34.965 [pool-1-thread-2] task2 완료
2024-07-30 14:39:34.966 [pool-1-thread-2] task4 시작
2024-07-30 14:39:34.966 [pool-1-thread-3] task5 완료
2024-07-30 14:39:34.967 [pool-1-thread-4] task6 완료
2024-07-30 14:39:35.955 [pool-1-thread-1] task3 완료
2024-07-30 14:39:35.967 [pool-1-thread-2] task4 완료
2024-07-30 14:39:36.971 [     main] == 작업 수행 완료 ==
2024-07-30 14:39:36.972 [     main] [pool= 4, active=0, queuedTasks=0, completedTask=6]
2024-07-30 14:39:39.975 [     main] == maximumPoolSize 대기 시간 초과 ==
2024-07-30 14:39:39.976 [     main] [pool= 2, active=0, queuedTasks=0, completedTask=6]
2024-07-30 14:39:39.978 [     main]  == shutdown 완료 ==
2024-07-30 14:39:39.978 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=6]

 

그럼 이 내용을 그림과 같이 분석해보자.

Executor 스레드 풀 관리 - 분석

  • task1 작업을 요청한다.
  • Executor는 스레드 풀에 스레드가 core 사이즈 만큼 있는지 확인한다.
    • core 사이즈 만큼 없다면 스레드를 하나 생성한다.
    • 작업을 처리하기 위해 스레드를 하나 생성했기 때문에 작업을 큐에 넣을 필요 없이, 해당 스레드가 바로 작업을 처리한다.

2024-07-30 15:36:01.299 [pool-1-thread-1] task1 시작
2024-07-30 15:36:01.307 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
  • 새로 만들어진 스레드1이 task1을 수행한다.

2024-07-30 15:36:01.308 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 15:36:01.308 [pool-1-thread-2] task2 시작
  • task2를 요청한다.
  • Executor는 스레드 풀에 스레드가 core 사이즈 만큼 있는지 확인한다.
    • 아직 core 사이즈만큼 없으므로 스레드를 하나 생성한다.
  • 새로 만들어진 스레드2가 task2를 처리한다. 마찬가지로 작업을 처리하기 위해 스레드를 만들었으니 작업을 큐에 넣을 필요 없이, 바로 작업을 수행한다.

2024-07-30 15:36:01.308 [     main] task3 -> [pool= 2, active=2, queuedTasks=1, completedTask=0]
  • task3 작업을 요청한다.
  • Executor는 스레드 풀에 스레드가 core 사이즈 만큼 있는지 확인한다.
  • core 사이즈 만큼 스레드가 이미 만들어져 있고, 스레드 풀에 사용할 수 있는 스레드가 없으므로 이 경우 큐에 작업을 보관한다. 

2024-07-30 15:36:01.308 [     main] task4 -> [pool= 2, active=2, queuedTasks=2, completedTask=0]
  • task4 작업을 요청한다.
  • Executor는 스레드 풀에 스레드가 core 사이즈 만큼 있는지 확인한다.
  • core 사이즈 만큼 스레드가 이미 만들어져 있고, 스레드 풀에 사용할 수 있는 스레드가 없으므로 이 경우 큐에 작업을 보관한다.

2024-07-30 15:36:01.309 [     main] task5 -> [pool= 3, active=3, queuedTasks=2, completedTask=0]
2024-07-30 15:36:01.309 [pool-1-thread-3] task5 시작
  • task5 작업을 요청한다.
  • Executor는 스레드 풀에 스레드가 core 사이즈 만큼 있는지 확인한다 → core 사이즈 만큼 있다.
  • Executor는 큐에 보관을 시도한다 → 큐가 가득 찼다. 
    • 큐까지 가득찬 상황이라면 긴급 상황이다. 대기하는 작업이 꽉 찰 정도로 요청이 많다는 뜻이다. 이 경우, Executormax(maximumPoolSize) 사이즈까지 초과 스레드를 만들어서 작업을 수행한다.
    • core=2 → 기본 스레드는 최대 2개
    • max=4 → 기본 스레드 2개에 초과 스레드 2개 합계 총 4개 가능 (초과 스레드 = max - core)
  • Executor는 초과 스레드인 스레드3을 만든다.
  • 작업을 처리하기 위해 스레드를 하나 생성했기 때문에 작업을 큐에 넣을 필요 없이, 해당 스레드가 바로 작업을 처리한다.
    • 참고로 이 경우는 큐가 가득찬 상태이기 때문에 큐에 넣는것도 불가능하다.
  • 스레드3이 task5를 처리한다.

2024-07-30 15:36:01.309 [     main] task6 -> [pool= 4, active=4, queuedTasks=2, completedTask=0]
2024-07-30 15:36:01.309 [pool-1-thread-4] task6 시작
  • task6 작업을 요청한다.
  • 마찬가지로 큐가 가득찼고 core 스레드도 전부 만든 상태이다.
  • Executor는 초과 스레드인 스레드4를 만들어서 task6을 처리한다. 
    • 이 경우 큐가 가득찬 상태이기 때문에 큐에 넣는것도 불가능하다.

2024-07-30 15:36:01.310 [     main] task7 실행 거절 예외 발생: java.util.concurrent.RejectedExecutionException: Task thread.executor.RunnableTask@c415607b rejected from java.util.concurrent.ThreadPoolExecutor@da251ccc[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]
  • task7 작업을 요청한다.
  • 큐가 가득찼다.
  • 스레드 풀의 스레드도 max 사이즈만큼 가득찼다.
  • RejectedExecutionException이 발생한다.

이 경우 큐에 넣을 수도 없고, 작업을 수행할 스레드도 만들 수 없다. 따라서 작업을 거절한다.

  • 작업들이 수행된다.

  • 스레드1이 task1을 스레드3이 task5의 작업을 완료하고 스레드 풀에 대기 상태로 돌아간다.

  • 스레드 풀의 스레드는 큐의 데이터를 획득하기 위해 대기한다.
  • 스레드1: task3을 획득한다.
  • 스레드3: task4를 획득한다.

  • 계속 작업을 수행한다.

2024-07-30 15:36:04.316 [     main] == 작업 수행 완료 ==
2024-07-30 15:36:04.317 [     main] [pool= 4, active=0, queuedTasks=0, completedTask=6]
  • 모든 작업이 완료된다.

2024-07-30 15:36:07.323 [     main] == maximumPoolSize 대기 시간 초과 ==
2024-07-30 15:36:07.325 [     main] [pool= 2, active=0, queuedTasks=0, completedTask=6]
  • 스레드3, 스레드4와 같은 초과 스레드들은 지정된 시간까지 작업을 하지 않고 대기하면 제거된다. 긴급한 작업들이 끝난 것으로 이해하면 된다.
  • 여기서는 지정한 3초간 스레드3, 스레드4가 작업을 진행하지 않았기 때문에 스레드 풀에서 제거된다.
  • 참고로 초과 스레드가 작업을 처리할 때마다 시간은 계속 초기화가 된다.
    • 작업 요청이 계속 들어온다면 긴급한 상황이 끝난 것이 아니다. 따라서 긴급한 상황이 끝날 때 까지는 초과 스레드를 살려두는 것이 많은 스레드를 사용해서 작업을 더 빨리 처리할 수 있다.

  • 초과 스레드가 제거된 모습이다.
  • 이후에 shutdown()이 진행되면 풀의 스레드도 모두 제거된다.

 

정리를 하자면,

  1. 작업을 요청하면 core 사이즈 만큼 스레드를 만든다.
  2. core 사이즈를 초과하면 큐에 작업을 넣는다.
  3. 큐를 초과하면 max 사이즈 만큼 스레드를 만든다. 임시로 사용되는 초과 스레드가 생성된다.
    • 큐가 가득차서 큐에 넣을 수도 없다. 초과 스레드가 바로 수행해야 한다.
  4. max 사이즈를 초과하면 요청을 거절한다. 예외가 발생한다.
    • Executor에 큐도 가득차고, 풀 최대 생성 가능한 스레드 수도 가득 찼다. 작업을 받을 수 없다.

 

스레드 미리 생성하기 (팁)

응답 시간이 아주 중요한 서버라면, 서버가 고객의 처음 요청을 받기 전에 스레드를 스레드 풀에 미리 생성해두고 싶을 수 있다. 스레드를 미리 생성해두면, 처음 요청에서 사용되는 스레드의 생성 시간을 줄일 수 있다. ThreadPoolExecutor.prestartAllCoreThreads()를 사용하면 기본 스레드를 미리 생성할 수 있다. 참고로 ExecutorService는 이 메서드를 제공하지 않는다. 

 

PreStartPoolMain

package thread.executor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class PreStartPoolMain {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(1000);
        ExecutorUtils.printState(es);

        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) es;
        threadPoolExecutor.prestartAllCoreThreads();

        ExecutorUtils.printState(es);
    }
}

실행 결과

2024-07-30 16:00:36.336 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 16:00:36.381 [     main] [pool= 1000, active=0, queuedTasks=0, completedTask=0]

 

 

이제 이 스레드를 생성하는 전략에 대해 알아보고, 어떤 상황에 어떤 전략을 사용해서 최대한 효율적으로 스레드를 사용할 수 있는지 알아보자. 

Executor 전략 - 고정 풀 전략

Executor 스레드 풀 관리 - 다양한 전략

ThreadPoolExecutor를 사용하면 스레드 풀에 사용되는 숫자와 블로킹 큐 등 다양한 속성을 조절할 수 있다.

  • corePoolSize: 스레드 풀에서 관리되는 기본 스레드 수
  • maximumPoolSize: 스레드 풀에서 관리되는 최대 스레드 수
  • keepAliveTime, TimeUnit unit: 기본 스레드 수를 초과해서 만들어진 스레드가 생존할 수 있는 대기 시간, 이 시간동안 처리할 작업이 없다면 초과 스레드는 제거된다.
  • BlockingQueue workQueue: 작업을 보관할 블로킹 큐

이런 속성들을 잘 사용한다면 자신에게 맞는 스레드 풀 전략을 사용할 수 있다.

자바는 Executors 클래스를 통해 3가지 기본 전략을 제공한다.

  • newSingleThreadPool(): 단일 스레드 풀 전략
  • newFixedThreadPool(nThreads): 고정 스레드 풀 전략
  • newCachedThreadPool(): 캐시 스레드 풀 전략

newSingleThreadPool(): 단일 스레드 풀 전략

  • 스레드 풀에 기본 스레드 1개만 사용한다.
  • 큐 사이즈에 제한이 없다 (LinkedBlockingQueue)
  • 주로 간단히 사용하거나, 테스트 용도로 사용한다.
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

 

newFixedThreadPool(nThreads): 고정 풀 전략

  • 스레드 풀에 nThreads 만큼의 기본 스레드를 생성한다. 초과 스레드는 생성하지 않는다.
  • 큐 사이즈에 제한이 없다 (LinkedBlockingQueue)
  • 스레드 수가 고정되어 있기 때문에 CPU, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식이다.
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

 

PoolSizeMainV2

package thread.executor.poolsize;

import thread.executor.RunnableTask;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static thread.executor.ExecutorUtils.printState;
import static util.MyLogger.log;

public class PoolSizeMainV2 {

    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);

        log("pool 생성");

        printState(es);

        for (int i = 1; i <= 6; i++) {
            String taskName = "task" + i;
            es.execute(new RunnableTask(taskName));
            printState(es, taskName);
        }
        es.close();
        log("== shutdown 완료 ==");
    }
}

실행 결과

2024-07-30 16:32:32.558 [     main] pool 생성
2024-07-30 16:32:32.575 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 16:32:32.579 [pool-1-thread-1] task1 시작
2024-07-30 16:32:32.588 [     main] task1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 16:32:32.588 [     main] task2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 16:32:32.588 [pool-1-thread-2] task2 시작
2024-07-30 16:32:32.589 [     main] task3 -> [pool= 2, active=2, queuedTasks=1, completedTask=0]
2024-07-30 16:32:32.589 [     main] task4 -> [pool= 2, active=2, queuedTasks=2, completedTask=0]
2024-07-30 16:32:32.590 [     main] task5 -> [pool= 2, active=2, queuedTasks=3, completedTask=0]
2024-07-30 16:32:32.590 [     main] task6 -> [pool= 2, active=2, queuedTasks=4, completedTask=0]
2024-07-30 16:32:33.581 [pool-1-thread-1] task1 완료
2024-07-30 16:32:33.582 [pool-1-thread-1] task3 시작
2024-07-30 16:32:33.590 [pool-1-thread-2] task2 완료
2024-07-30 16:32:33.591 [pool-1-thread-2] task4 시작
2024-07-30 16:32:34.584 [pool-1-thread-1] task3 완료
2024-07-30 16:32:34.584 [pool-1-thread-1] task5 시작
2024-07-30 16:32:34.592 [pool-1-thread-2] task4 완료
2024-07-30 16:32:34.592 [pool-1-thread-2] task6 시작
2024-07-30 16:32:35.586 [pool-1-thread-1] task5 완료
2024-07-30 16:32:35.594 [pool-1-thread-2] task6 완료
2024-07-30 16:32:35.597 [     main] == shutdown 완료 ==

 

2개의 스레드가 안정적으로 작업을 처리하는 것을 확인할 수 있다. 이 전략은 다음과 같은 특징이 있다.

 

특징

스레드 수가 고정되어 있기 때문에 CPU, 메모리 리소스가 어느정도 예측 가능한 안정적인 방식이다. 큐 사이즈도 제한이 없어서 작업을 많이 담아두어도 문제가 없다.

 

주의

이 방식의 가장 큰 장점은 스레드 수가 고정되어서 CPU, 메모리 리소스가 어느정도 예측 가능하다는 점이다. 따라서 일반적인 상황에 가장 안정적으로 서비스를 운영할 수 있다. 하지만 상황에 따라 장점이 가장 큰 단점이 되기도 한다.

 

상황1 - 점진적인 사용자 확대

  • 개발한 서비스가 잘 되어서 사용자가 점점 늘어난다.
  • 고정 스레드 전략을 사용해서 서비스를 안정적으로 잘 운영했는데, 언젠가부터 사용자들이 서비스 응답이 점점 느려진다고 항의한다.

상황2 - 갑작스런 요청 증가

  • 마케팅 팀의 이벤트가 대성공 하면서 갑자기 사용자가 폭증했다.
  • 고객은 응답을 받지 못한다고 항의한다.

확인

  • 개발자는 급하게 CPU, 메모리 사용량을 확인해보는데 아무런 문제 없이 여유있고 안정적으로 서비스가 운영되고 있다.
  • 고정 스레드 전략은 실행되는 스레드 수가 고정되어 있다. 따라서 사용자가 늘어나도 CPU, 메모리 사용량이 확 늘어나지 않는다.
  • 큐의 사이즈를 확인해보니 요청이 수 만 건 쌓여있다. 요청이 처리되는 시간보다 쌓이는 시간이 더 빠른 것이다. 참고로 고정 풀 전략의 큐 사이즈는 무한이다.
  • 예를 들어서 큐에 10000건이 쌓여있는데, 고정 스레드 수가 10이고, 각 스레드가 작업을 하나 처리하는데 1초가 걸린다면 모든 작업을 다 처리하는데는 1000초가 걸린다. 만약 처리 속도보다 작업이 쌓이는 속도가 더 빠른 경우에는 더 문제가 된다.
  • 서비스 초기에는 사용자가 적기 때문에 이런 문제가 없지만, 사용자가 늘어나면 문제가 될 수 있다.
  • 갑작스런 요청 증가도 물론 마찬가지이다.

결국 서버 자원은 여유가 있는데, 사용자만 점점 느려지는 문제가 발생한 것이다.

 

newCachedThreadPool(): 캐시 풀 전략

  • 기본 스레드를 사용하지 않고, 60초 생존 주기를 가진 초과 스레드만 사용한다.
  • 초과 스레드의 수는 제한이 없다.
  • 큐에 작업을 저장하지 않는다 (SynchronousQueue)
    • 대신에 생산자의 요청을 스레드 풀의 소비자 스레드가 직접 받아서 바로 처리한다.
  • 모든 요청이 대기하지 않고 스레드가 바로바로 처리한다. 따라서 빠른 처리가 가능하다.
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SychronousQueue<Runnable>());

 

SynchronousQueue는 아주 특별한 블로킹 큐이다.

  • BlockingQueue 인터페이스의 구현체 중 하나이다.
  • 이 큐는 내부에 저장 공간이 없다. 대신에 생산자의 작업을 소비자 스레드에게 직접 전달한다.
  • 쉽게 이야기해서 저장 공간의 크기가 0이고, 생산자 스레드가 큐에 작업을 전달하면 소비자 스레드가 큐에서 작업을 꺼낼 때까지 대기한다.
  • 소비자가 작업을 요청하면 기다리던 생산자가 소비자에게 직접 작업을 전달하고 반환된다. 그 반대의 경우도 같다.
  • 이름 그대로 생산자와 소비자를 동기화하는 큐이다.
  • 쉽게 이야기해서 중간에 버퍼를 두지 않는 스레드 간 직거래라고 보면 된다.

PoolSizeMainV3

package thread.executor.poolsize;

import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static thread.executor.ExecutorUtils.printState;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class PoolSizeMainV3 {

    public static void main(String[] args) {

        ExecutorService es = Executors.newCachedThreadPool();

        log("pool 생성");
        printState(es);

        for (int i = 1; i <= 4; i++) {
            String taskName = "task " + i;
            es.execute(new RunnableTask(taskName));
            printState(es, taskName);
        }

        sleep(4000);
        log("== 작업 수행 완료 ==");
        printState(es);

        es.close();
        log("== shutdown 완료 ==");
        printState(es);
    }
}

실행 결과

2024-07-30 17:44:29.832 [     main] pool 생성
2024-07-30 17:44:29.849 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=0]
2024-07-30 17:44:29.852 [pool-1-thread-1] task 1 시작
2024-07-30 17:44:29.861 [     main] task 1 -> [pool= 1, active=1, queuedTasks=0, completedTask=0]
2024-07-30 17:44:29.862 [     main] task 2 -> [pool= 2, active=2, queuedTasks=0, completedTask=0]
2024-07-30 17:44:29.862 [pool-1-thread-2] task 2 시작
2024-07-30 17:44:29.862 [     main] task 3 -> [pool= 3, active=3, queuedTasks=0, completedTask=0]
2024-07-30 17:44:29.862 [pool-1-thread-3] task 3 시작
2024-07-30 17:44:29.863 [     main] task 4 -> [pool= 4, active=4, queuedTasks=0, completedTask=0]
2024-07-30 17:44:29.863 [pool-1-thread-4] task 4 시작
2024-07-30 17:44:29.863 [     main] == 작업 수행 완료 ==
2024-07-30 17:44:30.853 [pool-1-thread-1] task 1 완료
2024-07-30 17:44:30.863 [pool-1-thread-2] task 2 완료
2024-07-30 17:44:30.863 [pool-1-thread-4] task 4 완료
2024-07-30 17:44:30.864 [pool-1-thread-3] task 3 완료
2024-07-30 17:44:33.869 [     main] == maximumPoolSize 대기 시간 초과 ==
2024-07-30 17:44:33.870 [     main] [pool= 4, active=0, queuedTasks=0, completedTask=4]
2024-07-30 17:44:33.874 [     main] == shutdown 완료 ==
2024-07-30 17:44:33.875 [     main] [pool= 0, active=0, queuedTasks=0, completedTask=4]
  • 모든 작업이 대기하지 않고 작업의 수 만큼 스레드가 생기면서 바로 실행되는 것을 확인할 수 있다.
  • "maximumPoolSize 대기 시간 초과" 로그를 통해 초과 스레드가 대기 시간이 지나서 모두 사라진 것을 확인할 수 있다.

이 전략은 다음과 같은 특징이 있다.

 

특징

캐시 스레드 풀 전략은 매우 빠르고, 유연한 전략이다.

이 전략은 기본 스레드도 없고, 대기 큐에 작업도 쌓이지 않는다. 대신에 작업 요청이 오면 초과 스레드로 작업을 바로바로 처리한다. 따라서 빠른 처리가 가능하다. 초과 스레드의 수도 제한이 없기 때문에 CPU, 메모리 자원만 허용한다면 시스템의 자원을 최대로 사용할 수 있다.

추가로 초과 스레드는 60초간 생존한다. 그래서 작업 수에 맞추어 적절한 수의 스레드가 재사용된다. 이런 특징 때문에 요청이 갑자기 증가하면 스레드도 갑자기 증가하고, 요청이 줄어들면 스레드도 점점 줄어든다. 이 전략은 작업의 요청 수에 따라서 스레드도 증가하고 감소하므로, 매우 유연한 전략이다.

 

그런데 어떻게 기본 스레드 없이 초과 스레드만 만들 수 있을까? Executor 스레드 풀 기본 관리 정책을 다시 확인해보자.

Executor 스레드 풀 관리

  1. 작업을 요청하면 core 사이즈 만큼 스레드를 만든다.
    • core 사이즈가 없다. 바로 core 사이즈를 초과한다.
  2. core 사이즈를 초과하면 큐에 작업을 넣는다.
    • 큐에 작업을 넣을 수 없다. (SynchronousQueue는 큐의 저장 공간이 0인 특별한 큐이다)
  3. 큐를 초과하면 max 사이즈 만큼 스레드를 만든다. 임시로 사용되는 초과 스레드가 생성된다.
    • 초과 스레드가 생성된다. 물론 풀에 대기하는 초과 스레드가 있으면 재사용된다.
  4. max 사이즈를 초과하면 요청을 거절한다. 예외가 발생한다.
    • 참고로 max 사이즈가 무제한이다. 따라서 초과 스레드를 무제한으로 만들 수 있다.

결과적으로 이 전략의 모든 작업은 초과 스레드가 처리한다.

 

주의

이 방식은 작업 수에 맞추어 스레드 수가 변하기 때문에, 작업의 처리 속도가 빠르고, CPU, 메모리를 매우 유연하게 사용할 수 있다는 장점이 있다. 하지만 상황에 따라 장점이 가장 큰 단점이 되기도 한다.

 

상황1 - 점진적인 사용자 확대

  • 개발한 서비스가 잘 되어서 사용자가 점점 늘어난다.
  • 캐시 스레드 전략을 사용하면 이런 경우 크게 문제가 되지 않는다.
  • 캐시 스레드 전략은 이런 경우에는 문제를 빠르게 찾을 수 있다. 사용자가 점점 증가하면서 스레드 사용량도 함께 늘어난다. 따라서 CPU 메모리의 사용량도 자연스럽게 증가한다.
  • 물론 CPU, 메모리 자원은 한계가 있기 때문에 적절한 시점에 시스템을 증설해야 한다. 그렇지 않으면 CPU, 메모리 같은 시스템 자원을 너무 많이 사용하면서 시스템이 다운될 수 있다.

상황2 - 갑작스런 요청 증가

  • 마케팅 팀의 이벤트가 대성공 하면서 갑자기 사용자가 폭증했다.
  • 고객은 응답을 받지 못한다고 항의한다.

상황2 - 확인

  • 개발자는 급하게 CPU, 메모리 사용량을 확인해보는데, CPU 사용량이 100%이고, 메모리 사용량도 지나치게 높아져 있다. 
  • 스레드 수를 확인해보니 스레드가 수 천개 실행되고 있다. 너무 많은 스레드가 작업을 처리하면서 시스템 전체가 느려지는 현상이 발생한다.
  • 캐시 스레드 풀 전략은 스레드가 무한으로 생성될 수 있다.
  • 수 천개의 스레드가 처리하는 속도보다 더 많은 작업이 들어온다.
  • 시스템은 너무 많은 스레드에 잠식 당해서 거의 다운된다. 메모리도 거의 다 사용되어 버린다.
  • 시스템이 멈추는 장애가 발생한다.

고정 스레드 풀 전략은 서버 자원은 여유가 있는데, 사용자만 점점 느려지는 문제가 발생할 수 있다. 반면에 캐시 스레드 풀 전략은 서버의 자원을 최대한 사용하지만, 서버가 감당할 수 있는 임계점을 넘는 순간 시스템이 다운될 수 있다.

 

Executor 전략 - 사용자 정의 풀 전략

상황1 - 점진적인 사용자 확대

  • 개발한 서비스가 잘 되어서 사용자가 점점 늘어난다.

상황2 - 갑작스런 요청 증가

  • 마케팅 팀의 이벤트가 대성공 하면서 갑자기 사용자가 폭증했다.

다음과 같이 세분화된 전략을 사용하면 상황1, 상황2를 모두 어느정도 대응할 수 있다.

  • 일반: 일반적인 상황에는 CPU, 메모리 자원을 예측할 수 있도록 고정 크기의 스레드로 서비스를 안정적으로 운영한다.
  • 긴급: 사용자의 요청이 갑자기 증가하면 긴급하게 스레드를 추가로 투입해서 작업을 빠르게 처리한다.
  • 거절: 사용자의 요청이 폭증해서 긴급 대응도 어렵다면 사용자의 요청을 거절한다.

이 방법은 평소에는 안정적으로 운영하다가, 사용자의 요청이 갑자기 증가하면 긴급하게 스레드를 더 투입해서 급한 불을 끄는 방법이다. 물론 긴급 상황에는 CPU, 메모리 자원을 더 사용하기 때문에 적정 수준을 찾아야 한다. 일반적으로는 여기까지 대응이 되겠지만, 시스템이 감당할 수 없을 정도로 사용자의 요청이 폭증하면, 처리 가능한 수준의 사용자 요청만 처리하고 나머지 요청은 거절해야 한다. 어떤 경우도 시스템이 다운되는 최악의 상황은 피해야 한다. 

 

사용자 정의 풀 전략은 다음과 같이 적용할 수 있다.

ExecutorService es = new ThreadPoolExecutor(100, 200, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000));
  • 100개의 기본 스레드를 사용한다.
  • 추가로 긴급 대응 가능한 긴급 스레드 100개를 사용한다. 긴급 스레드는 60초의 생존 주기를 가진다.
  • 1000개의 작업이 큐에 대기할 수 있다.

PoolSizeMainV4

package thread.executor.poolsize;

import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static thread.executor.ExecutorUtils.printState;
import static util.MyLogger.log;

public class PoolSizeMainV4 {

     public static final int TASK_SIZE = 1100;
//     public static final int TASK_SIZE = 1200; // 긴급
//     public static final int TASK_SIZE = 1201; // 거절

    public static void main(String[] args) {
        ExecutorService es = new ThreadPoolExecutor(
                100,
                200,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1000));

        printState(es);

        long startMs = System.currentTimeMillis();

        for (int i = 1; i <= TASK_SIZE; i++) {
            String taskName = "task" + i;
            try {
                es.execute(new RunnableTask(taskName));
                printState(es, taskName);
            } catch (RejectedExecutionException e) {
                log(taskName + " -> " + e);
            }
        }

        es.close();
        long endMs = System.currentTimeMillis();

        log("time : " + (endMs - startMs) + "ms");
    }
}

이 전략은 다음과 같이 작동한다. 하나의 작업을 처리하는데 약 1초가 걸린다고 가정해보자.

  • 일반: 1000개 이하의 작업이 큐에 담겨있다. → 100개의 기본 스레드가 처리한다.
  • 긴급: 큐에 담긴 작업이 이미 1000개인데 그 이상으로 계속 작업이 들어온다 → 1000개의 기본 스레드 + 100개의 초과 스레드가 처리한다.
  • 거절: 초과 스레드를 투입했지만, 큐에 담긴 작업이 1000개이고 또 초과 스레드도 넘어간 상황이다. → 이 경우 예외를 발생시킨다.

코드상에 작성해 둔 TASK_SIZE를 각각 한번씩 주석을 변경해서 실행해보자. 아래와 같은 실행 결과를 얻는다.

 

일반 - TASK_SIZE = 1100

2024-07-31 10:35:56.468 [     main] task1099 -> [pool= 100, active=100, queuedTasks=999, completedTask=0]
2024-07-31 10:35:56.468 [     main] task1100 -> [pool= 100, active=100, queuedTasks=1000, completedTask=0]
...
2024-07-31 10:36:07.426 [     main] time : 11072ms
  • 1000개 이하의 작업이 큐에 담겨있다. → 100개의 기본 스레드가 처리한다.
  • 최대 1000개의 작업이 큐에 대기하고 100개의 작업이 실행중일 수 있다. 따라서 1100개 까지는 기본 스레드로 처리할 수 있다.
  • 작업을 모두 처리하는데 11초가 걸린다. 1100 / 100 → 11초

긴급 - TASK_SIZE = 1200

2024-07-31 10:38:18.589 [     main] task1200 -> [pool= 200, active=200, queuedTasks=1000, completedTask=0]
...
2024-07-31 10:38:24.605 [     main] time : 6147ms
  • 큐에 담긴 작업이 1000개를 초과한다. → 100개의 기본 스레드 + 100개의 초과 스레드가 처리한다.
  • 최대 1000개의 작업이 대기하고 200개의 작업이 실행중일 수 있다.
  • 작업을 모두 처리하는데 6초가 걸린다. 1200 / 200 → 6초
  • 긴급 투입한 스레드 덕분에 풀의 스레드 수가 2배가 된다. 따라서 작업을 2배 빠르게 처리한다.
  • 물론 CPU, 메모리 사용을 더 하기 때문에 이런 부분은 감안해서 긴급 상황에 투입할 최대 스레드를 정해야 한다.

거절 - TASK_SIZE = 1201

2024-07-31 10:41:21.000 [     main] task1200 -> [pool= 200, active=200, queuedTasks=1000, completedTask=0]
2024-07-31 10:41:21.004 [     main] task1201 -> java.util.concurrent.RejectedExecutionException: Task thread.executor.RunnableTask@d82f3ab8 rejected from java.util.concurrent.ThreadPoolExecutor@21fb7e74[Running, pool size = 200, active threads = 200, queued tasks = 1000, completed tasks = 0]
...
2024-07-31 10:41:27.008 [     main] time : 6139ms
  • 중간에 task1201 예외 로그를 잘 확인해보자.
  • 긴급 투입한 스레드로도 작업이 빠르게 소모되지 않는다는 것은, 시스템이 감당하기 어려운 많은 요청이 들어오고 있다는 의미이다.
  • 여기서는 큐에 대기하는 작업 1000개 + 스레드가 처리 중인 작업 200개 → 총 1200개의 작업을 초과하면 예외가 발생한다.
  • 따라서 1201번에서 예외가 발생한다.
  • 이런 경우 요청을 거절한다. 고객 서비스라면 시스템에 사용자가 너무 많으니 나중에 다시 시도해달라고 해야 한다.
  • 나머지 1200개의 작업들은 긴급 상황과 같이 정상 처리된다.

 

이런 실수를 하면 안된다!

new ThreadPoolExecutor(100, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
  • 기본 스레드 100개
  • 최대 스레드 200개
  • 큐 사이즈: 무한대

이렇게 설정하면 절대로 최대 스레드가 동작하지 않는다. 왜냐하면 큐가 가득차야 긴급 상황으로 인지 되는데, LinkedBlockingQueue를 기본 생성자를 통해 무한대의 사이즈로 사용하게 되면, 큐가 가득찰 수가 없다. 결국 기본 스레드 100개만으로 무한대의 작업을 처리해야 하는 문제가 발생한다. 실무에서 자주 하는 실수 중에 하나이다.

 

Executor 예외 정책

생산자 소비자 문제를 실무에서 사용할 때는, 결국 소비자가 처리할 수 없을 정도로 생산 요청이 가득 차면 어떻게 할지를 정해야 한다. 개발자가 인지할 수 있게 로그도 남겨야 하고, 사용자에게 현재 시스템에 문제가 있다고 알리는 것도 필요하다. 이런 것을 위해 예외 정책이 필요하다. 

 

ThreadPoolExecutor에 작업을 요청할 때, 큐도 가득차고, 초과 스레드도 더는 할당할 수 없다면 작업을 거절한다.

ThreadPoolExecutor는 작업을 거절하는 다양한 정책을 제공한다.

  • AbortPolicy: 새로운 작업을 제출할 때 RejectedExecutionException을 발생시킨다. 기본 정책이다.
  • DiscardPolicy: 새로운 작업을 조용히 버린다.
  • CallerRunsPolicy: 새로운 작업을 제출한 스레드가 대신해서 직접 작업을 실행한다.
  • 사용자 정의(RejectedExecutionHandler): 개발자가 직접 정의한 거절 정책을 사용할 수 있다. 
참고로, ThreadPoolExecutorshutdown()하면 이후에 요청하는 작업을 거절하는데, 이때도 같은 정책이 적용된다.

 

AbortPolicy

작업이 거절되면 RejectedExecutionException을 던진다. 기본적으로 설정되어 있는 정책이다.

RejectMainV1

package thread.executor.rejected;

import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;

public class RejectMainV1 {

    public static void main(String[] args) {

        ExecutorService es = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                new ThreadPoolExecutor.AbortPolicy());

        es.execute(new RunnableTask("task1"));
        try {
            es.execute(new RunnableTask("task2"));
        } catch (RejectedExecutionException e) {
            log("요청 초과");
            // 포기, 다시 시도 등 다양한 고민을 하면 됨
            log(e);
        }

        es.close();
    }
}
  • ThreadPoolExecutor 생성자 마지막에 new ThreadPoolExecutor.AbortPolicy()를 제공하면 된다.
  • 참고로 이것이 기본 정책이기 때문에 생략해도 된다.
  • 스레드는 1개만 사용한다. 예제를 단순하게 만들기 위해 큐에 작업을 넣지 않도록 SynchronousQueue를 사용한다.

실행 결과

2024-07-31 12:43:13.500 [     main] 요청 초과
2024-07-31 12:43:13.500 [pool-1-thread-1] task1 시작
2024-07-31 12:43:13.503 [     main] java.util.concurrent.RejectedExecutionException: Task thread.executor.RunnableTask@b7fa1e8f rejected from java.util.concurrent.ThreadPoolExecutor@6ac69be4[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
2024-07-31 12:43:14.506 [pool-1-thread-1] task1 완료
  • task1은 풀의 스레드가 수행한다.
  • task2를 요청하면 허용 작업을 초과한다. 따라서 RejectedExecutionException이 발생한다.

RejectedExecutionException 예외를 잡아서 작업을 포기하거나, 사용자에게 알리거나, 다시 시도하면 된다. 이렇게 예외를 잡아서 필요한 코드를 직접 구현해도 되고 아니면 다음에 설명한 다른 정책들을 사용해도 된다.

 

RejectedExecutionHandler

마지막에 전달한 AbortPolicyRejectedExecutionHandler의 구현체이다. ThreadPoolExecutor 생성자는 RejectedExecutionHandler의 구현체를 전달 받는다.

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

ThreadPoolExecutor는 거절해야 하는 상황이 발생하면 RejectedExecutionHandler가 가지고 있는 rejectExecution()을 호출한다. 

 

DiscardPolicy

거절된 작업을 무시하고 아무런 예외도 발생시키지 않는다.

RejectMainV2

package thread.executor.rejected;

import thread.executor.RunnableTask;

import java.util.concurrent.*;

import static util.MyLogger.log;

public class RejectMainV2 {

    public static void main(String[] args) {

        ExecutorService es = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                new ThreadPoolExecutor.DiscardPolicy());

        es.execute(new RunnableTask("task1"));
        es.execute(new RunnableTask("task2"));
        es.execute(new RunnableTask("task3"));

        es.close();
    }
}
  • ThreadPoolExecutor 생성자 마지막에 new ThreadPoolExecutor.DiscardPolicy()를 제공하면 된다.

실행 결과

2024-07-31 12:47:28.663 [pool-1-thread-1] task1 시작
2024-07-31 12:47:29.668 [pool-1-thread-1] task1 완료
  • task2, task3은 거절된다. DiscardPolicy는 조용히 버리는 정책이다.
  • 다음 구현 코드를 보면 왜 조용히 버리는 정책인지 이해가 될 것이다. (아무것도 없다)
public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

 

CallerRunsPolicy

이제 이게 재밌는데, 호출한 스레드가 직접 작업을 수행하게 한다. 그러니까 예를 들어, 피자를 먹으러 온 손님한테 주인이 "내가 두 팔을 다 사용중이라 더 피자를 못 만드니까 당신이 알아서 해 먹으세요!" 라고 직접 시키는 것과 같다. 이로 인해 새로운 작업을 제출하는 스레드의 속도가 느려질 수 있다. 왜냐? 원래 같으면 새로운 작업을 제출하는 스레드는 execute() 또는 submit()으로 계속 작업을 요청하는데 요청했더니 나보고 이걸 하라고 시키면? 직접 작업을 처리하는 시간동안 요청을 못하게 된다. 

 

RejectMainV3

package thread.executor.rejected;

import thread.executor.RunnableTask;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RejectMainV3 {

    public static void main(String[] args) {

        ExecutorService es = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                new ThreadPoolExecutor.CallerRunsPolicy());

        es.execute(new RunnableTask("task1"));
        es.execute(new RunnableTask("task2"));
        es.execute(new RunnableTask("task3"));

        es.close();
    }
}

실행 결과

2024-07-31 12:52:13.122 [     main] task2 시작
2024-07-31 12:52:13.122 [pool-1-thread-1] task1 시작
2024-07-31 12:52:14.127 [pool-1-thread-1] task1 완료
2024-07-31 12:52:14.132 [     main] task2 완료
2024-07-31 12:52:14.133 [pool-1-thread-1] task3 시작
2024-07-31 12:52:15.134 [pool-1-thread-1] task3 완료
  • task1은 스레드 풀에 스레드가 있어서 스레드 풀에 있는 스레드가 수행한다.
  • task2는 스레드 풀에 보관할 큐도 없고 작업할 스레드가 없다. 거절해야 한다.
  • 이때 작업을 거절하는 대신에 작업을 요청한 스레드에게 대신 일을 시킨다.
  • task2의 작업을 main 스레드가 수행하는 것을 확인할 수 있다.

이 정책의 특징은 생산자 스레드가 소비자 대신 일을 수행하는 것도 있지만, 생산자 스레드가 대신 일을 수행하는 덕분에 작업의 생산 자체가 느려진다는 점이다. 덕분에 작업의 생산 속도가 너무 빠르다면, 생산 속도를 조절할 수 있다. 원래대로 하면 main 스레드가 task1, task2, task3, task4를 연속해서 바로 생산해야 한다. CallerRunsPolicy 정책 덕분에 main 스레드는 task2를 본인이 직접 완료하고 나서야 task3을 생산할 수 있다. 결과적으로 생산 속도가 조절되었다. 

 

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
  • r.run() 코드를 보면 별도의 스레드에서 수행하는 것이 아니라 main 스레드가 직접 수행하는 것을 알 수 있다. Runnable이 가진 run()을 직접 호출하면 절대 안된다고 예전 포스팅에서 말했다. 그 이유는 직접 호출하면 그 호출한 스레드가 이 작업을 수행하는 것이지 새로운 스레드가 작업을 수행하는게 아니라고 말했다. 그 이유 그대로가 여기서 사용된다. 재밌다.
  • 참고로 ThreadPoolExecutorshutdown()을 하면 이후에 요청하는 작업을 거절하는데, 이때도 생성할 때 설정한 예외 정책이 그대로 적용된다. 그래서 CallerRunsPolicy 정책은 shutdown() 이후에도 받은 작업을 수행하게 될테니 shutdown() 조건을 체크해서 이 경우에는 작업을 수행하지 않도록 해두었다.

사용자 정의

사용자가 직접 RejectedExecutionHandler 인터페이스를 구현해서 자신만의 예외 처리 전략을 정의할 수 있다. 이를 통해 특정 요구사항에 맞는 작업 거절 방식을 설정할 수 있다.

 

RejectMainV4

package thread.executor.rejected;

import thread.executor.RunnableTask;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static util.MyLogger.log;

public class RejectMainV4 {

    public static void main(String[] args) {

        ExecutorService es = new ThreadPoolExecutor(
                1,
                1,
                0,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                new MyRejectedExecutionHandler());

        es.execute(new RunnableTask("task1"));
        es.execute(new RunnableTask("task2"));
        es.execute(new RunnableTask("task3"));

        es.close();
    }

    static class MyRejectedExecutionHandler implements RejectedExecutionHandler {

        AtomicInteger count = new AtomicInteger();

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            int counted = count.incrementAndGet();
            log("[경고]: 누적된 거절 작업 수 : " + counted);
        }
    }
}

실행 결과

2024-07-31 13:01:29.707 [     main] [경고]: 누적된 거절 작업 수 : 1
2024-07-31 13:01:29.707 [pool-1-thread-1] task1 시작
2024-07-31 13:01:29.710 [     main] [경고]: 누적된 거절 작업 수 : 2
2024-07-31 13:01:30.712 [pool-1-thread-1] task1 완료

 

 

정리

길고 긴 멀티스레드 세상이 끝났다! 이 포스팅에서는 Executor 프레임워크를 조금 더 실무 관점에서 다루는 방식에 대해 알아보았다. 그래서 어떻게 우아한 종료를 할 수 있을지에 대한 고민과 스레드 풀의 corePoolSize, maximumPoolSize가 어떤 방식으로 동작하는지 이해해보았다. 그래서 풀 전략도 세가지로 나뉘어졌었다.

  • 고정 스레드 풀 전략: 트래픽이 일정하고, 시스템 안정성이 가장 중요
  • 캐시 스레드 풀 전략: 일반적인 성장하는 서비스
  • 사용자 정의 풀 전략: 다양한 상황에 대응

그리고 마지막으로, 예외 정책에 대해서도 알아봤다. 개인적으로 가장 재밌는건 CallerRunsPolicy였다. 사용하진 않을 것 같다ㅋㅋ

728x90
반응형
LIST

+ Recent posts