멀티스레드와 ThreadPoolExecutor 본문

백앤드 개발일지

멀티스레드와 ThreadPoolExecutor

giron 2024. 12. 30. 21:57
728x90

ThreadPoolExecutor

트래픽이 증가했을땐 처리량을 높이기 위해 스레드를 늘리고, 감소했을땐 줄이려면 어떻게 해야할까? 병렬처리를 위해선 어떻게 스레드를 설정해야할까? 

 

ThreadPoolExecutor 메서드

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, // corePoolSize: 기본 스레드 수. 항상 최소 2개의 스레드는 유지됩니다.
    4, // maximumPoolSize: 최대 스레드 수. 필요할 경우 스레드는 최대 4개까지 증가할 수 있습니다.
    30, // keepAliveTime: corePoolSize를 초과하는 스레드가 대기할 수 있는 최대 시간(30초).
    TimeUnit.SECONDS, // keepAliveTime의 시간 단위. 여기서는 초(seconds) 단위로 설정됩니다.
    new LinkedBlockingQueue<>(10), // 작업 큐: 최대 10개의 작업을 담을 수 있는 LinkedBlockingQueue 사용.
    new ThreadPoolExecutor.DiscardPolicy() // 거부 정책: 큐가 가득 차면 새 작업을 그냥 버립니다.
);

일반적으로 작업당 호출 오버헤드가 감소하여 많은 수의 비동기 작업을 실행할 때 성능이 향상되고, 작업 모음을 실행할 때 소비되는 스레드를 포함한 리소스를 제한하고 관리하는 수단을 제공합니다.

프로그래머는 일반적인 사용 시나리오에 대한 설정을 미리 구성한 아래의 Executors팩토리 메서드를 사용하는 것이 좋다. (물론 내부적으로 ThreadPoolExecutor()를 사용해 구현되어있기에 커스텀이 필요하다면 ThreadPoolExecutor()를 직접 구현하면 된다.)

Core and maximum pool sizes

ThreadPoolExecutor는 corePoolSize와 maximumPoolSize가 설정된 경계에 따라 자동으로 pool size를 조정합니다.

만약 corePoolSize보다 적게 스레드가 운영중이라면, 요청을 처리하기 위한 새로운 스레드가 만들어집니다. 비록 다른 워커 스레드들이 idel(유휴)상태 일지라도 만들어집니다.

만약 maximumPoolSize보다 적게 스레드가 운영중이라면, 오직 큐가 가득 찼을때만 스레드가 만들어진다.

  • corePoolSize = maximumPoolSize, 같은 수로 세팅하면 고정된 poolSize를 생성할 수 있다.
  • maximumPoolSize를 Integer.MAX_VALUE처럼 세팅하면 concurrent tasks 수 만큼 pool이 조정된다.

setCorePoolSize(int) and setMaximumPoolSize(int) 를 사용하면 동적으로 바꿀수도 있다. (더 작은 값으로 설정하면, 초과된 기존 스레드들은 다음에 유휴 상태가 될 때 종료된다.)

On-demand construction

prestartCoreThread() or prestartAllCoreThreads()를 사용해서 동적으로 overried할수 있다. 이를 통해 요청이 들어오기전에 미리 생성할 수 있다. 만약 이미 생성되어있다면 false를 반환한다.

Keep-alive times

만약 pool이 corePoolSize threads 보다 많다면, keepAliveTime이후의 유휴 상태의 초과한 스레드들은 삭제될것이다. setKeepAliveTime(long, TimeUnit).를 통해 동적으로 조정할 수 있다. 일반적으로 corePoolSize보다 많은 스레드를 대상으로 적용되지만 allowCoreThreadTimeOut(boolean)를 사용해서 core threads또한 대상에 적용되게 할 수 있다.

Queuing

  • corePoolSize보다 적은 수의 스레드가 실행 중이면 Executor는 큐에 넣는 것보다 항상 새로운 스레드를 추가하는 것을 선호합니다.
  • corePoolSize 이상의 스레드가 실행 중이면 Executor는 항상 새로운 스레드를 추가하는 것보다 요청을 큐에 넣는 것을 선호합니다.
  • 요청을 큐에 넣을 수 없는 경우 새 스레드가 생성되지만, 이것이 maximumPoolSize를 초과하는 경우 작업은 거부됩니다.

3가지 큐잉 전략

  1. Direct handoffs
    1. 좋은 기본 선택은 SynchronousQueue 사용이다. 큐에 작업을 보류하지 않고 스레드에 작업을 핸드오프하는 것이다.
    2. 일반적으로 새로 제출된 작업을 거부하지 않도록 무제한 maximumPoolSizes가 필요합니다.
    3. 버퍼링 할 공간이 없기 때문에 Queue에 삽입하려는 동작과 Queue에서 가져가려 한다.
  2. Unbounded queues
    1.  LinkedBlockingQueue같은 제한 없는 큐를 사용한다. corePoolSize threads 바쁘면 큐에 대기한다.
    2. 따라서 corePoolSize 스레드보다 더 많은 스레드가 생성되지 않습니다 (maximumPoolSize가 무의미해진다.)
    3. 각 작업이 다른 작업과 완전히 독립적이어서 작업이 서로의 실행에 영향을 미칠 수 없는 경우(예: 웹 페이지 서버에서) 이 방식이 적합할 수 있습니다. 이러한 스타일의 대기열은 일시적인 요청 버스트를 매끄럽게 처리하는 데 유용할 수 있지만 명령이 처리할 수 있는 것보다 평균적으로 더 빨리 도착하면 무제한 작업 대기열이 증가할 가능성이 있습니다.
  3. Bounded queues
    1. ArrayBlockingQueue를 사용하여 유한한 maximumPoolSizes의 자원이 고갈되는 것을 방지하도록 돕지만 조정 및 제어가 더 어려울 수 있다.( maximum pool sizes와 Queue Size 설정이 trade-off가 있기 때문이다.)
    2. 큰 queue size와 작은 pool size는 CPU 사용, OS 리소스 및 컨텍스트 전환 오버헤드가 최소화되지만 인위적으로 처리량이 낮아질 수 있습니다
    3. 만약 IO bound처럼 자주 blocking된다면, 작은 queue size와 큰 pool size는 cpu를 더 바쁘게할수 있지만 오버헤드에 주의해야한다.

Rejected tasks

새로운 작업이 execute(Runnable) 메서드를 통해 제출되었지만 거부되는 상황은 다음 두 가지입니다

  1. Executor가 종료(shut down)된 경우.
  2. Executor가 최대 스레드 수와 작업 큐의 크기 모두 유한한 제한(finite bounds)을 갖고 있고, 이미 포화 상태(saturated)인 경우

4가지 방식으로 처리된다.

  1. ThreadPoolExecutor.AbortPolicy: 기본 정책으로 핸들러는 거부시 런타임에 RejectedExecutionException를 throw한다.
  2. ThreadPoolExecutor.CallerRunsPolicy
    1. 작업을 제출한 호출 스레드가 직접 거부된 작업을 실행하도록 합니다. (응답이 느려질 수 있으나 받을 수 있음)
    2. 이 정책은 피드백 제어 메커니즘의 역할을 하며, 작업 제출 속도를 느리게 만드는 효과가 있습니다.
      즉, 제출한 스레드가 실행에 시간을 소비하게 되므로, 새로운 작업 제출이 줄어듭니다.
  3. ThreadPoolExecutor.DiscardPolicy
    1. 실행할 수 없는 작업을 그냥 버립니다.
    2. 작업 결과가 반드시 필요하지 않은 경우에만 사용해야 하며, 드물게 사용하는 정책입니다.
  4. ThreadPoolExecutor.DiscardOldestPolicy
    1. Executor가 종료되지 않은 경우, 작업 큐의 가장 오래된 작업을 삭제하고, 새 작업 실행을 재시도합니다.
    2. 단, 이 정책은 작업이 계속 거부될 수 있어, 대부분의 경우 적합하지 않습니다.
      작업을 삭제할 경우, 삭제된 작업이 대기 중인 컴포넌트에서 예외를 발생시키거나 작업 실패를 기록해야 합니다.
500개의 동시 요청 테스트 

1. CallerRunsPolicy: 8s
2. DiscardPolicy: 5s
3. DiscardOldestPolicy: 5s

 

 

사용자 정의 RejectedExecutionHandler

  • 기본 정책 외에도, 사용자는 직접 RejectedExecutionHandler 클래스를 정의해 특정 요구 사항에 맞는 거부 정책을 구현할 수 있습니다.
  • 사용자 정의 정책을 설계할 때는 특히 주의가 필요합니다. 특정 작업 큐 크기용량 정책에 맞게 설계해야 안정적인 작동이 보장됩니다.
class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task rejected: " + r.toString());
        // 필요시 로깅 또는 알림 처리
    }
}

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 30, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(10),
    new CustomRejectedExecutionHandler()
);

shutDown

shutDown(): 기존에 들어있는 작업들은 모두 끝마치고 종료된다.
shutDownNow(): 즉 종료된다

 

cf. ExecutorService vs ForkJoinPool

이해하기 쉬운 자료가 있어서 남겨둔다. : https://medium.com/@javatechie/understanding-the-basics-of-executorservice-vs-forkjoinpool-0fb22f117480

ExecutorService는 독립적으로 실행한다.

ExecutorService kitchen = Executors.newFixedThreadPool(3);

Future<String> soup = kitchen.submit(() -> "Soup is ready!");
Future<String> steak = kitchen.submit(() -> "Steak is ready!");
Future<String> salad = kitchen.submit(() -> "Salad is ready!");

System.out.println(soup.get());  // Waits for soup
System.out.println(steak.get()); // Waits for steak
System.out.println(salad.get()); // Waits for salad

kitchen.shutdown();

 

ForkJoinPool은 각각의 결과를 합치는 과정이 들어간다.

ForkJoinPool kitchen = new ForkJoinPool();

RecursiveTask<String> lasagnaTask = new RecursiveTask<>() {
    @Override
    protected String compute() {
        // Break down tasks like cooking noodles, making sauce, etc.
        ForkJoinTask<String> noodlesTask = new RecursiveTask<>() {
            protected String compute() {
                return "Noodles cooked!";
            }
        }.fork();

        ForkJoinTask<String> sauceTask = new RecursiveTask<>() {
            protected String compute() {
                return "Sauce prepared!";
            }
        }.fork();

        return noodlesTask.join() + " " + sauceTask.join() + " Lasagna is ready!";
    }
};

String result = kitchen.invoke(lasagnaTask);
System.out.println(result);

ForkJoinPool은 하나의 스레드가 하나의 큐를 관리한다. 이때 한쪽의 스레드만 busy한 문제를 해결하기 위해 work-stealing 알고리즘을 사용한다. 

 

Reference

https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html

728x90
Comments