-
비동기 Reactive Streams, CompletableFuture카테고리 없음 2025. 4. 23. 16:09
Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking back pressure.
핵심 개념
#Publisher: 데이터 생성과 발행자
#Subscriber: 데이터 구독과 처리자
#Subscription: 데이터 흐름 제어자
#Processor: Publisher 와 Subscriber 결합
#비동기란? 하나의 스레드에서 하나의 작업에 대한 요청을 보낸 후 응답이 오기 전까지 다른 작업을 동시에 수행할 수 있는 것. 현재 스레드가 블로킹되지 않아 처리 속도가 빠르고 더 적은 리소스로 더 많은 양을 처리할 수 있다.
#백프레셔란? 옵저버패턴에서는 구독자의 상태와 상관없이 발행자가 메세지를 보내면서 out of memory 같은 이슈에 부딪힐 수 있는 반면, 백프레셔는 구독자가 얼만큼의 데이터를 수용할 수 있을지 발행자에 알리고 발행자는 요청 수 만큼의 데이터만 구독자에 전달한다.
주요 인터페이스
흐름
Publisher → Subscriber | ↑ └── subscribe(subscriber) | └─> subscriber.onSubscribe(subscription) └─> subscription.request(n) └──> publisher.emit(onNext)
Publisher
Publisher.subscribe(Subscriber)는 구독을 신청하는 단계입니다.
Publisher구현체는 subscribe메서드 안에서 Subscriber에게 Subscription을 생성해서 onSubscribe() 호출합니다.
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Publisher 객체로는 Mono와 Flux 가 있습니다.
Mono 0~1개의 데이터 처리 Mono.just(”data”) Flux 0~N개의 데이터 처리 Flux.just(1,2,3) Flux.just(1, 2, 3) //가장 업스트림 .map(i -> i + 1) //중간 연산자 .filter(i -> i % 2 == 0) //중간 연산자 .subscribe(System.out::println); //다운스트림 (최종subscriber)
이렇게 메서드를 체이닝할 수 있는데, just 는 가장 상위에 있는 flux 로 업스트림,나머지는 just 보다 다운스트림이라고 할 수 있습니다.
이 개념은 상대적이라 map 은 just 의 다운스트림이지만 filter 의 업스트림이 됩니다.
내부적으로 각 스트림마다 Publisher와 Subscriber 가 생성됩니다.
Method Publisher Subscriber Flux.just(1, 2, 3) FluxJust 없음 (Source → 최초 onSubscribe 주체) .map(i -> i + 1) FluxMap MapSubscriber .filter(...) FluxFilter FilterSubscriber .subscribe(...) 없음 (subscribe 호출만) LambdaSubscriber 가장 업스트림인 FluxJust 에서는 Subscriber 를 등록하고 Subscriber 의 onSubscribe 인터페이스를 호출합니다.
FluxJust.class
public void subscribe(CoreSubscriber<? super T> actual) { actual.onSubscribe(Operators.scalarSubscription(actual, this.value, "just")); }
Subscriber
Subscriber.onSubscribe(Subscription)는 Publisher가 구독을 수락하며 제어권을 넘기는 단계입니다.
- 구독을 수락하고 request할 수 있는 권한을 받는 단계로
- Subscriber는 이 Subscription을 가지고 얼마나 데이터를 받을지 request(n) 합니다.
- 이후 결과에 따라 onNext, onError, onComplete 를 사용하여 상황을 전달합니다.
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
subscribe() 를 호출하지 않으면, 데이터 스트림은 시작되지 않습니다. 그 이유는 subscriber 가 subscription 을 통해 데이터 요청 (request) 를 보내야 publisher 가 request 만큼 데이터를 전달하기 때문입니다.
LambdaMonoSubscriber.class 의 onSubscribe 에서 Subscription 에 request 를 호출하는 것을 확인할 수 있습니다.
public final void onSubscribe(Subscription s) { if (Operators.validate(this.subscription, s)) { this.subscription = s; if (this.subscriptionConsumer != null) { try { this.subscriptionConsumer.accept(s); } catch (Throwable t) { Exceptions.throwIfFatal(t); s.cancel(); this.onError(t); } } else { s.request(Long.MAX_VALUE); } } }
JPA, Mybatis 는 리액티브 스트림을 지원하지 않아 한 번에 데이터를 로드하기 때문에 List<User> 같은 형태로 반환됩니다.
List 는 가변 길이 버퍼로 조회한 데이터 양이 많으면 out of memory 가 발생할 수 있습니다.
R2DBC 라이브러리를 사용해 리액티브를 지원할 수 있습니다.
MongoDB 는 리액티브 전용 모듈을 따로 제공해 데이터를 비동기로 스트리밍하는 Publisher 로 동작할 수 있습니다. Flux<User> 를 반환합니다.
스레드(스케줄러)를 조절하기 위해 subscribeOn, publishOn 를 사용할 수도 있습니다.
subscribeOn(Scheduler) publishOn(Scheduler) 적용 시점 Publisher 실행 시점 전체 지정 지점부터 이후 연산 영향 범위 upstream 전체 (source + 연산 모두) downstream (지정 위치 이후부터) 용도 데이터를 어디서 만들지 정할 때 데이터를 어디서 처리할지 정할 때 보통 위치 .subscribeOn()은 체인의 맨 위 .publishOn()은 중간 또는 이후에 위치 예시 DB, API 요청, 파일 읽기 등 blocking I/O 호출 위치 변경 .map(), .flatMap() 같은 데이터 처리 연산 위치 변경 중첩 사용 여러 번 써도 처음 한 번만 적용됨 여러 번 쓰면 적용된 위치마다 변경됨 subscribeOn 은 Publisher 단계(최상위 작업)에서 스케줄러의 스레드를 선택한 후 이후 작업에서 동일 스레드를 사용합니다.
- 스레드 개수 제한이 있는 Elastic 스케줄러 (최대 CPU 코어 수 * 10 개의 스레드 사용)
- 기존 elastic()처럼 필요할 때 새로운 스레드를 생성하지만, 일정 개수 이상은 추가 생성되지 않습니다.
- I/O 작업 (Blocking 작업) 에 적합
- 유휴(Idle) 스레드는 60초 후 종료됩니다.
Mono.fromSupplier(() -> { // DB/API 호출 (blocking) return getData(); }) .subscribeOn(Schedulers.boundedElastic()) .map(data -> transform(data)) // 여기도 같은 스레드에서 실행됨 .subscribe(System.out::println);
publishOn은 publishOn을 사용한 지점부터 다른 스레드가 사용됩니다.
Mono.just("data") .map(s -> { log("before publishOn"); // 메인 스레드 return s; }) .publishOn(Schedulers.parallel()) // 여기부터 스레드 변경 .map(s -> { log("after publishOn"); // parallel 스레드 return s.toUpperCase(); }). filter(s-> s.contains("code")) .subscribe(System.out::println);
https://engineering.linecorp.com/ko/blog/reactive-streams-with-armeria-1
Armeria로 Reactive Streams와 놀자! - 1
Reactive Streams란? LINE+에서 오픈소스 Armeria와 Central Dogma를 개발하고 있는 엄익훈입니다. 저는 Reactive Streams의 개념과, Reactive Streams를 오픈 소스 비동기 HTTP/2, RPC,...
engineering.linecorp.com
Reactive Stream이 다수의 이벤트를 다루는데 중점을 둔다면, CompletableFuture은 단일 값의 비동기 처리에 적합합니다.
thread를 생성해 작업을 위임하는 비동기 방식으로 일회성의 무거운 작업을 여러개의 CPU로 나누어 처리합니다.
하나의 요청에 두 개의 작업을 진행해야하는 경우,
private Integer work_1() { TimeUnit.SECONDS.sleep(1); return 1; } private Integer work_2() { TimeUnit.SECONDS.sleep(2); return 2; }
아래와 같이 작성하면 main thread 는 2초, value_1 thread 는 1초를 기다리며 자원을 낭비하게 됩니다.
@Test public void async_2() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); CompletableFuture<Integer> value_1 = new CompletableFuture<>(); CompletableFuture<Integer> value_2 = new CompletableFuture<>(); executorService.submit(() -> value_1.complete(work_1())); executorService.submit(() -> value_2.complete(work_2())); System.out.println("sum = " + (value_1.get() + value_2.get())); }
main thread 는 즉시 반환하고, 두 연산의 결과를 받아 작업할 수 있는 별도 스레드를 두는 것이 더 효율적입니다.
@Test public void async_3() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletableFuture<Integer> value_1 = new CompletableFuture<>(); CompletableFuture<Integer> value_2 = new CompletableFuture<>(); CompletableFuture<Integer> value_3 = value_1.thenCombine(value_2, (a, b) -> a + b); executorService.submit(() -> value_1.complete(work_1())); executorService.submit(() -> value_2.complete(work_2())); executorService.submit(() -> { try { System.out.println("sum = " + (value_3.get())); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }
비동기 작업을 체이닝하는 경우는 데드락에 유의해야합니다.
import java.util.concurrent.*; public class CompletableFutureDeadlockDemo { private static final ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { CompletableFuture<?> future = test("A"); future.join(); // main thread blocking 대기 threadPool.shutdown(); } private static CompletableFuture<?> test(String id) { return taskA(id) .handleAsync((response, exception) -> { log("handleAsync: after taskA"); return taskB(id); // 내부에서 또 async + join → 위험 }, threadPool); } private static CompletableFuture<?> taskA(String id) { return CompletableFuture.supplyAsync(() -> { log("taskA: callApi1"); sleep(1000); return "OK"; }, threadPool).exceptionally(e -> { log("taskA exception"); return null; }); } private static Object taskB(String id) { CompletableFuture<Object> future = CompletableFuture.supplyAsync(() -> { log("taskB: callApi2"); sleep(1000); return new Object(); }, threadPool).handleAsync((result, ex) -> { log("taskB: handleAsync after callApi2"); return new User(); // Dummy object }, threadPool); return CompletableFuture.allOf(future) .thenApply(Void -> future.join()) // join으로 blocking .join(); // outer join } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException ignored) {} } private static void log(String msg) { System.out.println(Thread.currentThread().getName() + " - " + msg); } static class User {} }
순서
1. taskA() → threadPool에서 스레드 1 사용
2. handleAsync(...) → threadPool에서 스레드 2 요청 → 사용
3. handleAsync 내부에서 taskB() 실행
4. 그 안에서 또 비동기 작업 (callApi2 + handleAsync) 요청
5. 하지만 이미 스레드 2개가 모두 사용 중이라 스레드 없음
6. 내부 작업이 스케줄되지 못하고 대기 → .join()은 그것을 기다림
7. Deadlock 발생
외부 API 호출같은 작업과 내부 작업을 별도의 스레드풀을 사용하도록 분리하거나,
내부 작업은 thenCompose 로 체이닝하는 것으로 해결할 수 있습니다.
반응형