Kafka Streams : Kafka Streams와 Spring 함께 사용하기

    들어가기 전

    Kafka Streams 자체는 데이터를 필터링 해주는 녀석이다. 만약 이 녀석을 웹서버와 함께 사용하고자 한다면, Spring 사용을 고려해 볼 수 있다. 이번 포스팅에서는 Spring과 Kafka Streams를 어떻게 함께 사용할 수 있는지에 대해 살펴보고자 한다. 

     

    Kafka Streams를 Spring에서 사용하는 방법

    • @EnableKafkaStreams를 사용하는 방법 
    • Kafka Streams를 스프링 빈으로 직접 등록

    위 두 가지 방법을 이용해서 Spring에서 KafkaStreams를 사용할 수 있다. Spring 웹 서버는 HTTP 요청이 들어오면 Request Scope의 빈을 생성한다. 즉, 요청마다 빈이 생성되는 것이기 때문에 KafkaStreams를 HTTP에서 사용하면 문제가 될 수 있다. 따라서 KafkaStreams를 싱글톤 빈으로 등록해서 Spring 웹 서버에서 KafkaStreams 싱글톤 빈을 주입받아서 사용해야한다. 아래에서는 각각 어떻게 사용하는지를 살펴보고자 한다. 

     

    @EnableKafkaStreams

    @EnableKafkaStreams 어노테이션을 이용해서 Kafka Streams를 스프링에서 사용하는 방법이다. 이 어노테이션을 사용하기 위해서는 아래 코드가 build.gradle에 추가되어야 한다. 

    implementation 'org.springframework.kafka:spring-kafka'

    @EnableKafkaStreams 코드가 활성화 되면 아래 동작이 이루어진다.

    • streamsBuilder 빈 등록
    • StreamsBuilderFactoryBean 빈 등록
    • StreamsBuilder 빈은 KafkaStreamsDefaultConfiguration 빈을 읽어서 자동으로 KafkaStreams를 생성해서 StreamsBuilderFactoryBean에 필드로 넣는다.

    streamsBuilder를 스프링 빈으로 주입 받아서 이 녀석에게 Topology를 구성해서 넘겨줄 수 있다. topology를 구성하면 구성된 결과는 streamsBuilder에 등록된다. 그리고 등록된 streamsBuilder는 이후 StreamsBuilderFactoryBean.start() 메서드에 의해서 kafkaStreams가 생성되고 자동으로 시작된다.

    @Override
    public synchronized void start() {
       if (!this.running) {
          try {
             Assert.state(this.properties != null,
                   "streams configuration properties must not be null");
             Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
             this.infrastructureCustomizer.configureTopology(topology);
             this.topology = topology;
             LOGGER.debug(() -> topology.describe().toString());
             this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
             this.kafkaStreams.setStateListener(this.stateListener);
             this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
             
             ...
             
             this.kafkaStreams.start();
    		...
       }
    }

    만약 컨트롤러에서 KafkaStreams를 이용해서 대화식 쿼리등을 구현하려고 한다면 컨트롤러는 KafkaStreams 객체가 필요하다. 그렇지만 위에서 볼 수 있듯이 KafkaStreams 빈이 등록되지는 않는다. 따라서 이 때는 StreamsBuilderFactoryBean을 주입받아서 KafkaStreams를 가져가야한다. 혹은 아래와 같이 스프링 컨텍스트가 모두 시작된 후 이벤트 리스너를 이용해서 KafkaSTreams 객체를 지정해줄 수 있다.

    @Slf4j
    @RequiredArgsConstructor
    @Component
    public class KafkaInjectListener {
    
        private final ObjectProvider<MyController> myControllerObjectProvider;
        private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
    
        @EventListener
        public void OnApplication(ContextRefreshedEvent contextRefreshedEvent) {
            log.info("HERE");
            KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
            MyController myController = myControllerObjectProvider.getObject();
            myController.configKafkaStreams(kafkaStreams);
        }
    }

    전체적인 코드는 다음과 같이 구성할 수 있다. 

    Config 코드

    KafkaStreams Property 파일을 스프링 빈으로 등록해준다. 그러면 StreamBuilderFactory는 이 값을 읽어서 KafkaStreams를 생성할 때 사용할 것이다. 

    @EnableKafkaStreams
    @Configuration
    @RequiredArgsConstructor
    @Slf4j
    public class MyKafkaConfig {
    
        /*
        public static final String DEFAULT_STREAMS_CONFIG_BEAN_NAME = "defaultKafkaStreamsConfig";
        public static final String DEFAULT_STREAMS_BUILDER_BEAN_NAME = "defaultKafkaStreamsBuilder";
         */
    
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
            HashMap<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello");
            return new KafkaStreamsConfiguration(props);
        }
    }

     

    Topology 코드

    StreamBuilder는 스프링 빈으로 자동 등록된다. 이 빈을 주입 받아서 Topology를 구성하면 된다. 어차피 스프링 빈에 구성된 Topology가 저장되기 때문에 따로 return 값을 지정할 필요는 없다. 

    @Component
    public class BuildPipeline {
    
        @Autowired
        public void buildPipeline(StreamsBuilder streamsBuilder, MyController myController) {
            streamsBuilder.stream("clients",
                            Consumed.with(Serdes.String(), Serdes.String()).withOffsetResetPolicy(EARLIEST))
                    .print(Printed.<String, String>toSysOut().withLabel("HELLO"));
        }
    }
    

    이벤트 리스너

    스프링 컨텍스트가 시작되면, 이벤트 리스너는 이 이벤트를 듣는다. 이 이벤트가 발생하면 이 메서드가 실행된다. 이 때 ObjectFactory를 가져와서 MyController 빈만 하나 받는다. 그리고 MyController 빈에 KafkaStreams를 셋팅해줄 수 있다. 

    @Slf4j
    @RequiredArgsConstructor
    @Component
    public class KafkaInjectListener {
    
        private final ObjectProvider<MyController> myControllerObjectProvider;
        private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
    
        @EventListener
        public void OnApplication(ContextRefreshedEvent contextRefreshedEvent) {
            log.info("HERE");
            KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
            MyController myController = myControllerObjectProvider.getObject();
            myController.configKafkaStreams(kafkaStreams);
        }
    }

    컨트롤러 코드

    컨트롤러는 다음과 같이 작성할 수 있다. 앞서 이야기한 것처럼 KafkaStreams 자체는 스프링 빈으로 생성이 되지 않는다. 따라서 생성자 주입 같은 것을 받을 수 없다. 그래서 이벤트 리스너를 이용해서 셋팅받는다. 

    @RestController
    @NoArgsConstructor
    public class MyController {
    
        private KafkaStreams kafkaStreams;
    
        public void configKafkaStreams(KafkaStreams kafkaStreams) {
            this.kafkaStreams = kafkaStreams;
        }
    
        @Autowired(required = false)
        public MyController(KafkaStreams kafkaStreams) {
            this.kafkaStreams = kafkaStreams;
        }
    
        @GetMapping("/hello")
        public String hello() {
            System.out.println("kafkaStreams = " + kafkaStreams);
            return kafkaStreams.toString();
        }
    }
    

    @EnableKafkaStreams의 장/단점

    전반적으로 어노테이션에 의해서 처리가 되기 때문에 구조가 깔끔하게 나누어질 수 있다는 게 장점인 것 같다. 또한 SmartLifeCycle을 구현했기 때문에 Spring의 Application Context가 생기거나 닫기 전에 stop / start가 좀 더 스마트하게 호출될 수 있다고 한다. 한 가지 단점으로 생각되는 것은 KafkaStreams에 필요한 stateListener 등을 셋팅하는 방법을 나는 찾을 수 없다. 이 부분이 해결되면 좀 더 깔끔하게 사용할 수 있을 것 같지만 이 부분을 사용하는 방법을 알 수 없기 때문에 나는 후자의 방법이 좋은 것 같다. 

     

     

    @Component를 이용한 등록

    KafkaStreams를 스프링 빈으로 직접 생성하고, Spring에서 KafkaStreams 빈을 가져다 쓸 수 있는 방법이다. 기본적으로 Spring을 잘 모르는 사람이고 KafkaStreams에 익숙한 사람이라면 이 방법이 가장 편리할 것 같다. 그리고 앞서 @EnableKafkaStreams를 이용할 때는 KafkaStreams가 Start 되는 시점이 블랙박스화 되어있기 때문에 stateListener를 등록할 수 있는 방법이 없었다. 이것을 고려한다면 처음에 찍먹을 해본다면 KafkaStreams를 스프링 빈으로 생성해서 등록해보는 것이 좋을 것 같다. 

    KafkaStreams 빈생성 코드

    아래와 같이 단순히 빈을 생성해서 스프링 컨텍스트에 등록할 수 있다. KafkaStreams 빈을 생성할 때, 이곳에서 stateListener()등을 등록해줄 수 있다. 또한 KafkaStreams 빈을 생성하고 돌려주기 전에 KafkaStreams를 시작해 줄 수도 있다. 필요하다면 이벤트 리스너 등을 이용해서 나중에 시작할 수도 있다. 

    @Component
    public class MyKafkaStreams {
    
        @Bean
        public KafkaStreams kafkaStreams() {
            Properties properties = new Properties();
            properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "HELLO");
            properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.stream("clients",
                    Consumed.with(Serdes.String(), Serdes.String()).withOffsetResetPolicy(EARLIEST))
                    .print(Printed.<String, String>toSysOut().withLabel("HELLO"));
    
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
            kafkaStreams.start();
            return kafkaStreams;
        }
    }

    Controller 코드

    컨트롤러는 생성자 주입으로 kafkaStreams 빈을 주입 받아서 사용할 수 있다. 

    @RestController
    @RequiredArgsConstructor
    public class MyController {
    
        private final KafkaStreams kafkaStreams;
    
        /*
        public void configKafkaStreams(KafkaStreams kafkaStreams) {
            this.kafkaStreams = kafkaStreams;
        }
         
        @Autowired(required = false)
        public MyController(KafkaStreams kafkaStreams) {
            this.kafkaStreams = kafkaStreams;
        }
        
         */
    
        @GetMapping("/hello")
        public String hello() {
            System.out.println("kafkaStreams = " + kafkaStreams);
            return kafkaStreams.toString();
        }
    }

     

    장단점 정리

    @EnableKafkaStreams는 어노테이션을 기반으로 동작하고, 내부적으로 SmartLifeCycle을 구현한 StreamsBuilderFactory를 이용해서 실행이 결정된다. 아직 내가 모르는 것일 수도 있지만 StateListener등을 등록하기가 어렵다. 다만 어노테이션을 기반으로 동작하기 때문에 클래스를 분리해서 좀 더 가독성 좋은 코드를 생성할 수도 있다.

    KafkaStreams 빈을 직접 생성해서 등록하는 것은 스프링에 익숙하지 않은 사람들이 바로 접근해보기에 좋은 방법인 것 같다. 처음에는 Kafka Streams 싱글톤 빈을 직접 생성하는 방법으로 접근해보고, 이후에는 필요하다면 SmartLifeCycle등을 직접 공부해서 처리해보는 것도 좋은 방법이 될 것 같다. 

     

    참고

    https://www.baeldung.com/spring-boot-kafka-streams

     

    전체 코드

    https://github.com/chickenchickenlove/springkafkastreams

    댓글

    Designed by JB FACTORY