Redis PUB/SUB, WebSocket 실시간 채팅 개발기 2 : 채팅 기능 구현
🔗 Redis PUB/SUB, WebSocket 실시간 채팅 개발기 1 : 설계하기
🔗 Redis PUB/SUB, WebSocket 실시간 채팅 개발기 3 : 채팅방 리스트는 어떻게 최신화할까?
🔗 Redis PUB/SUB, WebSocket 실시간 채팅 개발기 4 : 채팅방 리스트 최신화 성능 개선기
안녕하세요!
전 글에서는 [캐치룸]의 채팅 기능이 어떠한 기술을 써서 설계했는지 작성하였습니다.
이번 글에서는 실제 코드와 함께 작성하겠습니다.
💻 준비물
Redis - 저는 로컬에서는 Docker에 깔아서 사용했고, EC2에서는 엘라스틱캐시에 연결하여 사용했습니다.
Spring boot
특별한 건 없습니다.
✏️ 의존성 주입
//websocket
implementation 'org.springframework.boot:spring-boot-starter-websocket'
//redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
//mongo
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
✏️ Config
1. WebSocketConfig
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/sub");
config.setApplicationDestinationPrefixes("/pub");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
/* stompHandler가 websocket 앞단에서 token 을 체크할 수 있도록 인터셉터 설정 */
registration.interceptors(stompHandler);
}
}
2. RedisConfig
@RequiredArgsConstructor
@Configuration
public class RedisConfig {
// yml 파일 redis 설정 불러오기
private final RedisProperties redisProperties;
/**
* 단일 Topic 사용을 위한 Bean 설정
*/
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("chatroom");
}
@Bean
public RedisConnectionFactory redisConnectionFactory() {
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory();
lettuceConnectionFactory.setHostName(redisProperties.getHost());
lettuceConnectionFactory.setPort(redisProperties.getPort());
return lettuceConnectionFactory;
}
/**
* redis 에 발행(publish)된 메시지 처리를 위한 리스너 설정
*/
@Bean
public RedisMessageListenerContainer redisMessageListener (
MessageListenerAdapter listenerAdapterChatMessage,
ChannelTopic channelTopic
) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory());
container.addMessageListener(listenerAdapterChatMessage, channelTopic);
return container;
}
/** 실제 메시지를 처리하는 subscriber 설정 추가*/
@Bean
public MessageListenerAdapter listenerAdapterChatMessage(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "sendMessage");
}
}
3. MongoConfig
@Configuration
@RequiredArgsConstructor
@EnableMongoRepositories(basePackages = "com.catchroom.chat.message.repository")
public class MongoConfig {
private final MongoProperties mongoProperties;
@Bean
public MongoClient mongoClient() {
return MongoClients.create(mongoProperties.getUri());
}
@Bean
public MongoTemplate mongoTemplate() {
return new MongoTemplate(mongoClient(), mongoProperties.getDatabase());
}
}
✏️ StompHandler
원래는 인증/인가 부분을 StompHandler의 header에서 꺼내서 해야 하지만, 저희는 메인 서버에서 인증 인가를 처리하도록 로직을 처리하였기 때문에 클래스만 만들어놨습니다.
추후 리팩토링 과정에서 인원 수 체킹 하는 코드를 넣을 예정입니다.
@RequiredArgsConstructor
@Component
public class StompHandler implements ChannelInterceptor {
/** websocket을 통해 들어온 요청이 처리 되기전 실행된다.*/
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
return message;
}
}
✏️ RedisPublisher
아래 publish 메소드를 호출하면, 해당 Topic을 구독하는 모든 구독자에게 message가 발행(pub)됩니다.
@RequiredArgsConstructor
@Service
public class RedisPublisher {
private final ChannelTopic channelTopic;
private final RedisTemplate redisTemplate;
public void publish(MessageSubDto message) {
redisTemplate.convertAndSend(channelTopic.getTopic(), message);
}
}
✏️ RedisSubscriber
위에서 publish된 메시지는 RedisConfig 클래스에서 설정된 것에 의하여 밑에 sendMessage(publishMessage)가 호출됩니다.
@RequiredArgsConstructor
@Service
public class RedisSubscriber {
private final ObjectMapper objectMapper;
private final SimpMessageSendingOperations messagingTemplate;
/**
* Redis에서 메시지가 발행(publish)되면
* 대기하고 있던 Redis Subscriber가 해당 메시지를 받아 처리한다.
*/
public void sendMessage(String publishMessage) {
try {
ChatMessageDto chatMessage =
objectMapper.readValue(publishMessage, MessageSubDto.class).getChatMessageDto();
log.info("Redis Subcriber chatMSG : {}", chatMessage.getMessage());
// 채팅방을 구독한 클라이언트에게 메시지 발송
messagingTemplate.convertAndSend(
"/sub/chat/room/" + chatMessage.getRoomId(), chatMessage
);
} catch (Exception e) {
log.error("Exception {}", e);
}
}
}
✏️ ChatController
@MessageMapping("chat/message")은 스프링 프레임워크에서 WebSocket 또는 STOMP 기반의 메시징을 처리하기 위한 주석(annotation)입니다. @MessageMapping을 통해 WebSocket으로 들어오는 메시지 발행을 처리합니다.
프론트엔드는 prefix를 붙여서 /pub/chat/message로 발행 요청하면 해당 컨트롤러가 반응합니다.
메시지가 발행되면, MongoDB에 save를 해준 다음에, /sub/chat/room/{chatRoomId} 로 메시지 send 하고,
이는 클라이언트에서 해당 주소(/sub/chat/room/{chatRoomId})를 구독(sub) 하고 있다가, 발행된 메시지를 받게됩니다.
여기서 (/sub/chat/room/{chatRoomId}) 는 채팅룸을 구분하는 값. 즉 pub/sub의 Topic 역할이 됩니다.
@RequiredArgsConstructor
@Controller
public class ChatController {
private final ChatMongoService chatMongoService;
private final ChatService chatService;
/**
* websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
*/
@MessageMapping("/chat/message")
public void message(ChatMessageDto message,
@Header("Authorization") String accessToken
) {
ChatMessageDto chatMessageDto = chatMongoService.save(message);
chatService.sendChatMessage(chatMessageDto, accessToken); //RedisPublisher 호출
}
✏️ ChatMessageDto
@Builder
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessageDto {
private MessageType type; // 메시지 타입 (ENTER, TALK, QUIT, NEGO_REQ, NEGO_ALLOW, NEGO_DENIED, DELETE)
private String roomId; // 방 번호
private Long userId; // 사용자 id
private String message; // 메시지
private String time; // 전송 시간
private long userCount; // 채팅방 인원 수
private int negoPrice; // 네고 가격
}
🌟 채팅 순서
채팅을 하기 위해서는 웹 소켓 연결이 먼저 필요합니다.
1) https://백엔드서버주소/ws-stomp : 웹소켓 연결
2) wss://백엔드서버주소/pub/chat/message : 채팅 메시지 서버로 전송
3) wss://백엔드서버주소/sub/chat/room/{roomId} : 채팅방 메세지 받는 장소
다음 글에서는 채팅방 리스트를 어떻게 최신화할지에 대해서 작성하겠습니다.