CompletableFuture
자바 5부터는 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공한다.
비동기 계산을 모델링하는데 이용하거나, 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다.
시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있다. 이는 옷을 드라이클리닝 맡기고 언제 끝날지 적힌 영수증(Future) 를 받는 동작에 비유할 수 있다. 끝날 때까지 우리는 각자 볼 일을 본다.
Future 는 저수준 스레드에 비해 직관적으로 이해가 쉽다.
Future 를 이용하려면 작업을 Callable 객체 내부로 감싸 ExecutorService 에 제출한다.
get으로 결과를 가져올 때 결과가 준비되어 있지 않으면 호출 스레드가 블록된다. get의 인자로 대기 시간을 줄 수 있다.
여러 Future 의 결과가 있을 때 이들의 의존성은 표현하기 어렵다.
A라는 계산이 끝나면 B로 전달. B의 결과가 나오면 다른 질의 결과과 B의 결과를 조합해라.
CompletableFuture은 아래와 같은 선언형 기능을 제공하며 위 요구사항을 좀 더 쉽게 수현할 수 있도록 돕는다.
- 두 개의 비동기 계산 결과를 하나로 합친다. 두 가지 계산 결과는 서로 독립적일 수 있으며 또는 두 번째 결과가 첫번 째 결과에 의존하는 상황일 수 있다.
- Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
- Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다.
- 프로그램적으로 Future 를 완료시킨다.(즉, 비동기 동작에 수동으로 결과 제공)
- Future 완료 동작에 반응한다.(즉, 결과기다리면서 블록되지 않고, 결과 알림을 받으면 Future 의 결과로 원하는 동작 수행)
Future 와 CompletableFuture는 Collection 과 Stream 의 관계에 비유할 수 있다.
/**
* 최저가격 검색 애플리케이션
* */
public class Shop {
Random random = new Random();
String name;
public Shop(String name) {
this.name = name;
}
/**
* 외부 서비스 접근
* 네트워크로 모든 상점 가격 검색하는데 동기API 부적절함!!
* */
public double getPrice(String product){
return calculatePrice(product);
}
/**
* 동기 API 를 비동기적으로 소비
* getPriceAsync 는 즉시 반환되어 호출 스레드가 다른 작업 수행 가능함.
* Future 반환값으로 결과값 핸들링
* */
public Future<Double> getPriceAsync(String product){
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
//다른 스레드 생성해 계산 작업 할당
new Thread(()->{
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}
private double calculatePrice(String product){
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
//외부 서비스 접근
public static void delay(){
try{
Thread.sleep(1000L);
} catch (InterruptedException e){
throw new RuntimeException(e);
}
}
}
public class Main {
public static void main(String[] args) {
Shop shop = new Shop("BestS");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite");
long invocationTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("invocation return Time = "+ invocationTime + "msecs");//즉시 리턴 기대
doSomethingElse();
Double price = null;
try {
price = futurePrice.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("price is= " + price);
long retrievalTime = (System.nanoTime() - start) / 1_000_000;
System.out.println("retrievalTime = "+ retrievalTime + "mesc");
}
private static void doSomethingElse() {
try {
Thread.sleep(1000);
System.out.println("doSthElse....");
} catch (InterruptedException e) {
}
}
}
여기서 만약 가격을 계산하는 스레드에서 에러가 발생하면 어떻게 될까? 예외가 발생하면 해당 스레드에만 영향을 미친다.
즉, 에러가 발생해도 가격 계산은 계속 진행되며 일의 순서가 꼬인다. 결과적으로 클라이언트는 get메서드가 반환될 때까지 영원히 기다려야한다.
클라이언트는 타임아웃값을 받는 get메서드의 오버로드 버전을 만들어 이 문제를 해결할 수 있다.
이처럼 블록 문제가 발생할 수 있는 상황에서는 타임아웃을 활용하는 것이 좋다. 이 때 왜 에러가 발생했는지 알 수 있는 방법이 없다. 따라서 completeExceptionally 메서드를 사용해 CompletableFuture 내부에서 발생한 예외를 클라이언트로 전달해야 한다.
Double price = null;
try {
price = futurePrice.get(0, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
public Future<Double> getPriceAsync(String product){
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
//다른 스레드 생성해 계산 작업 할당
new Thread(()->{
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception ex){
futurePrice.completeExceptionally(ex); //시간 초과 시 예외발생
}
}).start();
return futurePrice;
}
위처럼 CompletableFuture 를 만든 걸 아래처럼 간단하게 만들 수도 있다.
public Future<Double> getPriceAsync2(String product){
return CompletableFuture.supplyAsync(()->calculatePrice(product));
}
supplyAsync 를 따라가면 내부적으로 예외처리가 위에 직접 작성한 것과 동일하게 관리되고 있다.
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<? extends T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return false; }
public void run() {
CompletableFuture<T> d; Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
이번에는 SHOP 클래스에서 구현한 API 제어 권한이 우리에게 없으며, 모든 API 는 동기 방식의 블록 메서드라고 가정한다. 블록 메서드를 사용할 수밖에 없는 상황에서 비동기적으로 여러 상점에 질의하는 방법, 즉 한 요청의 응답을 기다리며 블록하는 상황을 피해 애플리케이션 성능을 높일 수 있는 방법을 배운다.
비블록 코드 만들기
제품명을 입력하면 상점 이름과 제품가격 문자열 정보를 포함하는 List 반환 메서드를 구현해야한다.
public List<String> findPrices(String product);
public static List<String> findPrices(String product){
//[bestPrice price is 103.12, SaveBig price is 117.23, Favorita price is 124.61, BuyItAll price is 178.06]
return shops.stream().map(s-> String.format("%s price is %.2f", s.getName() , s.getPrice(product)))
.collect(toList());
}
long start = System.nanoTime();
List<String> saveBig = findPrices("SaveBig");
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("done in " + duration + " msecs");//4099
System.out.println(saveBig);
public static List<String> findPricesParallel(String product){
//[bestPrice price is 103.12, SaveBig price is 117.23, Favorita price is 124.61, BuyItAll price is 178.06]
return shops.parallelStream().map(s-> String.format("%s price is %.2f", s.getName() , s.getPrice(product)))
.collect(toList());
}
long start = System.nanoTime();
List<String> saveBig = findPricesParallel("SaveBig");
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("done in " + duration + " msecs");//1070
System.out.println(saveBig);
CompletableFuture 로 비동기 호출 구현
리스트의 CompletableFuture 는 각각 계산 결과가 끝난 상점의 이름 문자열을 포함한다.
하지만 findPrices 메서드 반환 형식은 List<String>이므로 모든 CompletableFuture의 동작이 완료되고 결과를 추출한 다음에 리스트를 반환해야 한다.
두 번째 map 연산을 List<CompletableFuture<String>> 에 적용할 수 있다.
즉, 리스트의 모든 CompletableFuture 에 join 을 호출해서 모든 동작이 끝나기를 기다린다..
CompletableFuture 클래스의 join 메서드는 Future 인터페이스의 get메서드와 같은 의미를 갖는다.
다만 join 은 아무 예외도 발생시키지 않는다는 차이가 있어 try-catch 로 감쌀 필요가 없다.
public static List<CompletableFuture<String>> findPricesWithCF(String product){
return shops.stream().map(s-> CompletableFuture.supplyAsync(()->String.format("%s price is %.2f", s.getName() , s.getPrice(product))))
.collect(toList());
}
public static List<String> findPricesWithCF(String product){
List<CompletableFuture<String>> cfList = shops.stream()
.map(s -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", s.getName(), s.getPrice(product))))
.collect(toList());
return cfList.stream().map(CompletableFuture::join)
.collect(toList());
}
두 map 연산을 하나의 스트림 처리 파이프라인으로 처리하지 않고 두 개의 스트림 파이프라인으로 처리했다.
스트림 연산은 게으른 특성이 있어 하나의 파이프라인으로 처리했다면 모든 가격 정보 요청 동작이 동기적, 순차적으로 이뤄지는 결과가 된다.
//동기적 코드 예시
public static List<String> findPricesWithCF2(String product){
return shops.stream()
.map(s -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", s.getName(), s.getPrice(product))))
.map(CompletableFuture::join)
.collect(toList());
}
아래쪽이 CompletableFuture 를 리스트로 모은다음 다른 작업과 독립적으로 각자의 작업을 수행하는 모습이다.
동기적으로 실행하면 4046 , 비동기는 2100 msecs 의 시간차이를 보인다.
비동기가 동기 실행보다는 빠르지만 parallelStream 1070 보다는 여전히 느리다.
네 개의 스레드를 병렬로 실행할 수 있는 기기라고 할 때, 순차 실행일 때는 상점 수가 1개 늘어나면 실행 시간도 1초 늘어난다. 병렬 스트림에서는 상점 수가 4개에서 5개가 됐을 때 1초가 늘어난다.
CompletableFuture 에서는 여전히 2006 으로 병렬보다 조금 빠르다.
(내 컴퓨터에서 순차 5053 병렬 2030 cf 2013 이 나옴)
병렬과 CF는 둘 다 내부적으로 Runtime.getRuntime().availablePeocessors() 가 반환하는 스레드 수를 사용하면서 비슷한 결과가 된다.
결과적으로 비슷하지만 CF는 병렬 스트림 버전에 비해 작업에 이용할 수 있는다양한 Executor 를 지정할 수 있다는 장점이 있다. 따라서 Executor 스레드풀 크기 조절로 애플리케이션에 맞는 최적화된 설정을 만들 수 있다.
커스텀 Executor 사용하기
*스레드 풀 크기 조절
스레드 풀이 너무 크면 CPU 와 메모리 자원을 서로 경쟁하느라 시간을 낭비할 수 있다. 반면 스레드 풀이 너무 작으면 CPU의 일부 코어는 활용되지 않을 수 있다. 게츠(자바 병렬 프로그래밍 저자)는 다음 공식으로 대략적인 CPU 활용 비율을 계산할 수 있다고 제안한다.
N(thread) = N(cpu) * U(cpu) * (1+W/C)
N(cpu)= Runtime.getRuntime().availablePeocessors()
U(cpu) = 0과 1사이의 값을 갖는 CPU 활용 비율
W/C = 대기시간과 계산시간의 비율
위 애플리케이션은 상점의 응답을 대략 99퍼센의 시간만큼 기다리므로 W/C 비율이 100으로 간주될 수 있다.
즉, 대상 CPU 활용률이 100퍼센트라면 400스레드를 갖는 풀을 만들어야한다.
4*1*100 = 400
하지만 상점 수보다 많은 스레드를 갖고 있어 봐야 사용할 가능성이 전혀 없으므로 낭비이다.
따라서 한 상점에 한 스레드가 할당될 수 있도록, 즉 가격 정보를 검색하려는 상점 수만큼 스레드를 갖도록 Executor 를 설정한다. 스레드 수가 너무 많으면 오히려 서버가 크래시될 수 있으므로 하나의 Executor 에서 사용할 스레드의 최대 개수는 100이하로 설정하는 것이 바람직하다.
ExecutorService executorService = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
여기서 만든 풀은 데몬 스레드를 포함한다.
자바에서 일반 스레드가 실행 중이면 자바 프로그램은 종료되지 않는다.
따라서 어떤 이벤트를 한없이 디가리면서 종료되지 않는 일반 스레드가 있으면 문제가 될 수 있다.
반면 데몬스레드는 자바 프로그램이 종료될 때 강제 실행 종료가 될 수 있다.
두 스레드의 성능은 같다.
public static List<String> findPricesWithCustomedEx(String product,ExecutorService executor){
List<CompletableFuture<String>> cfList =
shops.stream()
.map(s -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f", s.getName(), s.getPrice(product),executor)))
.collect(toList());
return cfList.stream().map(CompletableFuture::join)
.collect(toList());
}
책에서는 위 코드가 안정적인 성능을 보장해준다고 했는데 내가 실행한 바로는
커스텀 Executor 가 CF 구현보다 아주 조금 빠른데 병렬보다는 느리다. --이해안감
실행방법/상점수 | 5 | 9 | 16 |
순차실행 | 5094 msec | 9099 msec | 26309 msec |
병렬실행 | 2029 msec | 3023 msec | 7030 msec |
CF | 2022 msec | 3028 msec | 9124 msec |
커스텀 executor | 2025 msec | 3025 msec | 9037 msec |
*스트림 병렬화와 CompletableFuture 병렬화
컬렉션 계산을 병렬화하는 두 가지 방법.
1. 병렬 스트림 생성
2. 컬렉션 반복으로 CompletableFuture 내부의 연산으로 만든다.
- I/O 가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간단하며 효율적일 수 있다. (모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 스레드를 가질 필요가 없다)
- 작업이 I/O 를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture 가 더 많은 유연성을 제공하며 대기/계산(W/C)의 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O 를 실제로 언제 처리할지 예측하기 어려운 문제가 있다.
*병렬스트림에서는 스레드 풀의의 크기가 고정되어 있어 검색 대상이 확장되었을 때 유연하게 대응할 수 없다.
여기까지 동기 서비스를 이용하는 클라이언트에 CompletableFuture 를 활용하는 방법이다.
다음은 여러 비동기 연산을 CompletableFuture 로 파이프라인화 하는 방법을 배운다.
여러 상점에서 가격 정보를 얻어오고 Shop.getPrice
결과 문자열을 파싱하고 Quote.parse
할인 서버에 질의를 보낼 Discount.applyDiscount
애플리케이션을 만들었다.
//Shop
public String getPrice(String product){
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s",name,price,code);
}
//Quote
/**
* @param s "BestPrice:123.5:GOLD"
* */
public static Quote parse(String s){
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName,price,discountCode);
}
//Discount
public static String applyDiscount(Quote quote){
return quote.getShopName()+ " price is " + Discount.apply(quote.getPrice(),quote.getDiscountCode());
}
public static List<String> findPrices(String product){
return shops.stream()
.map(shop->shop.getPrice(product))//할인 전 가격 얻기
.map(Quote::parse)//상점에서 반환한 문자열을 Quote 객체로 변환한다.
.map(Discount::applyDiscount) //Discount 객체로 각 Quote 에 할인을 적용한다.
.collect(toList());
}
위 작업을 성능 최적화를 해본다.
public static List<String> findPricesWithCF(String product){
List<CompletableFuture<String>> futures = shops.stream()
//첫번째 map 은 비동기적으로 상점에서 정보조회
// return Stream<CompletableFuture<String>>
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
//Qutoe 파싱작업은 I/O 가 없다.지연없이 동작 수행.
// thenApply 는 CF가 끝날 때까지 블록하지 않는다 = CF동작 완전 종료 후 thenApply 적용됨
//return Stream<CompletableFuture<Quote>>
.map(future -> future.thenApply(Quote::parse))
//원격실행이 적용돼 동기적으로 작업해야한다.
// 두 비동기 연산을 동기적으로 작업할 수 있게 파이프라인으로 만드는 메서드 thenCompose
//future 가 여러 상점에서 quote 을 얻는 동안 메인 스레드는 다른 작업을 수행할 수 있다.
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return futures.stream().map(CompletableFuture::join)
.collect(toList());
}
CF 의 다른 메서드들처럼 thenCompose 메서드도 Async 로 끝나는 버전이 있다.
thenCompose : 이전 작업을 수행한 스레드와 같은 스레드로 작업
thenComposeAsync : 다음 작업이 다른 스레드에서 실행하도록 스레드 풀로 작업 제출
두번째 CF 의 결과는 첫번쩨 CF의 에 의존하므로
두 CF를 하나로 조합하든, Async 버전의 메서드를 사용하든
최종 결과나 실행 시간에는 영향을 미치치 않는다.
따라서 스레드 전환 오버헤드가 적게 발생하며 효율성이 좀 더 좋은 thenCompose 를 사용했다.
위 내용은 첫번째 CF에 thenCompose 로 얻은 결과를 두번째 CF로 전달한다.
하지만 상황에 따라 독립적으로 실행된 두 개의 CF 를 합쳐야하는 경우도 있다.
첫번째 CF의 동작 완료와 무관하게 두번째 CF를 실행할 수 있어야한다.
이럴 때는 thenCombine 메서드를 사용한다.
thenCombine 은 BiFunction 을 두번째 인자로 받는다. BiFunction 에서는 두 결과를 어떻게 합칠지 정의한다.
thenCompose 처럼 thenCombine 에도 Async 버전이 존재한다.
thenCombineAsync 에서는 BiFunction 이 정의하는 조합 동작이 스레드 풀로 제출되면서 별도의 태스크에서 비동기적으로 실행된다.
한 온라인상점이 유로 가격 정보를 제공하는데, 고객에게는 항상 달러 가격을 보여줘야 한다.
1. 우리는 주어진 상품의 가격을 상점에 요청하는 한편
2. 원격 환율 교환 서비스를 이용해서 유로와 달러의 현재 환율을 비동기적으로 요청해야 한다.
3. 두 가지 데이터를 얻었으면 가격에 환율을 곱해서 결과를 합칠 수 있다.
여기서 합치는 작업은 단순한 곱셉이라 별도의 태스크에서 수행하여 자원을 낭비할 필요가 없다.
-> 그럼 별도의 태스크에서 실행할만한 상황은 무엇?
shops.stream()
.map(shop-> CompletableFuture.supplyAsync(()-> shop.getPrice(product))
.thenCombine( CompletableFuture.supplyAsync(()->exchangeService.getRate(EUR, USD)) ,(price, rate)-> price * rate)
.orTimeout(3, TimeUnit.SECONDS);
마지막에 orTimeout 으로 시간이 초과하면 exception 을 발생시킬 수도 있다.
TimeoutException 으로 CF를 완료하면서 또 다른 CF를 반환할 수 있도록 내부적으로 ScheduledTreadExecutor 을 활용한다. 혹은 CompleteOnTimeout 메서드로 타임아웃 발생시 기본값으로 처리할 수도 있다.
shops.stream()
.map(shop-> CompletableFuture.supplyAsync(()-> Quote.parse(shop.getPrice(product)).getPrice())
.thenCombine( CompletableFuture.supplyAsync(()->exchangeService.getRate(EUR, USD))
.completeOnTimeout(Default_Rate,1,TimeUnit.SECONDS),(price, rate)-> price * rate)
.orTimeout(3, TimeUnit.SECONDS));
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
if (unit == null)
throw new NullPointerException();
if (result == null)
whenComplete(new Canceller(Delayer.delay(new Timeout(this),
timeout, unit)));
return this;
}
static final ScheduledThreadPoolExecutor delayer;
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<Void> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
delayedExecute(t);
return t;
}
CF 가 아닌 Future 로 구현하면 아래와 같이 복잡한 코드를 구현해야한다.
public static void futureEx(String product){
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Double> futureRate = executor.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
return exchangeService.getRate(EUR, USD);
}
});
Shop shop = new Shop("x");
Future<Double> futurePriceInUS= executor.submit(new Callable<Double>() {
@Override
public Double call() throws Exception {
String priceInEUR = shop.getPrice(product);
return priceInEUR * futureRate.get();
}
});
}
모든 상점 정보를 기다리지 않는 애플리케이션 만들기
findPricesStream 은 이전 내용과 동일한데, 여기서 List 로 값을 반환하는 것이 아니라
CF 스트림을 그대로 반환하고, 반환된 스트림에 4번째 map 을 적용한다.
4번째 map 은 단순히 CF에 동작을 등록한다. CF에 등록된 동작은 CF 의 계산이 끝나면 값을 소비한다.
자바 8의 CF API 는 thenAccept 메서드로 이 기능을 제공한다.
thenAccept 메서드는 연산 결과를 소비하는 Consumer 를 인수로 받는다.
public static void main(String[] args) {
findPricesStream("SaveBig").map(future-> future.thenAccept(System.out::println));
}
public static Stream<CompletableFuture<String>> findPricesStream(String product){
return shops.stream()
.map(shop-> CompletableFuture.supplyAsync(() -> shop.getPrice(product),executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote-> CompletableFuture.supplyAsync(()->Discount.applyDiscount(quote),executor)));
}
thenAccept 에도 thenAcceptAsync 가 있다.
thenAcceptAsync 는 CF가 완료된 스레드가 아니라 새로운 스레드를 이용해 Consumer 를 실행한다.
불필요한 컨텍스트 변경은 피하는 동시에 CF가 완료되는 즉시 응답하는 것이 좋으므로
thenAcceptAsync 를 사용하지 않는다.
(오히려 thenAcceptAsync로 새로운 스레드가 이용가능할 때까지 대기하는 상황이 생길 수 있다)
thenAccept 에서 결과를 어떻게 소비할지 정의했으므로 CF<Void> 를 반환한다.
가장 느린 상점에서도 응답을 제공하려면 스트림의 모든 CF<Void> 를 배열로 추가하고 실행결과를 기다린다.
allOf 메서드가 반환하는 CF에 join 을 호출하면 원래 스트림의 모든 CF의 실행완료를 기다린다.
CompletableFuture[] futures = findPricesStream("SaveBig").map(future -> future.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
만약 가장 먼저 끝나는 하나의 응답만 필요하면 anyOf 를 사용할 수 있다.
특정 상점에만 delay 를 길게 잡아 TimeoutException 을 발생시켰다.
Exception 을 잡지 않으면 그 뒤에 다른 애들도 안나와서
catch 로 처리해주었다..
public static void main(String[] args) {
// findPricesStream("SaveBig").map(future -> future.thenAccept(System.out::println));
try {
CompletableFuture[] futures = findPricesStream("SaveBig").map(future -> future.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
} catch (Exception t){
System.out.println("시간초과");
}
}