등본 분석 상황을 실시간으로 알려주기 위해서웹소켓을 사용할려고한다,
웹소켓이 아닌 SSE(Server Sent Event)를 활용할 예정
(중간에 앱을 종료했을때는, 알림을 사용하려고 한다)
SSE(Server-Sent Events)가 더 적합한 이유
- 단방향 통신만 필요 - 서버에서 프론트로만 진행 상황을 전송
- 더 간단한 구현 - WebSocket보다 설정이 단순함
- 자동 재연결 - 연결 끊김 시 브라우저가 자동으로 재연결 시도
- HTTP 기반 - 별도의 프로토콜이나 복잡한 설정 불필요
예를 들어
---
1. PDF 파싱
--> "첨부된 파일을 분석중이에요"
2. LLM 분석
--> AI가 등본을 분석중이에요"
3. 이후 처리
--> "분석한 내용을 정리중이에요"
4. 완료
--> "완료 됐습니다!"
1. 작업 관련 DDL 생성
job_id는 비동기로 처리할떄 작업에 대한 키가 필요하기때문에 job_id를 두는 것이다.
-- 분석 job 테이블
CREATE TABLE IF NOT EXISTS analysis_jobs (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT 'ID',
job_id VARCHAR(255) NOT NULL COMMENT '작업 ID(UUID)',
file_name VARCHAR(255) NOT NULL COMMENT '파일명',
file_size BIGINT NOT NULL COMMENT '파일 크기',
status VARCHAR(50) NOT NULL COMMENT '상태(예: PENDING, IN_PROGRESS, COMPLETED, FAILED)',
step VARCHAR(100) NULL COMMENT '현재 단계(예 : PDF_PARSING, LLM_ANALYSIS, POST_PROCESSING)',
result TEXT NULL COMMENT '분석 결과 JSON',
description TEXT NULL COMMENT '설명 (에러메시지 등)',
is_deleted BOOLEAN NOT NULL DEFAULT FALSE COMMENT '삭제 여부',
created_id BIGINT NOT NULL COMMENT '생성자 ID',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '생성 시간',
updated_id BIGINT NULL COMMENT '수정자 ID',
updated_at TIMESTAMP NULL DEFAULT NULL COMMENT '수정 시간'
);
CREATE INDEX IF NOT EXISTS idx_analysis_jobs_job_id ON analysis_jobs(job_id);
CREATE INDEX IF NOT EXISTS idx_analysis_jobs_status ON analysis_jobs(status);
CREATE INDEX IF NOT EXISTS idx_analysis_jobs_step ON analysis_jobs(step);
2. entity 생성
단계를 명확하게 하기 위해, 진행상태를 나타내는 JobStatus 와 작업 단위를 나타내는 AnalysisStep 을 따로 두었다
package com.woopi.safehome.domain.deed.adapter.outbound.persistence.jpa
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import com.woopi.safehome.global.`object`.BaseEntity
import jakarta.persistence.*
@Entity
@Table(name = "analysis_jobs")
class AnalysisJobEntity(
@Column(name = "job_id")
var jobId: String,
@Column(name = "file_name")
var fileName: String,
@Column(name = "file_size")
var fileSize: Long,
@Column(name = "status")
@Enumerated(EnumType.STRING)
var status: JobStatus,
@Column(name = "step")
@Enumerated(EnumType.STRING)
var step: AnalysisStep? = null,
@Column(name = "result")
var result: String? = null,
@Column(name = "description")
var description: String? = null,
) : BaseEntity() {
}
3. 모델 생성
package com.woopi.safehome.domain.deed.model
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
object AnalysisJob {
data class Query(
val id: Long,
val status: JobStatus,
val step: AnalysisStep? = null,
)
data class Create(
val jobId: String,
val fileName: String,
val fileSize: Long,
val status: JobStatus,
val step: AnalysisStep? = null,
val result: String? = null,
val description: String? = null
)
data class Update(
val id: Long,
val status: JobStatus,
val step: AnalysisStep? = null,
val result: String? = null,
val description: String? = null
)
data class Data(
val id: Long,
val jobId: String,
val fileName: String,
val fileSize: Long,
val status: JobStatus,
val step: AnalysisStep? = null,
val result: String? = null,
val description: String? = null
)
}
4. (임시) SSE 처리를 위한 AnalysisNotifierPort, sseNotifier 생성
package com.woopi.safehome.domain.analysis.application.port.outbound
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
interface AnalysisNotifierPort {
fun notifyStep(
jobId: String,
status: JobStatus,
step: AnalysisStep?,
message: String
)
}
package com.woopi.safehome.domain.analysis.adapter.outbound.sse
import com.woopi.safehome.domain.analysis.application.port.outbound.AnalysisNotifierPort
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import org.springframework.stereotype.Component
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import java.time.LocalDateTime
import java.util.concurrent.ConcurrentHashMap
@Component
class AnalysisSseNotifier: AnalysisNotifierPort {
private val emitters = ConcurrentHashMap<String, SseEmitter>()
fun createEmitter(jobId: String): SseEmitter {
val emitter = SseEmitter(300_000L) // 5분 타임아웃
emitter.onCompletion { emitters.remove(jobId) }
emitter.onTimeout { emitters.remove(jobId) }
emitter.onError { emitters.remove(jobId) }
emitters[jobId] = emitter
return emitter
}
override fun notifyStep(
jobId: String,
status: JobStatus,
step: AnalysisStep?,
message: String
) {
val payload = mapOf(
"jobId" to jobId,
"status" to status.name,
"step" to step?.name,
"message" to message,
"timestamp" to LocalDateTime.now().toString()
)
emitters[jobId]?.let { emitter ->
emitter.send(SseEmitter.event().data(payload))
// 완료되면 명시적으로 종료
if (status == JobStatus.COMPLETED) {
emitter.complete()
emitters.remove(jobId)
}
}
}
}
5. (임시) 작업 및 notifier 처리를 하기 위한 AsyncProcessor 생성 + config
package com.woopi.safehome.global.config
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
@Configuration
@EnableAsync
class AsyncConfig : WebMvcConfigurer {
override fun configureAsyncSupport(configurer: AsyncSupportConfigurer) {
configurer.setDefaultTimeout(300_000) // 5분 타임아웃
}
}
package com.woopi.safehome.domain.analysis.application.service
import com.woopi.safehome.domain.analysis.application.port.outbound.AnalysisNotifierPort
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service
@Service
class AnalysisAsyncProcessor(
private val analysisProgressPort: AnalysisNotifierPort
) {
@Async
fun process(jobId: String) {
analysisProgressPort.notifyStep(
jobId,
JobStatus.IN_PROGRESS,
AnalysisStep.PDF_PARSING,
"첨부된 파일을 분석중이에요"
)
Thread.sleep(1000)
analysisProgressPort.notifyStep(
jobId,
JobStatus.IN_PROGRESS,
AnalysisStep.LLM_ANALYSIS,
"AI가 등본을 분석중이에요"
)
Thread.sleep(1000)
analysisProgressPort.notifyStep(
jobId,
JobStatus.IN_PROGRESS,
AnalysisStep.POST_PROCESSING,
"분석한 내용을 정리중이에요"
)
analysisProgressPort.notifyStep(
jobId,
JobStatus.COMPLETED,
AnalysisStep.POST_PROCESSING,
"완료 됐습니다!"
)
}
}
6. (임시) sample usecase에서 비동기 처리를 위한 jobid 조회 및 async processor 처리 추가
package com.woopi.safehome.domain._sample.application.usecase
import com.woopi.safehome.domain._sample.adapter.inbound.web.SampleDtoMapper
import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleRequest
import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleResponse
import com.woopi.safehome.domain._sample.application.port.inbound.SampleUseCase
import com.woopi.safehome.domain._sample.application.port.outbound.SamplePersistencePort
import com.woopi.safehome.domain.analysis.application.service.AnalysisAsyncProcessor
import com.woopi.safehome.global.exception.BusinessException
import com.woopi.safehome.global.exception.ErrorCode
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*
@Transactional(readOnly = true)
@Service
class SampleUseCaseImpl(
private val samplePersistencePort: SamplePersistencePort,
private val analysisAsyncProcessor: AnalysisAsyncProcessor
) : SampleUseCase {
override fun getJobId(): String {
return UUID.randomUUID().toString()
}
override fun sseSample(jobId: String) {
analysisAsyncProcessor.process(jobId)
}
}
7. (임시) adapter에 jobid 추가, process api 추가
package com.woopi.safehome.domain._sample.adapter.inbound.web
import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleRequest
import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleResponse
import com.woopi.safehome.domain._sample.application.port.inbound.SampleUseCase
import com.woopi.safehome.domain.analysis.adapter.outbound.sse.AnalysisSseNotifier
import com.woopi.safehome.global.response.ApiResponse
import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.tags.Tag
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
@Tag(name = "샘플 API", description = "샘플 API")
@RestController
@RequestMapping("/api/sample")
class SampleInboundWebAdapter (
private val sampleUseCase: SampleUseCase,
private val analysisSseNotifier: AnalysisSseNotifier
) {
@Operation(summary = "jobid 조회", description = "sse 샘플")
@PostMapping("/sample/analysis/start")
fun websocketSample (): ApiResponse<String> {
return ApiResponse.success(sampleUseCase.getJobId())
}
@GetMapping("/analysis/{jobId}/stream", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun streamProgress(@PathVariable jobId: String): SseEmitter {
val emitter = analysisSseNotifier.createEmitter(jobId)
// SSE 연결 후 작업 시작
sampleUseCase.sseSample(jobId)
return emitter
}
}
8. 테스트
작성한 샘플의 전체 흐름은 다음과 같음
전체 흐름
1. 클라이언트 → POST /sample/analysis/start
- jobId 생성
- jobId 반환 (작업 시작 안함)
2. 클라이언트 → GET /analysis/{jobId}/stream
- createEmitter(jobId) 호출
→ SseEmitter 생성
→ emitters 맵에 jobId로 저장
→ 콜백 등록 (onCompletion, onTimeout, onError)
- sseSample(jobId) 호출
→ analysisAsyncProcessor.process(jobId) 실행
→ @Async로 별도 스레드에서 비동기 시작
- emitter 반환 (SSE 연결 유지)
3. 백그라운드 (비동기 스레드)
- process(jobId) 실행 중
→ notifyStep() 호출
→ emitters[jobId] 찾기
→ 찾은 emitter로 데이터 전송
→ 클라이언트가 실시간 수신
- 반복 (PDF_PARSING → LLM_ANALYSIS → POST_PROCESSING)
- COMPLETED 상태 전송
→ emitter.complete() 호출
→ SSE 연결 종료

'개발일지' 카테고리의 다른 글
| [개발일지_safeHome] 14. react native로 app 시작하기 (0) | 2026.01.18 |
|---|---|
| [개발일지_safeHome] 11. redis 세팅 추가 (0) | 2025.12.29 |
| [개발일지_safeHome] 10. 등기부등본 분석 설계는 어떻게 할 것인가? (0) | 2025.12.29 |
| [개발일지_safeHome] 9. swagger 세팅 (0) | 2025.12.28 |
| [개발일지_safeHome] 8. Custom Exception 개발 (0) | 2025.12.28 |