728x90
반응형
SMALL

이번 포스팅에서는 자바8에서 새로 도입된 CompletableFuture에 대해 알아보자. 이름에 Future가 들어가니까 Executors 프레임워크를 사용할 때 배워봤던 그 Future와 연관이 있나? 생각이 든다. 맞다.

 

CompletableFuture의 탄생 배경

자바에서 비동기 프로그래밍을 하려고 하면, Future를 사용해서도 어느정도는 가능했다만, 불편한 점들이 있다.

뭐가 불편했지?를 고민하기 전에 자바에서 비동기 프로그래밍이라는 건 어떤건지 먼저 감을 잡기 위해 아래 코드를 보자.

package executors;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Future<String> future = executorService.submit(new Task());

        // 이곳에서 future 를 기다리지 않고 계속 작업이 가능

        String futureResult = future.get();// 블로킹 메서드
        
        // 이곳에서 future 의 결과를 가지고 작업을 할 수 있음
        System.out.println(futureResult);

        executorService.shutdown();
        executorService.close();
    }

    static class Task implements Callable<String> {

        @Override
        public String call() throws Exception {
            System.out.println("Hello Callable");
            return "Im Callable";
        }
    }
}
  • 지금 코드를 보면, Executors 프레임워크를 사용해서 쓰레드 하나짜리 풀을 만들고 Callable을 수행한다.
  • 비동기 프로그래밍이란 건 Taskcall() 메서드를 실행하는 것을 기다리지 않고 내가 하고자하는 작업을 진행하는 걸 말한다.
  • 근데 여기서 불편한 점이 있다. future.get()을 호출하기 전에는 기다리지 않고 계속 무언가 작업이 가능하지만 get()을 호출하고 나서는 결국 저 Future의 작업이 끝날때까지 기다려야 하는 블로킹이 걸린다.
  • 그리고 나서 그 결과를 받아오고 나서 그 결과를 가지고 작업을 할 수 있다.

그러니까, Future를 이용해서 비동기 프로그래밍을 하려면 결국 get()을 호출하기 전에 최대한 열심히 무언가 작업을 해야하고 get()을 호출하고 나서 그 Future의 결과를 통해 어떤 작업을 수행할 수 있는 것이다. 

 

그런데 이제 이런 불편함이 생긴것이다.

→ 블로킹 메서드(get())을 호출하기 전에는 작업이 끝났을 때 콜백함수를 실행할 수 없다. 콜백함수를 미리 지정해 놓을 수 있다면 굉장히 편리할 것 같다.

 

CompletableFuture 사용

비동기 작업의 리턴값이 없는 경우: runAsync()

위와 같은 불편함을 해소하고자 이 CompletableFuture가 등장했다. 이 코드는 어떻게 사용하는지 한번 보자.

package executors;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        CompletableFuture.runAsync(() -> System.out.println("Run" + Thread.currentThread().getName()));

        System.out.println("Hi " + Thread.currentThread().getName());
    }
}
  • 어떤 작업을 비동기적으로 실행만 하면 될때, CompletableFuturerunAsync()를 사용할 수 있다.
  • 이렇게 작성하면, CompletableFuture의 작업과는 아무런 영향없이 그 이후의 코드를 실행할 수 있다.

실행 결과

 

비동기 작업의 리턴값이 있는 경우: supplyAsync()

이번에는 위와 다르게 비동기 작업의 리턴값이 있는 경우에는 어떻게 하면 될까?

package executors;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        });

        System.out.println("Hi " + Thread.currentThread().getName());
        System.out.println("Future return: " + future.get());
    }
}
  • 어떤 작업을 비동기적으로 수행하고 그 수행의 리턴값이 있는 경우에는 이렇게 supplyAsync()를 사용하면 된다.

실행 결과

 

 

그런데 지금까지는, Future를 사용하는 것과 별반 다른게 없다. 그래서 이제 Future와 어떤것이 확연히 다른지를 살펴보자.

CompletableFuturecallback

자, 만약 내가 어떤 작업을 비동기적으로 수행하고 그 결과를 통해 무언가를 또 하고 싶을때 어떻게 하면 될까? Future를 사용했을 땐, get()을 호출하고 결과를 받을때까지 블로킹 상태로 대기하다가 결과가 나오면 그때 무언갈 할 수 있었다.

package executors;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        }).thenApply(s -> {
            System.out.println(s + " " + Thread.currentThread().getName());
            return s.toUpperCase();
        });

        System.out.println("Hi " + Thread.currentThread().getName());
        System.out.println("Future return: " + future.get());
    }
}
  • 코드를 보면, thenApply()를 호출한다. 이게 바로 어떤 비동기 작업의 결과를 통해 무언가를 실행하는 콜백 함수이다.
  • 그래서, 결과로 받은 문자열을 대문자로 전부 변경하고 그 값을 반환한다.
  • 실제로 future.get()을 호출해보면 결과는 다음과 같다.

 

물론, 콜백 함수가 어떤 반환값이 필요하지 않은 경우가 있을수도 있다.

package executors;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        }).thenAccept(System.out::println);

        System.out.println("Hi " + Thread.currentThread().getName());
        System.out.println("Future return: " + future.get());
    }
}
  • 그럴때는 이렇게 thenAccept()를 호출하면 된다.
  • 그리고 이게 다 이전 포스팅을 배운 이유인게 thenAccept()는 인자로 무엇을 받냐면 Consumer를 받는다. Consumer는? 무언갈 받아서 소비만 하고 따로 리턴하는 게 없는 함수형 인터페이스다.

  • 반대로, 아까 콜백 함수를 사용하는데 반환값도 있었던 thenApply는 무엇을 받을까?

  • 이렇듯 Function을 받는다. 함수형 인터페이스 Function은 어떤값(T)를 받아 어떤값(U)로 반환한다.

 

그래서 이렇게 CompletableFuture를 사용하면, 비동기 프로그래밍을 훨씬 쉽고 편리하게 할 수가 있다.

그런데, 궁금한게 있다. Future를 사용했을 때는 Executors 프레임워크로 스레드 풀을 만들고 그 쓰레드 풀에서 스레드를 꺼내와서 사용했는데 여기서는 어떻게 된게 쓰레드 풀도 따로 안 만들고 어떻게 쓰레드가 생기고 하는 걸까?

 

실행해서 현재 쓰레드의 이름을 찍어보면 이런식으로 나온다.

ForkJoinPool.commonPool-worker-1

ForkJoinPool? 이 녀석은 자바7에서 추가된 병렬 작업을 처리하기 위한 효율적인 스레드 풀이다. 그리고 CompletableFuture는 기본적으로 ForkJoinPool.commonPool()을 사용한다. 그런데 이 풀은 기본적으로 CPU 코어 수에 비례하여 스레드 수를 제한한다. 근데 사실 그렇게 되면 CPU 코어 수가 아무리 많아봐야 20개가 안되는데 보통의 멀티쓰레드를 사용하는 애플리케이션은 쓰레드 1000개도 만들고 한다. 따라서 생각보다 이 ForkJoinPool이 비효율적 일수가 있는데 이럴때를 대비해 명시적으로 Executors에서 사용하는 스레드풀을 지정할 수가 있다. 아래 코드를 보자.

package executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        }, executorService);

        System.out.println(hello.get());
    }
}
  • supplyAsync()에는 두번째 파라미터로 Executor를 넘길 수가 있다. 그래서 내가 만든 ExecutorService를 넘겨주면 이 스레드 풀을 사용한다. 아 물론, supplyAsync()뿐 아니라 runAsync()도 이렇게 사용할 수 있다. 

 

CompletableFuture의 사용 2 - 조합

thenCompose()

이번에는 조금 더 깊게 들어가서, Future만을 사용했을 때 또 어떤 점이 불편했냐면, A라는 Future 하나와, B라는 Future 하나가 있을 때 이 두개를 이어서 하려면 A를 get()하고, B를 get()해서 이 두개의 결과를 가지고 이후 코드를 작성해야 했다.

 

근데, 이제 CompletableFuture를 사용하면, 어떻게 편리하게 사용할 수 있을까?

package executors;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = hello.thenCompose(Main::getWorld);
        System.out.println("future.get() = " + future.get());
    }

    private static CompletableFuture<String> getWorld(String word) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return word + " World";
        });
    }
}
  • CompleteableFuture에는 thenCompose()라는 게 있다. 이 녀석을 사용하면 어떤 Future의 결과를 받아서 새로운 Future를 이어갈 수 있다.
  • thenCompose()는 파라미터로 Function 함수형 인터페이스를 받는다. 즉, 어떤 값을 받아 어떤 값으로 변환해준다는 의미가 된다. 그리고 반환타입은 CompletableFuture<T>이다. 새로운 Future를 반환한다는 의미이다.
  • 그래서, Future의 결과값을 전달해주고 새로운 Future를 만들어낸다.

  • 그래서 thenCompose()를 사용하면 CompletableFuture를 이어서 실행할 수 있다.

thenCombine()

그런데 위의 케이스의 경우, 두 Future가 서로 연관관계가 있어서 어떤게 먼저 실행하고 그 다음걸 실행할 때 유용하게 사용할 수 있다. 코드도 보면 "Hello World"를 찍기 위해 "Hello"를 반환하는 Future를 먼저 실행하고 그 다음 "World"를 반환하는 Future를 실행한 것처럼 Future끼리 연관관계가 있을 때 Future끼리 이어 실행할 수 있게 하는게 thenCompose()였다면, 아무런 연관관계가 없지만 동시에 실행시키고 싶을 때도 있을 것이다. 

package executors;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture<String> future = hello.thenCombine(world, (h, w) -> h + " " + w);
        System.out.println("future.get() = " + future.get());
    }
}
  • 이럴때 사용하는게 thenCombine()이다. 아무 연관관계는 없지만 동시에 실행시키고 그 두 Future의 결과값으로 새로운 것을 만들어낼 때 유용하다. 그래서 지금 hello, world 라는 두 CompletableFuture가 있을 때 얘네가 누가 먼저 실행되어야 하고 그런건 아니지만 이 두개의 결과를 가지고 무언가를 만들어낼때 사용하기 딱 좋은게 thenCombine()이다.
  • thenCombine()은 첫번째 인자로, CompletionStage를 받는다. 이건 CompletableFuture가 구현한 인터페이스라서 CompletableFuture가 들어갈 수 있다. 그리고 두번째 인자로 BiFunction을 받는다. 정말 딱 들어맞는 함수형 인터페이스 아닌가? 

실행 결과

 

 

allOf()

이번에는 CompletableFuture가 2개 이상일때, 그 모든 CompletableFuture를 한번에 다 처리하고 어떤 작업을 진행할 수 있는 방법을 소개한다. 

package executors;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture[] futures = new CompletableFuture[]{hello, world};
        CompletableFuture<List<Object>> results = CompletableFuture.allOf(futures)
                .thenApply(v -> Arrays.stream(futures).map(CompletableFuture::join).toList());

        results.get().forEach(System.out::println);
    }
}
  • allOf()CompletableFuture의 배열을 받는다. 그래서 넘겨받은 모든 CompletableFuture의 모든 작업이 끝나면, thenApply()가 호출되는데 여기서 각 Futureget()을 호출하면 되지만 get()은 체크 예외를 던지기 때문에 처리하기가 좀 난감해진다. 그렇기에 join()을 호출하면 동일하게 작업의 결과를 받아오지만 체크예외가 아닌 언체크예외를 던지기 때문에 예외처리를 위한 별도의 동작이 필요없어진다.
  • 물론, join() 대신 get()을 호출하고 예외처리를 직접 해주어도 상관은 없다. 어떤 것을 사용하든 각각의 Future의 결과값을 리스트로 변환하고 해당 리스트를 순회하면서 출력하는 코드이고 결과는 다음과 같다.

실행 결과

 

anyOf()

이번에는 모든 Future가 끝난 후 실행되는 것 말고 어떤 Future라도 제일 빨리 끝난게 생기면 무언가를 처리할 수 있는 anyOf()에 대해 알아보자. 

package executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> world = CompletableFuture.supplyAsync(() -> {
            System.out.println("Run " + Thread.currentThread().getName());
            return "World";
        });

        CompletableFuture.anyOf(hello, world).thenAccept(System.out::println);
    }
}
  • 이렇게 anyOf()를 사용해서 여러 Future를 받고, thenAccept()를 호출해서 둘 중 먼저 실행이 끝난것을 받아 시스템 콘솔에 출력한다. 이때는 어떤게 먼저 실행될지에대한 보장은 없다. 그래서 실행할때마다 실행 결과가 달라진다.

실행 결과

 

예외 상황

이번에는 CompletableFuture를 실행하다가 예외가 발생한 경우 어떻게 다뤄야 하는지 알아보자.

package executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean isError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (isError) {
                throw new IllegalStateException("Oh shit Error!");
            }
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        }).exceptionally(ex -> {
            System.out.println("Exception " + ex.getMessage());
            return "Error!";
        });

        System.out.println(hello.get());
    }
}
  • CompletableFuture로 어떤 작업을 처리하다보면 당연하게도 예외는 발생할 수 있다. 그럴때 예외가 터지면 exceptionally()를 사용해서 해당 예외를 받고 어떤 처리를 할 수 있다.
  • 그래서 이 코드를 실행해보면 다음과 같다.

실행 결과

 

그런데 이제, 이렇게 exceptionally()를 사용해서도 처리할 수 있지만, 이건 완전 예외를 위한거라면 handle()이라는 녀석도 있다.

package executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        boolean isError = true;

        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            if (isError) {
                throw new IllegalStateException("Oh shit Error!");
            }
            System.out.println("Run " + Thread.currentThread().getName());
            return "Hello";
        }).handle((result, error) -> {
            if (error != null) {
                System.out.println(error.getMessage());
                return "Error!";
            }
            return "Hello!";
        });

        System.out.println(hello.get());
    }
}
  • handle()은 정상케이스와 에러케이스 두 개를 모두 다룰 수 있다. 그리고 handle()의 파라미터를 보면 BiFunction이다. 두개를 받아 하나로 반환하는 함수형 인터페이스. 확실히 배우면 배울수록 함수형 인터페이스를 잘 배워놨다는 생각이 든다.

실행 결과

 

 

정리를 하자면

자바에서 멀티쓰레드를 다루는 방법의 아주 대표적인 프레임워크는 Executors 프레임워크다. 이 녀석을 사용해서 쓰레드 풀도 만들고 Callable을 실행하고 Future를 받아 처리한다. 아주 좋지만, Future의 단점 중 하나는 비동기 프로그래밍을 하기가 꽤나 까다롭다는 것이다. 그래서 자바8부터 CompletableFuture가 등장하고 이 녀석을 사용하면 작업이 다 끝난 후 콜백함수를 정의하여 매우 편하게 비동기 프로그래밍을 할 수 있게 됐다. 그에 대한 내용을 살펴보았다.

728x90
반응형
LIST

'JAVA의 가장 기본이 되는 내용' 카테고리의 다른 글

Lombok은 어떻게 동작하는걸까?  (0) 2024.12.01
[Java 8] Optional  (0) 2024.11.27
[Java 8] Stream API  (0) 2024.11.27
[Java 8] 함수형 인터페이스와 람다 표현식  (0) 2024.11.25
애노테이션  (6) 2024.10.21

+ Recent posts