[프로젝트] 레디스 Pub/Sub, RabbitMQ 도입으로 구조 개선

2024. 6. 18. 19:33프로젝트

노닥 노닥 프로젝트 개발 중에,

  • 게시글 작성 시, 작성한 사람의 팔로워들에게 실시간으로 알림 전송
  • 누가 자신을 팔로우하면, 알림 전송
  • 일주일 치 알림 정보를 저장하고 있어야 함

위 알림 기능을 SSE(Sever Sent Events), Redis 로 구현했었다.

 

관련 포스팅 : https://matt1235.tistory.com/80

 

클라이언트가 서버에게 subscribe 요청을 보내 SSE 연결을 맺은 후에, 이벤트가 발생 시 서버 -> 클라이언트로

메시지를 실시간으로 보내주었다.

 

 

SSE 연결, 알림 시퀀스 다이어그램

 

위 그림을 보면 Redis sorted_set 에서 1주일 치 게시글 가져오기 부분이 있다.

해당 기능은 timestamp 로 score 를 부여했기에 rangeByScore 쿼리로 현재 ~  과거 1주일치의 알림 정보를 가져온다.

 

해당 Notification 집합에서 내가 팔로우하고 있는 사람들만이 작성한 것을 골라서, 반환한다.

 

알림 엔티티

@Getter
@Setter
@NoArgsConstructor
public class Notification implements Serializable {

    private Long postId;
    private String message;
    private Long timestamp;
    private Long writerId;

    public Notification(Long postId, String message, Long writerId) {
        this.postId = postId;
        this.message = message;
        this.timestamp = System.currentTimeMillis();
        this.writerId = writerId;
    }
}

 

 

Reids에 저장할 때는 객체를 직렬화 해서

문자열이나 바이트 배열로 변환해야 저장할 수 있으므로, Serializable 인터페이스를 구현하였다.

 

 

알림 서비스 

- getUndeliveredNotifications() 

public List<Notification> getUndeliveredNotifications(Long userId) {
    List<Long> followingIds = followService.getFollowees(userId).stream()
            .map(UserInfoResponse::getUserId)
            .collect(Collectors.toList());

    List<Notification> notifications = new ArrayList<>();
    long oneWeekAgo = System.currentTimeMillis() - 7 * 24 * 60 * 60 * 1000L;

    // Sorted Set에서 1주일 이내의 값을 가져오기
    Set<Object> notificationJsons = redisTemplate.opsForZSet().rangeByScore("notifications", oneWeekAgo, Double.MAX_VALUE);

    ObjectMapper objectMapper = new ObjectMapper();
    for (Object notificationJson : notificationJsons) {
        try {
            Notification notification = objectMapper.readValue(notificationJson.toString(), Notification.class);
            if (notification != null && followingIds.contains(notification.getWriterId())) {
                notifications.add(notification);
            }
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    return notifications;
}

 

1주일 치의 알림 정보를 가져오는 메서드이다.

 

LIst<Long> followingIds

  • 현재 로그인 해서 sse 연결을 맺은 유저가(userId), 팔로우 하고 있는 사람들의 id 목록이다.

Set<NotificationJsons>

  • redisTemplate 에서 sorted_set에서 1주일 치의 전체 알림을 가져온다. timestamp 로 점수를 부여했기 때문에 rangeByScore 로 쉽게 일주일 치를 가져올 수 있음.
  • for 문을 돌며 각 Notification 객체를 쓴 사람(writerId)이 내가 팔로우 하는 사람들 목록에 있는지 확인한다.

위 조건들을 만족하는 Notification(알림) 리스트를 반환한다.

 

해당 메서드는 처음 SSE 연결한 후, 클라이언트에게 1주일 치의 알림 전송을 제공하기 위한 것이다.

이후 팔로우 하는 사람이 게시글을 작성하면, 실시간으로 알림이 가게 된다.

 


 

현재 일주일 치  알림을 가져오는 getUndeliveredNotifications() 메서드를 보자.

// Sorted Set에서 1주일 이내의 값을 가져오기
Set<Object> notificationJsons = redisTemplate.opsForZSet().rangeByScore("notifications", oneWeekAgo, Double.MAX_VALUE);

 

  • rangeByScore() 의 시간 복잡도 : O(logN + M)

sorted_set 에서 사용 가능한 범위 쿼리인 rangeByScore 연산은 N 의 시간복잡도를 가지고 있다.

N은 집합의 크기, M은 반한되는 요소의 수

 

"notifications"는 전체 알림이므로 개수가 많아지면 속도가 느려질 것이다.

 

 

또 다음 부분을 살펴보면,

for (Object notificationJson : notificationJsons) {
    try {
        Notification notification = objectMapper.readValue(notificationJson.toString(), Notification.class);
        if (notification != null && followingIds.contains(notification.getWriterId())) {
            notifications.add(notification);
        }
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
}

 

followingIds.contains(notification.getWriterId()) 

  • followingIds 는 List 이고 contains 는 모든 요소를 순회하므로 이 역시 O(N) 이다.

즉 O(N) 이 2번 수행되는 것인데 O(N^2) 의 시간복잡도를 가진다. 따라서 반드시 구조 개선이 필요하다.

 


 

수정 전 - 모놀리식 구조

 

수정 후 - 비동기로 구성

 

위와 같이 구성한 이유

  • 원래는 BE 서버가 모든 것을 처리하는 구조였다. 하지만 코드를 보면 알 수 있듯 반복문도 많고 작업이 헤비해, 비동기로 구성하는 게 낫다고 판단하였음
  • 누군가 자신을 팔로우하거나, 게시글을 작성했다는 알림이 조금 늦게 왔다고 해도 큰일이 발생하지는 않기 때문이기도 하다.

이제 백엔드 서버는 메시지 브로커(Rabbit MQ)에 요청을 던지기만 하면 된다! (성공 여부를 신경 쓸 필요가 없음)

 

 

아래 시퀀스 다이어그램을 보면 더 쉽게 이해 가능한데

백엔드 입장에서는 요청을 받아 DB에 Post 를 저장하고, 브로커에 던지면 끝!

 

 

그 후부터는 컨슈머에서 해당 메시지를 읽은 후에 처리해주면 된다. 컨슈머 서버에서는 메시지를 읽어 팔로워 목록을 가져오고, 

레디스에 알림 데이터를 저장하고 post 채널에 메시지를 Publish 한다.

 

백엔드 서버에서는 해당 채널을 구독하고 있는데, 데이터가 들어오면 SSE 로 뿌려주면 끝이다!

BE 서버 입장에서는 부하가 훨씬 줄었다.


 

코드 구현 : 백엔드 서버

  • 코드가 길어, 핵심 부분만 살펴보자.

 

PostService

@Transactional
@IncreaseUserHistory(incrementValue = 2)
public void savePost(Long userId, PostRequest request) {
    User user = findUserById(userId);
    Category category = findCategoryByTitle(request.getChannel());

    Post post = createPost(user, category, request);
    Vote vote = createVote(post, request.getVoteTitle());

    request.getVoteOptionContent().entrySet().stream()
            .map(e -> createVoteOption(e.getKey(), e.getValue(), vote))
            .toList();

    postRepository.save(post);
    rabbitTemplate.convertAndSend("post-exchange", "post.created", new PostEvent(post.getId(), user));
}

 

  • rabbitTemplage.convertAndSend() : 간편하게 템플릿으로 브로커에 메시지 던짐. 끝

 

 

RedisConfig (일부분)

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                        @Qualifier("postListenerAdapter") MessageListenerAdapter postListenerAdapter,
                                        @Qualifier("followListenerAdapter") MessageListenerAdapter followListenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(postListenerAdapter, new PatternTopic("postEvent-channel"));
    container.addMessageListener(followListenerAdapter, new PatternTopic("followEvent-channel"));
    return container;
}

@Bean(name = "postListenerAdapter")
MessageListenerAdapter postListenerAdapter(RedisMessageSubscriber subscriber) {
    return new MessageListenerAdapter(subscriber, "onMessage");
}

@Bean(name = "followListenerAdapter")
MessageListenerAdapter followListenerAdapter(RedisMessageSubscriber subscriber) {
    return new MessageListenerAdapter(subscriber, "onMessage");
}

 

  • 아래와 같이 postEvent-channel, followEvent-channel 을 구독하고 있으며 각 채널에 메시지가 들어왔을 때, 로직을 수행하는 ListenerAdapter를 지정해줄 수 있다.
  • 현재는 follow, post 발생 시 모두 onMessage()가 호출되도록 지정함

 

 

RedisMessageSubScriber

@Component
@RequiredArgsConstructor
public class RedisMessageSubscriber implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 메시지 채널을 확인하여 적절한 메서드 호출
        String channel = new String(pattern);
        String messageBody = new String(message.getBody());
        System.out.println("Received messageBody: " + messageBody);

        if ("postEvent-channel".equals(channel)) {
            onPostMessage(messageBody);
        } else if ("followEvent-channel".equals(channel)) {
            onFollowMessage(messageBody);
        }
    }
 }
  • postEvent 나 followEvent 에 따라 다른 로직을 수행하도록 설정
  • 레디스 Pub/Sub 에서 Subscriber 역할을 하는 클래스이다.

 

    public void onPostMessage(String messageBody) {
        try {
            if (messageBody.startsWith("\"") && messageBody.endsWith("\"")) {
                messageBody = messageBody.substring(1, messageBody.length() - 1).replace("\\\"", "\"");
            }
            PostEvent postEvent = objectMapper.readValue(messageBody, PostEvent.class);
            notifyClients(postEvent);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @Async
    public void notifyClients(PostEvent postEvent) {
        clients.forEach((userId, emitter) -> {
            if (isFollowing(userId, postEvent.getWriterId())) {
                try {
                    Notification notification = new Notification(postEvent);
                    emitter.send(SseEmitter.event().name("notification").data(notification, MediaType.APPLICATION_JSON));
                } catch (IOException e) {
                    clients.remove(userId);
                }
            }
        });
    }
  • message 를 읽은 후, objectMapper로 String 인 메시지를, PostEvent로 역직렬화한다.
  • 그 다음에는 현재 SSE 연결되어 있는 유저 중, 팔로워들에게 실시간으로 알림을 전송한다.

 

@Async 사용 이유

nofifyClients 메서드는 여러 클라이언트에게 알림을 전송한다. 만약 동기적으로 처리하면 각 클라이언트에게 알림을 보내는 시간이 길어질 수 있다. 그럼 전체 시스템 응답 시간이 느려지기 때문에, 비동기적으로 처리하였다.

메서드를 호출한 스레드는 대기하지 않고, 즉시 다음 작업을 수행한다. 또 스레드 풀에서 스레드를 할당받아 작업을 처리하기 때문에, 동시에 작업을 처리하게 되어 응답 속도도 더 빠를 수 있다.


 

코드 구현 : 컨슈머 서버

컨슈머 서버에도 RedisConfig, RabbitConfig 로 각각 레디스, 래빗엠큐 설정을 해주어야 한다. (생략)

Post가 일어났을 때랑 Follow가 일어났을 때, 2가지의 경우에 대한 Consumer 가 있다.

 

RedisConfig 클래스의 일부

@Configuration
@EnableRabbit
public class RabbitMQConfig {

    static final String postExchangeName = "post-exchange";
    static final String followExchangeName = "follow-exchange";

    public static final String postQueueName = "post-queue";
    public static final String followQueueName = "follow-queue";
    
    @Bean
    Binding postBinding(Queue postQueue, TopicExchange postExchange) {
        return BindingBuilder.bind(postQueue).to(postExchange).with("post.#");
    }

    @Bean
    Binding followBinding(Queue followQueue, TopicExchange followExchange) {
        return BindingBuilder.bind(followQueue).to(followExchange).with("follow.#");
    }

	/**
		중략
	*/
    
    @Bean
    SimpleMessageListenerContainer postContainer(ConnectionFactory connectionFactory, MessageListenerAdapter postListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(postQueueName);
        container.setMessageListener(postListenerAdapter);
        return container;
    }
    
    @Bean
    MessageListenerAdapter postListenerAdapter(PostConsumer consumer, Jackson2JsonMessageConverter converter) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(consumer, "receivePostMessage");
        adapter.setMessageConverter(converter);
        return adapter;
    }
}

 

  • routingKey 라는게 있는데, 백엔드 서버에서 이 key를 통해 메시지 큐를 라우팅해준다. 즉 백엔드 서버에서 데이터를 브로커에 전달할 때, Key값을 넣어 전달하면 키에 따라 exchange -> queue 로 매핑되는 것이다.
  • postContainer 빈을 보면 setQueueNames(postQueueNames) 를 통해 post 큐에 대한 설정을 할 수 있다. 이 경우 postListenerAdapter를 지정해주었고, 어댑터는 'receivePostMessage' 메서드가 호출되도록 지정해주었다.

 

 

PostConumer 클래스

@Component
@RequiredArgsConstructor
public class PostConsumer {

    private final RedisTemplate<String, Object> redisTemplate;
    private final FollowRepository followRepository;
    private final ObjectMapper objectMapper;

    @RabbitListener(queues = RabbitMQConfig.postQueueName)
    public void receivePostMessage(PostEvent postEvent) {
        Set<Long> followerIds = followRepository.getFollowersByUserId(postEvent.getWriterId());
        sendPostEventToFollowers(postEvent, followerIds);
    }

    @Async
    public void sendPostEventToFollowers(PostEvent postEvent, Set<Long> followerIds) {
        for (Long followerId : followerIds) {
            String key = "notification:" + followerId;
            try {
                redisTemplate.opsForZSet().add(key, objectMapper.writeValueAsString(new Notification(postEvent)), postEvent.getTimestamp());
                redisTemplate.convertAndSend("postEvent-channel", new ObjectMapper().writeValueAsString(postEvent));
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

 

  • @RabbitListener를 통해 post 큐에 대한 처리를 수행하는 메서드를 정의했다.
  • post 큐에서 메시지를 읽어온 후 해당 게시글을 작성한 사용자의 팔로워들의 Id 를 가져온다.
  • 팔로워마다 notification:{userId} 키로 데이터를 Redis에 저장, postEvent-channel 에 데이터를 발행(Publish) 한다.

발행된 메시지는 앞서 설명했듯, 해당 채널을 구독한 백엔드 서버에서 로직이 실행된다.

(Follow 로직도 거의 동일하다.)


 

RabbitMQ 를 선택한 이유

앞서 말했듯 백엔드 서버 하나로는 알림을 저장하고, 팔로워들을 추출하여 지정된 사람들에게 SSE를 통해 알림을 뿌리는 것은 부하를 많이 유발한다. 따라서 중간에 메시지 브로커를 두어 비동기적으로 위 로직을 구성했다.

 

Kafka 처럼 높은 처리량을 요구 하지는 않고 컨슈머 서버에 메시지 전달이 주가 되었기에, RabbitMQ 를 선택했다.

 

 

결론

처음으로 메시지 브로커와 Redis Pub/Sub 을 사용해 보았다. 처음에는 SSE 로만 알림 기능을 구현하려고 했었으나 부족한 부분이 거슬렸고, 메시지 브로커를 도입해 msa 방식으로 알림 처리 로직을 구현했다.

설명만 들었을 때보다 직접 손으로 부족한 부분을 보완하다 보니, 메시지 브로커를 왜 써야 하는지, 그리고 비동기 방식은 필요한지에 대한 깊은 이해가 가능했다.