Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- JPA
- 페이징
- Hibernate
- LIST
- fullcalendar
- paging
- JQuery
- 벌크연산
- jQuery값전달
- 엔티티직접사용
- 프로젝트생성
- 제네릭
- fetchjoin
- 자바서블릿
- jQuery값전송
- joinfetch
- jQueryUI
- 페치조인
- javaservlet
- namedQuery
- 대량쿼리
- 제너릭
- JPQL
- jscalendar
- javascriptcalendar
- 스프링데이터흐름
- calendar
- springflow
- values()
- Generic
Archives
- Today
- Total
가자공부하러!
모던자바인액션(CH15) - CompletableFuture와 리액티브 프로그래밍 컨셉의 기초 본문
목차
요약 및 결론
책 내용
요약 및 결론
- 자바의 동시성 지원은 계속 진화해 가고 있는데 나는 잘 모르겠다
- Future는 CompletableFuture 없이는 쓰면 안될거라고 생각한다. get()호출했다가 영영 안끝나면 어떡해
- pub-sub 예제가 아주 좋다. 값셀 - 연산결과셀
책 내용
CompletableFuture와 리액티브 프로그래밍 컨셉의 기초
소프트웨어 개발 방법을 획기적으로 뒤집는 두 가지 추세
- 하드웨어 관련
- 애플리케이션 구성 관련
- MSA가 유행하면서 서비스가 작아진 대신 네트워크 통신 증가(매시업 애플리케이션)
- 매시업 애플리케이션에서 하나의 서비스의 응답을 기다리는 동안 다른 서비스를 처리하지 않을 이유가 없다.
- Java는이런 환경에서 사용할 수 있는 두 가지 주요 도구를 제공한다.
- Future 인터페이스
- 플로 API
- 동시성을 구현하는 자바 지원의 진화
- 최초 : Runnable과 Thread를 동기화된 클래스와 메서드를 이용해 잠금
- Java 5 :
- ExecuterService 인터페이스 : Executor인터페이스를 상속받으며 Callable을 실행하는 submit 메서드를 포함
- Runnable과 Thread의 변형을 반환하는 Callable
, Future , 제네릭 지원
- Java 7 : 분할정복 알고리즘의 포크/조인 구현을 지원하는 RecursiveTask 추가
- Java 8 : 스트림과 새로 추가된 람다 지원에 기반한 병렬 프로세싱 추가
- Java 9 : 분산 비동기 프로그래밍을 명시적으로 지원
- 리액티브 프로그래밍
- 발행-구독 프로토콜로 이를 지원
- 스레드와 높은 수준의 추상화
- 숫자 1,000,000개 배열을 처리하는 예제
int[] stats = {1, 2, ... , 1_000_000}; //네 개의 스레드를 만들어 계산 long sum0 = 0; for(int i = 0 ; i < 250_000; i++) { sum0 += stats[i]; } //내부반복을 통해 병렬성을 달성(스레드 사용 패턴 추상화) Arrays.stream(stats).parallel().sum();
- 숫자 1,000,000개 배열을 처리하는 예제
- Executor와 스레드 풀
- Java 5는 프레임워크와 스레드 풀을 통해 프로그래머가 태스크 제출과 실행을 분리할 수 있는 기능을 제공했다.
- 스레드의 문제
- 자바 스레드는 직접 운영체제 스레드에 접근한다.
- 운영체제가 지원하는 스레드 수를 초과해 사용하면 예상치 못한 방식으로 크래시 될 수 있다.
- 스레드 풀 그리고 스레드 풀이 더 좋은 이유
- 자바 ExecutorService는 태스크를 제출하고 나중에 결과를 수집할 수 있는 인터페이스를 제공한다.
- 프로그램은 newFixedThreadPool같은 팩토리 메서드 중 하나를 이용해 스레드 풀을 만들어 사용할 수 있다.
@Test public void makeThreadPool() { // 워커 스레드라 불리는 nThreads를 포함하는 ExecutorService를 만들고 이들을 스레드 풀에 저장한다. // 스레드 풀에서 사용하지 않은 스레드를 가지고, 제출된 태스크를 먼저 온 순서대로 실행한다. // 이들 태스크 실행이 종료되면 사용했던 스레드를 스레드풀로 반환한다. // 장점 : 하드웨어에 맞는 수의 태스크를 유지함과 동시에 수 천개의 태스크를 스레드 풀에 아무 오버헤드 없이 제출할 수 있다. // 큐의 크기 조정, 거부 정책, 태스크 종류에 따른 우선순위 등 다양한 설정을 할 수 있다. ExecutorService executorService = Executors.newFixedThreadPool(1); log.info("executorService : {}", executorService.toString()); //executorService : java.util.concurrent.ThreadPoolExecutor@51dcb805[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] }
- 스레드 풀 그리고 스레드 풀이 나쁜 이유
- 거의 모든 관점에서 스레드를 직접 사용하는 것보다 스레드 풀을 이용하는 것이 바람직하다.
- 그러나 두 가지 사항을 주의해야 한다.
- 블록(자거나 이벤트를 기다리는)할 수 있는 태스크는 스레드풀에 제출하지 말아야 한다.
- 프로그램을 종료하기 전에 모든 스레드 풀을 종료하는 습관을 갖는 것이 중요하다.
- 별로 안나빠 보이는뎅
- 스레드의 다른 추상화 : 중첩되지 않은 메서드 호출
- 엄격한 포크/조인
- 태스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다린다.
- 스레드 생성과 join()이 한 쌍처럼 중첩된 메서드 호출 내에 추가되는 방식
- 병렬 스트림과 포크/조인 프레임워크의 동시성
- 여유로운 포크/조인(비동기 메서드)
- 메서드가 반환된 후에도 만들어진 태스크 실행이 계속되는 메서드
- 그래서 뭐?
- 엄격한 포크/조인
- 스레드에 무엇을 바라는가?
- 일반적으로 프로그램을 작은 태스크 단위로 구조화하는 것이 목표이다.
- 스레드를 조작하는 복잡한 코드를 구현하지 않고 메서드를 호출하는 방법을 살펴보자.
- 동기 API와 비동기 API
- Java 8 스트림을 이용해 명시적으로 병렬 하드웨어를 이용하는 방법을 이미 배웠다.
- 외부 반복을 내부 반복으로 바꾸고, 스트림에 parallel()메서드를 이용한다.
- 서로 상호작용하지 않는 두 작업 f(x)와 g(x)의 합을 효율적으로 구하는 방법
// f(x)와 g(x)는 각각 오랜 시간이 걸리는 작업이라고 가정 private static class Result { private int left; private int right; } private static int f(int x) { log.info("execute heavy work function f(x)"); return x * 10; } private static int g(int x) { log.info("execute heavy work function g(x)"); return x + 10; }
@Test public void changeFromThreadToThreadPool() throws InterruptedException, ExecutionException { int x = 1337; Result result = new Result(); //by Thread Thread thread1 = new Thread(() -> result.left = f(x)); Thread thread2 = new Thread(() -> result.right = g(x)); thread1.start(); thread2.start(); thread1.join(); thread2.join(); log.info("left : {}, right : {}, sum : {}", result.left, result.right, result.left + result.right); //by ThreadPool with ExecutorService //Runnable 대신 Future API를 통해 코드를 더 단순화 하긴 했지만 submit 메서드 호출 같은 불필요한 코드가 있다. ExecutorService executorService = Executors.newFixedThreadPool(2); Future<Integer> y = executorService.submit(() -> f(x)); Future<Integer> z = executorService.submit(() -> g(x)); log.info("y : {}, z : {}, sum : {}", y.get(), z.get(), y.get() + z.get()); }
- Future 형식 API
- Future 형식 API 대안을 이용하면 f, g의 시그니처와 호출방법이 다음처럼 바뀐다.
// 시그니처? 리턴만 바뀌고 시그니처는 안바꼈는데;; Future<Integer> f(int x); Future<Integer> g(int x); // 호출 Future<Integer> y = f(x); Future<Integer> z = g(x); System.out.println(y.get() + z.get());
- 메서드 f, g는 호출 즉시 자신의 원래 바디를 평가하는 태스크를 포함하는 Future를 반환한다.
- 뭔지 잘 모르겠다 16장 실용예제에서 나온다니 그 때 다시봐야겠다.
- Future 형식 API 대안을 이용하면 f, g의 시그니처와 호출방법이 다음처럼 바뀐다.
- 리액티브 형식 API
- f, g의 시그니처를 바꿔서 콜백 형식의 프로그래밍을 이용하는 것이 핵심
- 이 책에서 콜백 : 메서드가 반환된 다음에 호출될 수 있는 람다나 메서드 참조를 가리키는 용어
// 책에있던 코드 void f(int x, IntConsumer dealWithResult); // 다운로드 받은 Functions 클래스에 있던 코드 private void f(int x, IntConsumer dealWithResult) { log.info("reactive programming"); dealWithResult.accept(Functions.f(x)); } private void g(int x, IntConsumer dealWithResult) { log.info("reactive programming"); dealWithResult.accept(Functions.g(x)); }
- 이 책에서 콜백 : 메서드가 반환된 다음에 호출될 수 있는 람다나 메서드 참조를 가리키는 용어
- 메서드 f에 추가 인수로 콜백(람다)을 전달해서 f의 바디에서는 return문으로 결과를 반환하는 것이 아니라
- 결과가 준비되면 이를 람다로 호출하는 태스크를 만드는 것이 비결이다.
- f는 바디를 실행하면서 태스크를 만든 다음 즉시 반환하므로 코드 형식이 아래처럼 바뀐다.
@Test public void reactiveProgramming() { int x = 1337; Result result = new Result(); f(x, (int y) -> { result.setLeft(y); log.info("sum : {}", result.getLeft() + result.getRight()); }); g(x, (int z) -> { result.setRight(z); log.info("sum : {}", result.getLeft() + result.getRight()); }); }
// 출력 결과 reactive programming sum : 2674 reactive programming sum : 4012
- 문제점 : f와 g의 호출 합계를 정확하게 출력하지 않고 상황에 따라 먼저 계산된 결과를 출력
- 해결방법
- if-then-else를 이용해 적절한 락을 걸어 두 콜백이 모두 호출됐는지 확인한 다음 println호출
- Future를 이용하는 것이 더 적절 : 리액티브 형식의 API는 보통 한 결과가 아니라 일련의 이벤트에 반응하도록 설계되었기 때문
- 해결방법
- 리액티브 형식의 프로그래밍으로 메서드 f와 g는 dealWithResult콜백을 여러 번 호출할 수 있다.
- 리액티브 형식의 비동기 API는 자연스럽게 일련의 값(나중에 스트림으로 연결)을 처리하는데 적합
- Future 형식의 API는 일회성의 값을 처리하는 데 적합하다.
- f, g의 시그니처를 바꿔서 콜백 형식의 프로그래밍을 이용하는 것이 핵심
- Future 형식, 리액티브 형식 모두 코드를 복잡하게 만드는 것 처럼 보일 수 있다.
- 하지만 장점이 있기 때문에 잘 쓰면 좋다
- 명시적으로 스레드를 처리하는 코드에 비해 사용 코드를 더 단순하게 만들어 준다.
- 높은 수준의 구조를 유지할 수 있게 도와준다.
- 애플리케이션 효율성을 크게 향상 시킬 수 있다.
- 하지만 장점이 있기 때문에 잘 쓰면 좋다
- 잠자기(그리고 기타 블로킹 동작)는 해로운 것으로 간주.
- 스레드는 잠들어도 여전히 시스템 자원을 점유한다.
- 이상적으로는 절대 기다리는 일을 만들지 말거나 코드에서 예외를 일으키는 방법으로 처리할 수 있다.
//예제 A work1(); Thread.sleep(10000); work2();
//예제 B public class ScheduledExecutorServiceExample { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); work1(); scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } public static void work1() { System.out.println("Hello from Work1!"); } public static void work2() { System.out.println("Hello from Work2!"); } }
- 위 두 예제 모드 스레드 풀에서 실행된다고 가정
- 예제 A의 실행 과정
- 코드는 스레드 풀 큐에 추가되며 나중에 차례가 되면 실행된다.
- 하지만 코드가 실행되면 워커 스레드를 점유한 상태에서 아무것도 하지 않고 10초를 잔다.
- 그리고 10초가 지나 깨어나서 work2()를 실행한 다음 작업을 종료하고 워커 스레드를 해제한다.
- 예제 B의 실행 과정
- work1()을 실행하고 종료한다.
- work2()가 10초 뒤에 실행될 수 있도록 큐에 추가한다.
- 10초 뒤에 work2()가 실행된다.
```
//결과
Hello from Work1!
``` - 가능하다면 I/O작업에도 이 원칙을 적용하는 것이 좋다.
- 현실성 확인
- 병렬 하드웨어를 최대한 활용할 수 있는 방법
- 시스템을 수많은 소규모 동시 실행 태스크로 설계
- 블록할 수 있는 모든 동작을 비동기 호출로 구현
- 그러나 현식적으로는 안됨
- 병렬 하드웨어를 최대한 활용할 수 있는 방법
- 비동기 API에서 예외는 어떻게 처리하는가?
- 그러게요? 궁금하네
- 비동기 API에서 호출된 메서드의 실제 바디는 별도의 스레드에서 호출된다.
- 이 때 발생하는 어떤 에러는 이미 호출자의 실행 범위와는 관계 없는 상황이 된다.
- 예상치 못한 일이 일어나면 예외를 발생시켜 다른 동작이 실행되어야 한다.
- 어떻게??
- Future를 구현한 CompletableFuture에서는 런타임 get() 메서드에 예외를 처리할 수 있는 기능을 제공하며, 예외에서 회복할 수 있도록 exceptionally()같은 메서드도 제공한다.
- 리액티브 API에 여러 콜백을 포함
void f(int x, Consumer<Integer> dealWithResult, Consumer<Throwable> dealWithException);
- 콜백이 여러 개면 이를 따로 제공하는 것보다는 한 객체로 이 메서드를 감싸는 것이 좋다.
void onCompelete() //값을 다 소진했더나 에러가 발생해서 더 이상 처리할 데이터가 없을 때 void onError(Throwable throwable) //도중에 에러가 발생했을 떄 void onNext(T item) //값이 있을 때
- Java 8 스트림을 이용해 명시적으로 병렬 하드웨어를 이용하는 방법을 이미 배웠다.
- 박스와 채널 모델
- 동시성 모델을 가장 잘 설계하고 개념화하기 위해 그림을 그리는 기법
- 병렬성을 극대화하기 위해서는 모든 함수를 Future로 감싸는게 좋다.
- 많은 태스크가 Future의 get()메서드를 호출하면 데드락에 걸릴 수 있다.
- CompletableFuture와 Combinators를 이용해 문제를 해결할 수 있다.
- CompletableFuture와 콤비네이터를 이용한 동시성
- Future인터페이스의 문제 : 동시 코딩 작업을 Future인터페이스로 생각하도록 유도하는 점
- CompletableFuture는 Future들을 조합하는 기능이다.
- 일반적으로 Future는 실행해서 get()으로 결과를 얻을 수 있는 Callable로 만들어진다.
- 그러나 CompletableFuture는 실행할 코드 없이 Future를 만들 수 있도록 허용한다.
@Test public void cfComplete() throws ExecutionException, InterruptedException { //f(x)의 실행이 끝나지 않으면 get()을 기다려야 하므로 프로세싱 자원을 낭비할 수 있다. ExecutorService executorService = Executors.newFixedThreadPool(10); int x = 1337; CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); executorService.submit(() -> completableFuture.complete(f(x))); int b = g(x); log.info("result : {}", completableFuture.get() + b); //result : 14717 executorService.shutdown(); }
- CompletableFuture의 thenCombine활용
- cfF와 cfG의 결과를 알지 못한 상태에서 thenCombine은 두 연산이 끝났을 때 스레드 풀에서 실행된 연산을 만든다.
- 결과를 추가하는 세 번째 연산 cfCombine은 다른 두 작업이 끝날 때까지는 스레드에서 실행되지 않는다.
@Test public void cfCompleteWithCombine() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); int x = 1337; CompletableFuture<Integer> cfF = new CompletableFuture<>(); CompletableFuture<Integer> cfG = new CompletableFuture<>(); CompletableFuture<Integer> cfCombine = cfF.thenCombine(cfG, (y, z) -> y + z); executorService.submit(() -> cfF.complete(f(x))); executorService.submit(() -> cfG.complete(g(x))); log.info("result : {}", cfCombine.get()); //result : 14717 executorService.shutdown(); }
- 발행-구독 그리고 리액티브 프로그래밍
- 리액티브 프로그래밍?
- Future와 CompletableFuture는 독립적 실행과 병렬성이라는 정식적 모델 기반
- 따라서 Future는 한 번만 실행해 결과를 제공한다.
- 반면 리액티브 프로그래밍은 시간이 흐르면서 여러 Future같은 객체를 통해 여러 결과를 제공한다.
- Java9에 추가된 발행-구독 모델을 적용한 Flow API
- 구독자가 있고 구독자가 구독할 수 있는 발행자가 있다.
- 이 연결을 구독(subscription)이라 한다.
- 이 연결을 이용해 메시지(또는 이벤트로 알려짐)를 전송한다.
- 두 Flow를 합치는 예제
- 두 값 셀의 합을 보여주는 결과 셀을 정해두고 각 값 셀이 변화할 때 마다 결과 셀이 변화하는 기능
@Test public void sumFlows() { SimpleCell c1 = new SimpleCell("C1"); SimpleCell c2 = new SimpleCell("C2"); ArithmeticCell c3 = new ArithmeticCell("C3"); c1.subscribe(c3::setLeft); c2.subscribe(c3::setRight); c1.onNext(10); c2.onNext(20); c1.onNext(15); // name : C1, value : 10 // name : C3, value : 10 // name : C2, value : 20 // name : C3, value : 30 // name : C1, value : 15 // name : C3, value : 35 }
- 두 값 셀의 합을 보여주는 결과 셀을 정해두고 각 값 셀이 변화할 때 마다 결과 셀이 변화하는 기능
- 역압력
- Publisher에서 Subcriber로 정보를 전달한다.
- Java9에서 Flow API의 Subscriber 인터페이스는 void onSubscribe(Subcription subscription) 메서드를 포함한다.
- Pub - Sub 간 채널이 연결되면 첫 번째로 호출되는 메서드
- Subscription 객체는 Sub - Pub 간 통신할 수 있는 메서드를 포함한다.
interface Subscription { void cancel(); void request(long n); }
- Publisher가 Subscription 객체를 만들어 Subscriber로 전달하면 Subscriber는 이를 이용해 Publisher로 정보를 보낼 수 있다.
- 실제 역압력의 간단한 형태
- 한 번에 한 개의 이벤트를 처리하도록 발행-구독 연결을 구성하려면 아래와 같은 작업이 필요하다.
- Subscriber가 OnSubscribe로 전달된 Subscription 객체를 subscription같은 필드에 저장
- Subscriber가 수 많은 이벤트를 받지 않도록 onSubscribe, onNext, onError의 마지막 동작에 chnnel.request(1)을 추가해 오직 한 이벤트만 요청한다.
- 요청을 보낸 채널에만 onNext, onError이벤트를 보내도록 Publisher의 notifyAllSubscribers 코드를 바꾼다
- 역압력을 구현할 때 고려해야 할 점들
- 여러 Subscriber가 있을 때 이벤트를 가장 느린 속도로 보낼 것인가 아니면 각 Subscriber에게 보내지 않은 데이터를 저장할 별도의 큐를 가질 것인가?
- 큐가 너무 커지면 어떻게 할까?
- Subscriber가 준비가 안되었다면 큐의 데이터는 폐기할 것인가?
- 데이터의 성격에 따라 다름. 온도 데이터 vs 은행 계좌 크레딧 데이터
- 한 번에 한 개의 이벤트를 처리하도록 발행-구독 연결을 구성하려면 아래와 같은 작업이 필요하다.
- 리액티브 프로그래밍?
- 리액티브 시스템 vs 리액티브 프로그래밍
- 리액티브 시스템
- 런타임 환경이 변화에 대응하도록 전체 아키텍쳐가 설계된 프로그램을 지칭
- 요악하자면 반응성, 회복성, 탄력성 세 가지 속성을 갖추도록 설계된 프로그램
- 반응성 : 큰 작업을 처리하느라 간단한 질의의 응답을 지연하는 일 없이 실시간으로 입력에 반응하는 것
- 회복성 : 한 컴포넌트의 실패가 전체 시스템의 실패로 연결되지 않음을 의미
- 탄력성 : 자신의 작업 부하에 맞게 적응하며 작업을 효율적으로 처리함을 의미
- 리액티브 프로그래밍
- Java9 Flow 관련 자바 인터페이스에서 제공하는 형식
- 리액티브 시스템을 구성하는 방법 중 하나
- 메시지 주도(message-driven) 속성을 반영
- 리액티브 시스템
'공부 > Java' 카테고리의 다른 글
모던자바인액션(CH17) - Reactive Programming (0) | 2020.09.11 |
---|---|
모던자바인액션(CH16) - CompletableFuture : 안정적 비동기 프로그래밍 (0) | 2020.08.29 |
모던자바인액션(CH13) - 디폴트 메서드 (0) | 2020.08.12 |
모던자바인액션(CH12) - 새로운 날짜 시간 API (0) | 2020.08.05 |
모던자바인액션(CH11) - Optional (0) | 2020.07.22 |
Comments