들어가기 전
Kafka Streams 자체는 데이터를 필터링 해주는 녀석이다. 만약 이 녀석을 웹서버와 함께 사용하고자 한다면, Spring 사용을 고려해 볼 수 있다. 이번 포스팅에서는 Spring과 Kafka Streams를 어떻게 함께 사용할 수 있는지에 대해 살펴보고자 한다.
Kafka Streams를 Spring에서 사용하는 방법
- @EnableKafkaStreams를 사용하는 방법
- Kafka Streams를 스프링 빈으로 직접 등록
위 두 가지 방법을 이용해서 Spring에서 KafkaStreams를 사용할 수 있다. Spring 웹 서버는 HTTP 요청이 들어오면 Request Scope의 빈을 생성한다. 즉, 요청마다 빈이 생성되는 것이기 때문에 KafkaStreams를 HTTP에서 사용하면 문제가 될 수 있다. 따라서 KafkaStreams를 싱글톤 빈으로 등록해서 Spring 웹 서버에서 KafkaStreams 싱글톤 빈을 주입받아서 사용해야한다. 아래에서는 각각 어떻게 사용하는지를 살펴보고자 한다.
@EnableKafkaStreams
@EnableKafkaStreams 어노테이션을 이용해서 Kafka Streams를 스프링에서 사용하는 방법이다. 이 어노테이션을 사용하기 위해서는 아래 코드가 build.gradle에 추가되어야 한다.
implementation 'org.springframework.kafka:spring-kafka'
@EnableKafkaStreams 코드가 활성화 되면 아래 동작이 이루어진다.
- streamsBuilder 빈 등록
- StreamsBuilderFactoryBean 빈 등록
- StreamsBuilder 빈은 KafkaStreamsDefaultConfiguration 빈을 읽어서 자동으로 KafkaStreams를 생성해서 StreamsBuilderFactoryBean에 필드로 넣는다.
streamsBuilder를 스프링 빈으로 주입 받아서 이 녀석에게 Topology를 구성해서 넘겨줄 수 있다. topology를 구성하면 구성된 결과는 streamsBuilder에 등록된다. 그리고 등록된 streamsBuilder는 이후 StreamsBuilderFactoryBean.start() 메서드에 의해서 kafkaStreams가 생성되고 자동으로 시작된다.
@Override
public synchronized void start() {
if (!this.running) {
try {
Assert.state(this.properties != null,
"streams configuration properties must not be null");
Topology topology = getObject().build(this.properties); // NOSONAR: getObject() cannot return null
this.infrastructureCustomizer.configureTopology(topology);
this.topology = topology;
LOGGER.debug(() -> topology.describe().toString());
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
this.kafkaStreams.setStateListener(this.stateListener);
this.kafkaStreams.setGlobalStateRestoreListener(this.stateRestoreListener);
...
this.kafkaStreams.start();
...
}
}
만약 컨트롤러에서 KafkaStreams를 이용해서 대화식 쿼리등을 구현하려고 한다면 컨트롤러는 KafkaStreams 객체가 필요하다. 그렇지만 위에서 볼 수 있듯이 KafkaStreams 빈이 등록되지는 않는다. 따라서 이 때는 StreamsBuilderFactoryBean을 주입받아서 KafkaStreams를 가져가야한다. 혹은 아래와 같이 스프링 컨텍스트가 모두 시작된 후 이벤트 리스너를 이용해서 KafkaSTreams 객체를 지정해줄 수 있다.
@Slf4j
@RequiredArgsConstructor
@Component
public class KafkaInjectListener {
private final ObjectProvider<MyController> myControllerObjectProvider;
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@EventListener
public void OnApplication(ContextRefreshedEvent contextRefreshedEvent) {
log.info("HERE");
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
MyController myController = myControllerObjectProvider.getObject();
myController.configKafkaStreams(kafkaStreams);
}
}
전체적인 코드는 다음과 같이 구성할 수 있다.
Config 코드
KafkaStreams Property 파일을 스프링 빈으로 등록해준다. 그러면 StreamBuilderFactory는 이 값을 읽어서 KafkaStreams를 생성할 때 사용할 것이다.
@EnableKafkaStreams
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MyKafkaConfig {
/*
public static final String DEFAULT_STREAMS_CONFIG_BEAN_NAME = "defaultKafkaStreamsConfig";
public static final String DEFAULT_STREAMS_BUILDER_BEAN_NAME = "defaultKafkaStreamsBuilder";
*/
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
HashMap<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello");
return new KafkaStreamsConfiguration(props);
}
}
Topology 코드
StreamBuilder는 스프링 빈으로 자동 등록된다. 이 빈을 주입 받아서 Topology를 구성하면 된다. 어차피 스프링 빈에 구성된 Topology가 저장되기 때문에 따로 return 값을 지정할 필요는 없다.
@Component
public class BuildPipeline {
@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder, MyController myController) {
streamsBuilder.stream("clients",
Consumed.with(Serdes.String(), Serdes.String()).withOffsetResetPolicy(EARLIEST))
.print(Printed.<String, String>toSysOut().withLabel("HELLO"));
}
}
이벤트 리스너
스프링 컨텍스트가 시작되면, 이벤트 리스너는 이 이벤트를 듣는다. 이 이벤트가 발생하면 이 메서드가 실행된다. 이 때 ObjectFactory를 가져와서 MyController 빈만 하나 받는다. 그리고 MyController 빈에 KafkaStreams를 셋팅해줄 수 있다.
@Slf4j
@RequiredArgsConstructor
@Component
public class KafkaInjectListener {
private final ObjectProvider<MyController> myControllerObjectProvider;
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@EventListener
public void OnApplication(ContextRefreshedEvent contextRefreshedEvent) {
log.info("HERE");
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
MyController myController = myControllerObjectProvider.getObject();
myController.configKafkaStreams(kafkaStreams);
}
}
컨트롤러 코드
컨트롤러는 다음과 같이 작성할 수 있다. 앞서 이야기한 것처럼 KafkaStreams 자체는 스프링 빈으로 생성이 되지 않는다. 따라서 생성자 주입 같은 것을 받을 수 없다. 그래서 이벤트 리스너를 이용해서 셋팅받는다.
@RestController
@NoArgsConstructor
public class MyController {
private KafkaStreams kafkaStreams;
public void configKafkaStreams(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@Autowired(required = false)
public MyController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@GetMapping("/hello")
public String hello() {
System.out.println("kafkaStreams = " + kafkaStreams);
return kafkaStreams.toString();
}
}
@EnableKafkaStreams의 장/단점
전반적으로 어노테이션에 의해서 처리가 되기 때문에 구조가 깔끔하게 나누어질 수 있다는 게 장점인 것 같다. 또한 SmartLifeCycle을 구현했기 때문에 Spring의 Application Context가 생기거나 닫기 전에 stop / start가 좀 더 스마트하게 호출될 수 있다고 한다. 한 가지 단점으로 생각되는 것은 KafkaStreams에 필요한 stateListener 등을 셋팅하는 방법을 나는 찾을 수 없다. 이 부분이 해결되면 좀 더 깔끔하게 사용할 수 있을 것 같지만 이 부분을 사용하는 방법을 알 수 없기 때문에 나는 후자의 방법이 좋은 것 같다.
@Component를 이용한 등록
KafkaStreams를 스프링 빈으로 직접 생성하고, Spring에서 KafkaStreams 빈을 가져다 쓸 수 있는 방법이다. 기본적으로 Spring을 잘 모르는 사람이고 KafkaStreams에 익숙한 사람이라면 이 방법이 가장 편리할 것 같다. 그리고 앞서 @EnableKafkaStreams를 이용할 때는 KafkaStreams가 Start 되는 시점이 블랙박스화 되어있기 때문에 stateListener를 등록할 수 있는 방법이 없었다. 이것을 고려한다면 처음에 찍먹을 해본다면 KafkaStreams를 스프링 빈으로 생성해서 등록해보는 것이 좋을 것 같다.
KafkaStreams 빈생성 코드
아래와 같이 단순히 빈을 생성해서 스프링 컨텍스트에 등록할 수 있다. KafkaStreams 빈을 생성할 때, 이곳에서 stateListener()등을 등록해줄 수 있다. 또한 KafkaStreams 빈을 생성하고 돌려주기 전에 KafkaStreams를 시작해 줄 수도 있다. 필요하다면 이벤트 리스너 등을 이용해서 나중에 시작할 수도 있다.
@Component
public class MyKafkaStreams {
@Bean
public KafkaStreams kafkaStreams() {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "HELLO");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream("clients",
Consumed.with(Serdes.String(), Serdes.String()).withOffsetResetPolicy(EARLIEST))
.print(Printed.<String, String>toSysOut().withLabel("HELLO"));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
return kafkaStreams;
}
}
Controller 코드
컨트롤러는 생성자 주입으로 kafkaStreams 빈을 주입 받아서 사용할 수 있다.
@RestController
@RequiredArgsConstructor
public class MyController {
private final KafkaStreams kafkaStreams;
/*
public void configKafkaStreams(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
@Autowired(required = false)
public MyController(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
*/
@GetMapping("/hello")
public String hello() {
System.out.println("kafkaStreams = " + kafkaStreams);
return kafkaStreams.toString();
}
}
장단점 정리
@EnableKafkaStreams는 어노테이션을 기반으로 동작하고, 내부적으로 SmartLifeCycle을 구현한 StreamsBuilderFactory를 이용해서 실행이 결정된다. 아직 내가 모르는 것일 수도 있지만 StateListener등을 등록하기가 어렵다. 다만 어노테이션을 기반으로 동작하기 때문에 클래스를 분리해서 좀 더 가독성 좋은 코드를 생성할 수도 있다.
KafkaStreams 빈을 직접 생성해서 등록하는 것은 스프링에 익숙하지 않은 사람들이 바로 접근해보기에 좋은 방법인 것 같다. 처음에는 Kafka Streams 싱글톤 빈을 직접 생성하는 방법으로 접근해보고, 이후에는 필요하다면 SmartLifeCycle등을 직접 공부해서 처리해보는 것도 좋은 방법이 될 것 같다.
참고
https://www.baeldung.com/spring-boot-kafka-streams
전체 코드
'Kafka eco-system > KafkaStreams' 카테고리의 다른 글
Kafka Streams : 부록 (0) | 2022.11.22 |
---|---|
Kafka Streams : 카프카 스트림즈 어플리케이션 테스트 (0) | 2022.11.21 |
Kafka Streams : 카프카 스트림즈 고급 어플리케이션 (0) | 2022.11.20 |
Kafka Streams : 모니터링과 성능 (0) | 2022.11.15 |
Kafka Streams : 프로세서 API (0) | 2022.11.14 |