본문 바로가기

Backend

Stomp Message broker - Kafka로 변환하기 (1) Configuration

기존에 Stomp에서 Message broker를 Spring에서 제공하는 SimpleBroker를 사용하고 있었다.

 

변경 전

WebSocketConfig.java

package com.domain.community.configuration;

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/send");
        registry.enableSimpleBroker("/chat-room");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("*")
//                .setAllowedOrigins("*")
                .withSockJS();
    }
}

 

ChatRestController.java

@MessageMapping("/{roomId}")
@SendTo("/chat-room/{roomId}")
public ResponseEntity<ChatMessageResponseDto> chat(@DestinationVariable Long roomId, ChatMessageRequestDto chatMessageRequestDto) {
    Chat chat = chatService.saveChatMessage(chatMessageRequestDto, roomId);
    return ResponseEntity.ok(ChatMessageResponseDto.fromEntity( chat ));
}

 

 

하지만 구현하고 있는 서비스는 중고거래가 목적이므로, 여러명의 사용자가 채팅방을 이용할 수 있고 많은 채팅 데이터들이 옮겨갈 수 있다. 추가적으로 SimpleBroker는 인메모리에 메세지를 저장하고 있으므로, 서버가 다운될 경우 처리되지 못한 메세지 요청들이 모두 유실된다. 그래서 Kafka를 도입해 백엔드 서버와 별개로 띄운 후, 메세지 유실을 방지하고 대용량 데이터 처리에도 강점이 있게 구현하려고 한다.

 

 

변경 후

WebSocketConfig.java

package com.domain.community.configuration;

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }
}

Cors 설정은 추후에 변경하기 위해 일단 열어두었다.

 

ChatRestController.java

    @MessageMapping("/{roomId}")
    public ResponseEntity<Void> chat(@DestinationVariable Long roomId, ChatMessageRequestDto chatMessageRequestDto) {
        kafkaProducerService.sendMessageToKafkaTopic(roomId, chatMessageRequestDto);
        return ResponseEntity.ok().build();
    }