Класс CompletableFuture входит в пакет java.util.concurrent и дополняет интерфейс Future. Упрощает работу с потоками.
Это свободный перевод статьи.
Создание CompletableFuture
Примитивный пример
Самый простой способ создать класс:
CompletableFuture<String> completableFuture = new CompletableFuture<String>();
Чтобы получить результат вычислений, можно использовать блокирующий метод get(). Как и в Future, он блокирует текущий тред до тех пор, пока результат не будет готов. А поскольку он у нас и не готовится, в нашем примере данная строка просто заблокирует поток:
String result = completableFuture.get()
Но можно и вручную завершить выполнение:
completableFuture.complete("Future's Result")
Все клиенты, ожидающие результата вычислений, получат результат. Дальнейшие вызовы completableFuture.complete() будут проигнорированы.
Запуск параллельного потока без получения результата — метод runAsync()
С помощью runAsync() можно запустить в отдельном треде задачу, для которой не требуется возвращать результат. Будет возвращен CompletableFuture<Void>.
В примере мы просто ждем секунду и печатаем значение, но в отдельном треде. А в основном треде блокируем код до получения «результата», но поскольку тип результата Void, то просто ждем завершения:
// Run a task specified by a Runnable Object asynchronously. CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() { @Override public void run() { // Simulate a long-running Job try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("I'll run in a separate thread than the main thread."); } }); // Block and wait for the future to complete future.get()
Результат:
I'll run in a separate thread than the main thread.
Если убрать последнюю строчку, то ничего не напечатается, поскольку программа завершится, не дожидаясь завершения параллельного потока (он же не демон).
То же самое с использованием лямбда-выражения:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // Simulate a long-running Job try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("I'll run in a separate thread than the main thread."); });
Запуск параллельного потока с получением результата — метод supplyAsync()
А теперь вернем результат из параллельного треда. Для этого надо использовать метод supplyAsync():
// Using Lambda Expression CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; }); // Block and get the result of the Future String result = future.get(); System.out.println(result);
Результат:
Result of the asynchronous computation
Callback-функции в CompletableFuture
Метод get() блокирует текущий поток. Обычно нам требуется другое. Надо не блокировать текущий поток для получения значения, а задать функцию, которая сделает что-то со значением сразу после того, как оно будет вычислено, в том же параллельном потоке. Так называемую callback-функцию.
thenApply()
Допустим, нам надо закончить вычисления в supplyAsync() и далее сделать что-то с результатом, не блокируя текущие поток. Вот supplyAsync():
// Create a CompletableFuture CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println(Thread.currentThread().getName()); return "Rajeev"; });
Возвращает она строку. Нам надо по окончании наших фиктивных вычислений длительностью 1 секунда присоединить к результату «Rajeev» приветствие «hello».
После thenApply() мы печатаем строку «go further», чтобы убедиться, что thenApply() не блокирует код. Только в конце мы блокируем код, чтобы получить конечный результат методом get() и вывести его:
// Attach a callback to the Future using thenApply() CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> { System.out.println(Thread.currentThread().getName()); return "Hello " + name; }); System.out.println(Thread.currentThread().getName()+" "+go further"); // Block and wait for the future to complete System.out.println(Thread.currentThread().getName()+" "+greetingFuture.get());
Обратите внимание, что везде мы выводим имя потока. В консоли у нас такой результат:
main go further ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Hello Rajeev
Обратите внимание, что функция внутри supplyAsync() и заданный для нее коллбэк внутри thenApply() не блокируют текущий поток.
Можно сделать целую цепочку методов thenApply():
CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println(Thread.currentThread().getName()); return "Rajeev"; }).thenApply(name -> { System.out.println(Thread.currentThread().getName()); return "Hello " + name; }).thenApply(greeting -> { System.out.println(Thread.currentThread().getName()); return greeting + ", Welcome to the CalliCoder Blog"; }); System.out.println(Thread.currentThread().getName()+" go further"); // Block and wait for the future to complete System.out.println(Thread.currentThread().getName()+" "+welcomeText .get());
Результат:
main go further ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Hello Rajeev, Welcome to the CalliCoder Blog
thenRun() и thenAccept()
Если из callback-функции значение возвращать не надо, а надо просто что-то в ней сделать, то подойдут методы thenRun() и thenAccept(). Они возвращают CompletableFuture<Void>.
Пример:
// thenAccept() example CompletableFuture.supplyAsync(() -> { return ProductService.getProductDetail(productId); }).thenAccept(product -> { System.out.println("Got product detail from remote service " + product.getName()) });
Тут сервис выдает продукт (ProductDetail) и когда ProductDetail получен, мы печатаем имя продукта.
thenAccept() имеет доступ к тому, что получено в supplyAsync(), а вот thenAccept() уже результат не возвращает.
thenRun() отличается от thenAccept() тем, что даже не имеет доступа к тому, что вычислил supplyAsync():
// thenRun() example CompletableFuture.supplyAsync(() -> { // Run some computation }).thenRun(() -> { // Computation Finished. });
Комбинация двух CompletableFutures
thenCompose() для двух зависимых друг от друга CompletableFutures
Допустим, мы хотим получить данные пользователя с помощью удаленного сервиса API, а потом получить данные его кредитной карты с помощью другого сервиса.
У нас есть соответствующие сервисы:
CompletableFuture<User> getUsersDetail(String userId) { return CompletableFuture.supplyAsync(() -> { UserService.getUserDetails(userId); }); } CompletableFuture<Double> getCreditRating(User user) { return CompletableFuture.supplyAsync(() -> { CreditRatingService.getCreditRating(user); }); }
Логично применить thenApply(), как и раньше. Но. В предыдущих примерах callback-функция, которая запускалась в thenApply(), возвращала обычное значение, а не CompletableFuture. А теперь у нас эта callback-функция возвращает CompletableFuture (сервис getCreditRating() — нам же его надо вызвать):
CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId) .thenApply(user -> getCreditRating(user));
Таким образом конечный результат — это вложенный CompletableFuture.
Если же мы хотим избавиться от вложенности, то надо использовать метод thenCompose():
CompletableFuture<Double> result = getUserDetail(userId) .thenCompose(user -> getCreditRating(user));
thenCombine() для двух независимых друг от друга CompletableFutures
С помощью thenCompose() мы комбинировали два Future, в которых один зависел от другого.
thenCombine() используется, когда надо запустить два независимых Future и сделать нечто после того, как оба они завершатся.
Вот пример. Первый CompletableFuture возвращает вес, второй — рост. Когда оба значения вычислятся, мы рассчитываем индекс bmi. Он вычисляется в callback-функции, переданной в метод thenCombine():
System.out.println("Retrieving weight."); CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0; }); System.out.println("Retrieving height."); CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8; }); System.out.println("Calculating BMI."); CompletableFuture<Double> combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> { Double heightInMeter = heightInCm/100; return weightInKg/(heightInMeter*heightInMeter); }); System.out.println("Your BMI is - " + combinedFuture.get());
Комбинация нескольких CompletableFutures
Пока что мы комбинировали два CompletableFuture. А что если их много? Для этого есть методы:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
CompletableFuture.allOf()
Допустим, есть список независимых CompletableFuture. Мы хотим их запустить параллельно, а по окончании выполнения их всех сделать кое-что еще. Вот тут то и пригодится CompletableFuture.allOf().
Допустим, мы хотим загрузить 100 веб-страниц. Загружать мы их хотим параллельно, то есть каждая страница грузится асинхронно:
CompletableFuture<String> downloadWebPage(String pageLink) { return CompletableFuture.supplyAsync(() -> { // Code to download and return the web page's content }); }
А когда все страницы загрузятся, мы хотим посчитать количество страниц, текст которых содержит слово «CompletableFuture»
List<String> webPageLinks = Arrays.asList(...) // A list of 100 web page links // Download contents of all the web pages asynchronously List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture<Void> allFutures = CompletableFuture.allOf( pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) );
Единственная проблема состоит в том, что CompletableFuture.allOf() возвращает тип CompletableFuture<Void>. Но эту проблему можно решить, написав несколько дополнительных строк кода:
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList()); });
Посмотрите на код выше. Поскольку мы вызываем future.join() тогда, когда все страницы уже будут закачаны, ничего не блокируется.
Метод join() такой же, как get(): единственное отличие в том, что он выбрасывает unchecked exception (если его выбросил CompletableFuture).
Теперь подсчитаем число страниц, содержащих наше слово:
// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> { return pageContents.stream() .filter(pageContent -> pageContent.contains("CompletableFuture")) .count(); }); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
CompletableFuture.anyOf()
Метод CompletableFuture.anyOf() возвращает CompletableFuture, который завершается сразу же, как только завершается первый из переданных ему в качестве аргументов CompletableFuture, и возвращает этот же самый результат, что и первый завершившийся CompletableFuture.
В примере первым завершается future2:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 1"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 2"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 3"; }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); System.out.println(anyOfFuture.get()); // Result of Future 2
Так что конечный результат:
Result of Future 2
Метод CompletableFuture.anyOf() принимает varargs, а возвращает CompletableFuture<Object>. Проблема в том, что все аргументы —CompletableFuture могут возвращать разный тип, и конечный тип неизвестен.
Обработка исключений CompletableFuture
Рассмотрим, что будет, если в какой-то из функций возникнет исключение:
CompletableFuture.supplyAsync(() -> { // Code which might throw an exception return "Some result"; }).thenApply(result -> { return "processed result"; }).thenApply(result -> { return "result after further processing"; }).thenAccept(result -> { // do something with the final result });
Если оно возникло в supplyAsync(), то никакой из колбэков вызван не будет.
Если оно возникло в первом thenApply(), то второй (и дальнейшие thenApply()) вызваны не будут. И так далее.
Обработка исключений с помощью колбэка exceptionally()
Integer age = -1; CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; } }).exceptionally(ex -> { System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; }); System.out.println("Maturity : " + maturityFuture.get());
Здесь в supplyAsync() возникает исключение и сразу выполняется колбэк из exceptionally(), в котором можно вернуть значение по умолчанию.
Обработка исключений с помощью handle()
Коллбэк в handle() вызывается независимо от того, возникло ли исключение:
Integer age = -1; CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; } }).handle((res, ex) -> { if(ex != null) { System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; } return res; }); System.out.println("Maturity : " + maturityFuture.get());
Если исключение возникает, то аргумент res равен null, если же нет — то ex равен null.
Итог
Код примеров частично доступен на GitHub (код к первой части статьи, сервисов обращения к репозиторию там нет).
main go further
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main Hello Rajeev, Welcome to the CalliCoder Blog
Откуда берётся » Welcome to the CalliCoder Blog» в самом первом примере thenApply()? Откуда берётся в цепочке thenApply() понятно)
БОЛЬШОЕ спасибо за ваш труд!
Конечно его там быть не должно (и нет). Спасибо за внимательность. Исправлено.