가자공부하러!

모던자바인액션(CH7) - 스트림(4) 본문

공부/Java

모던자바인액션(CH7) - 스트림(4)

오피스엑소더스 2020. 6. 3. 20:32

목차

요약 및 결론
책 내용
청크?


요약 및 결론

스트림은 쉽게 병렬처리를 제공해주는게 맞지만 원리를 모르고 막 쓰다보면 쉽게 망할 수 있다. 테스트 필수.


책 내용

  1. 세팅

  2. 병렬 스트림

    • 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
      • 모든 멀티코어 프로세서가 각각의 청크를 처리하게 됨
      • 내부적으로 ForkJoinPool을 사용한다.
      • Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.
    • 순차스트림에 parallel() 메소드를 호출하면 병렬 스트림으로 변환된다.
    • 부적합한 자료구조를 병렬처리 했다가는 안하느니만 못한 결과를 얻는다.
      • 이전 연산의 출력이 다음 연산의 입력에 영향을 미친다면 병렬처리를 위한 청크로 분할할 수 없다.
      • 멀티코어 간 데이터 이동은 꽤 비싸기 때문에 데이터 이동 시간보다 훨씬 오래 시간이 걸리는 작업만 병렬처리 하는게 바람직 하다.
      • 공유된 가변 상태는 올바른 결과값을 얻을 수 없고 성능이 안좋아지므로 꼭 피해야한다.
    • 병렬 스트림을 효과적으로 사용하기 위해서는?
      • 언제나 병렬 스트림의 성능이 순차 스트림의 성능보다 뛰어난 것이 아니기 때문에 확신이 서지 않으면 직접 테스트해보아야 한다. 확신이 서도 테스트 하는게 좋겠다.
      • 순차 스트림이 병렬 스트림보다 성능이 좋은 경우가 있다.
      • 자동 박싱/언박싱은 성능을 크게 저하시킬 수 있는 요소이므로 기본형 특화 스트림을 사용할 수 있으면 사용하는게 좋다.
      • 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
      • 스트림을 구성하는 자료구조가 병렬 처리에 적절한지 확인해야 한다. 요소를 청크로 분할하는 비용이 비싸면 안된다.
      • 최종 연산의 병합 과정의 비용을 살펴야 한다.
      • 스트림 소스의 병렬화 친밀도
        |소스|분해성|
        |ArrayList|훌륭|
        |LinkedList|나쁨|
        |IntStream.range|훌륭|
        |Stream.iterate|나쁨|
        |HashSet|좋음|
        |TreeSet|좋음|
  3. 포크/조인 프레임워크

    • 병렬 스트림의 내부 동작 원리를 이해한다 라는 생각으로 읽습니다.

    • 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음 각각의 서브태스크 결과를 합쳐서 전체 결과를 만들도록 설계되어 있음

    • 서브태스크를 스레드풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.

    • RecursiveTask 활용

      • resursive : 반복되는

      • 스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다.

        • R은 [병렬화된 태스크가 생성하는 결과 타입] 또는 [RecursiveAction 타입(결과가 없을 때)]
      • RecursiveTask를 정의하려면 추상 메소드 compute를 구현해야 한다.

        • compute 메소드 : 태스크를 서브태스크로 분할하는 로직과 더이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘 정의

        • // 대부분의 compute 메소드 구현 형식
          if (태스크가 충분히 작거나 더 이상 분할할 수 없다) {
            순차적으로 태스크 계산
          } else {
            태스크를 두 서브태스크로 분할
            태스크가 다시 서브태스크로 분할되도록 이 메소드를 재귀적으로 호출
            모든 서브태스크의 연산이 완료될 때 까지 기다림
            각 서브태스크의 결과를 합침
          }
          
        • 태스크 분할여부 결정 기준은 분할 후 정복(divide-and-conquer) 알고리즘의 병렬화 버전이 좋다

        • 예제 코드

          //RecursiveTask 정의와 추상메소드 compute() 구현
          public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long>{
          private final long[] numbers;
          private final int start;
          private final int end;
          public static final long THRESHHOLD = 10000L; //이 값 이하의 서브태스크는 더이상 분해 불가
          //메인태스크를 생성할 때 사용할 공개 생성자
          public ForkJoinSumCalculator(long[] numbers) {
              this(numbers, 0, numbers.length);
          }
          //메인태스크의 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성자
          private ForkJoinSumCalculator(long[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
          }
          @Override
          protected Long compute() {
            //태스크에서 더할 배열의 길이
            int length = end - start;
            //기준값과 같거나 작으면 순차적으로 결과를 계산
            if (length <= THRESHHOLD) {
              return computeSequentially();
            }
            //배열의 첫 번째 절반을(왼쪽) 더하는 서브태스크 생성
            ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
            //ForkJoinPool의 다른 스레드를 통해 새로 생성한 태스크를 비동기 실행
            leftTask.fork();
            //배열의 나머지 반을 다루는 서브태스크
            ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
            //두 번째 서브태스크 동기 실행. 추가로 분할이 발생할 수 있음
            Long rightResult = rightTask.compute();
            //첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없으면 대기
            Long leftResult = leftTask.join();
            //두 서브태스크의 결과를 조합한 값을 결과로 리턴
            return rightResult + leftResult;
          }
          //더 이상 분할할 수 없을 때 서브태스크의 결과를 계산
          private long computeSequentially() {
            long sum = 0;
            for (int i = start; i < end; i++) {
              sum += numbers[i];
            }
            return sum;
          }
          }
          @Test
          public void forkJoinSumTest() {
          //ForkJoinSumCalculator에서 정의한 태스크를 수행하고 결과를 출력하는 테스트
          int n = 1000;
          long[] numbers = LongStream.rangeClosed(1, n).toArray();
          ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
          Long invoke = (Long) (new ForkJoinPool().invoke(task));
          log.debug("invoke : {}", invoke);
          }
    • 포크/조인 프레임워크를 제대로 활용하는 방법

      • join 메소드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때 까지 호출자를 블록시킨다.
        • 그러므로 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.
        • 그렇지 않으면 각각의 서브태스크가 다른 태스크의 종료를 기다리게 됨
      • RecursiveTask 내에서는 ForkJoinPool의 invoke() 메소드는 사용하면 안됨.
        • compute()나 fork()는 가능
        • invoke()는 순차코드에서 병렬계산을 시작할 때에만
      • 서브태스크에 fork() 메소드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.
        • 한쪽작업은 fork(), 다른쪽은 comput()를 호출하면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다
      • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
        • fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.
        • 작업 훔치기
      • ForkJoinPool의 모든 스레드를 거의 공정하게 분할하는 기법
      • 흐름
        • 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날때 마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다.
        • 한 스레드가 다른 스레드 보다 자신에게 할당된 태스크를 더 빨리 처리한 상황
        • 할일이 없어진 스레드는 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.
        • 모든 태스크가 작업을 끝낼 때(모든 큐가 빌 때)까지 이 과정을 반복
  4. Spliterator 인터페이스

    • Spliterator : 분할할 수 있는 반복자

    • Iterator와 비교
      |Iterator|Spliterator|
      |boolean next()|boolean tryAdvance(Consumer<? super T> action)|
      |-|Spliteratort trySplit()|

      • trySplit() : Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성하는 메소드
    • 분할 과정

      • 스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
    • Spliterator 특성

      • //get 특성
        list.spliterator().characteristics();
        public static final int ORDERED    = 0x00000010;
        public static final int DISTINCT   = 0x00000001;
        public static final int SORTED     = 0x00000004;
        public static final int SIZED      = 0x00000040;
        public static final int NONNULL    = 0x00000100;
        public static final int IMMUTABLE  = 0x00000400;
        public static final int CONCURRENT = 0x00001000;
        public static final int SUBSIZED = 0x00004000;
    • 커스텀 Spliterator 구현

      //반복형으로 단어 수를 세는 메소드
      //얘가 Spliterator랑 무슨상관?
      private int countWordsIteratively(String s) {
       int counter = 0;
       boolean lastSpace = true;
       for (char c : s.toCharArray()) {
         if (Character.isWhitespace(c)) {
           lastSpace = true;
         } else {
           if (lastSpace) {
             counter++;
           }
           lastSpace = false;
         }
       }
       return counter;
      }
      //문자열 스트림을 탐색하면서 단어 수를 세는 클래스
      public class WordCounter {
       private final int counter;
       private final boolean lastSpace;
       public WordCounter(int counter, boolean lastSpace) {
         this.counter = counter;
         this.lastSpace = lastSpace;
       }
       public WordCounter accumulate(Character c) {
         if (Character.isWhitespace(c)) {
           return lastSpace ? this : new WordCounter(counter, true);
         } else {
           return lastSpace ? new WordCounter(counter + 1, false) : this;
         }
       }
       public WordCounter combine(WordCounter wordCounter) {
         return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
       }
       public int getCounter() {
         return counter;
       }
      }
      //문자 스트림의 리듀싱 연산 구현
      private int countWords(Stream<Character> stream) {
       WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);
       return wordCounter.getCounter();
      }
      //Spliterator를 구현한 다음 병렬 스트림으로 전달
      class  WordCounterSpliterator implements Spliterator<Character> {
        private final String string;
        private int currentChar = 0;
        public WordCounterSpliterator(String string) {
          this.string = string;
        }
        @Override
        public boolean tryAdvance(Consumer<? super Character> action) {
          action.accept(string.charAt(currentChar++));  //현재 문자 소비
          return currentChar < string.length(); //소비할 문자가 남았으면 true
        }
        @Override
        public Spliterator<Character> trySplit() {
          int currentSize = string.length();
          if (currentSize < 10) {
            return null;
          }
          //파싱할 문자열의 중간을 분할위치로 설정
          for (int splitPos = currentSize/2 + currentChar; splitPos < string.length(); splitPos++) {
            if (Character.isWhitespace(string.charAt(splitPos))) {
              //처음부터 분할 위치까지 문자열을 파싱할 새로운 Spliterator 생성
              Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
              currentChar = splitPos; //생성한 spliterator의 시작위치를 분할 위치로 설정
              return spliterator; //공백을 찾았고 문자열을 분리했으므로 루프 종료
            }
          }
          return null;
        }
        @Override
        public long estimateSize() {
          return string.length() - currentChar;
        }
        @Override
        public int characteristics() {
          return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
        }
      }
      //WordCounterSpliterator 활용
        @Test
        public void wordCounterSpliteratorTest() {
          Spliterator<Character> spliterator = new WordCounterSpliterator("Hello World");
          Stream<Character> stream = StreamSupport.stream(spliterator, true);//true는 병렬 스트림 생성 여부를 의미
          //병렬스트림을 countWords()메소드로 전달
          log.debug(String.format("Found %d words", countWords(stream)));
          //결과 : Found 19 words
        }
  5. 마치며

    • 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리할 수 있다.
    • 하지만 항상 병렬처리가 좋은 것은 아니기 때문에 테스트 필수

청크?

  • 처음듣는 단어가 나와서 궁금하다.

  • 청크가 뭔가요?

    • 작업 덩어리라고 알고 있으면 될 듯
  • 영어사전 : 몇 개의 단어가 자연스럽게 연결되어 구체적인 뜻을 나타내는 표현과 의미 덩어리

    **명사**
    1. 명사 (두툼한) 덩어리
    2. 명사 비격식 상당히 많은 양
    3. 명사 언어 말모듬, 청크(언어 학습자가 한꺼번에 하나의 단위처럼 배울 수 있는 어구. (예를 들어 Can I have the bill, please?나 Pleased to meet you. 같은 표현들)
    **동사**
    1. 타동사 [미·구어] <물건을> 내던지다; <불에> 장작을 지피다 ((up))
    2. 자동사 <기계 등이> 덜커덩[탕, 딱, 쾅] 소리를 내다
  • SpringBatch에서의 Chunk 클래스

  • x5software의 Templatating System


Comments