카테고리 없음
ThreadPoolExecutor
jis1218
2024. 10. 13. 23:27
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // corePoolSize
10, // maximumPoolSize
10L, // keepAliveTime
TimeUnit.SECONDS, // timeUnit
new LinkedBlockingQueue<>(10) // queue
);
- corePoolSize - 실행에 참여시킬 스레드 크기
- 작업이 시작되어야 스레드도 생성된다. 즉 처음부터 corePoolSize만큼 만들어지지 않는다.
- ThreadPoolExecutor.prestartAllCoreThreads 을 이용하면 처음부터 corePoolSize만큼 만들 수 있다.
- maximumPoolSize - workQueue가 꽉 찼을 때 추가적으로 (maximumPoolSize - corePoolSize)만큼의 스레드가 생성될 준비를 한다. 따라서 maximumPoolSize는 corePoolSize보다 작을 수 없다. 만약 작으면 IllegalArgumentException 이 발생한다.
- 역시나 필요할 때마다 생성된다. 처음부터 maximumPoolSize만큼 만들어지지 않는다.
- workQueue - corePoolSize가 꽉 차고 그 보다 더 많은 요청이 올 경우 workQueue에서 초과 작업을 가지고 있다.
- BlockingQueue를 사용한다. ThreadPoolExecutor의 경우 생성자로 구현체를 받는다.
- ThreadPoolTaskExecutor의 경우 queueCapacity에 따라 구현체가 다르다.
protected BlockingQueue<Runnable> createQueue(int queueCapacity) { return (BlockingQueue)(queueCapacity > 0 ? new LinkedBlockingQueue(queueCapacity) : new SynchronousQueue()); }
- keepAliveTime - maximumPoolSize에 의해 추가적으로 생성된 스레드가 작업이 없을 때 keep alive time만큼 존재하다가 사라지게 된다. 만약 allowCoreThreadTimeOut 이 true라면 아래 설명과 같이 core thread도 keep alive time 후에 사라지게 된다.
*/** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */*private volatile boolean allowCoreThreadTimeOut;
Executor 전략 중 newCachedThreadPool()이 있다.
기본 스레드를 사용하지 않고 60초 생존주기를 가진 초과 스레드만 사용한다.
즉 요청이 있으면 스레드가 계속 생성되고 60초 후에 사라지게 된다.
ThreadPoolExecutor executor = new ThreadPoolExecutor(
0, // corePoolSize
Integer.MAX_VALUE, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // timeUnit
new SynchronousQueue<Runnable>() // queue
);
여기서 SynchornousQueue는 무엇일까?
코드에 있는 주석을 보면
/**
* A {@linkplain BlockingQueue blocking queue} in which each insert
* operation must wait for a corresponding remove operation by another
* thread, and vice versa. A synchronous queue does not have any
* internal capacity, not even a capacity of one. You cannot
* {@code peek} at a synchronous queue because an element is only
* present when you try to remove it; you cannot insert an element
* (using any method) unless another thread is trying to remove it;
SynchronousQueue는 저장 capacity가 없다. 생산자의 작업을 소비자에게 직접 전달하는 구조이다.
따라서 다음과 같은 코드를 실행시키면 에러가 발생한다.
@Test
void name() {
SynchronousQueue<String> queue = new SynchronousQueue<>();
queue.add("hello");
System.out.println("queue = " + queue);
}
//Queue full
//java.lang.IllegalStateException: Queue full
// at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
// at com.insup.notification.service.SendServiceTest.name(SendServiceTest.java:118)
// at java.base/java.lang.reflect.Method.invoke(Method.java:580)
// at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
// at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
반면에 offer()를 사용하면 에러는 발생하지 않는다.
@Test
void name() {
SynchronousQueue<String> queue = new SynchronousQueue<>();
queue.offer("hello");
System.out.println("queue = " + queue);
}
// queue = []
대신 queue가 비어있는 것을 알게 된다.