2024. 6. 13. 23:39ㆍ프로젝트
노닥 노닥 프로젝트 개발 중에,
- 팔로우 한 사용자가 새 게시글을 작성한 경우 팔로워에게 실시간으로 알림 전송
- 알림은 최소 1주일 치의 정보를 가지고 있어야 함 (접속하지 않은 경우에도 알림은 쌓여야 한다.)
위 상황에서 알림을 보내는 기능이 필요했다.
대표적으로 Polling, 웹 소켓, SSE 방식을 생각해 볼 수 있는데 폴링은 리소스 낭비가 심할 것 같았고,
웹 소켓처럼 양 방향 통신이 필요한 것도 아니었기에, SSE 를 선택했다.
(SSE 관련 자세한 내용 설명은 생략합니다.)
관련 포스팅 : https://matt1235.tistory.com/79
SSE 를 사용하면 서버 -> 클라이언트로 이벤트가 발생함에 따라 실시간으로 메시지를 전송이 가능했고,
한 번 연결이 되면 헤더를 포함하지 않기에 웹 소켓보다 경제적이기에 SSE 를 선택했다.
SSE의 특징
- 실시간 업데이트 : 서버에서 클라이언트로 실시간으로 데이터 전송 가능 (단방향 통신)
- TCP 연결을 맺고 HTTP 프로토콜 사용
- 접속에 문제가 있으면 자동으로 재접속을 시도한다.
단점
- SSE는 지속적인 연결을 유지해야 하므로 클라이언트 네트워크와 서버 리소스 비용이 발생한다. 특히 많은 클라이언트가 연결을 맺을 경우 서버의 부하가 증가할 수 있음
- 네트워크 연결이 불안정한 경우에는 종료하고 재연결이 필요하므로 추가적인 네트워크 오버헤드가 발생할 수 있음
SseEmitter 를 통한 구현
Spring Framework 4.2 부터 SSE 프로토콜을 지원하는 SseEmitter 클래스를 사용할 수 있게 되었다.
이를 통해 실시간으로 클라이언트에게 정보 전송 가능
비동기 통신
- SseEmitter 는 비동기적으로 이벤트 전송이 가능하므로, 실시간성이 중요한 알림 기능에 적합하다.
- 즉 서버에서 이벤트가 발생하면 바로 이벤트를 전송해주어, 지연 없는 업데이트를 제공해준다.
클라이언트의 재시도 및 연결 관리
- SseEmitter 는 클라이언트의 연결 상태를 관리하고 연결이 끊어지는 경우 재연결 요청을 지원한다.
- 클라이언트가 알림을 구독한 후 연결이 끊어지면, 클라이언트는 다시 연결을 시도하고 이전에 미수신한 이벤트를 잃지 않도록 처리 가능하다.
확장성
- 여러 클라이언트와 동시에 통신이 가능해, 동일한 알림을 여러 클라이언트에게 전송 가능.
- 다수의 클라이언트에게 알림을 전달해야 하는 경우 적합하다.
구현 코드
알림 Entity
@Getter
@Setter
@NoArgsConstructor
public class Notification implements Serializable {
private Long postId;
private String message;
private Long timestamp;
private Long writerId;
public Notification(Long postId, String message, Long writerId) {
this.postId = postId;
this.message = message;
this.timestamp = System.currentTimeMillis();
this.writerId = writerId;
}
}
사용자가 게시글을 생성하면 Redis에 저장되는 엔티티 클래스이다.
알림 Controller
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
@GetMapping("/subscribe/{userId}")
@AuthorizationRequired(UserRole.GENERAL)
public SseEmitter subscribe(@PathVariable("userId") Long userId, Principal principal) {
validateUser(userId, principal);
return notificationService.getSseEmitter(userId);
}
private void validateUser(Long userId, Principal principal) {
long currentUserId = Long.parseLong(principal.getName());
if (currentUserId != userId) {
throw new AuthorizationException();
}
}
}
클라이언트와 서버가 SSE 연결을 맺도록 한다.
SseEmitter 를 통해 연결을 맺고 실시간으로 알림을 전송할 준비를 한다.
알림 Service
getSseEmitter() : Sse 연결 수립
@Service
@RequiredArgsConstructor
public class NotificationService {
private final Map<Long, SseEmitter> clients = new ConcurrentHashMap<>();
private final RedisTemplate<String, Object> redisTemplate;
private final FollowService followService;
/**
* SSE 연결 시, 동작 */
public SseEmitter getSseEmitter(Long userId) {
SseEmitter emitter = new SseEmitter(15 * 60 * 1000L);
clients.put(userId, emitter);
emitter.onCompletion(() -> clients.remove(userId));
emitter.onTimeout(() -> clients.remove(userId));
try {
emitter.send(SseEmitter.event().name("init").data("Connected"));
List<Notification> notifications = getUndeliveredNotifications(userId);
for (Notification notification : notifications) {
Map<String, Object> data = new HashMap<>();
data.put("postId", notification.getPostId());
data.put("message", notification.getMessage());
emitter.send(SseEmitter.event().name("newPost").data(data, MediaType.APPLICATION_JSON));
}
} catch (IOException e) {
clients.remove(userId);
}
return emitter;
}
clients
- Long 타입의 키와 SseEmitter 타입의 값으로 이루어져 있음
- 사용자 별로 생성된 SseEmitter 객체를 저장하는 역할
- 클라이언트가 구독(subscribe) 요청을 보내면 해당 사용자의 id 를 키로 사용해 맵에 저장하고, 이후 알림 전송 시 해당 사용자의 SseEmitter 를 조회하기 위해 사용한다.
ConcurrentHashMap
- thread-safe 한 Map
- 이 맵을 사용해 동시성 문제를 해결하고 맵에 데이터 저장/조회를 가능하게 하였음
Redis 에 알림 엔티티를 저장하는 함수
public void saveNotificationToRedis(Long postId, String message, Long writerId) {
Notification notification = new Notification(postId, message, writerId);
ObjectMapper objectMapper = new ObjectMapper();
try {
String notificationJson = objectMapper.writeValueAsString(notification);
redisTemplate.opsForZSet().add("notifications", notificationJson, notification.getTimestamp());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
Redis sorted_set 에 "notifications"를 키로, timestamp 를 스코어로 지정해 저장.
처음에는 user 별로 notification 엔티티를 저장하였으나, 이는 불가능하다는 것을 알게 되었다.
-> A 를 팔로우하는 유저가 10만 명이라면,
A 가 게시글을 작성할 때마다 10만 명의 팔로워들에게 1개씩 알림 엔티티를 만들어, followerId 를 키로 레디스에 저장
즉 팔로워 개수만큼 데이터가 저장되므로, 부하가 너무 크다. 따라서 게시글을 기준으로 알림 엔티티를 만들어 Redis 에 저장하였다.
전체 알림 가져오는 함수
public List<Notification> getUndeliveredNotifications(Long userId) {
List<Long> followingIds = followService.getFollowees(userId).stream()
.map(UserInfoResponse::getUserId)
.collect(Collectors.toList());
List<Notification> notifications = new ArrayList<>();
long oneWeekAgo = System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000L;
// Sorted Set에서 1주일 이내의 값을 가져오기
Set<Object> notificationJsons = redisTemplate.opsForZSet().rangeByScore("notifications", oneWeekAgo, Double.MAX_VALUE);
ObjectMapper objectMapper = new ObjectMapper();
for (Object notificationJson : notificationJsons) {
try {
Notification notification = objectMapper.readValue(notificationJson.toString(), Notification.class);
if (notification != null && followingIds.contains(notification.getWriterId())) {
notifications.add(notification);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
return notifications;
}
사용자가 Sse 연결을 수립하면, 접속하지 않은 동안 쌓인 알림을 가져와야 한다.
sorted_set을 통해 손쉽게 timestamp 를 기준으로 1주일 ~ 현재 까지의 알림을 가져온다.
단, 로그인 한 사용자가 팔로우 한 사람들 것만을 가져와야 하므로 followingIds 에 writerId 가 포함되는 값들만 가져와야 한다.
TODO : 전체를 불러올 때 SCAN 을 사용해 성능을 개선해야 한다.
-> 레디스의 rangeByScore() 는 O(N) 의 시간복잡도를 가지는데, O(N) 연산은 쓰지 않는게 좋다. 데이터가 많아지면 서버가 터져버릴 수도 있어서...
현재 Sse 연결을 맺고 있는 유저들에게, 알림 전송하는 메서드
/**
* user 의 팔로워들에게, Post 알림 전송 */
public void notifyFollowersBySse(User user, Post post) {
followService.getFollowers(user.getId()).forEach(follower -> {
SseEmitter emitter = clients.get(follower.getUserId());
if (emitter != null) {
try {
Map<String, Object> data = new HashMap<>();
data.put("postId", post.getId());
data.put("message", user.getNickname() + "님이 새로운 게시글을 작성했습니다.");
emitter.send(SseEmitter.event().name("newPost").data(data, MediaType.APPLICATION_JSON));
} catch (IOException e) {
clients.remove(follower.getUserId());
}
}
});
}
현재 Sse 연결을 맺고 있는 유저들에게 알림을 전송한다.
user 가 게시글을 작성하면 user.getFollowers() 로 팔로워 리스트를 가져오게 된다.
해당 리스트를 돌며 Sse 연결을 맺고 있는 사용자에게 알림을 보낸다.
추가 사항
유저가 게시글을 작성하면, 해당 게시글 (post_id) 알림 엔티티가 Redis 에 저장된다.
PostService - savePost() 의 일부
postRepository.save(post);
notificationService.saveNotificationToRedis(post.getId(), user.getNickname() + "님이 새 게시글을 작성했습니다.", user.getId());
notificationService.notifyFollowersBySse(user, post);
- Post 엔티티를 저장하고, Notification 엔티티를 Redis 에 저장한다.
- 게시글 작성 이벤트가 발생했으므로, Sse 연결중인 팔로워들에게 알린다
시퀀스 다이어그램
로직을 처음 짤 때 복잡해서 헤맸었는데,
시퀀스 다이어그램으로 간략하게 전체 흐름을 정리한 후 흐름을 이해하고 코드를 더 쉽게 짤 수 있었다.
결론
Sse 를 사용해 알림 기능을 만들어보았다.
Sse 구현 자체는 어렵지 않았고, 1주일 치의 알림을 어떻게 저장하고 유저들에게 뿌려줄까에 대한 부분이 난이도가 있었던 것 같다.
현재는 최적화가 제대로 이뤄지지 않은 것 같아서 Redis 에서 알림을 가져올 때 성능을 개선하고, 서버에서 유저의 Follower 리스트를 가져오고, for 문을 돌면서 데이터를 전송하는 부분도 성능 개선이 필요할 것 같다.
'프로젝트' 카테고리의 다른 글
[프로젝트] Pagination 으로 피드 랜덤조회 개선 (0) | 2024.08.08 |
---|---|
[프로젝트] 레디스 Pub/Sub, RabbitMQ 도입으로 구조 개선 (0) | 2024.06.18 |
[쿠폰] 서버 모니터링 및 부하테스트 (0) | 2024.05.12 |
레디스 쿼리로 서버 RPS 개선 (0) | 2024.04.29 |
[쿠폰] Redis 기반 비동기 시스템 구현 (0) | 2024.04.21 |