일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 스프링데이터흐름
- 엔티티직접사용
- jQuery값전송
- calendar
- jQuery값전달
- fetchjoin
- Hibernate
- 자바서블릿
- namedQuery
- jscalendar
- javascriptcalendar
- joinfetch
- 페이징
- 제네릭
- JPQL
- JQuery
- jQueryUI
- 페치조인
- LIST
- values()
- Generic
- springflow
- fullcalendar
- 벌크연산
- javaservlet
- 프로젝트생성
- paging
- 대량쿼리
- JPA
- 제너릭
- Today
- Total
가자공부하러!
모던자바인액션(CH7) - 스트림(4) 본문
목차
요약 및 결론
책 내용
청크?
요약 및 결론
스트림은 쉽게 병렬처리를 제공해주는게 맞지만 원리를 모르고 막 쓰다보면 쉽게 망할 수 있다. 테스트 필수.
책 내용
세팅
병렬 스트림
- 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
- 모든 멀티코어 프로세서가 각각의 청크를 처리하게 됨
- 내부적으로 ForkJoinPool을 사용한다.
- Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.
- 순차스트림에 parallel() 메소드를 호출하면 병렬 스트림으로 변환된다.
- https://docs.oracle.com/javase/8/docs/api/java/util/stream/BaseStream.html#parallel--
- 반대로 병렬 스트림에 sequential() 메소드를 호출하면 순차 스트림으로 변환된다.
- 마지막으로 호출된 메소드가 전체 파이프라인에 영향을 미친다.
- 부적합한 자료구조를 병렬처리 했다가는 안하느니만 못한 결과를 얻는다.
- 이전 연산의 출력이 다음 연산의 입력에 영향을 미친다면 병렬처리를 위한 청크로 분할할 수 없다.
- 멀티코어 간 데이터 이동은 꽤 비싸기 때문에 데이터 이동 시간보다 훨씬 오래 시간이 걸리는 작업만 병렬처리 하는게 바람직 하다.
- 공유된 가변 상태는 올바른 결과값을 얻을 수 없고 성능이 안좋아지므로 꼭 피해야한다.
- 병렬 스트림을 효과적으로 사용하기 위해서는?
- 언제나 병렬 스트림의 성능이 순차 스트림의 성능보다 뛰어난 것이 아니기 때문에 확신이 서지 않으면 직접 테스트해보아야 한다. 확신이 서도 테스트 하는게 좋겠다.
- 순차 스트림이 병렬 스트림보다 성능이 좋은 경우가 있다.
- 자동 박싱/언박싱은 성능을 크게 저하시킬 수 있는 요소이므로 기본형 특화 스트림을 사용할 수 있으면 사용하는게 좋다.
- 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
- 스트림을 구성하는 자료구조가 병렬 처리에 적절한지 확인해야 한다. 요소를 청크로 분할하는 비용이 비싸면 안된다.
- 최종 연산의 병합 과정의 비용을 살펴야 한다.
- 스트림 소스의 병렬화 친밀도
|소스|분해성|
|ArrayList|훌륭|
|LinkedList|나쁨|
|IntStream.range|훌륭|
|Stream.iterate|나쁨|
|HashSet|좋음|
|TreeSet|좋음|
- 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
포크/조인 프레임워크
병렬 스트림의 내부 동작 원리를 이해한다 라는 생각으로 읽습니다.
병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음 각각의 서브태스크 결과를 합쳐서 전체 결과를 만들도록 설계되어 있음
서브태스크를 스레드풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
RecursiveTask 활용
resursive : 반복되는
스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다.
- R은 [병렬화된 태스크가 생성하는 결과 타입] 또는 [RecursiveAction 타입(결과가 없을 때)]
RecursiveTask를 정의하려면 추상 메소드 compute를 구현해야 한다.
compute 메소드 : 태스크를 서브태스크로 분할하는 로직과 더이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘 정의
// 대부분의 compute 메소드 구현 형식 if (태스크가 충분히 작거나 더 이상 분할할 수 없다) { 순차적으로 태스크 계산 } else { 태스크를 두 서브태스크로 분할 태스크가 다시 서브태스크로 분할되도록 이 메소드를 재귀적으로 호출 모든 서브태스크의 연산이 완료될 때 까지 기다림 각 서브태스크의 결과를 합침 }
태스크 분할여부 결정 기준은 분할 후 정복(divide-and-conquer) 알고리즘의 병렬화 버전이 좋다
예제 코드
//RecursiveTask 정의와 추상메소드 compute() 구현 public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long>{ private final long[] numbers; private final int start; private final int end; public static final long THRESHHOLD = 10000L; //이 값 이하의 서브태스크는 더이상 분해 불가 //메인태스크를 생성할 때 사용할 공개 생성자 public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } //메인태스크의 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성자 private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { //태스크에서 더할 배열의 길이 int length = end - start; //기준값과 같거나 작으면 순차적으로 결과를 계산 if (length <= THRESHHOLD) { return computeSequentially(); } //배열의 첫 번째 절반을(왼쪽) 더하는 서브태스크 생성 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2); //ForkJoinPool의 다른 스레드를 통해 새로 생성한 태스크를 비동기 실행 leftTask.fork(); //배열의 나머지 반을 다루는 서브태스크 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start, start + length/2); //두 번째 서브태스크 동기 실행. 추가로 분할이 발생할 수 있음 Long rightResult = rightTask.compute(); //첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없으면 대기 Long leftResult = leftTask.join(); //두 서브태스크의 결과를 조합한 값을 결과로 리턴 return rightResult + leftResult; } //더 이상 분할할 수 없을 때 서브태스크의 결과를 계산 private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
@Test public void forkJoinSumTest() { //ForkJoinSumCalculator에서 정의한 태스크를 수행하고 결과를 출력하는 테스트 int n = 1000; long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); Long invoke = (Long) (new ForkJoinPool().invoke(task)); log.debug("invoke : {}", invoke); }
포크/조인 프레임워크를 제대로 활용하는 방법
- join 메소드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때 까지 호출자를 블록시킨다.
- 그러므로 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.
- 그렇지 않으면 각각의 서브태스크가 다른 태스크의 종료를 기다리게 됨
- RecursiveTask 내에서는 ForkJoinPool의 invoke() 메소드는 사용하면 안됨.
- compute()나 fork()는 가능
- invoke()는 순차코드에서 병렬계산을 시작할 때에만
- 서브태스크에 fork() 메소드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
- 한쪽작업은 fork(), 다른쪽은 comput()를 호출하면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다
- 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
- fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.
- 작업 훔치기
- ForkJoinPool의 모든 스레드를 거의 공정하게 분할하는 기법
- 흐름
- 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날때 마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다.
- 한 스레드가 다른 스레드 보다 자신에게 할당된 태스크를 더 빨리 처리한 상황
- 할일이 없어진 스레드는 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.
- 모든 태스크가 작업을 끝낼 때(모든 큐가 빌 때)까지 이 과정을 반복
- join 메소드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때 까지 호출자를 블록시킨다.
Spliterator 인터페이스
Spliterator : 분할할 수 있는 반복자
Iterator와 비교
|Iterator|Spliterator|
|boolean next()|boolean tryAdvance(Consumer<? super T> action)|
|-|Spliteratort trySplit()|- trySplit() : Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성하는 메소드
분할 과정
- 스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
Spliterator 특성
//get 특성 list.spliterator().characteristics(); public static final int ORDERED = 0x00000010; public static final int DISTINCT = 0x00000001; public static final int SORTED = 0x00000004; public static final int SIZED = 0x00000040; public static final int NONNULL = 0x00000100; public static final int IMMUTABLE = 0x00000400; public static final int CONCURRENT = 0x00001000; public static final int SUBSIZED = 0x00004000;
커스텀 Spliterator 구현
//반복형으로 단어 수를 세는 메소드 //얘가 Spliterator랑 무슨상관? private int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) { counter++; } lastSpace = false; } } return counter; }
//문자열 스트림을 탐색하면서 단어 수를 세는 클래스 public class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } } //문자 스트림의 리듀싱 연산 구현 private int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); }
//Spliterator를 구현한 다음 병렬 스트림으로 전달 class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept(string.charAt(currentChar++)); //현재 문자 소비 return currentChar < string.length(); //소비할 문자가 남았으면 true } @Override public Spliterator<Character> trySplit() { int currentSize = string.length(); if (currentSize < 10) { return null; } //파싱할 문자열의 중간을 분할위치로 설정 for (int splitPos = currentSize/2 + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { //처음부터 분할 위치까지 문자열을 파싱할 새로운 Spliterator 생성 Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; //생성한 spliterator의 시작위치를 분할 위치로 설정 return spliterator; //공백을 찾았고 문자열을 분리했으므로 루프 종료 } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
//WordCounterSpliterator 활용 @Test public void wordCounterSpliteratorTest() { Spliterator<Character> spliterator = new WordCounterSpliterator("Hello World"); Stream<Character> stream = StreamSupport.stream(spliterator, true);//true는 병렬 스트림 생성 여부를 의미 //병렬스트림을 countWords()메소드로 전달 log.debug(String.format("Found %d words", countWords(stream))); //결과 : Found 19 words }
마치며
- 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리할 수 있다.
- 하지만 항상 병렬처리가 좋은 것은 아니기 때문에 테스트 필수
청크?
처음듣는 단어가 나와서 궁금하다.
청크가 뭔가요?
- 작업 덩어리라고 알고 있으면 될 듯
영어사전 : 몇 개의 단어가 자연스럽게 연결되어 구체적인 뜻을 나타내는 표현과 의미 덩어리
**명사** 1. 명사 (두툼한) 덩어리 2. 명사 비격식 상당히 많은 양 3. 명사 언어 말모듬, 청크(언어 학습자가 한꺼번에 하나의 단위처럼 배울 수 있는 어구. (예를 들어 Can I have the bill, please?나 Pleased to meet you. 같은 표현들) **동사** 1. 타동사 [미·구어] <물건을> 내던지다; <불에> 장작을 지피다 ((up)) 2. 자동사 <기계 등이> 덜커덩[탕, 딱, 쾅] 소리를 내다
SpringBatch에서의 Chunk 클래스
- add, skip등의 메소드가 있는것으로 보아 뭔가 작업단위인듯 한 느낌이 있음
- https://docs.spring.io/spring-batch/docs/4.2.1.RELEASE/api/org/springframework/batch/core/step/item/Chunk.html
x5software의 Templatating System
- Java와 HTML을 자연스럽게 섞어주기 위한 목적의 템플릿
- http://www.x5software.com/chunk/wiki/index.php?title=Chunk_Documentation
'공부 > Java' 카테고리의 다른 글
모던자바인액션(CH9) - 스트림과 람다 활용(2) (0) | 2020.06.13 |
---|---|
모던자바인액션(CH8) - 스트림과 람다 활용(1) (0) | 2020.06.07 |
모던자바인액션(CH6) - 스트림(3) (0) | 2020.05.18 |
모던자바인액션(CH5) - 스트림(2) (0) | 2020.05.12 |
모던자바인액션(CH4) - 스트림(1) (0) | 2020.05.05 |