카테고리 없음

ThreadPoolExecutor

jis1218 2024. 10. 13. 23:27
ThreadPoolExecutor executor = new ThreadPoolExecutor(
        10, // corePoolSize
        10, // maximumPoolSize
        10L, // keepAliveTime
        TimeUnit.SECONDS, // timeUnit
        new LinkedBlockingQueue<>(10) // queue
);
  • corePoolSize - 실행에 참여시킬 스레드 크기
    • 작업이 시작되어야 스레드도 생성된다. 즉 처음부터 corePoolSize만큼 만들어지지 않는다.
    • ThreadPoolExecutor.prestartAllCoreThreads 을 이용하면 처음부터 corePoolSize만큼 만들 수 있다.
  • maximumPoolSize - workQueue가 꽉 찼을 때 추가적으로 (maximumPoolSize - corePoolSize)만큼의 스레드가 생성될 준비를 한다. 따라서 maximumPoolSize는 corePoolSize보다 작을 수 없다. 만약 작으면 IllegalArgumentException 이 발생한다.
    • 역시나 필요할 때마다 생성된다. 처음부터 maximumPoolSize만큼 만들어지지 않는다.
  • workQueue - corePoolSize가 꽉 차고 그 보다 더 많은 요청이 올 경우 workQueue에서 초과 작업을 가지고 있다.
    • BlockingQueue를 사용한다. ThreadPoolExecutor의 경우 생성자로 구현체를 받는다.
    • ThreadPoolTaskExecutor의 경우 queueCapacity에 따라 구현체가 다르다.
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue());
    }
    
  • keepAliveTime - maximumPoolSize에 의해 추가적으로 생성된 스레드가 작업이 없을 때 keep alive time만큼 존재하다가 사라지게 된다. 만약 allowCoreThreadTimeOut 이 true라면 아래 설명과 같이 core thread도 keep alive time 후에 사라지게 된다.

*/** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */*private volatile boolean allowCoreThreadTimeOut;

Executor 전략 중 newCachedThreadPool()이 있다.

기본 스레드를 사용하지 않고 60초 생존주기를 가진 초과 스레드만 사용한다.

즉 요청이 있으면 스레드가 계속 생성되고 60초 후에 사라지게 된다.

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        0, // corePoolSize
        Integer.MAX_VALUE, // maximumPoolSize
        60L, // keepAliveTime
        TimeUnit.SECONDS, // timeUnit
        new SynchronousQueue<Runnable>() // queue
);

여기서 SynchornousQueue는 무엇일까?

코드에 있는 주석을 보면

/**
 * A {@linkplain BlockingQueue blocking queue} in which each insert
 * operation must wait for a corresponding remove operation by another
 * thread, and vice versa.  A synchronous queue does not have any
 * internal capacity, not even a capacity of one.  You cannot
 * {@code peek} at a synchronous queue because an element is only
 * present when you try to remove it; you cannot insert an element
 * (using any method) unless another thread is trying to remove it;

SynchronousQueue는 저장 capacity가 없다. 생산자의 작업을 소비자에게 직접 전달하는 구조이다.

따라서 다음과 같은 코드를 실행시키면 에러가 발생한다.

@Test
void name() {
    SynchronousQueue<String> queue = new SynchronousQueue<>();
    queue.add("hello");

    System.out.println("queue = " + queue);
}

//Queue full
//java.lang.IllegalStateException: Queue full
//	at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
//	at com.insup.notification.service.SendServiceTest.name(SendServiceTest.java:118)
//	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
//	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
//	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

반면에 offer()를 사용하면 에러는 발생하지 않는다.

@Test
void name() {
    SynchronousQueue<String> queue = new SynchronousQueue<>();
    queue.offer("hello");

    System.out.println("queue = " + queue);
}

// queue = []

대신 queue가 비어있는 것을 알게 된다.