GitHub - lsh2613/spring-rabbitmq: Spring + RabbitMQ를 활용한 1:1 채팅방 구현
Spring + RabbitMQ를 활용한 1:1 채팅방 구현. Contribute to lsh2613/spring-rabbitmq development by creating an account on GitHub.
github.com
1. 읽음/안 읽음
개발자마다 부르는 말이 다른 것 같다. 하지만 둘 다 구현하고자 하는 바는 메시지에 대해 읽었는지, 안 읽었는지를 알 수 있게 해주는 지표를 구현하고자 하는 것이다.
크게 다음과 같은 기능을 의미한다
1. 나의 채팅방에서 읽지 않은 메시지의 개수 표시
2. 채팅방 내에서 메시지가 다른 참가자들에 의해 얼마나 읽혔는지(혹은 안 읽혔는지)를 표시
2. 설계
위 기능을 구현하기 위해 생각한 설계 방법이다
2.1. 기능 구현 설계 후보
2.1.1. Read-Message 테이블 관리
누가 어떤 메시지를 읽었는지에 대해 테이블을 관리한다
- 1번 기능 - 내가 채팅방을 불러올 때 내가 읽지 않은 메시지를 카운팅 하여 구현할 수 있다.
- 2번 기능 - 채팅방 참가자 - 읽은 횟수를 통해 구현할 수 있다.
2.1.2. LastEntryTime 관리
마지막으로 채팅방에 접속한 시간 관리한다
- 1번 기능
- 채팅방을 불러올 때 내가 해당 채팅방에 접속한 마지막 시간 이후에 생성된 메시지 중 내가 작성하지 않은 메시지를 카운팅 하여 구현할 수 있다.
- 즉, 누가 메시지를 작성했는지도 알아야 한다.
- 2번 기능
- 채팅방 전체 참가자 - 실시간으로 채팅방에 접속해 있는 인원 - (각 메시지 생성 시간보다 이후에 접속한 참가자의 수)
2.1.3. 선택
2.1 설계 방법에 경우 1:1 채팅방에서는 오버헤드가 크지 않을 수 있지만 단체 톡방을 생각해 보자
N명의 채팅방 참가자가 존재한다고 했을 때 모든 참가자가 단 하나의 메시지를 읽어도 최대 N개의 Read-Message 레코드가 생성된다
즉, 하나의 메시지마다 읽은 유저의 정보를 담기 때문에, 메시지가 쌓일수록 데이터의 양은 더욱 커지게 되고 이로 인해 메모리 사용량은 증가하고 메시지 처리 속도가 느려질 것이다
또한, 사용자가 채팅방에 들어가지 않은 동안 M개의 메시지가 쌓여있다고 가정해 보자. 이때 사용자가 채팅방에 접속하면 읽지 않은 메시지 M개에 대해 Read-Message 테이블로 Insert 쿼리를 요청할 것이다.
간단하게 극적으로 생각했을 때, N명이 M개의 메시지에 대해 읽기 요청이 들어오면 N*M개의 Insert 쿼리 요청을 처리해야 할 것이다.
물론 1:1 채팅방을 구현하고 있지만 단체 톡방으로 개선될 상황을 고려하여 더 효율적인 방법인 2.2 LastEntryTime을 관리하는 방법을 채택하였다
2.2 읽음/안 읽음 싱크 맞추기
위에서 언급한 설계를 통해 unreadCnt는 구현할 수 있겠지만 싱크가 맞지 않는 문제가 발생할 수 있다.
A-online, B-offline인 경우 A가 message_01, message_02 메시지를 발행했다고 가정하자. 이때는 A만 읽었으니까 unreadCnt = 1이다.
B가 접속하게 되면 message_01, message_02는 unreadCnt가 0이지만 A는 1인 값을 출력하고 있다.
즉, B가 접속했을 때 B가 읽지 않은 메시지가 존재하고, B와 같은 채팅방에 존재하는 참가자 중 온라인인 유저가 존재한다면 해당 채팅방으로 싱크 요청 메시지를 보내야 한다.
따라서 해당 프로젝트에서는 사용자가 전송하는 사용자 메시지 타입과, 메시지 싱크를 요청하는 싱크 요청 메시지 타입이 존재한다
2.3 Redis를 통한 채팅방 접속 인원 관리
현재 채팅방에 접속해있는 인원은 영구 db가 아닌 메모리 기반의 redis를 사용한다.
채팅방에 접속 상태를 실시간으로 관리해야 업데이트가 빈번히 일어나며 짧은 데이터 수명을 가지고 있다. 이러한 이유로 메모리 기반 db인 redis를 활용하면 속도와 효율성의 장점을 가져갈 수 있다.
redis-set 자료구조를 활용하여 chatRoomId -> Set:{memberId_01, memberId02, ...}로 현재 접속 인원을 관리할 예정이다.
즉, online/offline 을 관리하기 위해선 CONNECT:접속, DISCONNECT:퇴장으로 STOMP의 COMMAND를 구분하여 사용자가 접속해 있는 chat-room-id에서 현재 사용자를 제거해야 한다.
하지만 STOMP의 CONNECT는 header를 추가할 수 있지만, DISCONNECT는 header를 추가할 수 없다. 따라서 CONNECT 시 header에 chat-room-id, member-id를 받아와서 stomp-session에 저장해두고 DISCONNECT 시 세션에서 꺼내서 사용하기로 하였다
3. 구현
redis 설정을 위해 docker-compose를 사용하였고 이전 게시글에서 설정한 mongodb도 추가하였다
docker-compose.yml
version: '3.8'
services:
mongodb:
image: mongo:latest
container_name: mongodb
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: 1234
MONGO_INITDB_DATABASE: message_history
ports:
- "27017:27017"
networks:
- my_network
redis:
image: redis:alpine
container_name: redis
hostname: redis
ports:
- "6379:6379"
networks:
- my_network
networks:
my_network:
RedisConfig
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String redisHost;
@Value("${spring.data.redis.port}")
private int redisPort;
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(redisHost, redisPort);
}
// jwt -> memberId
@Bean
public RedisTemplate<String, Long> StringLongRedisTemplate() {
RedisTemplate<String, Long> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Long.class));
return redisTemplate;
}
// chatRoomId -> SET {memberId_01, memberId_02, ...}
@Bean
public RedisTemplate<Long, Long> LongLongRedisTemplate() {
RedisTemplate<Long, Long> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new Jackson2JsonRedisSerializer<>(Long.class));
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Long.class));
return redisTemplate;
}
}
먼저 LastEntryTime은 각 채티방 참가자마다 존재하기 때문에 ChatRoomMember 엔티티에 컬럼으로 추가하여 관리하였다
ChatRoomMember
public class ChatRoomMember {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "chatRoomId", nullable = false)
private ChatRoom chatRoom;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "memberId", nullable = false)
private Member member;
private LocalDateTime lastEntryTime;
public ChatRoomMember(ChatRoom chatRoom, Member member) {
this.chatRoom = chatRoom;
this.member = member;
this.lastEntryTime = LocalDateTime.now();
}
public void updateLastEntryTime() {
this.lastEntryTime = LocalDateTime.now();
}
}
이제 현재 채팅방 접속 인원을 관리하기 위한 기능을 구현해 보자
RedisUtil
@RequiredArgsConstructor
@Component
public class RedisChatUtil {
private final RedisTemplate<Long, Long> chatRoom2Members;
public void addChatRoom2Member(Long chatRoomId, Long memberId) {
SetOperations<Long, Long> ops = chatRoom2Members.opsForSet();
ops.add(chatRoomId, memberId);
}
public Set<Long> getOnlineMembers(Long chatRoomId) {
SetOperations<Long, Long> ops = chatRoom2Members.opsForSet();
return ops.members(chatRoomId);
}
public int getOnlineMemberCntInChatRoom(Long chatRoomId) {
SetOperations<Long, Long> ops = chatRoom2Members.opsForSet();
return ops.members(chatRoomId).size();
}
public void removeChatRoom2Member(Long chatRoomId, Long memberId) {
SetOperations<Long, Long> ops = chatRoom2Members.opsForSet();
ops.remove(chatRoomId, memberId);
}
}
STOMP-CONNECT, DISCONNECT는 ChannelInterceptor를 통해서 잡을 수도 있지만 EventListner를 통해 간단하게 구현할 수 있다
ChatMessageController
@EventListener
public void handleWebSocketConnectListener(SessionConnectEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
chatMessageService.handleConnectMessage(accessor);
}
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
chatMessageService.handleDisconnectMessage(accessor);
}
STOMP의 header, session을 다루는 코드들이 중복되기에 util 클래스를 만들어 활용하였다
StompHeaderAccessorUtil
@Component
public class StompHeaderAccessorUtil {
static final String CHAT_ROOM_ID = "chat-room-id";
static final String MEMBER_ID = "member-id";
public void setMemberIdInSession(StompHeaderAccessor accessor, Long memberId) {
accessor.getSessionAttributes().put(MEMBER_ID, memberId);
}
public Long getMemberIdInSession(StompHeaderAccessor accessor) {
return Optional.ofNullable((Long) accessor.getSessionAttributes().get(MEMBER_ID))
.orElseThrow(() -> new RuntimeException("Stomp header session에 memberId가 존재하지 않습니다"));
}
public Long removeMemberIdInSession(StompHeaderAccessor accessor) {
return Optional.ofNullable((Long) accessor.getSessionAttributes().remove(MEMBER_ID))
.orElseThrow(() -> new RuntimeException("Stomp header session에 memberId가 존재하지 않습니다"));
}
public Long getChatRoomIdInHeader(StompHeaderAccessor accessor) {
return Optional.ofNullable(accessor.getFirstNativeHeader(CHAT_ROOM_ID))
.map(Long::valueOf)
.orElseThrow(() -> new RuntimeException("Stomp header에 chat-room-id 존재하지 않습니다"));
}
public void setChatRoomIdInSession(StompHeaderAccessor accessor, Long chatRoomId) {
accessor.getSessionAttributes().put(CHAT_ROOM_ID, chatRoomId);
}
public Long getChatRoomIdInSession(StompHeaderAccessor accessor) {
return Optional.ofNullable((Long) accessor.getSessionAttributes().get(CHAT_ROOM_ID))
.orElseThrow(() -> new RuntimeException("Stomp header session에 chat-room-id가 존재하지 않습니다"));
}
public Long removeChatRoomIdInSession(StompHeaderAccessor accessor) {
return Optional.ofNullable((Long) accessor.getSessionAttributes().remove(CHAT_ROOM_ID))
.orElseThrow(() -> new RuntimeException("Stomp header session에 chat-room-id가 존재하지 않습니다"));
}
}
ChatMessageService-handleConnectMessage()
public void handleConnectMessage(StompHeaderAccessor accessor) {
Long memberId = stompHeaderAccessorUtil.getMemberIdInSession(accessor);
Member member = entityFacade.getMember(memberId);
Long chatRoomId = stompHeaderAccessorUtil.getChatRoomIdInSession(accessor);
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
enterChatRoom(chatRoom.getId(), member.getId());
readUnreadMessages(chatRoom, member.getId());
}
private void enterChatRoom(Long chatRoomId, Long memberId) {
redisChatUtil.addChatRoom2Member(chatRoomId, memberId);
}
private void readUnreadMessages(ChatRoom chatRoom, Long memberId) {
ChatRoomMember chatRoomMember = chatRoom.getChatRoomMember(memberId);
LocalDateTime lastEntryTime = chatRoomMember.getLastEntryTime();
boolean existsUnreadMessage = chatMessageRepository.existsByChatRoomIdAndCreatedAtAfter(chatRoom.getId(), lastEntryTime);
boolean existsOnlineChatRoomMember = redisChatUtil.getOnlineMemberCntInChatRoom(chatRoom.getId()) > 1; // 1은 본인
if (existsUnreadMessage && existsOnlineChatRoomMember)
sendChatSyncRequestMessage(chatRoom.getId());
}
private void sendChatSyncRequestMessage(Long chatRoomId) {
MessageRes messageRes = ChatSyncRequestRes.createRes();
rabbitTemplate.convertAndSend(ROUTING_KEY_PREFIX + chatRoomId, messageRes);
}
동작 과정
1. 헤더에서 memberId, chatRoomId를 꺼내 Entity 존재 여부 확인
2. 채팅방 입장
2.1. 세션에 memberId, chatRoomId를 저장
2.3. redis에 해당 chatRoomId에 memberId가 온라인이라는 것을 저장
3. 나의 마지막 접속 시간 이후에 작성된 메시지가 존재하고, 나 이외에 다른 채팅방 참가자가 존재하는지 체크
4. 3번이 해당되면 해당 채팅 방으로 메시지 싱크 요청을 보냄
DISCONNECT는 비교적 간단하다. 위에서 설정한 값들을 모두 이전 상태로 되돌려주고 마지막 접속 시간을 업데이트 해주면 된다.
ChatMessageService-handleDisconnectMessage()
public void handleDisconnectMessage(StompHeaderAccessor accessor) {
Long memberId = stompHeaderAccessorUtil.removeMemberIdInSession(accessor);
Member member = entityFacade.getMember(memberId);
Long chatRoomId = stompHeaderAccessorUtil.removeChatRoomIdInSession(accessor);
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
ChatRoomMember chatRoomMember = chatRoom.getChatRoomMember(member.getId());
chatRoomMember.updateLastEntryTime();
exitChatRoom(chatRoom, member);
}
private void exitChatRoom(ChatRoom chatRoom, Member member) {
redisChatUtil.removeChatRoom2Member(chatRoom.getId(), member.getId());
}
이제 메시지를 보낼 때도 unreadCnt를 계산해서 보내줘야 한다.
ChatMessageService-sendMessage()
public void sendMessage(StompHeaderAccessor accessor, ChatMessageReq req) {
Long memberId = stompHeaderAccessorUtil.getMemberIdInSession(accessor);
Member member = entityFacade.getMember(memberId);
Long chatRoomId = stompHeaderAccessorUtil.getChatRoomIdInSession(accessor);
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
ChatMessage chatMessage = req.createChatMessage(chatRoom.getId(), member.getId());
chatMessageRepository.save(chatMessage);
int onlineMemberCnt = redisChatUtil.getOnlineMemberCntInChatRoom(chatRoom.getId());
int unreadCnt = chatRoom.getChatRoomMemberCnt() - onlineMemberCnt;
MessageRes messageRes = ChatMessageRes.createRes(chatMessage, unreadCnt);
rabbitTemplate.convertAndSend(ROUTING_KEY_PREFIX + chatRoom.getId(), messageRes);
}
메시지 저장 및 메시지 발행
unreadCnt = 채팅방 전체 참가자 -현재 채팅방에 접속 중인 인원
여기까지가 읽음/안 읽음을 적용한 메시지를 주고 받는 기능에 대한 구현 코드이다.
이제는 이 글의 서두에서 다뤘던 메시지 조회에서 다루는 기능들에 대한 설명이다.
1. 나의 채팅방에서 읽지 않은 메시지의 개수 표시
2. 채팅방 내에서 메시지가 다른 참가자들에 의해 얼마나 읽혔는지(혹은 안 읽혔는지)를 표시
먼저 1번 기능을 구현하기 위해 우리는 내가 읽지 않은 메시지의 개수(unreadMessageCnt)와, 마지막 메시지를 조회해야 한다.
내가 읽지 않은 메시지는 나의 마지막 접속 이후에 생성된 메시지를 카운팅하여 구하고 마지막 메시지는 해당 채팅방에 생성날짜가 가장 최신인 메시지 하나만 조회하여 구할 수 있다.
// ChatRoomController
@GetMapping("/chat-rooms")
public ResponseEntity getChatRooms(@RequestParam Long loginId) {
return ResponseEntity.ok(chatRoomService.getChatRooms(loginId));
}
// ChatRoomService
@Override
public List<ChatRoomRes> getChatRooms(Long memberId) {
Member member = entityFacade.getMember(memberId);
List<ChatRoomMember> chatRoomMembers = chatRoomMemberRepository.findAllByMemberId(member.getId());
List<ChatRoomRes> chatRoomResList = chatRoomMembers.stream().map(chatroomMember -> {
ChatRoom chatRoom = chatroomMember.getChatRoom();
Optional<ChatMessage> lastMessage = chatMessageRepository.findTopByChatRoomIdOrderByCreatedAtDesc(chatRoom.getId());
int unreadMessageCnt = chatMessageRepository.countByChatRoomIdAndCreatedAtAfter(chatRoom.getId(), chatroomMember.getLastEntryTime());
return ChatRoomRes.createRes(chatRoom.getId(), member.getUsername(), unreadMessageCnt, lastMessage);
})
.toList();
return chatRoomResList;
}
2번 기능은 채팅방 전체 참가자 - 실시간으로 채팅방에 접속해 있는 인원 - (각 메시지 생성 시간보다 이후에 접속한 참가자의 수)를 통해 unreadCnt를 구할 수 있다.
여기서 중요한 점은 '채팅방에 접속 중인 인원'은 '각 메시지 생성 시간보다 이후에 접속한 참가자의 수'를 구할 때 제외되어야 한다.
제외되지 않는다면 마이너스 연산이 두 번 적용될 것이다
// ChatMessageController
@GetMapping("/chat-messages/chat-rooms/{chatRoomId}")
public ResponseEntity getChatMessages(@PathVariable Long chatRoomId) {
List<MessageRes> chatMessageResList = chatMessageService.getChatMessages(chatRoomId);
return ResponseEntity.ok(chatMessageResList);
}
// ChatMessageService
public List<MessageRes> getChatMessages(Long chatRoomId) {
ChatRoom chatRoom = entityFacade.getChatRoom(chatRoomId);
Set<Long> onlineMembersInChatRoom = redisChatUtil.getOnlineMembers(chatRoomId);
List<ChatMessage> chatMessages = chatMessageRepository.findByChatRoomIdOrderByCreatedAtAsc(chatRoom.getId());
List<MessageRes> messageResList = chatMessages.stream()
.map(chatMessage -> {
int unreadCnt = chatRoom.getUnreadCnt(onlineMembersInChatRoom, chatMessage.getCreatedAt());
return ChatMessageRes.createRes(chatMessage, unreadCnt);
})
.toList();
return messageResList;
}
// ChatRoom
public int getUnreadCnt(Set<Long> onlineMembers, LocalDateTime messageCreatedAt) {
List<LocalDateTime> lastEntryTimes = getLastEntryTimesExcludingOnlineMembers(onlineMembers);
int unreadMemberCount = (int) lastEntryTimes.stream()
.filter(time -> time.isAfter(messageCreatedAt))
.count();
return getChatRoomMemberCnt() - onlineMembers.size() - unreadMemberCount;
}
private List<LocalDateTime> getLastEntryTimesExcludingOnlineMembers(Set<Long> onlineMemberIds) {
return chatRoomMembers.stream()
.filter(chatRoomMember -> !onlineMemberIds.contains(chatRoomMember.getMember().getId()))
.map(ChatRoomMember::getLastEntryTime)
.toList();
}
4. 테스트
테스트를 진행하기 위해 chatRoom, Member, ChatRoomMeber를 생성하였고 테스트에선 2명의 채팅 참가자를 설정했다
chatRoomId=1에 대해 memberId=1로 stomp-connect를 시도하면 채팅방 입장 로직으로 인해 chatRoomId=1에 memberId=1이 접속 중임을 확인할 수 있다
이때 메시지를 발행하면 memberId=1만 접속해있기 때문에 unreadCnt=1로 출력되는 것을 확인해볼 수 있다
member=2가 채팅방에 접속한다면 접속 중인 참가자 member=1,2로 총 두 명이 존재하게 되고, member=1에 대해 메시지 싱크 요청을 보낸다
이제 다시 메시지를 보내면 unreadCnt=0으로 전송된다
이제 가장 중요한 1, 2번 기능 api를 호출해보자
먼저 1번 기능 채팅방을 조회해보면 member=1, 2의 마지막 접속 시간, 현재 접속 유무, 채팅 메시지의 생성날짜를 비교하여 unreadCnt를 구하게 된다. 아직 두 유저가 채팅방을 나가지 않았으므로 채팅방 참가자(2) - 현재 접속 중인 참가자(2) = 0으로 계산되어 나온다
이제 member=1,2 모두 채팅방을 나가보자
chatRoomId에 해당되는 접속 유저가 존재하지 않는 것을 확인할 수 있다
다음으로, 2번 기능 api를 호출해보자.
member=1, 2 모두 마지막 채팅을 읽고 나왔기 때문에 채팅방을 조회했을 때 읽지 않은 메시지가 0개로 잘 출력된다
다시 member=1로 들어가서 채팅을 전송하고 member=2로 채팅방을 조회하게 되면 다음과 같이 읽지 않은 메시지가 하나 존재하는 것을 확인해볼 수 있다
'Project > RabbitMQ(STOMP)를 적용한 1:1 채팅' 카테고리의 다른 글
Spring + RabbitMQ를 통한 1:1 채팅방 구현 - 인증 적용 (4) (0) | 2024.11.16 |
---|---|
Spring + RabbitMQ를 통한 1:1 채팅방 구현 - 채팅내역을 MongoDB로 (2) (0) | 2024.11.01 |
Spring + RabbitMQ를 통한 1:1 채팅방 구현 (1) (0) | 2024.10.31 |