개발/Spring

간단한 푸시 서비스 각 부분별 스레드 설정에 따른 결과

jis1218 2024. 12. 12. 23:04

푸시 서비스가 아래와 같이 이루어져 있다고 하자.

  1. SQS로부터 메시지를 받는 SqsListener,
  2. 비즈니스 로직을 처리하는 FirebaseService,
  3. FCM과 직접적으로 통신하는 Firebase SDK,

Sqs의 기본 TaskExecutor

우선 Sqs의 기본 TaskExecutor는 다음과 같이 정의된다.

// AbstractPipelineMessageListenerContainer.class

protected TaskExecutor createTaskExecutor() {
	ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
	int poolSize = getContainerOptions().getMaxConcurrentMessages() * this.messageSources.size();
	executor.setMaxPoolSize(poolSize);
	executor.setCorePoolSize(poolSize);
	// Necessary due to a small racing condition between releasing the permit and releasing the thread.
	executor.setQueueCapacity(poolSize);
	executor.setAllowCoreThreadTimeOut(true);
	executor.setThreadFactory(createThreadFactory());
	executor.afterPropertiesSet();
	return executor;
}

int poolSize = getContainerOptions().getMaxConcurrentMessages() * this.messageSources.size();에서 알 수 있듯이 messageSource인 SqsListener의 개수와 MaxConcurrentMessages의 곱에 따라 poolSize가 정해진다. 즉 SqsListener에 maxConcurrentMessages를 10으로 하였다면 poolSize는 10개가 되는 것이다.

Firebase SDK의 기본 ThreadPoolExecutor

// FirebaseThreadManagers.class

private static class DefaultThreadManager extends GlobalThreadManager {

	@Override
	protected ExecutorService doInit() {
	  ThreadFactory threadFactory = FirebaseScheduledExecutor.getThreadFactoryWithName(
	      getThreadFactory(), "firebase-default-%d");
	
	  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L,
	      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
	  threadPoolExecutor.allowCoreThreadTimeOut(true);
	  return threadPoolExecutor;
	}

Firebase는 DefaultThreadManager에 Default ThreadPoolExecutor를 가지고 있다. poolSize는 100개이다.

대량의 푸시를 한번에 보낸다고 할 때 각 부분별 스레드에 따른 결과는 다음과 같다.

SqsListener FirebaseService FirebaseSDK 결과

SqsListener FirebaseService FirebaseSDK 결과
Sqs 기본 스레드
(core pool size : 10개)
CustomThreadPool-1
(core pool size : 100개)
SDK 기본 스레드
(core pool size : 100개)
OOM 발생
Sqs 기본 스레드
(core pool size : 10개)
가상 스레드 SDK 기본 스레드
(core pool size : 100개)
OOM 발생
Sqs 기본 스레드
(core pool size : 10개)
Sqs 기본 스레드
(core pool size : 10개)
SDK 기본 스레드
(core pool size : 100개)
정상 발송
CustomThreadPool-1
(core pool size : 100개)
CustomThreadPool-1
(core pool size : 100개)
SDK 기본 스레드
(core pool size : 100개)
정상 발송

 

결과에 있듯이 대량의 메시지를 보낼 경우 SqsListener와 FirebaseService를 비동기처리 하면 OOM이 발생하는 것을 알 수 있다. 그 이유는 SqsListener의 스레드가 Blocking 되는 부분이 없어 계속해서 message를 SQS로부터 polling 하기 때문이며 이로 인해 CustomThreadPool을 사용하였을 경우 ThreadPoolExecutor의 Queue에 메시지가 계속 쌓이게 되어 OOM이 발생한다. 가상 스레드를 사용하였다면 스레드가 poll 받은 message수 만큼 늘어나므로 역시나 OOM이 발생한다.

따라서 SqsListener와 FirebaseService를 동기화하고 FirebaseService에서 FirebaseSDK의 동작이 끝날 때까지 기다리게 되면 SqsListener의 스레드가 Blocking 되므로 처리한 만큼만 메시지를 polling할 수 있게 된다.

SqsListener의 스레드를 기본으로 사용하지 않고 다음과 같이 Custom한 ThreadPool을 사용할 수도 있는데

@Bean
public MessageListenerContainerFactory messageListenerContainerFactory() {
    return SqsMessageListenerContainerFactory.builder()
        .sqsAsyncClient(sqsAsyncClient())
        .configure(optionsBuilder -> optionsBuilder
            .componentsTaskExecutor(Executors.newFixedThreadPool(100))
            .build()
        )
        .build();
}

@SqsListener(value = "mySqs, maxConcurrentMessages = "20", maxMessagesPerPoll = "5",

, factory = "messageListenerContainerFactory") 에서 maxConcurrentMessages가 20일 경우 20개의 스레드만 사용하게 되므로 나머지 80개는 다음 cycle에 쓰이게 된다. maxConcurrentMessage만큼의 스레드를 생성하여 사용하는 것보다는 더 빠른데 그 이유는 반납하고 다시 사용하는데 시간이 더 소요되어서 그런 것으로 보인다.