스프링 비동기(2) - 응답 출력기

2024. 3. 5. 01:06spring/비동기 처리

과제

서비스에서 응답을 여러 청크로 나눠 전송하라

 

해결책

ResponseBodyEmitter(또는 SseEmitter)로 응답을 청크로 나눠 보낸다.

 

풀이

스프링에서는 HttpMessageConverter 인프라를 이용해서 어떤 객체를 평범한 일반 객체로 출력할 수 있다. 클라이언트는 청크된(또는 스트리밍된)리스트를 받게 된다. 결과를 객체 대신 이벤트 형태로 보내는 방법도 있다. 이를 서버 전송 이벤트라고 한다.

 

 

여러 결과를 하나의 응답에 실어 보내기

 

스프링 MVC의 ResponseBodyEmitter 클래스는 (뷰 이름 또는 ModelAndView 등) 하나의 결과 대신 여러 객체를 클라이언트에 반환할 때 유용하다. 반환할 객체는 HttpMessageConverter를 이용해 결과로 변환한 다음 전송하며 핸들러 메서드는 ResponseBodyEmitter 를 반드시 반환해야 한다. 

 

ReservationQueryController의 find() 메서드에서 조회 결과를 하나씩 클라이언트에 보내고 마지막에 ResponseBodyEmitter를 반환하도록 고친다.

 

@GetMapping(params = "courtName")
public ResponseBodyEmitter find(
        @RequestParam("courtName") String courtName,
        Model model
) {
    final ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    taskExecutor.execute(() -> {
        Collection<Reservation> reservations = reservationService.query(courtName);
        try {
            for (Reservation reservation : reservations) {
                    emitter.send(reservation);
            }
            emitter.complete();
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });

    return emitter;
}

 

1. find() 메서드는 처음에 생성한 ResponseBodyEmitter 객체를 마지막에 반환한다.

 

2. ReservationService.query() 메서드로 예약 리스트를 조회하고 그 결과 레코드를 하나씩 ResponseBodyEmitter.send() 메서드로 반환한다.

 

3. 전부 다 반환되면 결과 전송을 담당한 스레드가 처리를 마친 다음 응답을 처리할 수 있게끔 complete() 메서드를 호출해서 메모리에서 해제한다.

 

이 과정에서 발생한  예외를 유저에 알리고 싶을 경우, completeWithError() 메서드를 호출하면 스프링 MVC 예외 처리 장치를 통과한다. 응답은 그 이후에 처리된다.

 

포스트맨을 이용해 localhost:8080/find?courtName=Tennis 에 접속하면 다음과 같은 결과가 출력된다.

 

 

상태 코드를 바꾸거나 커스텀 헤더를 추가하고 싶을 때엔 ResponseEntity 안에 ResponseBodyEmitter를 감싼다.

 

@GetMapping("/find")
public ResponseEntity<ResponseBodyEmitter> find(
        @RequestParam("courtName") String courtName
) {
    final ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    taskExecutor.execute(() -> {
        Collection<Reservation> reservations = reservationService.query(courtName);
        try {
            for (Reservation reservation : reservations) {
                emitter.send(reservation);
            }
            emitter.complete();
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });
    return ResponseEntity.status(HttpStatus.ACCEPTED)
            .header("Accept", "TEST Value")
            .body(emitter);
}

 

 

 

여러 결과를 이벤트 형태로 보내기

 

SseEmitter는 서버 전송 이벤트를 이용해 서버 => 클라이언트 방향으로 이벤트를 전송한다. 서버 전송 이벤트는 서버가 클라이언트에 보내는 메시지로 text/event-stream이라는 콘텐트 타입 헤더가 들어있다.

정의할 필드는 4개뿐이다.

필드 설명
id 이벤트 ID
event 이벤트 타입
data 이벤트 데이터
retry 이벤트 스트림에 재접속하는 시간

 

핸들러 메서드에서 이벤트를 전송하려면 SseEmitter 인스턴스를 만들어 마지막 줄에서 반환한다. 데이터 각 항목은 send() 메서드로 클라이언트에 보낸다.

 

@GetMapping("/sse/find")
public SseEmitter sseFind(
        @RequestParam("courtName") String courtName
) {
    final SseEmitter emitter = new SseEmitter();
    taskExecutor.execute(() -> {
        Collection<Reservation> reservations = reservationService.query(courtName);
        try {
            for (Reservation reservation : reservations) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {}
                emitter.send(reservation);
            }
            emitter.complete();
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });
    return emitter;
}

 

 

Content-Type 헤더값이 text/envent-stream인 걸로 보아 이벤트 스트림을 받았음을 알 수 있다. 스트림을 열어둔 상태로 계속 이벶트 알림을 수신했다. 출력된 객체는 제각기 JSON으로 변환되었는데, 평범한 ResponseBodyEmitter처럼 HttpMessageConverter로 변환시킨 것이다. 각 객체는 data 태그 내부에 이벤트 데이터로 씌어졌다.

 

이벤트에 필드를 더 추가하고 싶다면 SseEventBuilder를 사용해서 추가하자

@GetMapping("/sse/find")
public SseEmitter sseFind(
        @RequestParam("courtName") String courtName
) {
    final SseEmitter emitter = new SseEmitter();
    taskExecutor.execute(() -> {
        Collection<Reservation> reservations = reservationService.query(courtName);
        try {
            for (Reservation reservation : reservations) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {}
                emitter.send(
                        SseEmitter.event()
                                .id(String.valueOf(reservation.hashCode()))
                                .data(reservation)
                );
            }
            emitter.complete();
        } catch (IOException e) {
            emitter.completeWithError(e);
        }
    });
    return emitter;
}