개발일지

[개발일지_safeHome] 13. SSE 샘플 추가

woopii 2026. 1. 15. 00:13

등본 분석 상황을 실시간으로 알려주기 위해서
웹소켓을 사용할려고한다,

웹소켓이 아닌 SSE(Server Sent Event)를 활용할 예정 

(중간에 앱을 종료했을때는, 알림을 사용하려고 한다)

 

SSE(Server-Sent Events)가 더 적합한 이유

- 단방향 통신만 필요 - 서버에서 프론트로만 진행 상황을 전송 
- 더 간단한 구현 - WebSocket보다 설정이 단순함
- 자동 재연결 - 연결 끊김 시 브라우저가 자동으로 재연결 시도
- HTTP 기반 - 별도의 프로토콜이나 복잡한 설정 불필요

 

 

예를 들어

---

1. PDF 파싱 

--> "첨부된 파일을 분석중이에요"

2. LLM 분석

--> AI가 등본을 분석중이에요"

3. 이후 처리

--> "분석한 내용을 정리중이에요"

4. 완료 

--> "완료 됐습니다!"

 

1. 작업 관련 DDL 생성

job_id는 비동기로 처리할떄 작업에 대한 키가 필요하기때문에 job_id를 두는 것이다.

-- 분석 job 테이블
CREATE TABLE IF NOT EXISTS analysis_jobs (
    id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT 'ID',
    job_id VARCHAR(255) NOT NULL COMMENT '작업 ID(UUID)',
    file_name VARCHAR(255) NOT NULL COMMENT '파일명',
    file_size BIGINT NOT NULL COMMENT '파일 크기',
    status VARCHAR(50) NOT NULL COMMENT '상태(예: PENDING, IN_PROGRESS, COMPLETED, FAILED)',
    step VARCHAR(100) NULL COMMENT '현재 단계(예 : PDF_PARSING, LLM_ANALYSIS, POST_PROCESSING)',
    result TEXT NULL COMMENT '분석 결과 JSON',
    description TEXT NULL COMMENT '설명 (에러메시지 등)',
    is_deleted BOOLEAN NOT NULL DEFAULT FALSE COMMENT '삭제 여부',
    created_id BIGINT NOT NULL COMMENT '생성자 ID',
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '생성 시간',
    updated_id BIGINT NULL COMMENT '수정자 ID',
    updated_at TIMESTAMP NULL DEFAULT NULL COMMENT '수정 시간'
);

CREATE INDEX IF NOT EXISTS idx_analysis_jobs_job_id ON analysis_jobs(job_id);
CREATE INDEX IF NOT EXISTS idx_analysis_jobs_status ON analysis_jobs(status);
CREATE INDEX IF NOT EXISTS idx_analysis_jobs_step ON analysis_jobs(step);

 

2. entity 생성

단계를 명확하게 하기 위해, 진행상태를 나타내는 JobStatus 와 작업 단위를 나타내는 AnalysisStep 을 따로 두었다 

package com.woopi.safehome.domain.deed.adapter.outbound.persistence.jpa

import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import com.woopi.safehome.global.`object`.BaseEntity
import jakarta.persistence.*

@Entity
@Table(name = "analysis_jobs")
class AnalysisJobEntity(

    @Column(name = "job_id")
    var jobId: String,

    @Column(name = "file_name")
    var fileName: String,

    @Column(name = "file_size")
    var fileSize: Long,

    @Column(name = "status")
    @Enumerated(EnumType.STRING)
    var status: JobStatus,

    @Column(name = "step")
    @Enumerated(EnumType.STRING)
    var step: AnalysisStep? = null,

    @Column(name = "result")
    var result: String? = null,

    @Column(name = "description")
    var description: String? = null,

    ) : BaseEntity() {

}

 

 

3. 모델 생성

package com.woopi.safehome.domain.deed.model

import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus

object AnalysisJob {

    data class Query(
        val id: Long,
        val status: JobStatus,
        val step: AnalysisStep? = null,
    )

    data class Create(
        val jobId: String,
        val fileName: String,
        val fileSize: Long,
        val status: JobStatus,
        val step: AnalysisStep? = null,
        val result: String? = null,
        val description: String? = null
    )

    data class Update(
        val id: Long,
        val status: JobStatus,
        val step: AnalysisStep? = null,
        val result: String? = null,
        val description: String? = null
    )

    data class Data(
        val id: Long,
        val jobId: String,
        val fileName: String,
        val fileSize: Long,
        val status: JobStatus,
        val step: AnalysisStep? = null,
        val result: String? = null,
        val description: String? = null
    )

}

 

 

4. (임시) SSE 처리를 위한 AnalysisNotifierPort, sseNotifier 생성

package com.woopi.safehome.domain.analysis.application.port.outbound

import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus

interface AnalysisNotifierPort {
    fun notifyStep(
        jobId: String,
        status: JobStatus,
        step: AnalysisStep?,
        message: String
    )
}
package com.woopi.safehome.domain.analysis.adapter.outbound.sse

import com.woopi.safehome.domain.analysis.application.port.outbound.AnalysisNotifierPort
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import org.springframework.stereotype.Component
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import java.time.LocalDateTime
import java.util.concurrent.ConcurrentHashMap

@Component
class AnalysisSseNotifier: AnalysisNotifierPort {

    private val emitters = ConcurrentHashMap<String, SseEmitter>()

    fun createEmitter(jobId: String): SseEmitter {
        val emitter = SseEmitter(300_000L) // 5분 타임아웃

        emitter.onCompletion { emitters.remove(jobId) }
        emitter.onTimeout { emitters.remove(jobId) }
        emitter.onError { emitters.remove(jobId) }

        emitters[jobId] = emitter
        return emitter
    }

    override fun notifyStep(
        jobId: String,
        status: JobStatus,
        step: AnalysisStep?,
        message: String
    ) {
        val payload = mapOf(
            "jobId" to jobId,
            "status" to status.name,
            "step" to step?.name,
            "message" to message,
            "timestamp" to LocalDateTime.now().toString()
        )

        emitters[jobId]?.let { emitter ->
            emitter.send(SseEmitter.event().data(payload))

            // 완료되면 명시적으로 종료
            if (status == JobStatus.COMPLETED) {
                emitter.complete()
                emitters.remove(jobId)
            }
        }
    }

}

 

5. (임시) 작업 및 notifier 처리를 하기 위한 AsyncProcessor 생성 + config

package com.woopi.safehome.global.config

import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer

@Configuration
@EnableAsync
class AsyncConfig : WebMvcConfigurer {

    override fun configureAsyncSupport(configurer: AsyncSupportConfigurer) {
        configurer.setDefaultTimeout(300_000) // 5분 타임아웃
    }

}

 

package com.woopi.safehome.domain.analysis.application.service

import com.woopi.safehome.domain.analysis.application.port.outbound.AnalysisNotifierPort
import com.woopi.safehome.global.enums.AnalysisStep
import com.woopi.safehome.global.enums.JobStatus
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Service

@Service
class AnalysisAsyncProcessor(
    private val analysisProgressPort: AnalysisNotifierPort
) {

    @Async
    fun process(jobId: String) {

        analysisProgressPort.notifyStep(
            jobId,
            JobStatus.IN_PROGRESS,
            AnalysisStep.PDF_PARSING,
            "첨부된 파일을 분석중이에요"
        )

        Thread.sleep(1000)

        analysisProgressPort.notifyStep(
            jobId,
            JobStatus.IN_PROGRESS,
            AnalysisStep.LLM_ANALYSIS,
            "AI가 등본을 분석중이에요"
        )

        Thread.sleep(1000)

        analysisProgressPort.notifyStep(
            jobId,
            JobStatus.IN_PROGRESS,
            AnalysisStep.POST_PROCESSING,
            "분석한 내용을 정리중이에요"
        )

        analysisProgressPort.notifyStep(
            jobId,
            JobStatus.COMPLETED,
            AnalysisStep.POST_PROCESSING,
            "완료 됐습니다!"
        )
    }
}

 

6. (임시) sample usecase에서 비동기 처리를 위한 jobid 조회 및 async processor 처리 추가

package com.woopi.safehome.domain._sample.application.usecase

import com.woopi.safehome.domain._sample.adapter.inbound.web.SampleDtoMapper
import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleRequest
import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleResponse
import com.woopi.safehome.domain._sample.application.port.inbound.SampleUseCase
import com.woopi.safehome.domain._sample.application.port.outbound.SamplePersistencePort
import com.woopi.safehome.domain.analysis.application.service.AnalysisAsyncProcessor
import com.woopi.safehome.global.exception.BusinessException
import com.woopi.safehome.global.exception.ErrorCode
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*

@Transactional(readOnly = true)
@Service
class SampleUseCaseImpl(
    private val samplePersistencePort: SamplePersistencePort,
    private val analysisAsyncProcessor: AnalysisAsyncProcessor
) : SampleUseCase {

    override fun getJobId(): String {
        return UUID.randomUUID().toString()
    }

    override fun sseSample(jobId: String) {
        analysisAsyncProcessor.process(jobId)
    }


}

 

7. (임시) adapter에 jobid 추가, process api 추가

package com.woopi.safehome.domain._sample.adapter.inbound.web

import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleRequest
import com.woopi.safehome.domain._sample.adapter.inbound.web.dto.SampleResponse
import com.woopi.safehome.domain._sample.application.port.inbound.SampleUseCase
import com.woopi.safehome.domain.analysis.adapter.outbound.sse.AnalysisSseNotifier
import com.woopi.safehome.global.response.ApiResponse
import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.tags.Tag
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter

@Tag(name = "샘플 API", description = "샘플 API")
@RestController
@RequestMapping("/api/sample")
class SampleInboundWebAdapter (
    private val sampleUseCase: SampleUseCase,
    private val analysisSseNotifier: AnalysisSseNotifier
) {


    @Operation(summary = "jobid 조회", description = "sse 샘플")
    @PostMapping("/sample/analysis/start")
    fun websocketSample (): ApiResponse<String> {
        return ApiResponse.success(sampleUseCase.getJobId())
    }

    @GetMapping("/analysis/{jobId}/stream", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun streamProgress(@PathVariable jobId: String): SseEmitter {
        val emitter = analysisSseNotifier.createEmitter(jobId)

        // SSE 연결 후 작업 시작
        sampleUseCase.sseSample(jobId)

        return emitter
    }
}

 

8. 테스트

작성한 샘플의 전체 흐름은 다음과 같음

전체 흐름

1. 클라이언트 → POST /sample/analysis/start

- jobId 생성
- jobId 반환 (작업 시작 안함)

2. 클라이언트 → GET /analysis/{jobId}/stream

- createEmitter(jobId) 호출
  → SseEmitter 생성
  → emitters 맵에 jobId로 저장
  → 콜백 등록 (onCompletion, onTimeout, onError)
  
- sseSample(jobId) 호출
  → analysisAsyncProcessor.process(jobId) 실행
  → @Async로 별도 스레드에서 비동기 시작
  
- emitter 반환 (SSE 연결 유지)

3. 백그라운드 (비동기 스레드)

- process(jobId) 실행 중
  → notifyStep() 호출
  → emitters[jobId] 찾기
  → 찾은 emitter로 데이터 전송
  → 클라이언트가 실시간 수신
  
- 반복 (PDF_PARSING → LLM_ANALYSIS → POST_PROCESSING)
  
- COMPLETED 상태 전송
  → emitter.complete() 호출
  → SSE 연결 종료