ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 비동기 Reactive Streams, CompletableFuture
    카테고리 없음 2025. 4. 23. 16:09

    Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking back pressure.

     

    핵심 개념

    #Publisher: 데이터 생성과 발행자

    #Subscriber: 데이터 구독과 처리자

    #Subscription: 데이터 흐름 제어자

    #Processor: Publisher 와 Subscriber 결합

     

    #비동기란?  하나의 스레드에서 하나의 작업에 대한 요청을 보낸 후 응답이 오기 전까지 다른 작업을 동시에 수행할 수 있는 것. 현재 스레드가 블로킹되지 않아 처리 속도가 빠르고 더 적은 리소스로 더 많은 양을 처리할 수 있다.

     

    #백프레셔란? 옵저버패턴에서는 구독자의 상태와 상관없이 발행자가 메세지를 보내면서 out of memory 같은 이슈에 부딪힐 수 있는 반면, 백프레셔는 구독자가 얼만큼의 데이터를 수용할 수 있을지 발행자에 알리고 발행자는 요청 수 만큼의 데이터만 구독자에 전달한다.

     

    주요 인터페이스

    흐름

    Publisher            →           Subscriber
       |                                    ↑
       └── subscribe(subscriber)            |
            └─> subscriber.onSubscribe(subscription)
                                    └─> subscription.request(n)
                                          └──> publisher.emit(onNext)

     

    Publisher

    Publisher.subscribe(Subscriber)는 구독을 신청하는 단계입니다.

    Publisher구현체는 subscribe메서드 안에서 Subscriber에게 Subscription을 생성해서 onSubscribe() 호출합니다.

    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    

     

    Publisher 객체로는 Mono와 Flux 가 있습니다.

    Mono 0~1개의 데이터 처리 Mono.just(”data”)
    Flux 0~N개의 데이터 처리 Flux.just(1,2,3)
    Flux.just(1, 2, 3) //가장 업스트림
        .map(i -> i + 1) //중간 연산자
        .filter(i -> i % 2 == 0) //중간 연산자
        .subscribe(System.out::println); //다운스트림 (최종subscriber)

     

    이렇게 메서드를 체이닝할 수 있는데, just 는 가장 상위에 있는 flux 로 업스트림,나머지는 just 보다 다운스트림이라고 할 수 있습니다.

    이 개념은 상대적이라 map 은 just 의 다운스트림이지만 filter 의 업스트림이 됩니다.

    내부적으로 각 스트림마다 Publisher와 Subscriber 가 생성됩니다.

     

    Method Publisher Subscriber
    Flux.just(1, 2, 3) FluxJust 없음 (Source → 최초 onSubscribe 주체)
    .map(i -> i + 1) FluxMap MapSubscriber
    .filter(...) FluxFilter FilterSubscriber
    .subscribe(...) 없음 (subscribe 호출만) LambdaSubscriber

     

    가장 업스트림인 FluxJust 에서는 Subscriber 를 등록하고 Subscriber 의 onSubscribe 인터페이스를 호출합니다.

     

    FluxJust.class

     public void subscribe(CoreSubscriber<? super T> actual) {
            actual.onSubscribe(Operators.scalarSubscription(actual, this.value, "just"));
        }
    

     

    Subscriber

    Subscriber.onSubscribe(Subscription)는 Publisher가 구독을 수락하며 제어권을 넘기는 단계입니다.

    • 구독을 수락하고 request할 수 있는 권한을 받는 단계로
    • Subscriber는 이 Subscription을 가지고 얼마나 데이터를 받을지 request(n) 합니다.
    • 이후 결과에 따라 onNextonErroronComplete 를 사용하여 상황을 전달합니다.
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }

     

    subscribe() 를 호출하지 않으면, 데이터 스트림은 시작되지 않습니다. 그 이유는 subscriber 가 subscription 을 통해 데이터 요청 (request) 를 보내야 publisher 가 request 만큼 데이터를 전달하기 때문입니다.

     

    LambdaMonoSubscriber.class 의 onSubscribe 에서 Subscription 에 request 를 호출하는 것을 확인할 수 있습니다.

     

    public final void onSubscribe(Subscription s) {
            if (Operators.validate(this.subscription, s)) {
                this.subscription = s;
                if (this.subscriptionConsumer != null) {
                    try {
                        this.subscriptionConsumer.accept(s);
                    } catch (Throwable t) {
                        Exceptions.throwIfFatal(t);
                        s.cancel();
                        this.onError(t);
                    }
                } else {
                    s.request(Long.MAX_VALUE);
                }
            }
    
        }

     

     

    JPA, Mybatis 는 리액티브 스트림을 지원하지 않아 한 번에 데이터를 로드하기 때문에 List<User> 같은 형태로 반환됩니다.

    List 는 가변 길이 버퍼로 조회한 데이터 양이 많으면 out of memory 가 발생할 수 있습니다.

    R2DBC 라이브러리를 사용해 리액티브를 지원할 수 있습니다.

    MongoDB 는 리액티브 전용 모듈을 따로 제공해 데이터를 비동기로 스트리밍하는 Publisher 로 동작할 수 있습니다. Flux<User> 를 반환합니다.

     

    스레드(스케줄러)를 조절하기 위해 subscribeOn, publishOn 를 사용할 수도 있습니다. 

      subscribeOn(Scheduler) publishOn(Scheduler)
    적용 시점 Publisher 실행 시점 전체 지정 지점부터 이후 연산
    영향 범위 upstream 전체 (source + 연산 모두) downstream (지정 위치 이후부터)
    용도 데이터를 어디서 만들지 정할 때 데이터를 어디서 처리할지 정할 때
    보통 위치 .subscribeOn()은 체인의 맨 위 .publishOn()은 중간 또는 이후에 위치
    예시 DB, API 요청, 파일 읽기 등 blocking I/O 호출 위치 변경 .map(), .flatMap() 같은 데이터 처리 연산 위치 변경
    중첩 사용 여러 번 써도 처음 한 번만 적용됨 여러 번 쓰면 적용된 위치마다 변경됨

     

    subscribeOn 은 Publisher 단계(최상위 작업)에서 스케줄러의 스레드를 선택한 후 이후 작업에서 동일 스레드를 사용합니다.

    • 스레드 개수 제한이 있는 Elastic 스케줄러 (최대 CPU 코어 수 * 10 개의 스레드 사용)
    • 기존 elastic()처럼 필요할 때 새로운 스레드를 생성하지만, 일정 개수 이상은 추가 생성되지 않습니다.
    • I/O 작업 (Blocking 작업) 에 적합
    • 유휴(Idle) 스레드는 60초 후 종료됩니다.
    Mono.fromSupplier(() -> {
        // DB/API 호출 (blocking)
        return getData();
    })
    .subscribeOn(Schedulers.boundedElastic())
    .map(data -> transform(data))   // 여기도 같은 스레드에서 실행됨
    .subscribe(System.out::println);

     

    publishOn은 publishOn을 사용한 지점부터 다른 스레드가 사용됩니다.

    Mono.just("data")
    .map(s -> {
        log("before publishOn");   // 메인 스레드
        return s;
    })
    .publishOn(Schedulers.parallel())  //  여기부터 스레드 변경
    .map(s -> {
        log("after publishOn");    // parallel 스레드
        return s.toUpperCase();
    }).
    filter(s-> s.contains("code"))
    .subscribe(System.out::println);

     

     

    https://engineering.linecorp.com/ko/blog/reactive-streams-with-armeria-1

     

    Armeria로 Reactive Streams와 놀자! - 1

    Reactive Streams란? LINE+에서 오픈소스 Armeria와 Central Dogma를 개발하고 있는 엄익훈입니다. 저는 Reactive Streams의 개념과, Reactive Streams를 오픈 소스 비동기 HTTP/2, RPC,...

    engineering.linecorp.com

     


    Reactive Stream이 다수의 이벤트를 다루는데 중점을 둔다면, CompletableFuture은 단일 값의 비동기 처리에 적합합니다.

    thread를 생성해 작업을 위임하는 비동기 방식으로 일회성의 무거운 작업을 여러개의 CPU로 나누어 처리합니다.

     

    하나의 요청에 두 개의 작업을 진행해야하는 경우, 

    private Integer work_1() {
        TimeUnit.SECONDS.sleep(1);
        return 1;
    }
    
    private Integer work_2() {
        TimeUnit.SECONDS.sleep(2);
        return 2;
    }

     

    아래와 같이 작성하면 main thread 는 2초, value_1 thread 는 1초를 기다리며 자원을 낭비하게 됩니다.

    @Test
    public void async_2() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
    
        CompletableFuture<Integer> value_1 = new CompletableFuture<>();
        CompletableFuture<Integer> value_2 = new CompletableFuture<>();
    
        executorService.submit(() -> value_1.complete(work_1()));
        executorService.submit(() -> value_2.complete(work_2()));
    
        System.out.println("sum = " + (value_1.get() + value_2.get()));
    }

     

    main thread 는 즉시 반환하고, 두 연산의 결과를 받아 작업할 수 있는 별도 스레드를 두는 것이 더 효율적입니다.

    @Test
    public void async_3() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        CompletableFuture<Integer> value_1 = new CompletableFuture<>();
        CompletableFuture<Integer> value_2 = new CompletableFuture<>();
        CompletableFuture<Integer> value_3 = value_1.thenCombine(value_2, (a, b) -> a + b);
    
        executorService.submit(() -> value_1.complete(work_1()));
        executorService.submit(() -> value_2.complete(work_2()));
        executorService.submit(() -> {
            try {
                System.out.println("sum = " + (value_3.get()));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });
    }

     

    비동기 작업을 체이닝하는 경우는 데드락에 유의해야합니다.

    import java.util.concurrent.*;
    
    public class CompletableFutureDeadlockDemo {
    
        private static final ExecutorService threadPool = Executors.newFixedThreadPool(2); 
    
        public static void main(String[] args) {
            CompletableFuture<?> future = test("A");
    
            future.join(); // main thread blocking 대기
            threadPool.shutdown();
        }
    
        private static CompletableFuture<?> test(String id) {
            return taskA(id)
                    .handleAsync((response, exception) -> {
                        log("handleAsync: after taskA");
                        return taskB(id); //  내부에서 또 async + join → 위험
                    }, threadPool);
        }
    
        private static CompletableFuture<?> taskA(String id) {
            return CompletableFuture.supplyAsync(() -> {
                log("taskA: callApi1");
                sleep(1000);
                return "OK";
            }, threadPool).exceptionally(e -> {
                log("taskA exception");
                return null;
            });
        }
    
        private static Object taskB(String id) {
            CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> {
                log("taskB: callApi2");
                sleep(1000);
                return new Object();
            }, threadPool).handleAsync((result, ex) -> {
                log("taskB: handleAsync after callApi2");
                return new User(); // Dummy object
            }, threadPool);
    
            return CompletableFuture.allOf(future)
                    .thenApply(Void -> future.join()) //  join으로 blocking
                    .join(); //  outer join
        }
    
        private static void sleep(long millis) {
            try { Thread.sleep(millis); } catch (InterruptedException ignored) {}
        }
    
        private static void log(String msg) {
            System.out.println(Thread.currentThread().getName() + " - " + msg);
        }
    
        static class User {}
    }

     

    순서

    1. taskA() → threadPool에서 스레드 1 사용

    2. handleAsync(...) → threadPool에서 스레드 2 요청 → 사용

    3. handleAsync 내부에서 taskB() 실행

    4. 그 안에서 또 비동기 작업 (callApi2 + handleAsync) 요청

    5. 하지만 이미 스레드 2개가 모두 사용 중이라 스레드 없음

    6. 내부 작업이 스케줄되지 못하고 대기 → .join()은 그것을 기다림

    7. Deadlock 발생

     

    외부 API 호출같은 작업과 내부 작업을 별도의 스레드풀을 사용하도록 분리하거나, 

    내부 작업은 thenCompose 로 체이닝하는 것으로 해결할 수 있습니다.

     

     

     

    https://techblog.woowahan.com/2722/

    반응형
Designed by Tistory.