StatCounter - Free Web Tracker and Counter

Kafka Streams : Kafka Streams와 Spring 함께 사용하기

반응형

들어가기 전

Kafka Streams 자체는 데이터를 필터링 해주는 녀석이다. 만약 이 녀석을 웹서버와 함께 사용하고자 한다면, Spring 사용을 고려해 볼 수 있다. 이번 포스팅에서는 Spring과 Kafka Streams를 어떻게 함께 사용할 수 있는지에 대해 살펴보고자 한다. 

 

Kafka Streams를 Spring에서 사용하는 방법

  • @EnableKafkaStreams를 사용하는 방법 
  • Kafka Streams를 스프링 빈으로 직접 등록

위 두 가지 방법을 이용해서 Spring에서 KafkaStreams를 사용할 수 있다. Spring 웹 서버는 HTTP 요청이 들어오면 Request Scope의 빈을 생성한다. 즉, 요청마다 빈이 생성되는 것이기 때문에 KafkaStreams를 HTTP에서 사용하면 문제가 될 수 있다. 따라서 KafkaStreams를 싱글톤 빈으로 등록해서 Spring 웹 서버에서 KafkaStreams 싱글톤 빈을 주입받아서 사용해야한다. 아래에서는 각각 어떻게 사용하는지를 살펴보고자 한다. 

 

@EnableKafkaStreams

@EnableKafkaStreams 어노테이션을 이용해서 Kafka Streams를 스프링에서 사용하는 방법이다. 이 어노테이션을 사용하기 위해서는 아래 코드가 build.gradle에 추가되어야 한다. 

implementation 'org.springframework.kafka:spring-kafka'

@EnableKafkaStreams 코드가 활성화 되면 아래 동작이 이루어진다.

  • streamsBuilder 빈 등록
  • StreamsBuilderFactoryBean 빈 등록
  • StreamsBuilder 빈은 KafkaStreamsDefaultConfiguration 빈을 읽어서 자동으로 KafkaStreams를 생성해서 StreamsBuilderFactoryBean에 필드로 넣는다.

streamsBuilder를 스프링 빈으로 주입 받아서 이 녀석에게 Topology를 구성해서 넘겨줄 수 있다. topology를 구성하면 구성된 결과는 streamsBuilder에 등록된다. 그리고 등록된 streamsBuilder는 이후 StreamsBuilderFactoryBean.start() 메서드에 의해서 kafkaStreams가 생성되고 자동으로 시작된다.

@Override
public synchronized void start() {
   if (!this.running) {
      try {
         Assert.state(this.properties != null,
               "streams configuration properties must not be null");
         Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
         this.infrastructureCustomizer.configureTopology(topology);
         this.topology = topology;
         LOGGER.debug(() -> topology.describe().toString());
         this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
         this.kafkaStreams.setStateListener(this.stateListener);
         this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
         
         ...
         
         this.kafkaStreams.start();
		...
   }
}

만약 컨트롤러에서 KafkaStreams를 이용해서 대화식 쿼리등을 구현하려고 한다면 컨트롤러는 KafkaStreams 객체가 필요하다. 그렇지만 위에서 볼 수 있듯이 KafkaStreams 빈이 등록되지는 않는다. 따라서 이 때는 StreamsBuilderFactoryBean을 주입받아서 KafkaStreams를 가져가야한다. 혹은 아래와 같이 스프링 컨텍스트가 모두 시작된 후 이벤트 리스너를 이용해서 KafkaSTreams 객체를 지정해줄 수 있다.

@Slf4j
@RequiredArgsConstructor
@Component
public class KafkaInjectListener {

    private final ObjectProvider<MyController> myControllerObjectProvider;
    private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

    @EventListener
    public void OnApplication(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("HERE");
        KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
        MyController myController = myControllerObjectProvider.getObject();
        myController.configKafkaStreams(kafkaStreams);
    }
}

전체적인 코드는 다음과 같이 구성할 수 있다. 

Config 코드

KafkaStreams Property 파일을 스프링 빈으로 등록해준다. 그러면 StreamBuilderFactory는 이 값을 읽어서 KafkaStreams를 생성할 때 사용할 것이다. 

@EnableKafkaStreams
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MyKafkaConfig {

    /*
    public static final String DEFAULT_STREAMS_CONFIG_BEAN_NAME = "defaultKafkaStreamsConfig";
    public static final String DEFAULT_STREAMS_BUILDER_BEAN_NAME = "defaultKafkaStreamsBuilder";
     */

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        HashMap<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello");
        return new KafkaStreamsConfiguration(props);
    }
}

 

Topology 코드

StreamBuilder는 스프링 빈으로 자동 등록된다. 이 빈을 주입 받아서 Topology를 구성하면 된다. 어차피 스프링 빈에 구성된 Topology가 저장되기 때문에 따로 return 값을 지정할 필요는 없다. 

@Component
public class BuildPipeline {

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder, MyController myController) {
        streamsBuilder.stream("clients",
                        Consumed.with(Serdes.String(), Serdes.String()).withOffsetResetPolicy(EARLIEST))
                .print(Printed.<String, String>toSysOut().withLabel("HELLO"));
    }
}

이벤트 리스너

스프링 컨텍스트가 시작되면, 이벤트 리스너는 이 이벤트를 듣는다. 이 이벤트가 발생하면 이 메서드가 실행된다. 이 때 ObjectFactory를 가져와서 MyController 빈만 하나 받는다. 그리고 MyController 빈에 KafkaStreams를 셋팅해줄 수 있다. 

@Slf4j
@RequiredArgsConstructor
@Component
public class KafkaInjectListener {

    private final ObjectProvider<MyController> myControllerObjectProvider;
    private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

    @EventListener
    public void OnApplication(ContextRefreshedEvent contextRefreshedEvent) {
        log.info("HERE");
        KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
        MyController myController = myControllerObjectProvider.getObject();
        myController.configKafkaStreams(kafkaStreams);
    }
}

컨트롤러 코드

컨트롤러는 다음과 같이 작성할 수 있다. 앞서 이야기한 것처럼 KafkaStreams 자체는 스프링 빈으로 생성이 되지 않는다. 따라서 생성자 주입 같은 것을 받을 수 없다. 그래서 이벤트 리스너를 이용해서 셋팅받는다. 

@RestController
@NoArgsConstructor
public class MyController {

    private KafkaStreams kafkaStreams;

    public void configKafkaStreams(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }

    @Autowired(required = false)
    public MyController(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }

    @GetMapping("/hello")
    public String hello() {
        System.out.println("kafkaStreams = " + kafkaStreams);
        return kafkaStreams.toString();
    }
}

@EnableKafkaStreams의 장/단점

전반적으로 어노테이션에 의해서 처리가 되기 때문에 구조가 깔끔하게 나누어질 수 있다는 게 장점인 것 같다. 또한 SmartLifeCycle을 구현했기 때문에 Spring의 Application Context가 생기거나 닫기 전에 stop / start가 좀 더 스마트하게 호출될 수 있다고 한다. 한 가지 단점으로 생각되는 것은 KafkaStreams에 필요한 stateListener 등을 셋팅하는 방법을 나는 찾을 수 없다. 이 부분이 해결되면 좀 더 깔끔하게 사용할 수 있을 것 같지만 이 부분을 사용하는 방법을 알 수 없기 때문에 나는 후자의 방법이 좋은 것 같다. 

 

 

@Component를 이용한 등록

KafkaStreams를 스프링 빈으로 직접 생성하고, Spring에서 KafkaStreams 빈을 가져다 쓸 수 있는 방법이다. 기본적으로 Spring을 잘 모르는 사람이고 KafkaStreams에 익숙한 사람이라면 이 방법이 가장 편리할 것 같다. 그리고 앞서 @EnableKafkaStreams를 이용할 때는 KafkaStreams가 Start 되는 시점이 블랙박스화 되어있기 때문에 stateListener를 등록할 수 있는 방법이 없었다. 이것을 고려한다면 처음에 찍먹을 해본다면 KafkaStreams를 스프링 빈으로 생성해서 등록해보는 것이 좋을 것 같다. 

KafkaStreams 빈생성 코드

아래와 같이 단순히 빈을 생성해서 스프링 컨텍스트에 등록할 수 있다. KafkaStreams 빈을 생성할 때, 이곳에서 stateListener()등을 등록해줄 수 있다. 또한 KafkaStreams 빈을 생성하고 돌려주기 전에 KafkaStreams를 시작해 줄 수도 있다. 필요하다면 이벤트 리스너 등을 이용해서 나중에 시작할 수도 있다. 

@Component
public class MyKafkaStreams {

    @Bean
    public KafkaStreams kafkaStreams() {
        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "HELLO");
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("clients",
                Consumed.with(Serdes.String(), Serdes.String()).withOffsetResetPolicy(EARLIEST))
                .print(Printed.<String, String>toSysOut().withLabel("HELLO"));

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.start();
        return kafkaStreams;
    }
}

Controller 코드

컨트롤러는 생성자 주입으로 kafkaStreams 빈을 주입 받아서 사용할 수 있다. 

@RestController
@RequiredArgsConstructor
public class MyController {

    private final KafkaStreams kafkaStreams;

    /*
    public void configKafkaStreams(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }
     
    @Autowired(required = false)
    public MyController(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }
    
     */

    @GetMapping("/hello")
    public String hello() {
        System.out.println("kafkaStreams = " + kafkaStreams);
        return kafkaStreams.toString();
    }
}

 

장단점 정리

@EnableKafkaStreams는 어노테이션을 기반으로 동작하고, 내부적으로 SmartLifeCycle을 구현한 StreamsBuilderFactory를 이용해서 실행이 결정된다. 아직 내가 모르는 것일 수도 있지만 StateListener등을 등록하기가 어렵다. 다만 어노테이션을 기반으로 동작하기 때문에 클래스를 분리해서 좀 더 가독성 좋은 코드를 생성할 수도 있다.

KafkaStreams 빈을 직접 생성해서 등록하는 것은 스프링에 익숙하지 않은 사람들이 바로 접근해보기에 좋은 방법인 것 같다. 처음에는 Kafka Streams 싱글톤 빈을 직접 생성하는 방법으로 접근해보고, 이후에는 필요하다면 SmartLifeCycle등을 직접 공부해서 처리해보는 것도 좋은 방법이 될 것 같다. 

 

참고

https://www.baeldung.com/spring-boot-kafka-streams

 

전체 코드

https://github.com/chickenchickenlove/springkafkastreams

댓글

Designed by JB FACTORY