상세 컨텐츠

본문 제목

Kotlin과 Spring 환경에서 CountDownLatch를 활용한 동시성 제어

프로그래밍/Kotlin

by 웰치스짱 2025. 5. 15. 18:36

본문

반응형

Kotlin과 Spring 환경에서 CountDownLatch를 활용한 동시성 제어

개발에서 여러 작업을 동시에 처리해야 하는 경우는 흔합니다. 백그라운드에서 대량의 데이터를 처리하거나, 외부 서비스에 병렬적으로 요청을 보내거나, 애플리케이션 시작 시 여러 컴포넌트의 초기화가 완료되기를 기다리는 등 다양한 시나리오에서 동시성(Concurrency) 문제가 발생합니다.

자바 플랫폼 (그리고 Kotlin)에서는 이러한 동시성 문제를 해결하기 위한 다양한 도구들을 제공하며, 그 중 java.util.concurrent 패키지에 포함된 CountDownLatch는 특정 작업들이 모두 완료될 때까지 다른 스레드를 기다리게 하는 강력한 메커니즘입니다.

이번 블로그 게시물에서는 Kotlin과 Spring 환경에서 CountDownLatch가 무엇인지 이론적으로 살펴보고, 실제 애플리케이션 코드에서 어떻게 활용할 수 있는지 상세한 예제를 통해 알아보겠습니다.

1. CountDownLatch 이론 살펴보기

CountDownLatch는 이름에서도 알 수 있듯이 "카운트다운 래치(잠금 장치)"입니다. 특정 숫자로 초기화되며, 하나 이상의 스레드가 이 숫자가 0이 될 때까지 기다릴 수 있도록 합니다.

주요 특징 및 동작 방식은 다음과 같습니다.

  1. 초기화: CountDownLatch(int count) 생성자를 사용하여 특정 양수 값으로 초기화합니다. 이 count는 완료되기를 기다리는 이벤트 또는 작업의 총 개수를 나타냅니다.
  2. countDown(): 래치를 초기화한 스레드 외의 다른 스레드들은 자신이 맡은 작업을 완료했을 때 countDown() 메서드를 호출하여 내부 카운트 값을 1 감소시킵니다.
  3. await(): 래치가 0이 될 때까지 기다리는 스레드는 await() 메서드를 호출합니다. await() 메서드는 내부 카운트가 0이 될 때까지 해당 스레드를 블록(Blocked) 상태로 만듭니다. 카운트가 0이 되는 순간, await()를 호출했던 모든 스레드는 블록 상태에서 벗어나 다음 코드를 실행하게 됩니다.
  4. 일회성: CountDownLatch는 일회성입니다. 일단 카운트가 0에 도달하면 더 이상 리셋할 수 없습니다. 이후에 await()를 호출하는 스레드는 즉시 통과하며, countDown()를 호출해도 카운트는 변하지 않습니다.

언제 CountDownLatch를 사용할까요?

  • 메인 스레드가 여러 워커 스레드의 초기화나 특정 단계의 작업 완료를 기다려야 할 때
  • 여러 서비스나 컴포넌트가 준비될 때까지 애플리케이션의 시작을 지연시켜야 할 때
  • 성능 테스트에서 여러 클라이언트 스레드를 동시에 시작시키고 결과를 기다릴 때

CountDownLatch는 여러 스레드가 "준비되었다"는 신호를 보내고, 단일 스레드가 이 신호들을 모두 받은 후 진행하는 시나리오에 적합합니다. (만약 여러 스레드가 서로를 기다려 동기화하는 시나리오라면 CyclicBarrier를 고려할 수도 있습니다.)

2. Kotlin + Spring 환경에서 CountDownLatch 예제

이제 Spring Boot와 Kotlin 환경에서 CountDownLatch를 사용하여 여러 비동기 작업의 완료를 기다리는 예제를 만들어 보겠습니다.

시나리오:

메인 서비스가 여러 개의 비동기 작업을 시작시키고, 이 모든 비동기 작업이 완료될 때까지 기다린 후 최종 결과를 처리하는 시나리오를 구현합니다. Spring의 TaskExecutor를 사용하여 비동기 작업을 실행하겠습니다.

프로젝트 설정:

  • Spring Boot 프로젝트 (Kotlin 사용)
  • spring-boot-starter-web (간단한 HTTP 요청 처리를 위해)

코드 구현:

먼저 비동기 작업을 실행할 TaskExecutor를 설정합니다. Spring Boot는 기본 TaskExecutor를 제공하지만, 여기서는 명시적으로 설정하는 예시를 보여줍니다.

Kotlin
 
// src/main/kotlin/com/example/latchdemo/config/TaskExecutorConfig.kt
package com.example.latchdemo.config

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.concurrent.Executor

@Configuration
@EnableAsync // @Async 어노테이션 사용을 활성화합니다.
class TaskExecutorConfig {

    @Bean
    fun taskExecutor(): Executor {
        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = 5 // 기본 스레드 수
        executor.maxPoolSize = 10 // 최대 스레드 수
        executor.setQueueCapacity(25) // 큐 용량
        executor.setThreadNamePrefix("AsyncTask-") // 스레드 이름 접두사
        executor.initialize()
        return executor
    }
}

다음은 비동기 작업을 수행할 서비스 클래스입니다. @Async 어노테이션을 사용하여 이 메서드가 별도의 스레드에서 실행되도록 합니다.

Kotlin
 
// src/main/kotlin/com/example/latchdemo/service/WorkerService.kt
package com.example.latchdemo.service

import org.slf4j.LoggerFactory
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

@Service
class WorkerService {

    private val logger = LoggerFactory.getLogger(WorkerService::class.java)

    @Async // 이 메서드는 별도의 스레드에서 비동기적으로 실행됩니다.
    fun performTask(taskId: Int, latch: CountDownLatch) {
        try {
            logger.info("태스크 $taskId 시작. 남은 래치 카운트: ${latch.count}")
            // 실제 작업 수행 (여기서는 딜레이를 줍니다)
            val workDuration = (1000..3000).random().toLong() // 1~3초 랜덤 작업 시간
            TimeUnit.MILLISECONDS.sleep(workDuration)
            logger.info("태스크 $taskId 완료. 작업 시간: ${workDuration}ms")
        } catch (e: InterruptedException) {
            logger.error("태스크 $taskId 중단됨", e)
            // 인터럽트 발생 시 처리 (필요에 따라)
        } finally {
            //!!! 중요: 작업 완료 후 CountDown 호출
            latch.countDown()
            logger.info("태스크 $taskId countDown 호출. 남은 래치 카운트: ${latch.count}")
        }
    }
}

WorkerService의 performTask 메서드는 CountDownLatch 인자를 받습니다. 작업이 완료되면 finally 블록에서 latch.countDown()을 호출하여 래치의 카운트를 감소시킵니다. finally를 사용하는 이유는 작업 중 예외가 발생하더라도 countDown()이 호출되도록 보장하여 메인 스레드가 무한히 대기하는 상황을 방지하기 위함입니다.

이제 메인 서비스입니다. 이 서비스는 여러 개의 performTask를 시작시키고, 모든 태스크가 완료될 때까지 await()를 호출하여 기다립니다.

Kotlin
 
// src/main/kotlin/com/example/latchdemo/service/MainOrchestratorService.kt
package com.example.latchdemo.service

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

@Service
class MainOrchestratorService(
    private val workerService: WorkerService
) {

    private val logger = LoggerFactory.getLogger(MainOrchestratorService::class.java)

    fun startTasksAndAwaitCompletion(numberOfTasks: Int) {
        logger.info("============= 태스크 오케스트레이션 시작 =============")

        // 래치 초기화: 기다릴 작업의 개수만큼 설정
        val latch = CountDownLatch(numberOfTasks)
        logger.info("CountDownLatch 초기화됨 (카운트: $numberOfTasks)")

        // 여러 워커 태스크 실행
        for (i in 1..numberOfTasks) {
            workerService.performTask(i, latch) // 비동기 메서드 호출
        }

        logger.info("모든 태스크 실행 요청 완료. 래치 카운트가 0이 될 때까지 대기 중...")

        try {
            // 래치 카운트가 0이 될 때까지 대기
            val awaitSuccess = latch.await(10, TimeUnit.SECONDS) // 최대 10초 대기

            if (awaitSuccess) {
                logger.info("============= 모든 태스크 완료 감지. 대기 해제 =============")
            } else {
                logger.warn("============= 래치 대기 시간 초과! 일부 태스크가 완료되지 않았을 수 있습니다. =============")
                // 시간 초과 시 추가적인 에러 처리 로직 구현 가능
            }

        } catch (e: InterruptedException) {
            logger.error("대기 중 인터럽트 발생!", e)
            // 인터럽트 발생 시 처리 (필요에 따라)
        }

        logger.info("============= 오케스트레이션 종료 =============")
    }
}

MainOrchestratorService의 startTasksAndAwaitCompletion 메서드는 다음과 같이 작동합니다.

  1. CountDownLatch(numberOfTasks)를 생성하여 기다릴 태스크의 개수로 초기화합니다.
  2. for 루프를 돌며 numberOfTasks 만큼 workerService.performTask를 호출합니다. @Async 덕분에 이 메서드 호출은 즉시 반환되고 실제 작업은 TaskExecutor의 스레드 풀에서 비동기적으로 실행됩니다.
  3. 루프가 끝난 후 latch.await(10, TimeUnit.SECONDS)를 호출합니다. 메인 스레드는 여기서 블록되어 latch의 카운트가 0이 되거나, 최대 10초가 경과할 때까지 기다립니다.
  4. 모든 워커 태스크가 countDown()을 호출하여 래치 카운트가 0이 되면 await()는 성공적으로 반환되고 메인 스레드가 다시 실행됩니다. 만약 10초 안에 카운트가 0이 되지 않으면 await()는 false를 반환합니다.

이 서비스를 트리거하는 간단한 REST 컨트롤러를 추가해 보겠습니다.

Kotlin
 
// src/main/kotlin/com/example/latchdemo/controller/TaskController.kt
package com.example.latchdemo.controller

import com.example.latchdemo.service.MainOrchestratorService
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController

@RestController
class TaskController(
    private val mainOrchestratorService: MainOrchestratorService
) {

    @GetMapping("/start-tasks")
    fun startTasks(@RequestParam(defaultValue = "5") numberOfTasks: Int): String {
        mainOrchestratorService.startTasksAndAwaitCompletion(numberOfTasks)
        return "$numberOfTasks개의 태스크 실행 요청 및 완료 대기 완료. 로그를 확인하세요."
    }
}

실행 결과 확인:

애플리케이션을 실행하고 http://localhost:8080/start-tasks?numberOfTasks=5 와 같이 요청을 보내면, 콘솔 로그에서 여러 AsyncTask- 스레드가 시작되고, 각 태스크가 완료될 때마다 래치 카운트가 줄어들며, 최종적으로 모든 태스크 완료 후 메인 스레드의 대기가 해제되는 과정을 확인할 수 있습니다.

... (로그 시작) ...
c.e.l.service.MainOrchestratorService    : ============= 태스크 오케스트레이션 시작 =============
c.e.l.service.MainOrchestratorService    : CountDownLatch 초기화됨 (카운트: 5)
c.e.l.service.MainOrchestratorService    : 모든 태스크 실행 요청 완료. 래치 카운트가 0이 될 때까지 대기 중...
c.e.l.service.WorkerService              : 태스크 1 시작. 남은 래치 카운트: 5 [AsyncTask-1]
c.e.l.service.WorkerService              : 태스크 2 시작. 남은 래치 카운트: 5 [AsyncTask-2]
c.e.l.service.WorkerService              : 태스크 3 시작. 남은 래치 카운트: 5 [AsyncTask-3]
c.e.l.service.WorkerService              : 태스크 4 시작. 남은 래치 카운트: 5 [AsyncTask-4]
c.e.l.service.WorkerService              : 태스크 5 시작. 남은 래치 카운트: 5 [AsyncTask-5]
c.e.l.service.WorkerService              : 태스크 3 완료. 작업 시간: XXXXms [AsyncTask-3]
c.e.l.service.WorkerService              : 태스크 3 countDown 호출. 남은 래치 카운트: 4 [AsyncTask-3]
c.e.l.service.WorkerService              : 태스크 1 완료. 작업 시간: YYYYms [AsyncTask-1]
c.e.l.service.WorkerService              : 태스크 1 countDown 호출. 남은 래치 카운트: 3 [AsyncTask-1]
c.e.l.service.WorkerService              : 태스크 5 완료. 작업 시간: ZZZZms [AsyncTask-5]
c.e.l.service.WorkerService              : 태스크 5 countDown 호출. 남은 래치 카운트: 2 [AsyncTask-5]
c.e.l.service.WorkerService              : 태스크 2 완료. 작업 시간: AAAAms [AsyncTask-2]
c.e.l.service.WorkerService              : 태스크 2 countDown 호출. 남은 래치 카운트: 1 [AsyncTask-2]
c.e.l.service.WorkerService              : 태스크 4 완료. 작업 시간: BBBBms [AsyncTask-4]
c.e.l.service.WorkerService              : 태스크 4 countDown 호출. 남은 래치 카운트: 0 [AsyncTask-4]
c.e.l.service.MainOrchestratorService    : ============= 모든 태스크 완료 감지. 대기 해제 =============
c.e.l.service.MainOrchestratorService    : ============= 오케스트레이션 종료 =============
... (로그 종료) ...

(로그의 작업 시간 및 순서는 실행 시마다 달라질 수 있습니다.)

로그를 보면 메인 스레드는 CountDownLatch 초기화 후 await에서 대기하고, 비동기 태스크 스레드들이 각자 작업을 수행하고 countDown을 호출하여 래치 카운트를 줄입니다. 마지막 태스크가 countDown을 호출하여 카운트가 0이 되면, 메인 스레드의 대기가 풀리고 다음 코드를 실행하는 것을 명확히 볼 수 있습니다.

결론

CountDownLatch는 여러 개의 동시 작업이 완료될 때까지 특정 스레드를 대기시켜야 하는 상황에서 매우 유용하게 사용될 수 있습니다. Spring 환경에서 TaskExecutor나 @Async와 함께 사용하면 비동기 작업의 완료 시점을 효과적으로 제어할 수 있습니다.

Kotlin에서도 자바의 java.util.concurrent 라이브러리를 그대로 활용할 수 있으며, Kotlin의 간결한 문법과 함께 사용하면 더욱 효율적인 동시성 코드를 작성할 수 있습니다.

물론 Kotlin Coroutines와 같은 최신 비동기/동시성 메커니즘은 CountDownLatch와는 다른 방식(구조화된 동시성 등)으로 작업을 조율하지만, 기존 자바 라이브러리와의 상호운용성이나 특정 패턴에서는 CountDownLatch가 여전히 유용한 도구임을 기억하시기 바랍니다.

이 예제를 통해 CountDownLatch의 기본적인 사용법과 Spring/Kotlin 환경에서의 적용 방법을 이해하셨기를 바랍니다. 동시성 프로그래밍에서 작업을 조율해야 할 때 CountDownLatch를 효과적으로 활용해 보세요!

반응형

관련글 더보기

댓글 영역