[프로젝트] SSE 로 실시간 알림 기능 구현하기

2024. 6. 13. 23:39프로젝트

 

노닥 노닥 프로젝트 개발 중에,

  • 팔로우 한 사용자가 새 게시글을 작성한 경우 팔로워에게 실시간으로 알림 전송
  • 알림은 최소 1주일 치의 정보를 가지고 있어야 함 (접속하지 않은 경우에도 알림은 쌓여야 한다.)

위 상황에서 알림을 보내는 기능이 필요했다.

 

대표적으로 Polling, 웹 소켓,  SSE 방식을 생각해 볼 수 있는데 폴링은 리소스 낭비가 심할 것 같았고,

웹 소켓처럼 양 방향 통신이 필요한 것도 아니었기에, SSE 를 선택했다.

 

(SSE 관련 자세한 내용 설명은 생략합니다.)

관련 포스팅 : https://matt1235.tistory.com/79

 

웹 소켓 & SSE(Server Sent Events)

웹 소켓과 SSE를 알아보기 전, 우선 클라이언트와 서버 간의 HTTP 통신에 대해 알아봅시다. HTTP 통신HTTP 1.1 이하에서는 클라이언트가 서버에게 요청을 보내고, 응답을 받는 단방향 통신만 가능했

matt1235.tistory.com

 

 

SSE 를 사용하면 서버 -> 클라이언트로 이벤트가 발생함에 따라 실시간으로 메시지를 전송이 가능했고,

한 번 연결이 되면 헤더를 포함하지 않기에 웹 소켓보다 경제적이기에 SSE 를 선택했다.

 

 

SSE의 특징

  • 실시간 업데이트 : 서버에서 클라이언트로 실시간으로 데이터 전송 가능 (단방향 통신)
  • TCP 연결을 맺고 HTTP 프로토콜 사용
  • 접속에 문제가 있으면 자동으로 재접속을 시도한다.

단점

  • SSE는 지속적인 연결을 유지해야 하므로 클라이언트 네트워크와 서버 리소스 비용이 발생한다. 특히 많은 클라이언트가 연결을 맺을 경우 서버의 부하가 증가할 수 있음
  • 네트워크 연결이 불안정한 경우에는 종료하고 재연결이 필요하므로 추가적인 네트워크 오버헤드가 발생할 수 있음

 

SseEmitter 를 통한 구현

Spring Framework 4.2 부터 SSE 프로토콜을 지원하는 SseEmitter 클래스를 사용할 수 있게 되었다.

이를 통해 실시간으로 클라이언트에게 정보 전송 가능

 

비동기 통신

  • SseEmitter 는 비동기적으로 이벤트 전송이 가능하므로, 실시간성이 중요한 알림 기능에 적합하다.
  • 즉 서버에서 이벤트가 발생하면 바로 이벤트를 전송해주어, 지연 없는 업데이트를 제공해준다.

클라이언트의 재시도 및 연결 관리

  • SseEmitter 는 클라이언트의 연결 상태를 관리하고 연결이 끊어지는 경우 재연결 요청을 지원한다.
  • 클라이언트가 알림을 구독한 후 연결이 끊어지면, 클라이언트는 다시 연결을 시도하고 이전에 미수신한 이벤트를 잃지 않도록 처리 가능하다.

확장성

  • 여러 클라이언트와 동시에 통신이 가능해, 동일한 알림을 여러 클라이언트에게 전송 가능.
  • 다수의 클라이언트에게 알림을 전달해야 하는 경우 적합하다.

구현 코드

알림 Entity

@Getter
@Setter
@NoArgsConstructor
public class Notification implements Serializable {

    private Long postId;
    private String message;
    private Long timestamp;
    private Long writerId;

    public Notification(Long postId, String message, Long writerId) {
        this.postId = postId;
        this.message = message;
        this.timestamp = System.currentTimeMillis();
        this.writerId = writerId;
    }
}

 

 

사용자가 게시글을 생성하면 Redis에 저장되는 엔티티 클래스이다.

 

 

알림 Controller

@RestController
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;

    @GetMapping("/subscribe/{userId}")
    @AuthorizationRequired(UserRole.GENERAL)
    public SseEmitter subscribe(@PathVariable("userId") Long userId, Principal principal) {
        validateUser(userId, principal);
        return notificationService.getSseEmitter(userId);
    }

    private void validateUser(Long userId, Principal principal) {
        long currentUserId = Long.parseLong(principal.getName());
        if (currentUserId != userId) {
            throw new AuthorizationException();
        }
    }
}

 

클라이언트와 서버가 SSE 연결을 맺도록 한다.

SseEmitter 를 통해 연결을 맺고 실시간으로 알림을 전송할 준비를 한다.

 

 

 

알림 Service

getSseEmitter() : Sse 연결 수립

@Service
@RequiredArgsConstructor
public class NotificationService {

    private final Map<Long, SseEmitter> clients = new ConcurrentHashMap<>();
    private final RedisTemplate<String, Object> redisTemplate;
    private final FollowService followService;

    /**
     * SSE 연결 시, 동작 */
    public SseEmitter getSseEmitter(Long userId) {
        SseEmitter emitter = new SseEmitter(15 * 60 * 1000L);
        clients.put(userId, emitter);

        emitter.onCompletion(() -> clients.remove(userId));
        emitter.onTimeout(() -> clients.remove(userId));

        try {
            emitter.send(SseEmitter.event().name("init").data("Connected"));
            List<Notification> notifications = getUndeliveredNotifications(userId);

            for (Notification notification : notifications) {
                Map<String, Object> data = new HashMap<>();
                data.put("postId", notification.getPostId());
                data.put("message", notification.getMessage());
                emitter.send(SseEmitter.event().name("newPost").data(data, MediaType.APPLICATION_JSON));
            }
        } catch (IOException e) {
            clients.remove(userId);
        }

        return emitter;
    }

 

clients

  • Long 타입의 키와 SseEmitter 타입의 값으로 이루어져 있음
  • 사용자 별로 생성된 SseEmitter 객체를 저장하는 역할
  • 클라이언트가 구독(subscribe) 요청을 보내면 해당 사용자의 id 를 키로 사용해 맵에 저장하고, 이후 알림 전송 시 해당 사용자의 SseEmitter 를 조회하기 위해 사용한다.

 

ConcurrentHashMap

  • thread-safe 한 Map
  • 이 맵을 사용해 동시성 문제를 해결하고 맵에 데이터 저장/조회를 가능하게 하였음

 

Redis 에 알림 엔티티를 저장하는 함수

public void saveNotificationToRedis(Long postId, String message, Long writerId) {
    Notification notification = new Notification(postId, message, writerId);

    ObjectMapper objectMapper = new ObjectMapper();
    try {
        String notificationJson = objectMapper.writeValueAsString(notification);
        redisTemplate.opsForZSet().add("notifications", notificationJson, notification.getTimestamp());
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
}

 

Redis sorted_set 에 "notifications"를 키로, timestamp 를 스코어로 지정해 저장.

 

처음에는 user 별로 notification 엔티티를 저장하였으나, 이는 불가능하다는 것을 알게 되었다.

-> A 를 팔로우하는 유저가 10만 명이라면,

A 가 게시글을 작성할 때마다 10만 명의 팔로워들에게 1개씩 알림 엔티티를 만들어, followerId 를 키로 레디스에 저장 

 

즉 팔로워 개수만큼 데이터가 저장되므로, 부하가 너무 크다. 따라서 게시글을 기준으로 알림 엔티티를 만들어 Redis 에 저장하였다.

 

 

전체 알림 가져오는 함수

public List<Notification> getUndeliveredNotifications(Long userId) {
    List<Long> followingIds = followService.getFollowees(userId).stream()
            .map(UserInfoResponse::getUserId)
            .collect(Collectors.toList());

    List<Notification> notifications = new ArrayList<>();
    long oneWeekAgo = System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000L;

    // Sorted Set에서 1주일 이내의 값을 가져오기
    Set<Object> notificationJsons = redisTemplate.opsForZSet().rangeByScore("notifications", oneWeekAgo, Double.MAX_VALUE);

    ObjectMapper objectMapper = new ObjectMapper();
    for (Object notificationJson : notificationJsons) {
        try {
            Notification notification = objectMapper.readValue(notificationJson.toString(), Notification.class);
            if (notification != null && followingIds.contains(notification.getWriterId())) {
                notifications.add(notification);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    return notifications;
}

 

사용자가 Sse 연결을 수립하면, 접속하지 않은 동안 쌓인 알림을 가져와야 한다.

sorted_set을 통해 손쉽게 timestamp 를 기준으로 1주일 ~ 현재 까지의 알림을 가져온다.

 

단, 로그인 한 사용자가 팔로우 한 사람들 것만을 가져와야 하므로 followingIds 에 writerId 가 포함되는 값들만 가져와야 한다.

 

TODO : 전체를 불러올 때 SCAN 을 사용해 성능을 개선해야 한다.

-> 레디스의 rangeByScore() 는 O(N) 의 시간복잡도를 가지는데, O(N) 연산은 쓰지 않는게 좋다. 데이터가 많아지면 서버가 터져버릴 수도 있어서...

 

 

현재 Sse 연결을 맺고 있는 유저들에게, 알림 전송하는 메서드

/**
 * user 의 팔로워들에게, Post 알림 전송 */
public void notifyFollowersBySse(User user, Post post) {
    followService.getFollowers(user.getId()).forEach(follower -> {
        SseEmitter emitter = clients.get(follower.getUserId());
        if (emitter != null) {
            try {
                Map<String, Object> data = new HashMap<>();
                data.put("postId", post.getId());
                data.put("message", user.getNickname() + "님이 새로운 게시글을 작성했습니다.");
                emitter.send(SseEmitter.event().name("newPost").data(data, MediaType.APPLICATION_JSON));
            } catch (IOException e) {
                clients.remove(follower.getUserId());
            }
        }
    });
}

 

현재 Sse 연결을 맺고 있는 유저들에게 알림을 전송한다.

user 가 게시글을 작성하면 user.getFollowers() 로 팔로워 리스트를 가져오게 된다.

해당 리스트를 돌며 Sse 연결을 맺고 있는 사용자에게 알림을 보낸다.

 

 

추가 사항

유저가 게시글을 작성하면, 해당 게시글 (post_id) 알림 엔티티가 Redis 에 저장된다.

 

PostService - savePost() 의 일부

postRepository.save(post);

notificationService.saveNotificationToRedis(post.getId(), user.getNickname() + "님이 새 게시글을 작성했습니다.", user.getId());
notificationService.notifyFollowersBySse(user, post);
  • Post 엔티티를 저장하고, Notification 엔티티를 Redis 에 저장한다.
  • 게시글 작성 이벤트가 발생했으므로, Sse 연결중인 팔로워들에게 알린다

 

시퀀스 다이어그램

 

로직을 처음 짤 때 복잡해서 헤맸었는데,

시퀀스 다이어그램으로 간략하게 전체 흐름을 정리한 후 흐름을 이해하고 코드를 더 쉽게 짤 수 있었다.

 


 

 

결론

Sse 를 사용해 알림 기능을 만들어보았다.

Sse 구현 자체는 어렵지 않았고, 1주일 치의 알림을 어떻게 저장하고 유저들에게 뿌려줄까에 대한 부분이 난이도가 있었던 것 같다.

 

현재는 최적화가 제대로 이뤄지지 않은 것 같아서 Redis 에서 알림을 가져올 때 성능을 개선하고, 서버에서 유저의 Follower 리스트를 가져오고, for 문을 돌면서 데이터를 전송하는 부분도 성능 개선이 필요할 것 같다.