Spring boot

Redis PUB/SUB, WebSocket 실시간 채팅 개발기 2 : 채팅 기능 구현

개발중인 감자 2024. 2. 15. 01:13

🔗 Redis PUB/SUB, WebSocket 실시간 채팅 개발기 1 : 설계하기

🔗 Redis PUB/SUB, WebSocket 실시간 채팅 개발기 3 : 채팅방 리스트는 어떻게 최신화할까?

🔗 Redis PUB/SUB, WebSocket 실시간 채팅 개발기 4 : 채팅방 리스트 최신화 성능 개선기

 

 

안녕하세요!

전 글에서는 [캐치룸]의 채팅 기능이 어떠한 기술을 써서 설계했는지 작성하였습니다. 

이번 글에서는 실제 코드와 함께 작성하겠습니다. 

 

💻 준비물

Redis - 저는 로컬에서는 Docker에 깔아서 사용했고, EC2에서는 엘라스틱캐시에 연결하여 사용했습니다. 

Spring boot

특별한 건 없습니다. 

 

 

✏️ 의존성 주입 

//websocket
implementation 'org.springframework.boot:spring-boot-starter-websocket'

//redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis'

//mongo
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'

 

 

✏️ Config 

1. WebSocketConfig

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private final StompHandler stompHandler;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/sub");
        config.setApplicationDestinationPrefixes("/pub");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-stomp")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        /* stompHandler가 websocket 앞단에서 token 을 체크할 수 있도록 인터셉터 설정 */
        registration.interceptors(stompHandler);
    }
}

 

 

2. RedisConfig

@RequiredArgsConstructor
@Configuration
public class RedisConfig {

    // yml 파일 redis 설정 불러오기
    private final RedisProperties redisProperties;
    /**
     * 단일 Topic 사용을 위한 Bean 설정
     */
    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic("chatroom");
    }

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory();
        lettuceConnectionFactory.setHostName(redisProperties.getHost());
        lettuceConnectionFactory.setPort(redisProperties.getPort());
        return lettuceConnectionFactory;
    }

    /**
     * redis 에 발행(publish)된 메시지 처리를 위한 리스너 설정
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListener (
            MessageListenerAdapter listenerAdapterChatMessage,
            ChannelTopic channelTopic
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory());
        container.addMessageListener(listenerAdapterChatMessage, channelTopic);
        return container;
    }

    /** 실제 메시지를 처리하는 subscriber 설정 추가*/
    @Bean
    public MessageListenerAdapter listenerAdapterChatMessage(RedisSubscriber subscriber) {
        return new MessageListenerAdapter(subscriber, "sendMessage");
    }
}

 

3. MongoConfig

@Configuration
@RequiredArgsConstructor
@EnableMongoRepositories(basePackages = "com.catchroom.chat.message.repository")
public class MongoConfig {
    private final MongoProperties mongoProperties;

    @Bean
    public MongoClient mongoClient() {
        return MongoClients.create(mongoProperties.getUri());
    }

    @Bean
    public MongoTemplate mongoTemplate() {
        return new MongoTemplate(mongoClient(), mongoProperties.getDatabase());
    }
}

 

 

✏️ StompHandler

원래는 인증/인가 부분을 StompHandler의 header에서 꺼내서 해야 하지만, 저희는 메인 서버에서 인증 인가를 처리하도록 로직을 처리하였기 때문에 클래스만 만들어놨습니다. 

추후 리팩토링 과정에서 인원 수 체킹 하는 코드를 넣을 예정입니다.

@RequiredArgsConstructor
@Component
public class StompHandler implements ChannelInterceptor {

    /** websocket을 통해 들어온 요청이 처리 되기전 실행된다.*/
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        return message;
    }
}

 

 

✏️ RedisPublisher

아래 publish 메소드를 호출하면, 해당 Topic을 구독하는 모든 구독자에게 message가 발행(pub)됩니다. 

@RequiredArgsConstructor
@Service
public class RedisPublisher {
    private final ChannelTopic channelTopic;
    private final RedisTemplate redisTemplate;

    public void publish(MessageSubDto message) {
        redisTemplate.convertAndSend(channelTopic.getTopic(), message);
    }
}

 

 

✏️ RedisSubscriber

위에서 publish된 메시지는 RedisConfig 클래스에서 설정된 것에 의하여 밑에 sendMessage(publishMessage)가 호출됩니다.

@RequiredArgsConstructor
@Service
public class RedisSubscriber {

    private final ObjectMapper objectMapper;
    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * Redis에서 메시지가 발행(publish)되면
     * 대기하고 있던 Redis Subscriber가 해당 메시지를 받아 처리한다.
     */
    public void sendMessage(String publishMessage) {
        try {

            ChatMessageDto chatMessage =
                    objectMapper.readValue(publishMessage, MessageSubDto.class).getChatMessageDto();

            log.info("Redis Subcriber chatMSG : {}", chatMessage.getMessage());

            // 채팅방을 구독한 클라이언트에게 메시지 발송
            messagingTemplate.convertAndSend(
                    "/sub/chat/room/" + chatMessage.getRoomId(), chatMessage
            );

        } catch (Exception e) {
            log.error("Exception {}", e);
        }
    }
}

 

 

✏️ ChatController

@MessageMapping("chat/message")은 스프링 프레임워크에서 WebSocket 또는 STOMP 기반의 메시징을 처리하기 위한 주석(annotation)입니다. @MessageMapping을 통해 WebSocket으로 들어오는 메시지 발행을 처리합니다. 

 

프론트엔드는 prefix를 붙여서 /pub/chat/message로 발행 요청하면 해당 컨트롤러가 반응합니다. 

 

메시지가 발행되면, MongoDB에 save를 해준 다음에, /sub/chat/room/{chatRoomId} 메시지 send 하고,

이는 클라이언트에서 해당 주소(/sub/chat/room/{chatRoomId}) 구독(sub) 하고 있다가, 발행된 메시지를 받게됩니다.

여기서 (/sub/chat/room/{chatRoomId}) 채팅룸을 구분하는 . pub/sub Topic 역할이 됩니다.

@RequiredArgsConstructor
@Controller
public class ChatController {

    private final ChatMongoService chatMongoService;
    private final ChatService chatService;

    /**
     * websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chat/message")
    public void message(ChatMessageDto message,
                        @Header("Authorization") String accessToken
    ) {
        ChatMessageDto chatMessageDto = chatMongoService.save(message);
        chatService.sendChatMessage(chatMessageDto, accessToken); //RedisPublisher 호출 
    }

 

 

✏️ ChatMessageDto

@Builder
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessageDto {

    private MessageType type;   // 메시지 타입 (ENTER, TALK, QUIT, NEGO_REQ, NEGO_ALLOW, NEGO_DENIED, DELETE)
    private String roomId;      // 방 번호
    private Long userId;        // 사용자 id
    private String message;     // 메시지
    private String time;        // 전송 시간
    private long userCount;     // 채팅방 인원 수
    private int negoPrice;      // 네고 가격
}

 

 

🌟 채팅 순서 

채팅을 하기 위해서는 웹 소켓 연결이 먼저 필요합니다. 

 

1) https://백엔드서버주소/ws-stomp : 웹소켓 연결

2) wss://백엔드서버주소/pub/chat/message : 채팅 메시지 서버로 전송

3) wss://백엔드서버주소/sub/chat/room/{roomId} : 채팅방 메세지 받는 장소 

 

 

다음 글에서는 채팅방 리스트를 어떻게 최신화할지에 대해서 작성하겠습니다.