본문 바로가기
기획&구현

[BE] Server-sent events(SSE)를 이용하여 실시간 알림 구현하기

by 당근플래너 팀 2023. 2. 5.

안녕하세요. 당근플래너 팀 백엔드를 맡은 성원입니다.😊

 

저희 서비스는 실시간 알림 구현을 위해 Server-sent events를 채택하였습니다.

SSE가 뭔지 알아보고 Spring과 Nginx 환경에서 어떻게 구현되는지 알아보겠습니다. 

 

SSE란?

서버에서 클라이언트로 text message를 보내는 브라우저 기반 웹 애플리케이션 기술이며 HTTP의 persistent connections을 기반으로 하는 HTML5 표준 기술

SSE를 사용해서 통신 할 때 서버는 초기 요청을 하지 않고도 필요할 때마다 데이터를 앱으로 푸시 할 수 있습니다. 
즉, 서버에서 클라이언트로 업데이트를 스트리밍 할 수 있다는 말입니다. (Server → Client 단방향)

 

Client: SSE Subscribe 요청 

GET /connect HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache

 

Server : Subscribe 요청에 대한 응답

HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked

이 때 이벤트 데이터의 미디어 타입text/event-stream 이 표준으로 정해져 있습니다. (이벤트는 캐싱하지 않고 지속적 연결을 사용) 그리고 Transfer-Encoding 헤더의 값을 chunked 로 설정해야 합니다.

서버는 동적으로 생성된 컨텐츠를 스트리밍하기 때문에 본문의 크기를 미리 알 수 없기 때문입니다.

 

Server : 이벤트 전달

  • 구독(subscribe)을 하고 나면 서버는 해당 클라이언트에게 비동기적으로 데이터를 전송할 수 있음.
  • SSE의 이벤트 데이터는 UTF-8 인코딩된 문자열만 지원함(charset=UTF-8)

 

SSE를 이용한 실시간 알림 구현

Controller 구현

@GetMapping(value = "/api/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails,
                                @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId,
                                HttpServletResponse response){

        response.setHeader("Connection", "keep-alive");
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("X-Accel-Buffering", "no");

        return notificationService.subscribe(userDetails.getMember(), lastEventId);
    }

 

  • 서버에서는 EventSource를 통해 날아오는 요청을 처리할 컨트롤러가 필요
  • sse 통신을 하기 위해서는 MIME 타입을 text/event-stream 으로 해줘야 함.
  • 파라미터에 있는 "Last-Event-ID"는 서버와의 연결 오류로 인해 유실된 데이터를 다시 전송하기 위해 클라이언트에게 마지막으로 전송한 event id값을 보내주는 것
  • required = false : Last-Event-ID는 항상 담겨있는 것이 아니기때문에 설정해줌
  • defaultValue = "" : 값이 담겨져 있지 않은 겨웅에 isEmpty로 구분하기 위해 빈 데이터 설정
  • @AuthenticationPrincipal을 활용해 누구로부터 온 알림 구독인지 알 수 있음(Spring Security를 이용 중이기 때문에 이에 대한 정보를 받아 올 수 있음.)

 

Service 구현

private final NotificationRepository notificationRepository;
private final EmitterRepository emitterRepository = new EmitterRepositoryImpl();
private final Long timeout = 14L * 24L * 60L * 1000L * 60L;

public SseEmitter subscribe(Member member, String lastEventId) {

    /**
     #1 코드 밑에 설명 참조
     */ 
		String emitterId = makeTimeIncludeId(member);
		
    /**
     #2 코드 밑에 설명 참조
     */ 
    SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(timeout));

    emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
    emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

    /**
    #3 코드 밑에 설명 참조
    */ 
    String eventId = makeTimeIncludeId(member);
    sendDummyData(emitterId, emitter, eventId, "EventStream Created. [memberId="+member.getId()+"]");

    /**
     #4 코드 밑에 설명 참조
     */ 
    if (hasLostData(lastEventId)){
        Map<String, Object> events = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(member.getId()));
        events.entrySet().stream()
                .filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                .forEach(entry -> sendDummyData(emitterId, emitter, entry.getKey(), entry.getValue()));
    }
    return emitter;
}

private void sendDummyData(String emitterId, SseEmitter emitter, String eventId, Object data) {
    try {
        emitter.send(SseEmitter.event()
                .id(eventId)
                .data(data));
    } catch (IOException exception) {
        emitterRepository.deleteById(emitterId);
    }
}
private String makeTimeIncludeId(Member member) {
    return member.getId() + "_" + System.currentTimeMillis();
}
private boolean hasLostData(String lastEventId) {
    return !lastEventId.isEmpty();
}

 

 

#1 에 대한 설명 

  • id 값을 ${user_id}_${System.currentTimeMillis()} 형태로 사용 → Last_Event_ID 헤더 때문
  • Last_Event_ID 헤더는 클라이언트가 마지막으로 수신한 데이터의 id값을 의미 → id값을 그대로 사용하면 데이터가 유실된 시점을 파악할 수 없음.
  • ${user_id}_${System.currentTimeMillis()} 형태로 저장된 key값 비교를 통해 유실된 데이터만 재전송할 수 있게 함.
Last-Event-Id = 3_1631593143664

{3_1631593143664, data1}
{3_1831593143664, data3}
{3_1731593143664, data2}

=> data1 까지 제대로 전송되었고, data2, data3을 다시 보내야한다.

 

#2 에 대한 설명 

  • 클라이언트의 SSE연결 요청에 응답하기 위해서는 SseEmitter 객체를 만들어 반환해줘야 함 → 이때 유효 시간도 같이 넣어 줌 (유효 시간만큼 SSE연결이 유지되고, 시간이 지나면 자동으로 클라이언트에서 재연결 요청을 보냄)
  • onCompletion : 비동기 요청이 완료되면 콜백할 코드를 등록(여기서는 저장해둔 SseEmitter를 삭제)
  • onTimeout : 비동기 요청 시간 초과시 호출할 코드를 등록(SseEmitter를 삭제)

 

#3 에 대한 설명 

  • SSE 연결이 이뤄진 후, 하나의 데이터도 전송되지 않는다면 SseEmitter의 유효 시간이 끝나면 503응답이 발생하는 문제 → 따라서 연결시 바로 더미 데이터를 한번 보내줌.

 

#4 에 대한 설명 

  • Last-Event-ID 값이 헤더에 있는 경우, 저장된 데이터 캐시에서 id값과 Last-Event-ID값을 통해 유실된 데이터들만 다시 보내줌.

 

Service 추가 코드 (데이터 전송)

public void send(NotificationRequest request){
        Notification notification = saveNotification(request);
        sendNotification(request, notification);
    }

    @Async
    public void sendNotification(NotificationRequest request, Notification notification) {
        String receiverId = String.valueOf(request.getMember().getId());
        String eventId = receiverId + "_" + System.currentTimeMillis();
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverId);
        emitters.forEach(
                (key, emitter) -> {
                    emitterRepository.saveEventCache(key, notification);
                    sendDummyData(key, emitter, eventId, new NotificationResponse(notification));
                }
        );
    }

    @Transactional
    public Notification saveNotification(NotificationRequest request) {
        Notification notification = Notification.builder()
                .content(request.getContent())
                .groupId(request.getGroupId())
                .isRead(false)
                .notificationType(request.getNotificationType())
                .member(request.getMember())
                .build();
        notificationRepository.save(notification);
        return notification;
    }

 

  • 클라이언트에 보낼 데이터 형식인 Notification 객체를 만들고, 현재 로그인 한 유저의 id값을 통해 SseEmitter를 모두 가져옴.
  • 그 후, 데이터 캐시에도 저장해주고, 실제로 데이터를 전송
  • 실제로 알림을 보내고 싶은 로직에서 send 메서드를 호출

 

주의점

SSE 구독중 연결이 끊기는 문제가 발생할 수 있습니다.  INCOMPLETE_CHUNKED_ENCODING 오류 발생.

 

Nginx 설정 추가

저희는 로컬에선 잘 작동하다 배포 시 문제가 발생하였는데요, 웹 서버로 Nginx를 이용하여 생긴 문제점이였습니다.

  • Nginx는 서버의 응답을 버퍼에 저장해두었다가 버퍼가 가득 차거나 서버가 응답 데이터를 모두 보내면 클라이언트로 전송
  • 여기서 문제는 버퍼링 기능을 활성화하면 SSE 통신 시 원하는대로 동작하지 않거나 실시간 성이 떨어지는 문제도 동시에발생
  • 또한 버퍼는 데이터 전송을 기다리고 chunk는 즉시 전송이기 때문에 chunk의 설정에 대한 문제가 발생

해결하려면 Nginx 설정 파일에 다음 코드를 추가해야합니다.

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding off;

 

/etc/nginx/conf.d/??.conf 전체 코드

 

Controller에서 HTTP 헤더 값 조작

Nginx 서버 설정을 해도 구독 후 연결 유지가 안된다면, SSE를 사용하여 구독하는 Controller에 아래 코드를 추가하여

HTTP 헤더의 값을 조작해야 합니다. 위에 저희가 구현한 Controller 코드에는 구현이 되어있습니다.

 

아래 코드에서 response HttpServletResponse를 매개변수로 받습니다.

  • response.setHeader("Connection", "keep-alive");
  • response.setHeader("Cache-Control", "no-cache");
  • response.setHeader("X-Accel-Buffering", "no");

NotificationController 전체코드

 

마치며

이때까지 SSE를 사용하여 실시간 알림 서비스를 구현하는 법을 알아봤습니다. 웹 서버에 대한 이해와

코드를 잘 읽어보신다면 쉽게 구현할 수 있는 것 같습니다. 긴 글 읽어주셔서 감사합니다 :)

그럼 안녕 ~!