Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- calendar
- LIST
- springflow
- 제너릭
- 제네릭
- JPA
- 프로젝트생성
- jQuery값전송
- jscalendar
- jQueryUI
- namedQuery
- JPQL
- jQuery값전달
- 자바서블릿
- fullcalendar
- 페이징
- fetchjoin
- JQuery
- paging
- 스프링데이터흐름
- 페치조인
- Hibernate
- javaservlet
- Generic
- values()
- 벌크연산
- javascriptcalendar
- 대량쿼리
- 엔티티직접사용
- joinfetch
Archives
- Today
- Total
가자공부하러!
모던자바인액션(CH17) - Reactive Programming 본문
목차
요약 및 결론
책 내용
요약 및 결론
- Flow API는 Akka, RxJava등의 리액티브 라이브러리를 이용해 개발된 리액티브 애플리케이션 간 공용어를 제시하는 역할을 한다.
- Flow API를 만들 당시 이미 Akka, RxJava 등의 라이브러리가 이미 존재했기 때문에 Flow를 구현한 클래스를 제공하지 않는다.
- 얼마나 효과가 좋은 프로그래밍 방법인지 알 수 없다.
- 리액티브 프로그래밍이 실제로 사용된 곳 : MS의 엑셀, WebFlux
책 내용
리액티브 프로그래밍 패러다임의 중요성이 증가하는 이유
- 빅테이터 : 페타 바이트 단위의 큰 데이터를 처리할 필요성이 생김
- 다양한 환경 : 모바일 디바이스 부터 수천 개의 프로세서로 실행되는 클라우드 기반 클러스터 까지 다양한 환경에 애플리케이션이 배포된다
- 사용 패턴 : 사용자는 1년 내내 항상 지속되는 서비스와 밀리초 단위의 응답 시간을 기대한다.
리액티브 프로그래밍에서는
다양한 시스템과 소스에서 들어오는 데이터 항목 스트림을
비동기적으로 처리하고 합쳐서 문제를 해결한다.
1. 리액티브 매니패스토(https://www.reactivemanifesto.org)
리액티브 애플리케이션과 시스템 개발의 핵심 원칙
1. 반응성(responsive)
- 빠를 뿐 만 아니라 일정하고 예상할 수 있는 반응 시간을 제공
- 일관된 서비스 품질을 제공하는 데 중점
- 회복성 때문에 지속되는 서비스를 제공할 수 있고, 탄력성 때문에 반응시간을 일정하게 유지할 수 있다.
2. 회복성(resilient)
- 장애가 발생해도 시스템은 반응하고 장애를 복구한다.
- 컴포넌트 실행 복제, 여러 컴포넌트의 시간과 공간 분리, 각 컴포넌트에 위임하는 등 다양한 기법 존재
3. 탄력성(elastic)
- 무거운 작업 부하가 발생하면 자동으로 관련 컴포넌트에 할당된 리소스를 늘려서 대응한다.
4. 메시지 주도(message-driven)
- 회복성과 탄력성을 지원하려면 컴포넌트 간 경계를 명확하게 정의해야 한다.
- 비동기 메시지를 전달해 컴포넌트 끼리의 통신이 이루어지도록 한다.
- 장애를 메시지로 처리해서 회복성을 얻을 수 있다.
- 주고 받은 메시지 수를 감시하고 메시지 양에 따라 적절하게 리소스를 할당할 수 있다.
- 애플리케이션 수준의 리액티브
- 주요 기능은 비동기로 작업을 수행할 수 있다는 점
- CPU 사용률을 극대화 하기 위해 스레드를 퓨처, 액터, 이벤트 루프 등과 공유하고 처리할 이벤트를 변환하고 관리한다.
- 시스템 수준의 리액티브
- 리액티브 시스템은 여러 애플리케이션이 한 개의 일관적이고 회복할 수 있는 플랫폼을 구성하게 해준다. 뿐 만 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍쳐다.
리액티브 원칙 적용은 각 애플리케이션이 될 수도 시스템이 될 수도 있다는 이야기 인가 보다.
- 리액티브 시스템의 주요 속성으로 메시지 주도를 꼽을 수 있다.
- 각 컴포넌트에서 발생한 장애를 고립시켜서 회복성을 제공한다. 회복성의 핵심은 고립과 비결합이다.
- 탄력성의 핵심은 위치 투명성이다
리액티브 시스템의 모든 컴포넌트가 수신자의 위치에 상관없이 다른 모든 서비스와 통신할 수 있음을 의미
덕분에 시스템을 복제하며 작업 부하에 따라 애플리케이션을 확장할 수 있다.
- 리액티브 시스템은 여러 애플리케이션이 한 개의 일관적이고 회복할 수 있는 플랫폼을 구성하게 해준다. 뿐 만 아니라 이들 애플리케이션 중 하나가 실패해도 전체 시스템은 계속 운영될 수 있도록 도와주는 소프트웨어 아키텍쳐다.
2. 리액티브 스트림과 플로 API
리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다.
1. 리액티브 스트림은 잠재적으로 무한의 비동기 데이터를 순서대로, 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술이다.
- 역압력? : 발행-구독 프로토콜에서 발행자가 이벤트를 제공하는 속도보다 구독자의 이벤트 소비 속도가 느릴 때 문제가 발생하지 않도록 보장하는 장치.
- 역압력 예시 : 알림의 대상은 업스트림 발행자
1. 부하가 발생한 컴포넌트는 이벤트 발생 속도를 늦추라고 알림
2. 얼마나 많은 이벤트를 수신할 수 있는지 알림
3. 다른 데이터를 받기 전에 기존 처리 속도가 얼마나 걸리는지 알림
2. 리액티브 스트림 구현이 제공해야 하는 최소 기능 집합을 정의한 API
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<?superT> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(Titem);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { }
- Flow 클래스 소개
- Java9에 추가된 리액티브 프로그래밍을 제공하는 클래스
- 네 개의 중첩된(nested) 인터페이스 포함(이너클래스 처럼)
- Publisher : 항목을 발행
- Subscriber : 항목을 소비, 콜백 메서드 네 개를 정의
// onSubscribe는 항상 처음 호출되어야 한다. // onNext는 onSubscribe 다음에 호출되어야 하며 여러 번 호출될 수 있다. // onNext 다음에 onError 또는 onComplete가 호출되어야 한다. onSubscribe onNext* (onError | onComplete)
- Subscription : 발행과 소비 과정을 관리(역압력 포함)
- Processor : 이벤트의 단계를 나타내고 Subscriber이며 동시에 Publisher이다.
- 인터페이스 간 규칙 요약
1. Publisher는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에 전달해야 한다. - 지정된 개수보다 적은 수의 요소를 onNext로 전달할 수 있다. - onComlete, onError를 호출해 Subscription을 종료할 수 있다. 2. Subscriber는 요소를 받아 처리할 수 있음을 Publisher에게 알려야 한다. - Publisher에게 역압력을 행사하고 관리한계를 넘지 않도록 제어할 수 있다. - onComplete, onError신호를 처리하는 상황에서는 Subscription이 취소됐다고 가정한다. > Publisher나 Subscription의 어떤 메서드도 호출할 수 없다. - Subscription의 시그널을 받을 준비가 되어있어야 한다. > Subscribtion.request() 메서드 호출이 없어도 언제든 종료 시그널을 받을 준비가 돼있어야 한다. > Subscription.cancel()이 호출된 이후라도 한 개 이상의 onNext를 받을 준비가 돼있어야 한다. 3. Publisher와 Subscriber는 정확하게 Subscription을 공유해야 하며 각각이 고유한 역할을 수행해야 한다. - Subscriber는 onSubscribe와 onNext메서드에서 request메서드를 동기적으로 호출할 수 있어야 한다. - Subscription.cancel() 메서드는 몇 번을 호출해도 한 번 호출한 것과 같은 효과를 가져야 한다.
- 첫 번째 리액티브 애플리케이션 만들기
- 리액티브 원칙이 적용된 온도 보고 프로그램
- TempInfo : 원격 온도계를 흉내낸다(0에서 99 사이의 화씨 온도를 랜덤으로 만들어 연속적으로 보고)
- TempSubscription : 온도를 전송한다.
- TempSubscriber : 각 도시에 설치된 센서에서 보고한 온도 스트림을 출력
- 예제 코드
- Publisher가 Subscription을 만들어서 Subscriber에게 전달
// 현재 보고된 온도를 전달하는 자바 빈 @Getter @AllArgsConstructor @ToString public class TempInfo { public static final Random random = new Random(); private final String town; private final int temp; public static TempInfo fetch(String town) { if (random.nextInt(10) == 0) { throw new RuntimeException("Temperature Fetch Error!"); //10% 확률로 작업 실패 } return new TempInfo(town, random.nextInt(100)); } }
// Subscriber에게 TempInfo 스트림을 전송하는 Subscription @AllArgsConstructor public class TempSubscription implements Subscription { // request메서드가 TempSubscription자신에게 또 다른 요소를 보내는 재귀호출을 방지하기 위해 스레드 컨트롤 // 예제에서는 Subscriber의 onError가 예외를 발생시키기 때문에 없어도 됨 private static final ExecutorService executor = Executors.newSingleThreadExecutor(); private final Subscriber<? super TempInfo> subscriber; private final String town; @Override public void request(long n) { executor.submit(() -> { // 다른 스레드에서 다음 요소를 구독자에게 보냄 for (long i = 0L; i < n; i++) { //Subscriber가 만든 요청을 한 개씩 반복 try { subscriber.onNext(TempInfo.fetch(town)); } catch (Exception e) { subscriber.onError(e); //온도 가져오기 실패하면 Subscriber로 에러 전달 break; } } }); } @Override public void cancel() { subscriber.onComplete(); //구독 취소되면 완료신호를 Subscriber로 전달 } }
// 레포트를 요청하고 받은 온도를 출력하는 Subscriber @Slf4j public class TempSubscriber implements Subscriber<TempInfo> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 구독을 저장하고 첫 번째 요청을 전달 this.subscription = subscription; subscription.request(1L); } @Override public void onNext(TempInfo tempInfo) { // 수신한 온도를 출력하고 다음 정보를 요청 log.info("temperature info : {}", tempInfo); subscription.request(1L); } @Override public void onError(Throwable throwable) { log.error("temperature fetch error : {}", throwable.getMessage()); } @Override public void onComplete() { log.info("done!"); } }
public class Ch17ReactiveProgramming { @Test void subscribe() { // TempSubscriber를 이용해 Publisher에 구독하도록 구현 getTemperatures("New York").subscribe(new TempSubscriber()); // temperature info : TempInfo(town=New York, temp=81) // temperature info : TempInfo(town=New York, temp=24) // temperature info : TempInfo(town=New York, temp=19) // temperature fetch error : Temperature Fetch Error! // // getTemperatures의 람다식을 익명클래스로 풀어쓰면 아래처럼 // String town = "London"; // Publisher pub = new Publisher() { // public void subscribe(Subscriber subscriber) { // subscriber.onSubscribe(new TempSubscription(subscriber, town)); // } // }; // pub.subscribe(new TempSubscriber()); } // 리액티브 애플리케이션이 동작할 수 있도록 만든 Publisher private static Publisher<TempInfo> getTemperatures(String town) { return subscriber -> subscriber.onSubscribe( new TempSubscription(subscriber, town) ); } }
- Publisher가 Subscription을 만들어서 Subscriber에게 전달
- Processor로 데이터 변환하기
- Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것이 목적
- 화씨로 제공된 데이터를 섭씨로 변환해 다시 방출하는 예제
public class TempProcessor implements Processor<TempInfo, TempInfo> { private Subscriber<? super TempInfo> subscriber; @Override public void subscribe(Subscriber<? super TempInfo> subscriber) { this.subscriber = subscriber; } @Override public void onNext(TempInfo tempInfo) { // 온도를 섭씨로 변환한 값으로 TempInfo를 만들어 다시 전송 subscriber.onNext(new TempInfo(tempInfo.getTown(), (tempInfo.getTemp() - 32) * 5 / 9)); } // 다른 모든 신호는 업스트림 구독자에게 전달 @Override public void onSubscribe(Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onComplete() { subscriber.onComplete(); } }
public class Ch17ReactiveProgramming { @Test void subscribeWithProcessor() { getCelsiusTemperature("Hanoi").subscribe(new TempSubscriber()); // temperature info : TempInfo(town=Hanoi, temp=26) // temperature info : TempInfo(town=Hanoi, temp=-7) // temperature info : TempInfo(town=Hanoi, temp=13) // temperature fetch error : Temperature Fetch Error! } private static Publisher<TempInfo> getCelsiusTemperature(String town) { return subscriber -> { // Processor를 만들어서 Subscribe와 Publisher 사이에 연결 TempProcessor processor = new TempProcessor(); processor.subscribe(subscriber); processor.onSubscribe(new TempSubscription(processor, town)); }; } }
- 자바는 왜 플로 API 구현을 제공하지 않는가?
- Flow API를 만들 당시 이미 Akka, RxJava 등의 라이브러리가 이미 존재했기 때문에 Flow를 구현한 클래스를 제공하지 않는다.
- Java 9 표준화 과정 덕분에 다양한 라이브러리가 쉽게 협력할 수 있게 됐다.
3. 리액티브 라이브러리 RxJava 사용하기
- 리액티브 원칙이 적용된 온도 보고 프로그램
- io.reactivex
- RxJava는 일부 이벤트에는 역압력을 적용하지 말 것을 권장한다.
- 천 개 이하의 요소를 가진 스트림, 마우스의 움직임, 터치 이벤트 등 역압력을 적용하기 힘든 GUI 이벤트
- 자주 발생하지 않는 종류의 이벤트
- 구성
- Publisher와 동일한 ObservableSource를 구현한 Observable, Flowable
- Subscriber와 동일한 Observer, 유사한 Emitter
- Subscription은 Disposable로 대체
- RxJava는 일부 이벤트에는 역압력을 적용하지 말 것을 권장한다.
Observable 만들고 사용하기
- Observer 인터페이스
- Java9 Subscriber보다 유연하다.
- onNext메서드만 사용해서 Observable을 구독할 수 있다.
- 예제 : 사용자에게 팩토리 메서드를 제공해 매 초마다 온도를 방출하는 Observable 반환
@Slf4j public class TempObserver implements Observer<TempInfo> { @Override public void onSubscribe(@NonNull Disposable disposable) { } @Override public void onNext(@NonNull TempInfo tempInfo) { log.info("tempinfo : {}", tempInfo); } @Override public void onError(@NonNull Throwable throwable) { log.error("got problem : {}", throwable.getMessage()); } @Override public void onComplete() { log.info("done!"); } }
@Slf4j public class Ch17ReactiveProgramming { @Test void observableCustom() { getTemperature("HongKong").blockingSubscribe(new TempObserver()); } private static Observable<TempInfo> getTemperature(String town) { return Observable.create(emitter -> // Observer를 소비하는 함수로부터 Observable 만들기 Observable.interval(1, TimeUnit.SECONDS) // 매 초마다 무한으로 증가하는 long값을 방출하는 Observable .subscribe(i -> { if(!emitter.isDisposed()) { // 소비된 Observer가 아직 폐기되지 않았으면 작업 수행 if (i >= 5) { // 5번 온도보고했으면 emitter.onComplete(); // 옵저버를 완료하고 스트림 종료 } else { try { emitter.onNext(TempInfo.fetch(town)); // 아니면 온도를 보고 } catch (Exception e) { emitter.onError(e); // 에러 발생하면 Observer에 알림 } } } }) ); } }
- Observer 인터페이스
Observable을 변환하고 합치기.
Java9 Flow API에 비해 RxJava 등의 라이브러리는 스트림을 조작하기 쉬운 것이 장점이다.
- 스트림을 합치고, 만들고, 거르는 등
마블 다이어그램 : 리액티브 스트림을 시각적으로 쉽게 표현한 다이어그램
RxJava map, filter를 이용한 예제
Java Stream에서 제공하는 기능들이 있어서 사용하기 편리하겠다.
@Slf4j public class Ch17ReactiveProgramming { ...중복 생략... @Test void observableMap() { getCelsiusTemperatureInRxJava("Berlin").blockingSubscribe(new TempObserver()); // tempinfo : TempInfo(town=Berlin, temp=4) // tempinfo : TempInfo(town=Berlin, temp=16) // tempinfo : TempInfo(town=Berlin, temp=22) // got problem : Temperature Fetch Error! getCelsiusTemperaturesInRxJava("Seoul", "Busan", "Incheon").blockingSubscribe(new TempObserver()); // tempinfo : TempInfo(town=Incheon, temp=1) // tempinfo : TempInfo(town=Seoul, temp=-3) // tempinfo : TempInfo(town=Busan, temp=19) // tempinfo : TempInfo(town=Busan, temp=-5) // tempinfo : TempInfo(town=Seoul, temp=14) // got problem : Temperature Fetch Error! } private Observable<TempInfo> getCelsiusTemperatureInRxJava(String town) { return getTemperature(town) .filter(tempInfo -> tempInfo.getTemp() > 0) .map(temp -> new TempInfo(temp.getTown(), (temp.getTemp() - 32) * 5 / 9)); } private Observable<TempInfo> getCelsiusTemperaturesInRxJava(String... towns) { return Observable.merge(Arrays.stream(towns) .map(TempObservable::getCelsiusTemperature) .collect(toList())); } }
4. 마치며
- 데이터 처리량이 늘고 사용자 기대치가 달라지면서 인기를 얻고 있다.
- 반응성, 회복성, 탄력성, 메시지 주도
- 리액티브 스트림은 비동기적으로 처리되므로 역압력 기법이 기본적으로 탑재되어 있다.
- 역압력은 발행자가 구독자보다 빠른 속도로 아이템을 발행함으로써 발생하는 문제를 방지한다.
'공부 > Java' 카테고리의 다른 글
모던자바인액션(CH19) - Functional Programming (adv) (0) | 2020.10.08 |
---|---|
모던자바인액션(CH18) - Functional Programming (0) | 2020.09.20 |
모던자바인액션(CH16) - CompletableFuture : 안정적 비동기 프로그래밍 (0) | 2020.08.29 |
모던자바인액션(CH15) - CompletableFuture와 리액티브 프로그래밍 컨셉의 기초 (0) | 2020.08.22 |
모던자바인액션(CH13) - 디폴트 메서드 (0) | 2020.08.12 |
Comments