동시성 - 모던 자바 인 액션
인터넷으로 여러 웹 서비스에 접근하는 애플리케이션을 구현할 때, 이들 서비스의 응답을 기다리는 동안 연산이 블록 되거나 귀중한 CPU 클록 싸이클 자원을 낭비하고 싶지 않다. 예를 들어 페이스북 응답을 기다리는 동안 트위터 응답을 차단하지 말란 법은 없다.
포크/조인 프레임워크와 병렬스트림은 병렬성의 귀중한 도구다.
이들은 한 태스크를 여러 하위 태스크로 나눠 CPU의 다른 코어 또는 다른 머신에서 이들 하위 태스크를 병렬로 실행한다.
반면 병렬성이 아니라 동시성을 필요로 하는 상황,
즉 조금씩 연관된 작업을 같은 CPU에서 동작하는 것 또는
애플리케이션을 생산성을 극대화할 수 있도록 코어를 바쁘게 유지하는 것이 목표라면,
원격 서비스나 데이터베이스 결과를 기다리는 스레드를 블록함으로 연산 자원을 낭비하는 일은 피해야 한다.
* 병렬이 멀티코어+멀티 스레드 작업이지만, 동시성으로 접근하는 것이 더 효율적일 때도 있다.
네트워크 통신, 파일 저장 및 로드 I/O 작업은 CPU 가 거의 일을 하지 않고 요청 후 응답이 올 때까지 대기 상태에 있게 된다. 이때 한개의 CPU가 I/O 요청 후 기다리는 동안 다른 작업을 처리하도록 하면 효율적이다.
물론 병렬작업으로 여러 CPU 가 동시에 다수 I/O 작업을 실행하게 할 수 있으나, 물리적인 CPU 의 경우 개수 제한이 있어 동시성으로 접근하는 것이 좋다.
동시성은 작업이 바뀔 때마다 문맥교환(Context switching)이 발생하고, 동시 작업이 너무 많다면 문맥교환 오버로드로 인해 싱글 코어에서 싱글 스레드로 작업하는 것이 더 빠를 수 있다.
코어가 N 배로 늘어나더라도 sequential 하게 동작하는 부분도 있기때문에 반드시 성능이 N 배로 늘어나는 것이 아니다.
- 출처 :https://mentha2.tistory.com/245
자바는 이런 환경에서 사용할 수 있는 두 가지 주요 도구를 제공한다.
1. Future 인터페이스로 자바 8의 CompletableFuture 구현은 간단하고 효율적인 해결사다.
2. 최근 자바 9에 추가된 구독 프로토콜에 기반한 리액티브 프로그래밍 개념을 따르는 플로 API 는 조금 더 정교한 프로그래밍 접근 방법을 제공한다.
처음 자바는 Runnable, Thread 를 동기화된 클래스와 메서드를 이용해 잠갔다.
자바5에서는 ExecutorService 인터페이스, Runnable, Thread 의 변형을 반환하는 Callable, Future, 제네릭 등을 지원했다.
자바7에서는 포크/조인 구현을 지원하는 RecursiveTask 가 추가되었고,
자바8에서는 스트림과 람다 지원에 기반한 병렬 프로세싱이 추가되었다.
자바는 Future 를 조합하는 기능을 추가하면서 동시성을 강화했고, (자바8 CompletableFuture)
자바9에서 분산 비동기 프로그래밍을 명시적으로 지원한다.
즉, 매쉬업 애플리케이션, 즉 다양한 웹 서비스를 이용하고 이들 정보를 실시간으로 조합해 사용자에세 제공하거나 추가 웹 서비스를 통해 제공하는 종류의 애플리케이션을 개발하는데 필수적인 기초 모델과 툴킷을 제공한다.
이 과정을 리액티브 프로그래밍이라 부르며, 자바 9에서는 발행-구독 프로토콜로 이를 지원한다.(Flow 인터페이스 추가)
CompletableFuture 와 Flow 의 궁극적 목표는 가능한 한 동시에 실행할 수 있는 독립적인 태스크를 가능하게 만들면서 멀티코어 또는 여러 기기를 통해 제공되는 병렬성을 쉽게 이용하는 것이다.
단일 CPU 컴퓨터도 여러 사용자를 지원할 수 있는데 이는 운영체제가 각 사용자에 프로세스 하나를 할당하기 때문이다.
운영체제는 두 사용자가 각각 자신만의 공간에 있다고 생각할 수 있도록 가상 주소 공간을 각각의 프로세스에 제공한다.
운영체제는 주기적으로 번갈아가며 각 프로세스에 CPU 를 할당해 실제로 마술같은 일이 일어난다.
프로세스는 다시 운영체제에 한 개 이상의 스레드, 즉 본인이 가진 프로세스와 같은 주소공간을 공유하는 프로세스를 요청함으로 태스크를 동시에 또는 협력적으로 실행할 수 있다.
멀티코어 설정(한 사용자 프로세스만 실행하는 한명의 사용자 노트북)에서는 스레드의 도움없이 프로그램이 노트북의 컴퓨팅 파워를 모두 활용할 수 없다. 각 코어는 한 개 이상의 프로세스나 스레드에 할당될 수 있지만 프로그램이 스레드를 사용하지 않는다면 효율성을 고려해 여러 프로세서 코어 중 한개만을 사용할 것이다.
실제로 네 개의 코어를 가진 CPU 에서 이론적으로는 프로그램을 네 개의 코어에서 병렬로 실행함으로 실행 속도를 네 배까지 향상시킬 수 있다.(물론 오버헤드로 실제 4배 되기는 어려움)
for 문을 0부터 100,000 까지 돌며 sum 하는 작업이 있을 때 코어 4개에서 숫자를 쪼개 네 개의 스레드에 할당해
start()로 실행 join() 으로 완료될 때까지 기다렸다 sum = sum1+..+sum4 를 계산한다.
병렬 스트림의 내부 반복은 스트림을 이용해 스레드 사용 패턴을 추상화한 것이다.
RecursiveTask 지원으로 포크/조인 스레드 추상화로 분할/정복 알고리즘을 병렬화하며 멀티코어 머신에서 배열의 합을 효율적으로 계산하는 높은 수준의 방식을 제공할 수 있다.
자바5는 Executor 프레임워크와 스레드 풀을 통해 태스크 제출과 실행을 분리할 수 있는 기능을 제공했다.
스레드의 문제
자바 스레드는 직접 운영체제 스레드에 접근한다. 운영체제 스레드를 만들고 종료하려면 비싼 비용(페이지 테이블과 관련 상호작용)을 치러야 하며, 더욱이 운영체제 스레드의 숫자는 제한되어 있는 것이 문제이다.
운영체제가 지원하는 스레드 수를 초과해 사용하면 자바 애플리케이션이 예상치 못한 방식으로 크래시될 수 있으므로 기존 스레드가 실행되는 상태에서 계속 새로운 스레드를 만드는 상황이 일어나지 않도록 주의해야 한다.
보통 운영체제와 자바 스레드 개수가 하드웨어 스레드 개수보다 많으므로 일부 운영 체제 스레드가 블록되거나 자고 있는 상황에서 모든 하드웨어 스레드가 코드를 실행하도록 할당된 상황에 놓을 수 있다.
2016인텔 코어 i7-6900k 서버 프로세서는 8개의 코어를 가지며 각 코어는 2개의 대칭 멀티프로세싱 하드웨어 스레드를 포함한다.
그럼 하드웨어 스레드를 총 16개 초함하는데 서버에는 프로세서를 여러개 포함할 수 있어 하드웨어 스레드 64개를 보통 보유할 수 있다.
자바 ExecutorService 는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다.
프로그램은 newFixedThreadPool 같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 만들어 사용할 수 있다.
이 메서드는 워커 스레드라 불리는 nThreads 를 포함하는 ExecutorService 를 만들고 이들을 스레드 풀에 저장한다.
스레드 풀에서 사용하지 않은 스레드로 제출된 태스크를 먼저 온 순서대로 실행한다. 실행이 종료되는 풀로 반환한다.
이 방식의 장점은 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버해드없이 제출할 수 있다는 점이다.
프로그래머가 태스크(Runnalble/Callable) 을 제공하면 스레드가 이를 실행한다.
스레드 풀 그리고 스레드 풀이 나쁜 이유
두 가지 사항을 주의해야한다.
- k 스레드를 가진 스레드풀은 오직 k 만큼의 스레드를 동시에 실행할 수 있다. 초과로 제출된 태스크는 큐에 저장되며 이전에 태스크 중 하나가 종료되지 전까지는 스레드에 할당하지 않는다. 불필요하게 많은 스레드를 만드는 일을 피할 수 있으므로 보통 문제가 되지 않지만 잠을 자거나 I/O 를 기다리거나 네트워크 연결을 기다리는 태스크가 있다면 주의해야한다. I/O 를 기다리는 블록 상황에서 이들 태스크가 워커 스레드에 할당된 상태를 유지하지만 아무 작업도 하지 않게 된다.
처음 제출한, 기존 실행 중인 태스크가 나중의 태스크 제출을 기다리는 상황(Future의 일반적인 패턴)이라면 데드락에 걸릴 수도 있다.
핵심은 블록(자거나 이벤트를 기다리는)할 수 있는 태스크는 스레드 풀에 제출하지 말아야한다는 것이지만 이를 항상 지킬 수 있는 것은 아니다.
✅ 상황 가정
- 스레드 수가 2개인 고정 스레드 풀 (Executors.newFixedThreadPool(2))
- 첫 번째 작업 A: 내부에서 다른 작업 B를 Future로 제출하고, 그 결과를 기다림 (get() 호출)
🧨 문제 발생
1. 작업 A가 스레드 1에서 실행됨.
2. 작업 A 안에서 작업 B를 스레드풀에 제출함.
3. 작업 B는 스레드 2에서 실행되길 기대하지만...
4. 스레드 2는 이미 다른 작업 C가 점유하고 있다면, 작업 B는 대기 중.
5. 그런데 작업 A는 작업 B가 끝나길 get()으로 기다리고 있어, 스레드 1도 놀고 있음.
🎯 결과:
- 작업 A는 작업 B를 기다리고,
- 작업 B는 스레드가 나기를 기다리며,
- 모든 스레드가 대기 상태에 빠져 데드락 발생!
- 중요한 코드를 실행하는 스레드가 죽는 일이 발생하지 않도록 보통 자바 프로그램은 main 이 반환하기 전에 모든 스레드의 작업이 끝나길 기다린다. 따라서 프로그램을 종료하기 전에 모든 스레드 풀을 종료하는 습관을 갖는 것은 중요하다.
(풀의 워커 스레드가 만들어진 다음 다른 태스크 제출을 기다리면서 종료되지 않은 상태일 수 있으므로)
보통 장기간 실행하는 인터넷 서비스를 관리하도록 오래 실행되는 ExecutorService 를 갖는 것은 흔한 일이다.
자바는 이런 상황을 다룰 수 있도록 Thread.setDaemon 메서드를 제공한다.
스레디의 다른 추상화 : 중첩되지 않은 메서드 호출
포크앤 조인 프레임워크에서 사용된 동시성에는 한 개의 특별한 속성, 태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다렸다. 다시 말해 스레드 생성과 join 이 한쌍처럼 중첩된 메서드 호출 내에 추가되었다.
이를 엄격한 포크/조인이라 부른다.
시작된 태스크를 내부 호출이 아닌 외부 호출에서 종료하도록 기다리는 좀 더 여유로운 방식의 포크/조인을 사용해도 비교적 안전하다.
그러면 제공된 인터페이스를 사용자는 일반 호출로 간주할 수 있다.
사용자의 메서드 호출에 의해 스레드가 생성되고 메서드를 벗어나 계속 실행되는 메서드를 비동기 메서드라 한다.
- 스레드 실행은 메서드를 호출한 다음의 코드와 동시에 실행되므로 데이터 경쟁 문제를 일으키지 않도록 주의해야 한다.
- 기존 실행 중이던 스레드가 종료되지 않은 상황에서 자바의 main 메서드가 반환하면 어떻게 될까? 아래 두 가지 모두 안전하지 않다.,
- 애플리케이션을 종료하지 못하고 모든 스레드가 실행을 끝낼 때까지 대기
- 애플리케이션 종료를 방해하는 스레드를 강제종료시키고 애플리케이션을 종료
첫 번째 방법에서는 잊고서 종료를 못한 스레드에 의해 애플리케이션이 크래시될 수 있다. 또 다른 문제로 디스크에 쓰기 I/O 작업을 시도하는 일련의 작업을 중단했을 때 이로 인해 외부 데이터의 일관성이 파괴될 수 있다.
이들 문제를 피하려면 애플리케이션에서 만든 모든 스레드를 추적하고 애플리케이션을 종료하기 전에 스레드 풀을 포함한 모든 스레드를 종료하는 것이 좋다.
자바 스레드는 setDaemon 메서드를 통해 데몬 또는 비데몬으로 구분시킬 수 있다.
데몬 스레드는 애플리케이션이 종료될 때 강제 종료되므로 디스크의 데이터 일관성을 파괴하지 않는 동작을 수행할 때
유용하게 활용할 수 있는 반면, main 메서드는 모든 비데몬 스레드가 종료될 때까지 프로그램을 종료하지 않고 기다린다.
1. 명시적 스레드 방식
- 직접 스레드를 만들고(join까지 직접 처리) 병렬 수행을 하는 방식.
- 매우 명시적이고, 저수준입니다.
- 코드도 길고, 에러 처리, 예외, 스레드 수 관리 등에서 번거롭습니다.
public class ThreadExample {
public static void main(String[] args) throws InterruptedException {
int x = 1337;
Result result = new Result();
Thread thread1 = new Thread(() -> { result.left = f(x); });
Thread thread2 = new Thread(() -> { result.right = g(x); });
thread1.start();
thread2.start();
thread1.join(); //main thread 가 thread1 이 종료되기를 기다림
thread2.join();
System.out.println(result.left + result.right);
}
private static class Result{
private int left;
private int right;
}
public static int f(int x){
return 1;
}
public static int g(int x){
return 2;
}
}
Runnable 대신 Future API 인터페이스를 이용해 코드를 더 단순화할 수 있다.
이미 ExecutorService 로 스레드풀을 설정했다고 가정하면 다음과 같이 구현할 수 있다.
하지만 여전히 submit 메서드 호출같은 불필요한 코드가 있다.
명시적 반복으로 병렬화를 수행하던 코드를 스트림을 이용해 내부 반복으로 바꾼 것처럼 비슷한 방법으로 이를 해결해야 한다.
2. ExecutorService + Future
- Future를 이용하면 스레드를 직접 만들 필요 없이 스레드풀을 통해 비동기 작업을 수행할 수 있어요.
- 하지만 submit() → get() 흐름이 반복되고, 여전히 명시적으로 get() 호출해야 하므로 약간의 번거로움은 남아 있습니다.
- 또한 get()은 블로킹입니다. (결과가 나올 때까지 기다림)
public class ExecutorServiceExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int x = 1337;
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> y = executorService.submit(() -> f(x));
Future<Integer> z = executorService.submit(() -> g(x));
System.out.println(y.get() + z.get());
executorService.shutdown();
}
private static class Result{
private int left;
private int right;
}
public static int f(int x){
return 1;
}
public static int g(int x){
return 2;
}
}
문제의 해결을 비동기 API 라는 기능으로 API 를 바꿔 해결할 수 있다.
첫번째 방법인 자바 Future 를 이용하면 이 문제를 조금 개선할 수 있다.
자바5 Future 는 자바8의 CompletableFuture 로 이들을 조합할 수 있게 되 더 기능이 풍부해졌다.
두번째는 concurrent.Flow 인터페이스를 이용하는 방법이다.
3. CompletableFuture (자바 8+)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int x = 1337;
// 비동기 작업 시작
CompletableFuture<Integer> futureF = CompletableFuture.supplyAsync(() -> f(x));
CompletableFuture<Integer> futureG = CompletableFuture.supplyAsync(() -> g(x));
// 두 작업 결과를 더해서 출력
CompletableFuture<Integer> result = futureF.thenCombine(futureG, Integer::sum);
// 결과 출력 (여기서는 간단하게 get 사용)
System.out.println(result.get());
}
public static int f(int x) {
return 1;
}
public static int g(int x) {
return 2;
}
}
4. 리액티브 스타일 (Callback, Flow, RxJava, Project Reactor 등)
println 이 두 번 호출되고, + 피연산자들이 println 호출 전에 업데이트 될 수도 있어
적절한 락은 이용해서 두 콜백이 모두 호출된 후에 println 을 하는 방법으로 해결할 수 있다.
리액티브형식은 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었으므로 Future 를 이용하는 것이 더 바람직하다.
- 완전히 비동기 콜백 기반입니다.
- f/g는 작업이 끝났을 때 콜백을 호출합니다.
- 콜백은 호출 시점이 보장되지 않기 때문에 결과가 둘 다 도착하기 전엔 System.out.println()을 실행하면 안 됨
- 그래서 적절한 동기화(락, 카운트다운래치, atomic 등) 이 필요합니다.
public class CallbackStyleExample {
public static void main(String[] args) {
int x = 1337;
Result result = new Result();
f(x,(int y)->{
result.left = y;
System.out.println("f = "+ (result.left + result.right));
});
g(x,(int y)->{
result.right = y;
System.out.println("g = "+ (result.left + result.right));
});
}
private static class Result{
private int left;
private int right;
}
public static void f(int x, IntConsumer dealWithResult){
dealWithResult.accept(x);
}
public static void g(int x, IntConsumer dealWithResult){
dealWithResult.accept(x);
}
}
리액티브 형식의 프로그래밍으로 f,g 는 콜백을 여러 번 호출할 수 있는데
원래 f,g 는 한 번만 return 하도록 사용되었다.
Future 도 한 번만 완료되고 결과는 get 으로 얻는다.
리액티브 형식 비동기 API 는 일련의 값을 (나중에 스트림으로 연결), Future 형식 비동기 API 는 일회성의 값을 처리하는데 적합하다.
→ f(x)가 여러 번 콜백을 호출할 수도 있는 경우 (ex. 이벤트 스트림) → 리액티브 스타일이 맞음
→ f(x)가 한 번의 결과만 반환하는 경우 → Future나 CompletableFuture가 맞음
코드는 복잡해보이지만 명시적 스레드 처리보다 사용 코드를 더 단순하게 만들어 주고 높은 수준의 구조를 유지할 수 있게 도와준다.
그리고 계산이 오래 걸리거나 동기 메서드에 이 API 활용으로 애플리케이션 효율을 향상시킬 수 있다.
잠자기는 해로운 것으로 간주
스레드 풀에서 잠을 자는 태스크는 다른 태스크가 시작되지 못하게 막으므로 자원을 소비한다는 사실을 기억하자.
(운영체제가 이들 태스크를 관리하므로 일단 스레드로 할당된 태스크는 중지시키지 못한다.)
스레드풀에서 잠자는 스레드 뿐만 아니라 모든 블록 동작도 마찬가지로 실행을 막는다.
블록 동작은 다른 태스크가 어떤 동작을 완료하기를 기다리는 동작(Future.get()) 과 외부상호작용(DB,네트워크 등) 기다리는 동작으로 구분할 수 있다.
이상적으로는 태스크에서 기다리는 일을 만들지 말거나, 코드에서 예외를 일으키는 방법으로 처리할 수 있다.
태스크를 앞과 뒤 두 부분으로 나누고 블록되지 않을 때만 뒷부분을 자바가 스케줄링하도록 요청할 수 있다.
work1();
Thread.sleep(10000);
work2();
위 코드는 스레드 풀 큐에 추가되고 차례가 되면 실행되는데, work1 코드 실행 후 워커 스레드를 점유한 상태에서 10초를 잔다. 그리고 work2 를 실행 후 작업 종료하고 워커 스레드를 해제한다.
반면 아래코드는 work1 을 실행 후 종료하는데 work 2를 10초 뒤에 실행할 수 있도록 큐에 추가해둔다.
그러면 위 코드처럼 스레드 자원 점유가 아니라 다른 작업이 실행될 수 있도록 허용한다는 의미다.
태스크를 만들 때는 이런 특징을 잘 활용해야 한다.
가능하면 I/O 작업에도 이 원칙을 적용하는 것이 좋다. 읽기 메서드를 호출하고 읽기 작업이 끝나면 이를 처리할 태스크를 런타임 라이브러리에 스케줄하도록 요청하고 종료한다.
public class ScheduledExecutorServiceExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
work1();
scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2,10, TimeUnit.SECONDS);
scheduledExecutorService.shutdown();
}
public static void work1(){
System.out.println("work1");
}
public static void work2(){
System.out.println("work2");
}
}
코드의 복잡한 부분은 Completable Future 를 통해 추상화할 수 있다
ComposableFuture 가 아니라 CompletableFuture 라고 부르는 이유는 뭘까?
일반적으로 Future 는 실행에서 get으로 결과를 얻을 수 있는 Callable 로 만들어진다.
하지만 CompletableFuture 는 실행할 코드 없이 Future 를 만들 수 있도록 허용하며
complete 메서드를 이용해 나중에 어떤 값을 이용해 다른 스레드가 이를 완료할 수 있고
get으로 값을 얻을 수 있도록 허용한다.(그래서 CompletableFuture 라 부른다)
public class CFComplete {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
executorService.submit( () -> completableFuture.complete(f(x)));
int b= g(x);
System.out.println(completableFuture.get()+b);
executorService.shutdown();
}
public static int f(int x){
return x;
}
public static int g(int x){
return x;
}
}
위 코드는 f(x)의 실행이 끝나지 않는 상황에서 get을 기다려야 하므로 프로세싱 자원을 낭비할 수 있다.
* 스레드를 완벽하게 사용하는 방법
f(x), g(x) 두 스레드가 있는데 한 스레드는 다른 스레드가 종료될 때까지 대기한다.
정답은 f,g 각각의 태스크, 합계를 계산하는 태스크 세 개를 이용하는 것이다.
처음 두 태스크가 실행되기 전까지 세 번째 태스크는 실행할 수 없다. 자바로 이 문제를 어떻게 해결할 수 있을까? > Future 를 조합해 이 문제를 해결할 수 있다.
Function 에 compose 와 andThen 같은 메서드를 시용해 다른 Function 을 얻을 수 있는데,
CompletableFuture<T> 에 thenCombine 메서드를 사용함으로 두 연산 결과를 더 효과적으로 더할 수 있다.
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
아래에서 thenCombine 이 핵심이다. a와 b의 결과를 알지 못한 상태에서 thenCombine 은 두 연산이 끝났을 때 스레드 풀에서 실행될 연산을 만든다. 결과를 추가하는 세 번째 연산 c는 다른 두 작업이 끝날 때까지는 스레드에서 실행되지 않는다.(먼저 시작해서 블록되지 않는다는 특징). 따라서 기존의 두 가지 버전의 코드에서 발생했던 블록 문제가 일어나지 않는다. Future 연산이 두 번째로 종료되는 상황에서 실제 필요한 스레드는 한 개지만 스레드 풀의 두 스레드가 여전히 활성상태이다.
public class CFCombine {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;
CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> b = new CompletableFuture<>();
CompletableFuture<Integer> c = a.thenCombine(b, (y, z) -> (y + z)); //**
executorService.submit(()->a.complete(f(x)));
executorService.submit(()->b.complete(g(x)));
System.out.println(c.get());
executorService.shutdown();
}
public static int f(int x){
return x;
}
public static int g(int x){
return x;
}
}
상황에 따라서는 get을 기다리는 스레드가 큰 문제가 되지 않으므로 기존 자바8 Future 사용도 해결 방법이 될 수 있다.
하지만 어떤 상황에서는 많은 수의 Future 를 사용해야한다. (서비스에 여러 질의 처리) 이런 상황에서 CompletableFuture 와 콤비네이터를 이용해 get에서 블록하지 않을 수 있고 그렇게 함으로 병렬 실행의 효율성은 높으고 데드락은 피하는 최상의 해결책을 구현할 수 있다.
발행-구독 그리고 리액티브 프로그래밍
Future 와 CompletableFuture 은 독립적 실행과 병렬성이라는 정신적 모델에 기반한다.
연산이 끝나면 get으로 Future 의 결과를 얻을 수 있다. 따라서 Future 는 한 번만 실행해 결과를 제공한다.
반면 리액티브 프로그밍은 시간이 흐르면서 여러 Future 같은 객체를 통해 여러 결과를 제공한다.
예를 들어 웹서버 컴포넌트 응답을 기다리는 리스너 객체에서 네트워크에 HTTP 요청을 발생하길 기다렸다 나중에 결과 데이터를 생산할 수 있다. 그리고 다음 결과를 처리할 수 있도록 다른 네트워크 요청을 기다린다.
스트림으로 해결할 수 있는 상황이면 좋겠지만, 한 번의 단말 동작으로 소비될 수 있다.
스트림 패러다임은 두 개의 파이프라인으로 값을 분리(포크)하기 어려우며 두 개로 분리된 스크림에서 다시 결과를 합치기도 어렵다.
스트림은 선형적인 파이프라인 처리 기법에 알맞다.
자바9에서는 concurrent.Flow 인터페이스에 발행-구독 모델(pub-sub 프로토콜) 을 적용해 리액티브 프로그래밍을 제공한다.
- 구독자가 구독할 수 있는 발행자
- 이 연결을 구독이라 한다.
- 이 연결을 통해 메세지(이벤트 알려짐)를 전송한다.
* 여러 컴포넌트가 한 구독자를 구독할 수 있다.
* 한 컴포넌트가 여러 개별 스트림을 발행하고 여러 구독자에 가입할 수 있다.
두 정보 소스로부터 발생하는 이벤트를 합쳐서 다른 구독자가 볼 수 있도록 발행하는 예로 엑셀 시트의 =c1+c2 함수이다.
c1 이나 c2의 값이 갱신되면 c3도 변한다.
public class Main {
public static void main(String[] args) {
SimpleCell c1 = new SimpleCell("C1");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c3 = new SimpleCell("C3");
c1.subscribe(c3);
c2.onNext(20);
c1.onNext(15);
}
}
public interface Publisher <T>{
void subscribe(Subscriber<? super T> subscriber);
}
public interface Subscriber<T> {
void onNext(T t);
}
셀은 publisher 와 subscriber 모두 상속받아 둘 다를 의미하게 된다.
public class SimpleCell implements Publisher<Integer>, Subscriber<Integer>{
private int value=0;
private String name;
private List<Subscriber> subscribers = new ArrayList<>();
public SimpleCell(String name){
this.name= name;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscribers.add(subscriber);
}
@Override
public void onNext(Integer value) { //업스트림 발행->구독
this.value = value; //구독한 셀에 값 변경됐을 때 값을 갱신해 반응함
System.out.println(this.name + ":" + this.value);
notifyAllSubscribers(); //구독자에게 알림 > 다운스트림
}
private void notifyAllSubscribers(){ //onNext 다운스트림
subscribers.forEach(s->s.onNext(this.value));
}
}
c3은 직접 c1을 구독하므로 c1이 갱신되면서 c3도 10으로 함께 갱신된다.
c3 = c1+c2 을 구현하기 위해 아래 클래스를 추가한다.
public class ArithmeticCell extends SimpleCell{
private int left;
private int right;
public ArithmeticCell(String name) {
super(name);
}
public void setLeft(int left){
this.left = left;
onNext(left + this.right);
}
public void setRight(int right){
this.right = right;
onNext(left + this.right);
}
}
public class Main {
public static void main(String[] args) {
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");
ArithmeticCell c3 = new ArithmeticCell("C3");
c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);
c1.onNext(10); //업스트림 생산자->소비자 / 발행자->구독자
c2.onNext(20);
c1.onNext(15);
//publsher 발행자 c1,c2
//구독자 c3
}
}
이렇게하면 c1이 15로 갱신될 때 c3도 자신을 갱신한다.
발행자-구독자 상호작용의 멋진 점은 발행자 구독자의 그래프를 설정할 수 있다는 점이다.
예를 들어 c5=c3+c4 같이 c3,c4에 의존하는 새로운 셀을 만들 수 있다.
데이터가 발행자(생산자)에서 구독자(소비자)로 흐름을 업스트림 혹은 다운스트림이라 부른다.
위에서 main 에서 호출된 onNext 는 업스트림 , (발행자)
onnext 내부에서 notifyAllsubscribers 호출되는 onNext 는 다운스트림이다. (구독자->발행자)
옵저버 패턴과 비슷한데 옵저버보다 이 새로운 API 프로토콜이 더 강력한 이유는
자바 플로 API Subscriber 가 갖고 있는 onError, onComplete 같은 메서드가 있기 때문이다.
플로 인터페이스를 복잡하게 만든 개념은 압력과 역압력이다.
압력은 처음에 호출이 별로 없을 땐 정상이지만 서비스가 커지며 수천 수만개의 호출이 생기는 상황이다.
역압력은 무한 속도로 아이템을 방출하는 대신 요청 시에만 다음 아이템을 내보내는 request 메서드로
내보내는 메세지를 제한하는 기법이다.
Publisher에서 Subscriber로 정보를 전달한다.
Subscriber에서 Publisher로 정보를 요청해야 할 필요가 있을 수 있다. - 역압력
Publisher는 여러 Subscriber를 갖고 있으므로 역압력 요청이 한 연결에만 영향을 미쳐야 한다는 것이 문제가 될 수 있다. List<Subscriber> 중 하나의 Subscriber 에만 응답
Publisher 와 Subscriber 연결을 위해 Subscription 객체를 사이에 두고 통신한다.
public interface Subscription {
void cancel();
void request(long n);
}
리액티브 시스템
런타임 환경이 변화에 대응하도록 전체 아키텍처가 설계된 프로그램을 가리킨다.
특징
- 반응성: 큰 작업을 하느라 간단한 질의의 응답을 지연하지 않고 실시간으로 입력에 반응하는 것을 의미
- 회복성: 한 컴포넌트의 실패로 전체 시스템이 실패하지 않음을 의미
- 탄력성: 시스템이 자신의 작업 부하에 맞게 적응하며 작업을 효율적으로 처리함을 의미
-메세지 주도?
여러 가지 방법으로 이런 속성을 구현할 수 있지만 Flow 인터페이스에서 제공하는 리액티브 프로그래밍 형식을 이용하는 것도 주요 방법 중 하나이다.
이들 인터페이스 설계는 메세지 주도 속성을 반영한다.
메세지 주고 시스템은 박스와 채널 모델에 기반한 내부 API 를 갖고 있는데 여기서 컴포넌트는 처리할 입력을 기다리고 결과를 다른 컴포넌트로 보내면서 시스템이 반응한다.