비동기 분석 프로세스를 붙이면서 처리해야할 수많은 내용중에 하나는
SSE로 진행 상황을 알리면서 해당 상태를 DB에도 남기는 것이다
그래서 오늘은
- AnalysisJob Persistence 구조 보강
- Async Processor에서 DB 상태 업데이트까지 책임지기
이 두 가지를 했다.
1. AnalysisJobPersistencePort 역할 정리
기존에는 save() 위주였는데,
비동기 프로세스에서 쓰기엔 부족해서 의도를 드러내는 메서드들을 추가했다.
Persistence Port
package com.woopi.safehome.domain.deed.application.port.outbound
import com.woopi.safehome.domain.deed.model.AnalysisJob
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
interface AnalysisJobPersistencePort {
/**
* 분석 작업 저장
*/
fun create(analysisJobCreate: AnalysisJob.Create): AnalysisJob.Data
/**
* Job 상태 / 단계 / 설명 업데이트
* (IN_PROGRESS, FAILED 등)
*/
fun update(command: AnalysisJob.Update): AnalysisJob.Data
/**
* JobId 기준 조회 (SSE 재연결, 결과 조회용)
*/
fun findByJobId(jobId: String): AnalysisJob.Data?
/**
* JobId 기준 상태 + 단계 변경 (편의 메서드)
*/
fun updateStatus(
jobId: String,
status: JobStatus,
step: AnalysisStep? = null,
description: String? = null
): AnalysisJob.Data
/**
* Job 완료 처리 (결과 저장 포함)
*/
fun complete(
jobId: String,
result: String
): AnalysisJob.Data
}
- 상태 변경을 Command처럼 표현
- Async 쪽에서 JPA / Entity를 전혀 모르게 함
2. Persistence Adapter 수정
Adapter에서는 단순히
- jobId로 엔티티 조회
- 상태/단계/설명 갱신
- 저장
만 책임진다
package com.woopi.safehome.domain.deed.adapter.outbound.persistence
import com.woopi.safehome.domain.deed.adapter.outbound.persistence.jpa.AnalysisJobEntityMapper
import com.woopi.safehome.domain.deed.adapter.outbound.persistence.jpa.AnalysisJobRepository
import com.woopi.safehome.domain.deed.application.port.outbound.AnalysisJobPersistencePort
import com.woopi.safehome.domain.deed.model.AnalysisJob
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import com.woopi.safehome.global.exception.BusinessException
import com.woopi.safehome.global.exception.ErrorCode
import org.springframework.stereotype.Component
@Component
class AnalysisJobPersistenceAdapter(
private val analysisJobRepository: AnalysisJobRepository
) : AnalysisJobPersistencePort {
override fun create(analysisJobCreate: AnalysisJob.Create): AnalysisJob.Data {
return analysisJobRepository.save(
AnalysisJobEntityMapper.toEntity(analysisJobCreate)
).let { AnalysisJobEntityMapper.toModel(it) }
}
override fun update(command: AnalysisJob.Update): AnalysisJob.Data {
val entity = analysisJobRepository.findById(command.id)
.orElseThrow { BusinessException(ErrorCode.NOT_FOUND, "AnalysisJob not found: id=${command.id}") }
entity.status = command.status
entity.step = command.step
entity.result = command.result
entity.description = command.description
return analysisJobRepository.save(entity)
.let { AnalysisJobEntityMapper.toModel(it) }
}
override fun findByJobId(jobId: String): AnalysisJob.Data? {
return analysisJobRepository.findByJobId(jobId)
?.let { AnalysisJobEntityMapper.toModel(it) }
}
override fun updateStatus(
jobId: String,
status: JobStatus,
step: AnalysisStep?,
description: String?
): AnalysisJob.Data {
val entity = analysisJobRepository.findByJobId(jobId)
?: throw BusinessException(ErrorCode.NOT_FOUND)
entity.status = status
entity.step = step
entity.description = description
return analysisJobRepository.save(entity)
.let { AnalysisJobEntityMapper.toModel(it) }
}
override fun complete(
jobId: String,
result: String
): AnalysisJob.Data {
val entity = analysisJobRepository.findByJobId(jobId)
?: throw BusinessException(ErrorCode.NOT_FOUND, "AnalysisJob not found: jobId=$jobId")
entity.status = JobStatus.COMPLETED
entity.step = AnalysisStep.POST_PROCESSING
entity.result = result
return analysisJobRepository.save(entity)
.let { AnalysisJobEntityMapper.toModel(it) }
}
}
3. Async Processor에 DB 반영 추가
기존 Async Processor는
SSE 알림만 쏘고 있었는데,
이제는 DB 상태 업데이트 → SSE 알림 순서로 바꿨다.
수정된 Async Processor
package com.woopi.safehome.domain.analysisjob.application.service
import com.woopi.safehome.domain.analysisjob.application.port.outbound.AnalysisSseNotifierPort
import com.woopi.safehome.domain.deed.application.port.outbound.AnalysisJobPersistencePort
import com.woopi.safehome.domain.deed.domain.service.PdfValidationService
import com.woopi.safehome.domain.deed.domain.service.exception.InvalidPdfException
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
import org.springframework.web.multipart.MultipartFile
@Service
class AnalysisAsyncProcessor(
private val analysisProgressPort: AnalysisSseNotifierPort,
private val pdfValidationService: PdfValidationService,
private val analysisJobPersistencePort: AnalysisJobPersistencePort
) {
@Async
fun process(jobId: String, file: MultipartFile) {
fun updateAndNotify(
status: JobStatus,
step: AnalysisStep,
message: String
) {
analysisJobPersistencePort.updateStatus(
jobId = jobId,
status = status,
step = step,
description = message
)
analysisProgressPort.notifyStep(
jobId,
status,
step,
message
)
}
updateAndNotify(
JobStatus.IN_PROGRESS,
AnalysisStep.PDF_PARSING,
"첨부된 파일을 분석중이에요"
)
try {
pdfValidationService.validate(file)
} catch (e: InvalidPdfException) {
updateAndNotify(
JobStatus.FAILED,
AnalysisStep.PDF_PARSING,
e.message ?: "PDF 검증 실패"
)
return
}
Thread.sleep(1000)
updateAndNotify(
JobStatus.IN_PROGRESS,
AnalysisStep.LLM_ANALYSIS,
"AI가 등본을 분석중이에요"
)
Thread.sleep(1000)
updateAndNotify(
JobStatus.IN_PROGRESS,
AnalysisStep.POST_PROCESSING,
"분석한 내용을 정리중이에요"
)
analysisJobPersistencePort.complete(
jobId = jobId,
result = "분석 성공"
)
analysisProgressPort.notifyStep(
jobId,
JobStatus.COMPLETED,
AnalysisStep.POST_PROCESSING,
"완료 됐습니다!"
)
}
}
4. 이번 작업 정리
- Async 작업 중 모든 상태 변화는 DB에 남김
- SSE는 상태 변경을 전달하는 보조 수단
- PDF 검증 실패도 Job FAILED 상태로 명확히 기록
- 분석 프로세스가 중단돼도 상태 추적 가능
마무리
Async Processor가 “분석 실행자 + 상태 전이 관리자” 역할을 하도록 정리했고,
Persistence Port는 그걸 받쳐주는 구조로 다듬었다.
'개발일지' 카테고리의 다른 글
| [개발일지_safeHome] 16. 등기부등본 비동기 분석 프로세스_PDF 검증_1 (0) | 2026.02.05 |
|---|---|
| [개발일지_safeHome] 15. 등본 분석 프로세스 재정리 (0) | 2026.02.05 |
| [개발일지_safeHome] 14. react native로 app 시작하기 (0) | 2026.01.18 |
| [개발일지_safeHome] 13. SSE 샘플 추가 (0) | 2026.01.15 |
| [개발일지_safeHome] 11. redis 세팅 추가 (0) | 2025.12.29 |