개발일지

[개발일지_safeHome] 17. 등기부등본 비동기 분석 프로세스_상태변경처리

woopii 2026. 2. 6. 01:32

 

비동기 분석 프로세스를 붙이면서 처리해야할 수많은 내용중에 하나는

SSE로 진행 상황을 알리면서 해당 상태를 DB에도 남기는 것이다

 

그래서 오늘은

  • AnalysisJob Persistence 구조 보강
  • Async Processor에서 DB 상태 업데이트까지 책임지기

이 두 가지를 했다.


1. AnalysisJobPersistencePort 역할 정리

기존에는 save() 위주였는데,
비동기 프로세스에서 쓰기엔 부족해서 의도를 드러내는 메서드들을 추가했다.

Persistence Port

package com.woopi.safehome.domain.deed.application.port.outbound

import com.woopi.safehome.domain.deed.model.AnalysisJob
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus

interface AnalysisJobPersistencePort {

    /**
     * 분석 작업 저장
     */
    fun create(analysisJobCreate: AnalysisJob.Create): AnalysisJob.Data

    /**
     * Job 상태 / 단계 / 설명 업데이트
     * (IN_PROGRESS, FAILED 등)
     */
    fun update(command: AnalysisJob.Update): AnalysisJob.Data

    /**
     * JobId 기준 조회 (SSE 재연결, 결과 조회용)
     */
    fun findByJobId(jobId: String): AnalysisJob.Data?

    /**
     * JobId 기준 상태 + 단계 변경 (편의 메서드)
     */
    fun updateStatus(
        jobId: String,
        status: JobStatus,
        step: AnalysisStep? = null,
        description: String? = null
    ): AnalysisJob.Data

    /**
     * Job 완료 처리 (결과 저장 포함)
     */
    fun complete(
        jobId: String,
        result: String
    ): AnalysisJob.Data

}
 
  • 상태 변경을 Command처럼 표현
  • Async 쪽에서 JPA / Entity를 전혀 모르게 함

2. Persistence Adapter 수정

Adapter에서는 단순히

  • jobId로 엔티티 조회
  • 상태/단계/설명 갱신
  • 저장

만 책임진다

package com.woopi.safehome.domain.deed.adapter.outbound.persistence

import com.woopi.safehome.domain.deed.adapter.outbound.persistence.jpa.AnalysisJobEntityMapper
import com.woopi.safehome.domain.deed.adapter.outbound.persistence.jpa.AnalysisJobRepository
import com.woopi.safehome.domain.deed.application.port.outbound.AnalysisJobPersistencePort
import com.woopi.safehome.domain.deed.model.AnalysisJob
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import com.woopi.safehome.global.exception.BusinessException
import com.woopi.safehome.global.exception.ErrorCode
import org.springframework.stereotype.Component

@Component
class AnalysisJobPersistenceAdapter(
    private val analysisJobRepository: AnalysisJobRepository
) : AnalysisJobPersistencePort {

    override fun create(analysisJobCreate: AnalysisJob.Create): AnalysisJob.Data {
        return analysisJobRepository.save(
            AnalysisJobEntityMapper.toEntity(analysisJobCreate)
        ).let { AnalysisJobEntityMapper.toModel(it) }
    }

    override fun update(command: AnalysisJob.Update): AnalysisJob.Data {
        val entity = analysisJobRepository.findById(command.id)
            .orElseThrow { BusinessException(ErrorCode.NOT_FOUND, "AnalysisJob not found: id=${command.id}") }

        entity.status = command.status
        entity.step = command.step
        entity.result = command.result
        entity.description = command.description

        return analysisJobRepository.save(entity)
            .let { AnalysisJobEntityMapper.toModel(it) }
    }

    override fun findByJobId(jobId: String): AnalysisJob.Data? {
        return analysisJobRepository.findByJobId(jobId)
            ?.let { AnalysisJobEntityMapper.toModel(it) }
    }

    override fun updateStatus(
        jobId: String,
        status: JobStatus,
        step: AnalysisStep?,
        description: String?
    ): AnalysisJob.Data {
        val entity = analysisJobRepository.findByJobId(jobId)
            ?: throw BusinessException(ErrorCode.NOT_FOUND)

        entity.status = status
        entity.step = step
        entity.description = description

        return analysisJobRepository.save(entity)
            .let { AnalysisJobEntityMapper.toModel(it) }
    }

    override fun complete(
        jobId: String,
        result: String
    ): AnalysisJob.Data {
        val entity = analysisJobRepository.findByJobId(jobId)
            ?: throw BusinessException(ErrorCode.NOT_FOUND, "AnalysisJob not found: jobId=$jobId")

        entity.status = JobStatus.COMPLETED
        entity.step = AnalysisStep.POST_PROCESSING
        entity.result = result

        return analysisJobRepository.save(entity)
            .let { AnalysisJobEntityMapper.toModel(it) }
    }

}

3. Async Processor에 DB 반영 추가

기존 Async Processor는
SSE 알림만 쏘고 있었는데,
이제는 DB 상태 업데이트 → SSE 알림 순서로 바꿨다.

수정된 Async Processor

 
package com.woopi.safehome.domain.analysisjob.application.service

import com.woopi.safehome.domain.analysisjob.application.port.outbound.AnalysisSseNotifierPort
import com.woopi.safehome.domain.deed.application.port.outbound.AnalysisJobPersistencePort
import com.woopi.safehome.domain.deed.domain.service.PdfValidationService
import com.woopi.safehome.domain.deed.domain.service.exception.InvalidPdfException
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import org.springframework.web.multipart.MultipartFile

@Service
class AnalysisAsyncProcessor(
    private val analysisProgressPort: AnalysisSseNotifierPort,
    private val pdfValidationService: PdfValidationService,
    private val analysisJobPersistencePort: AnalysisJobPersistencePort
) {

    @Async
    fun process(jobId: String, file: MultipartFile) {

        fun updateAndNotify(
            status: JobStatus,
            step: AnalysisStep,
            message: String
        ) {
            analysisJobPersistencePort.updateStatus(
                jobId = jobId,
                status = status,
                step = step,
                description = message
            )
            analysisProgressPort.notifyStep(
                jobId,
                status,
                step,
                message
            )
        }

        updateAndNotify(
            JobStatus.IN_PROGRESS,
            AnalysisStep.PDF_PARSING,
            "첨부된 파일을 분석중이에요"
        )

        try {
            pdfValidationService.validate(file)
        } catch (e: InvalidPdfException) {
            updateAndNotify(
                JobStatus.FAILED,
                AnalysisStep.PDF_PARSING,
                e.message ?: "PDF 검증 실패"
            )
            return
        }

        Thread.sleep(1000)

        updateAndNotify(
            JobStatus.IN_PROGRESS,
            AnalysisStep.LLM_ANALYSIS,
            "AI가 등본을 분석중이에요"
        )

        Thread.sleep(1000)

        updateAndNotify(
            JobStatus.IN_PROGRESS,
            AnalysisStep.POST_PROCESSING,
            "분석한 내용을 정리중이에요"
        )

        analysisJobPersistencePort.complete(
            jobId = jobId,
            result = "분석 성공"
        )

        analysisProgressPort.notifyStep(
            jobId,
            JobStatus.COMPLETED,
            AnalysisStep.POST_PROCESSING,
            "완료 됐습니다!"
        )
    }
}

4. 이번 작업 정리

  • Async 작업 중 모든 상태 변화는 DB에 남김
  • SSE는 상태 변경을 전달하는 보조 수단
  • PDF 검증 실패도 Job FAILED 상태로 명확히 기록
  • 분석 프로세스가 중단돼도 상태 추적 가능

마무리

Async Processor가 “분석 실행자 + 상태 전이 관리자” 역할을 하도록 정리했고,
Persistence Port는 그걸 받쳐주는 구조로 다듬었다.