Stream에서 CompletableFuture 사용하기

Posted by Joonki Chung on Tuesday, November 7, 2017 Tags: java   3 minute read

Problem

스트림 데이터를 각각 비동기로 별로의 쓰레드에서 병렬 처리하고, 결과를 취합하여 메인 쓰레드에 표시하는 작업을 구현해야했다. 그래서 다음과 같이 구현했었다.

// 동물 단어를 받아서 각 단어들의 글자수의 합을 구한다고 가정해보자
Long sum = Stream.of("cat", "dog", "tiger", "lion")
                // calculate()은 단어의 글자수를 구하는 함수.
                // executor는 사전에 정의해둔 Executor. 쓰레드가 두개 이상인 Executor라고 가정하자.
                .mapToLong(word -> CompletableFuture.supplyAsync(() -> calculate(word), executor).join())
                .sum();
System.out.println(sum);

하지만 막상 실행해보니 비동기적으로 처리되지 않았다. 곰곰이 생각해보니 저렇게 구현하면 스트림 데이터가 순차적으로 처리될테니 CompletableFuture를 쓴 의미가 없어진다. 그래서 다시 수정해보았다.

// 동물 단어를 받아서 각 단어들의 글자수의 합을 구한다고 가정해보자
Long sum = Stream.of("cat", "dog", "tiger", "lion")
                // calculate()은 단어의 글자수를 구하는 함수.
                // executor는 사전에 정의해둔 Executor. 쓰레드가 두개 이상인 Executor라고 가정하자.
                .map(word -> CompletableFuture.supplyAsync(() -> calculate(word), executor))
                .mapToLong(CompletableFuture::join)
                .sum();
System.out.println(sum);

이번에는 두 단계로 나눠서, 1단계에서는 supplyAsync()를 하고 그 후에 join()를 수행했다. 하지만 역시 안됐다. map()mapToLong() 모두 intermediate operation이기 때문이다. Terminal operation인 sum()에서 최종 처리될 것이므로 수정 전과 다를게 없다.

Solution

예제의 3, 4번을 참고하여 다시 수정해보았다.

// 동물 단어를 받아서 각 단어들의 글자수의 합을 구한다고 가정해보자
List<CompletableFuture> futures = Stream.of("cat", "dog", "tiger", "lion")
                // calculate()은 단어의 글자수를 구하는 함수.
                // executor는 사전에 정의해둔 Executor. 쓰레드가 두개 이상인 Executor라고 가정하자.
                .map(word -> CompletableFuture.supplyAsync(() -> calculate(word), executor))
                .collect(Collectors.toList()); // *중요*

Long sum = futures.stream()
                .mapToLong(CompletableFuture::join)
                .sum();

System.out.println(sum);

이렇게 하니 정상적으로 병렬 처리가 되었다. collect()로 스트림 자체를 terminate했기 때문에 supplyAsync()가 비동기 작업을 수행할 수 있었다. 그 후에 join()으로 취합하여 의도대로 동작하게 되었다.

Thoughts

Stream의 intermediate/terminal operation에 대해서는 알고 있던 개념이었는데 CompletableFuture와 같이 쓰니 헷갈렸던 것 같다. CompletableFuture에 대한 이해가 부족하기도 하다. 그런데 생각보다 쓰기가 훨씬 복잡해서 좀 당황스럽다. 자바스크립트의 Promise와는 다르게 멀티쓰레드 방식이라 그런 것 같다. 이래서 Spring 5 WebFlux가 나왔나 싶기도..

References

http://fahdshariff.blogspot.kr/2016/06/java-8-completablefuture-vs-parallel.html