Spring : EventPublishing을 통한 객체 간 결합 약화

    코드

    사용된 코드는 이곳에서 확인할 수 있습니다.


    ApplicationEventPublisher를 이용하기 

    스프링부트는 ApplicationEventPublisher 객체를 자동으로 스프링빈으로 등록해준다. 이 객체를 이용하면 객체 간의 의존 관계를 줄이는 방향으로 코드를 작성할 수 있다. 의존성이 약해지면서 상대적으로 문맥 파악이 어려워진다는 단점이 있지만 불필요하게 강하게 결합한 객체들을 분리해 줄 수 있어서 좋을 때도 있다.

    @RestController
    @Slf4j
    @RequiredArgsConstructor
    public class MyController {
        private final ApplicationEventPublisher eventPublisher;
    
        @GetMapping("/event")
        public String callEvent() {
            LogEvent logEvent = new LogEvent();
            eventPublisher.publishEvent(logEvent);
            return "ok";
        }
    }

    사용하기 위해서는 등록된 스프링 빈을 주입 받아서 사용하면 된다. 여기서는 LogEvent()를 하나 만들어서 publishEvent() 메서드의 매개변수로 넘겨주는 방법으로 사용했다. 예전에는 ApplicationEvent 타입의 객체들을 전달해야 했지만 최근에는 꼭 그렇게 사용하지 않아도 된다. 

    예를 들어 내가 사용한 LogEvent 클래스는 아래에서 볼 수 있듯이 어떤 클래스도 상속받지 않은 클래스인 것을 확인할 수 있다.

    public class LogEvent {
    }

    이렇게 사용할 수 있는 이유는 eventPublisher의 publishEvent()가 Object 타입을 받도록 구현되어 있기 때문이다. eventPublisher 함수형 인터페이스에서 제공해주는 default 메서드도 (Object)로 타입을 변경해서 publishEvent()를 호출하도록 구현되어있다. 

    @FunctionalInterface
    public interface ApplicationEventPublisher {
       default void publishEvent(ApplicationEvent event) {
          publishEvent((Object) event);
       }
    
       void publishEvent(Object event);
    }

     


    EventListener

    eventPublisher를 이용해서 이벤트를 발행했다면, 발행된 이벤트를 듣고 처리해 줄 Listener 역시 필요하다. 이런 Listener는 어떻게 처리할 수 있을까?

    • ApplicationListner 인터페이스 구현
    • @EventListner를 이용해 프록시 객체 생성

    객체를 직접 구현해서 스프링 빈으로 등록하거나 @EventListner를 이용해서 AOP로 프록시 객체를 생성하는 방법이 있다. ApplicationListener 인터페이스는 ApplicationEvent 타입으로 생성되어야 하기 때문에 반드시 ApplicationEvent를 상속받은 객체를 Publisher를 통해서 발행해야한다. 

    그런데 최근에는 Object 타입으로 EventPublish를 발행하도록 바뀌었기 때문에 ApplicatnListener를 구현하는 것보다는 @EventListener 메서드를 이용하는 것이 더 적절할 것이다.

    @FunctionalInterface
    public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
       void onApplicationEvent(E event);
       static <T> ApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) {
          return event -> consumer.accept(event.getPayload());
       }
    
    }

    @EventListener를 이용해서 이벤트를 처리하는 방법은 아래와 같다.

    1. 해당 클래스를 스프링 빈으로 등록하고, 메서드에 @EventListener 어노테이션을 붙인다.
    2. 매개변수로 처리할 이벤트 타입을 선언한다. 이 때, 반드시 하나의 매개변수만 선언해야 함. 
    @Slf4j
    @Component
    public class EventHandler {
    
        @EventListener
        public void eventListener(LogEvent logEvent) {
            log.info("logEvent = {}", logEvent);
        }
    }

    결론적으로는 위처럼 작성해주면 된다. 그러면 스프링부트가 시작할 때, EventListenerMethodProcessor가 @EventListener 어노테이션이 붙은 것을 보고 MultiCaseter 스프링 빈에 이 EventHandler를 등록시켜준다. 


    eventPublisher.publish()

    eventPublish()를 이용해서 이벤트 객체를 publish() 했을 때의 흐름을 살펴보고자 한다. 

    // AbstractApplicationContext.java
    protected void publishEvent(Object event, @Nullable ResolvableType typeHint) {
       
       ...
       
       // Object 형태의 Event가 주어지는 경우 타입을 추론함.
       if (eventType == null) {
          eventType = ResolvableType.forInstance(applicationEvent);
          if (typeHint == null) {
             typeHint = eventType;
          }
       }
    
       if (this.earlyApplicationEvents != null) {
          this.earlyApplicationEvents.add(applicationEvent);
       }
       else {
       	  // Multicaster를 통해서 이벤트를 처리해달라고 함. 
          getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
       }
      ...
    }

    위 클래스의 publishEvent()가 호출된다. Object 타입으로 이벤트를 받기 때문에 어떤 종류의 이벤트인지 추론이 필요한데, 이 메서드 내에서 eventType을 확인하는 작업을 거친다. 

    이벤트 작업이 무슨 타입인지 확인되면 eventMultiCaster 인스턴스를 얻어와서 현재 이벤트를 멀티 캐스팅한다. 즉, 등록된 이벤트 리스너들에게 '이 이벤트를 처리'해달라고 요청을 보낸다. 

     

    // SimpleApplicationEventMulticastor.java
    
    @Override
    public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
       ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
       Executor executor = getTaskExecutor();
       for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
          if (executor != null) {
             executor.execute(() -> invokeListener(listener, event));
          }
          else {
             invokeListener(listener, event);
          }
       }
    }

    멀티 캐스터 인스턴스의 multicastEvent() 메서드를 호출하면 현재 타입에 맞는 EventListener를 찾아와서 순차적으로 처리하는 작업을 한다. 여기서 눈여겨 볼 부분은 크게 두 부분이다.

    1. 이 이벤트를 처리할 수 있는 모든 EventListener를 불러와서 한번씩 Event를 처리하도록 한다.
    2. 만약 TaskExecutor가 있는 경우, taskExecutor가 Event를 처리하도록 한다. 

    TaskExecutor를 처리하는 것은 이벤트의 처리가 현재 쓰레드가 아닌 다른 TaskExecutor를 통해 비동기적으로 처리되는 작업을 의미한다.

    또 다른 사실은 하나의 이벤트 객체가 여러 리스너에서 번갈아가면서 사용될 수 있다는 것이다. 만약 이벤트 리스너가 이벤트를 처리할 때, 이벤트의 상태를 변화하는 작업을 한번이라도 한다면 다른 이벤트 리스너가 이벤트를 처리하는데 영향을 미칠 것이다. TaskExecutor를 이용해서 처리할 경우 동시에 여러 쓰레드에서 같은 이벤트 인스턴스에 접근할 수 있음도 내포한다.

    따라서 이벤트 객체를 처리할 때, 반드시 다음 두 가지를 고려해야하는 것을 의미한다.

    1. 이벤트 객체를 불변 객체로 만들어야 함. 
    2. 이벤트 리스너에서 방어적 복사를 한 후에 이벤트를 처리해야 함. 

     


    @EventListener를 이용해 비동기적으로 처리하기

    위의 설정만으로 Event를 발행하고 처리한다면, Event를 발행하는 쓰레드가 Event를 동기적으로 처리하도록 동작한다. 이렇게 하면 객체 간의 의존성을 줄일 수는 있지만, 비동기적으로 처리를 하지는 못한다. 

    만약 이렇게 발행하는 이벤트가 단순히 로그를 남기거나, 카운트를 측정하는 정도라면 굳이 동기적으로 실행될 필요가 없을 것이다. 오히려 비동기적으로 실행하는 것이 좋은 선택지가 될 수도 있을 것이다. 여기서는 @EventListner를 이용해 비동기적으로 이벤트를 처리하는 방법을 알아보고자 한다. 

    @Override
    public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
       ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
       Executor executor = getTaskExecutor();
       for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
          if (executor != null) {
             executor.execute(() -> invokeListener(listener, event));
          }
          else {
             invokeListener(listener, event);
          }
       }
    }

    @EventHandler 어노테이션만 붙였을 때는 AOP로 처리되지는 않는다. 어노테이션 기반으로 처리되서 AOP를 이용한 것처럼 보이지만 실제로는 Multicaster를 통해서 처리된다. 만약 @EventHandler에 @Async 어노테이션까지 붙는 경우 MultiCaster는 method를 invoke()할 때 AsyncExecutionInterceptor라는 AOP 클래스에게 메서드를 invoke()할 것을 요청해준다.

     


    @EnableAsync, @Async만 붙였을 때

    단순히 @EnableAsync, @Async 어노테이션만 붙여 비동기 처리를 하려고 하면 @EventHandler가 AOP로 처리될 때 AsyncExecutionInterceptor 클래스에 의해서 처리된다.

    // AsyncExecutionInterceptor.Java
    
    @Override
    @Nullable
    public Object invoke(final MethodInvocation invocation) throws Throwable {
       ...
       // Executor 가져옴
       AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
       ...
    
       // 처리해야 할 함수 제출
       Callable<Object> task = () -> {
          try {
             // AOP 처리
             Object result = invocation.proceed(); // AOP
             if (result instanceof Future<?> future) {
                // 쓰레드 작업 완료되면 값 반환
                return future.get();
             }
          }
          ...
       };
    
       // Executor에게 Callable 객체를 전달해서 결과를 기다림.
       return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

    AsyncExecutionInterceptor는 다음 작업을 처리해준다. 

    1. 자신이 가지고 있는 AsyncExecutor를 가져옴
    2. AOP로 처리해야 할 작업을 Callable에 정의해 둠. 
    3. TaskExeuctor에 Callable 객체를 전달한 후, 결과를 완료하면 Callable 객체에 의해서 호출받음.

    즉, 비동기적으로 AOP 작업이 처리되는 것을 의미한다. 그리고 이 때, Default로 제공되는 AsyncExecutor는 corePoolSize가 8이고, Maximum Thread 갯수가 거의 무한대인 ThreadPoolTaskExecutor가 기본적으로 제공된다. 자바에서 생성되는 Thread는 커널 레벨 쓰레드와 1:1 매칭이 되기 때문에 ThreadPoolTaskExeuctor는 관리될 수 있어야만 한다.

    따라서 현재 상태로는 비동기로 이벤트를 처리할 수는 있지만, 쓰레드 풀을 관리할 수 없어서 어플리케이션이 떠있는 서버 전체에 큰 장애를 가져올 수 있는 상태가 된다. 

     


    @EventListener + @Async + TaskExeuctor 관리하면서 처리하기

    앞서 @EventListener + @Async만 사용했을 때는 최대 쓰레드 갯수를 관리할 수 없어서 위험한 프로그램이 된다는 것을 이해했다. 스프링에서는 이 부분을 해결하기 위해서 Async AOP Interceptor를 생성할 때, TaskExecutor 빈이 등록되어있다면 이 TaskExeuctor를 Async AOP Interceptor에서 사용해준다.

    ThreadTaskExecutor를 스프링빈으로 등록해서 사용하는 코드는 아래와 같다.

    @Configuration
    public class BeanConfig {
        @Bean
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setCorePoolSize(50);
            threadPoolTaskExecutor.setMaxPoolSize(1000);
            threadPoolTaskExecutor.setPrestartAllCoreThreads(true);
            return threadPoolTaskExecutor;
        }
    }

    이렇게 등록을 한 다음 AsyncAOPInterceptor가 사용하고 있는 TaskExecutor를 런타임에 확인해보면 정상적으로 등록된 것을 확인할 수 있다. 

     


    TaskExeuctor의 Queue 사이즈 관련 문제

    @Async 어노테이션을 이용하면 AOP를 통해서 ThreadPoolTaskExecutor에게 작업을 제출해주고, Executor는 각 작업을 미리 생성해둔 쓰레드를 이용해서 처리한다. 그런데 여기서 한 가지 더 알아둬야 할 부분이 있다. 

    만약 Exeuctor에게 동시에 쓰레드 갯수보다 많은 작업이 제출되면 어떻게 될까?

    Execuctor는 내부적으로 BlockingQueue<Runnable>를 가지고 있는데, 현재 처리할 수 있는 양보다 많은 Task가 제출되면 Queue에 Task를 넣어둔 다음에 순차적으로 처리하도록 해준다. 

    더보기
    // ThreadPoolTaskExecutor.java
    @Override
    protected ExecutorService initializeExecutor(
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    
       BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
    
       ...
          executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
    	...
        return executor;
    }

    그리고 처리해야 할 Task를 보관할 수 있는 QueueSize는 ThreadPoolTaskExeuctor를 생성할 때 손쉽게 지정할 수 있다. 아래처럼 단순히 setQueueCapacity() 메서드를 호출해서 설정하면 된다. 

    threadPoolTaskExecutor.setQueueCapacity(1000);

     


    TaskExeuctor의 Queue가 꽉찬 경우라면? 

    큐 사이즈를 무한으로 크게 놔둘 수는 없다. 메모리 문제가 걱정되기도 하며, 작업이 처리되는데 긴 Latency가 발생할 수 있기 때문이다. 그렇다면 어느 정도의 합리적인 수준의 큐 사이즈를 설정해야하는 것으로 귀결된다. 

    너무 많은 요청이 온다면 큐가 가득 차는 경우도 발생하기 시작할텐데, 이 경우에는 어떻게 동작할까? TaskExeuctor는 큐 사이즈가 가득 찼을 때 새롭게 들어온 작업을 어떻게 처리할지를 정책으로 관리해준다. 현재 4가지의 정책이 제공된다.

    • AbortPolicy : 작업이 제출되었을 때 큐가 가득 찼으면 RejectedExecutionException을 던짐. 
    • CallerRunsPolicy : 큐가 가득차면, 작업을 제출한 쓰레드가 직접 작업을 처리함. 즉, 동기적으로 처리함.
    • DiscardPolicy : 큐가 가득차면, 새로 제출된 작업을 버림.
    • DiscardOldestPolicy : 큐가 가득차면, 새로 제출된 작업을 추가하고 가장 오래전에 제출된 작업을 버림. 

    어떤 비동기 처리를 하느냐에 따라 적절한 정책을 선택해서 처리해야한다. 


    TaskExeuctor의 QueueSize가 적절한지는 어떻게 확인할 수 있을까? 실패한 요청을 알고 싶다면?

    TaskExeuctor의 QueueSize가 실시간으로 어떻게 변하는지를 확인하는 것도 중요할 것을 짐작할 수 있다. QueSize를 모니터링 하고 싶다면 아래와 같이 메트릭으로 등록한 후, 프로메테우스를 이용해 실제 값이 어떻게 변하는지를 살펴보는 것이 많은 도움이 될 것이다. 

    @Bean
    public MeterBinder queSize() {
        final ThreadPoolTaskExecutor taskExecutor = (ThreadPoolTaskExecutor) taskExecutor();
        return registry -> Gauge.builder(
                "async-que-size",
                taskExecutor,
                ThreadPoolTaskExecutor::getQueueSize).register(registry);
    }
    
    @Bean
    public CountedAspect countedAspect(MeterRegistry meterRegistry) {
        return new CountedAspect(meterRegistry);
    }

    혹은 QueueSize를 넘었을 때 작업을 버리고 싶은데, 얼마나 많은 작업이 버렸는지를 확인하고 싶을 때도 있을 것이다. 이 때는 Policy 클래스를 하나 구현하고 메트릭을 찍도록 한다. 그리고 이 Policy 인스턴스를 하나 생성해서 TaskExecutor의 RejectExecutionHandler로 등록해주기만 하면 된다. 

    public class LogDiscardPolicy extends ThreadPoolExecutor.DiscardPolicy {
    
        @Counted("dropped-request")
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            super.rejectedExecution(r, e);
        }
    }
    
    //
    
    @Bean
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setCorePoolSize(50);
            threadPoolTaskExecutor.setMaxPoolSize(1000);
            threadPoolTaskExecutor.setQueueCapacity(1000);
            threadPoolTaskExecutor.setPrestartAllCoreThreads(true);
            
            // Policy 설정
            threadPoolTaskExecutor.setRejectedExecutionHandler(new LogDiscardPolicy());
            return threadPoolTaskExecutor;
        }

     

     


    요약

    • EventPublisher를 이용하면 객체 간의 결합도를 낮춰줌. Object 타입으로 이벤트를 전달해주기 때문임.
    • EventPublisher가 발생한 event는 Multicaster를 통해서 EventHandler에게 전달됨.
    • 하나의 이벤트는 여러 EventHandler에서 처리될 수 있음. 따라서 Event는 반드시 불변객체로 만들거나, EventHandler는 방어적 복사를 이용해 처리해야 함. 
    • EventPublisher - EventHandler는 이벤트를 발행한 쓰레드가 동기적으로 이벤트를 처리함.
    • 비동기적으로 처리하기 위해서 @Async 어노테이션을 붙여서 처리할 수 있음.
    • @Async 어노테이션만 붙이면 기본으로 ThreadPoolTaskExeuctor가 AOP Interceptor에 전달되는데, 최대 쓰레드 갯수를 조절할 수 없어서 위험함. 
    • 스프링 빈으로 TaskExecutor를 등록해서 사용할 수 있음. 
    • 쓰레드 갯수보다 더 많은 Task가 제출되면 TaskQueue에 보관된 후 차례대로 처리됨. 
    • 최대 TaskQueue 사이즈를 달성했는데 새로운 Task가 제출되면 ThreadPoolTaskExecutor는 RejectExeuctionHandler Policy에 따라서 다르게 동작함. 
      • Discard
      • DiscardOldest
      • CallerRuns
      • Abort
    • 현재 TaskQue 사이즈와 거절되거나 버려진 Task들은 메트릭 수집을 통해서 모니터링 할 수 있음. 

    댓글

    Designed by JB FACTORY