본문 바로가기

Backend

Stomp Message broker - Kafka로 변환하기 (2) Topic, Listener 동적 생성

채팅방 별로 메세지를 관리하기 위해, Kafka에서도 Topic을 기준으로 채팅방별로 Listener를 따로 두어 관리하려고 한다.

그렇게 하기 위해서는 채팅 내용을 DB에 저장한 이후에, 해당 채팅방의 PK를 기준으로 Topic을 만들고 해당 Topic을 Listen해줄 Kafka Listener가 필요하게 된다. 이것을 동적으로 생성하기 위해, KafkaUtils 클래스를 작성했다.

 

package com.domain.community.utils;

import com.domain.community.service.KafkaConsumerService;
import java.util.Collections;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class KafkaUtils {
    private final KafkaAdmin kafkaAdmin;
    private final KafkaConsumerService kafkaConsumerService;
    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    private final KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
    
    public void createTopic(String topicName, int numPartitions, short replicationFactor) {
        NewTopic newChatTopic = new NewTopic(topicName, numPartitions, replicationFactor);
        AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
        adminClient.createTopics(Collections.singleton(newChatTopic));
        adminClient.close();
    }

    public void createAndRegisterListener(String topicName, String groupId) {
        // KafkaListener 인스턴스 생성
        MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>(); 
        endpoint.setId(groupId + "-" + topicName);
        endpoint.setGroupId(groupId);
        endpoint.setTopics(topicName); // 리스너가 구독할 토픽 설정
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        endpoint.setBean(kafkaConsumerService); // 메시지를 처리할 빈 설정
        try {
            endpoint.setMethod(kafkaConsumerService.getClass().getMethod("consume", String.class));
            /*
            	처리 메소드 설정. KafkaConsumerService에 consume 메소드가 메세지를 처리함.
            */
        } catch (NoSuchMethodException e) {
            throw new IllegalStateException("Method not found", e);
        }
        kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, kafkaListenerContainerFactory, true); // 생성한 리스너를 동적으로 추가해줌.
    }
}

 

제일 어려운 부분이 동적 생성이었다. 웹에 자료도 찾기 힘들었고, 버전별로 다르게 동작하다 보니 올바르게 작동하게 만드는 것이 조금 어려웠음. 그리고 Intellij를 사용하게 되면 KafkaListenerEndpointRegistry 타입 Bean을 찾을 수 없다고 하는데, Bean 주입이 정상적으로 잘 되어 잘 작동한다. 

 

Producer 같은 경우에는 일반적인 topicName과 ChatMessageRequestDto로 쉽게 만들 수 있지만, 위에서 볼 수 있듯이 kafkaConsumerService에서 consume 메소드를 가져와 리스너를 동적으로 추가하는 부분이 꽤 복잡했다.

 

KafkaConsumerService.java

package com.domain.community.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.domain.community.dto.ChatMessageRequestDto;
import com.domain.community.entity.Chat;
import com.domain.community.repository.ChatRepository;
import com.domain.community.repository.ChatRoomRepository;
import com.domain.member.repository.MemberRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaConsumerService {
    private final ObjectMapper objectMapper;
    private final MemberRepository memberRepository;
    private final ChatRoomRepository chatRoomRepository;
    private final ChatRepository chatRepository;

    public void consume(String message) {
        try {
            ChatMessageRequestDto chatMessage = objectMapper.readValue(message, ChatMessageRequestDto.class);
            Chat chat = Chat.builder()
                    .member(memberRepository.findById(chatMessage.getMemberId())
                            .orElseThrow(() -> new IllegalArgumentException("해당 사용자를 찾을 수 없습니다.")))
                    .chatRoom(chatRoomRepository.findById(chatMessage.getRoomId())
                            .orElseThrow(() -> new IllegalArgumentException("해당 채팅방을 찾을 수 없습니다.")))
                    .content(chatMessage.getContent())
                    .type(chatMessage.getType())
                    .fileId(chatMessage.getFileId())
                    .build();
            chatRepository.save(chat);
        } catch (Exception e){
            e.printStackTrace();
            throw new RuntimeException("Object Mapping에 실패했습니다.");
        }
    }
}

 

Listener를 사용하기위해 endpoint에 Method를 추가해 사용했다. @KafkaListener 어노테이션은 붙이면 안됨.

 

이렇게 리스너만 동적으로 생성해준다면, 추후에 서비스에서는 호출해서 사용하기만 하면 된다.

 

더 편하게 사용하려면 사실 kafka 설정에서 토픽이 존재하지 않을 경우 생성하는 옵션을 킬 수 있게 되는데, 이렇게 구현할 경우 악의적인 사용자가 무작위 토픽으로 생성 Request를 보낼 경우 계속해서 생성하게 될 수 있기 때문에 동적으로 토픽 생성 및 리스너 생성이 더 좋아 보인다.