CompletableFuture – руководство с примерами

Класс CompletableFuture  входит в пакет java.util.concurrent и дополняет интерфейс Future. Упрощает работу с потоками.
Это свободный перевод статьи.

Создание CompletableFuture

Примитивный пример

Самый простой способ создать класс:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

Чтобы получить результат вычислений, можно использовать блокирующий метод get(). Как и в Future, он блокирует текущий тред до тех пор, пока результат не будет готов. А поскольку он у нас и не готовится, в нашем примере данная строка просто заблокирует поток:

String result = completableFuture.get()

Но можно и вручную завершить выполнение:

completableFuture.complete("Future's Result")

Все клиенты, ожидающие результата вычислений, получат результат. Дальнейшие вызовы completableFuture.complete() будут проигнорированы.

Запуск параллельного потока без получения результата – метод runAsync()

С помощью runAsync() можно запустить в отдельном треде задачу, для которой не требуется возвращать результат. Будет возвращен CompletableFuture<Void>.

В примере мы просто ждем секунду и печатаем значение, но в отдельном треде. А в основном треде блокируем код до получения “результата”, но поскольку тип результата Void, то просто ждем завершения:

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

Результат:

I'll run in a separate thread than the main thread.

Если убрать последнюю строчку, то ничего не напечатается, поскольку программа завершится, не дожидаясь завершения параллельного потока (он же не демон).

То же самое с использованием лямбда-выражения:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

Запуск параллельного потока с получением результата – метод  supplyAsync()

А теперь вернем результат из параллельного треда. Для этого надо использовать метод supplyAsync():

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Результат:

Result of the asynchronous computation

Callback-функции в CompletableFuture

Метод get() блокирует текущий поток. Обычно нам требуется другое. Надо не блокировать текущий поток для получения значения, а задать функцию, которая сделает что-то со значением сразу после того, как оно будет вычислено, в том же параллельном потоке. Так называемую callback-функцию.

thenApply()

Допустим, нам надо закончить вычисления в supplyAsync() и далее сделать что-то с результатом, не блокируя текущие поток. Вот supplyAsync():

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   System.out.println(Thread.currentThread().getName());
   return "Rajeev";
});

Возвращает она строку. Нам надо по окончании наших фиктивных вычислений длительностью 1 секунда присоединить к результату “Rajeev” приветствие “hello”.

После thenApply() мы печатаем строку “go further”, чтобы убедиться, что thenApply() не блокирует код. Только в конце мы блокируем код, чтобы получить конечный результат методом get() и вывести его:

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
    System.out.println(Thread.currentThread().getName());    
    return "Hello " + name;
});
System.out.println(Thread.currentThread().getName()+" "+go further");

// Block and wait for the future to complete
System.out.println(Thread.currentThread().getName()+" "+greetingFuture.get());

Обратите внимание, что везде мы выводим имя потока. В консоли у нас такой результат:

main go further
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main Hello Rajeev

Обратите внимание, что функция внутри supplyAsync() и заданный для нее коллбэк внутри thenApply() не блокируют текущий поток.

Можно сделать целую цепочку методов thenApply():

CompletableFuture<String> welcomeText  = CompletableFuture.supplyAsync(() -> {
	try {
		TimeUnit.SECONDS.sleep(1);
	} catch (InterruptedException e) {
		throw new IllegalStateException(e);
	}
	System.out.println(Thread.currentThread().getName());
	return "Rajeev";
}).thenApply(name -> {
	System.out.println(Thread.currentThread().getName());
	return "Hello " + name;
		
}).thenApply(greeting -> {
	System.out.println(Thread.currentThread().getName());
	return greeting + ", Welcome to the CalliCoder Blog";
});
System.out.println(Thread.currentThread().getName()+" go further");
		
// Block and wait for the future to complete
System.out.println(Thread.currentThread().getName()+" "+welcomeText .get());

Результат:

main go further
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main Hello Rajeev, Welcome to the CalliCoder Blog

thenRun() и thenAccept()

Если из callback-функции значение возвращать не надо, а надо просто что-то в ней сделать, то подойдут методы thenRun() и thenAccept(). Они возвращают CompletableFuture<Void>.

Пример:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
	return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
	System.out.println("Got product detail from remote service " + product.getName())
});

Тут сервис выдает продукт (ProductDetail) и когда ProductDetail получен, мы печатаем имя продукта.

thenAccept() имеет доступ к тому, что получено в supplyAsync(), а вот thenAccept() уже результат не возвращает.

thenRun() отличается от thenAccept() тем, что даже не имеет доступа к тому, что вычислил supplyAsync():

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

Комбинация двух CompletableFutures 

 thenCompose() для двух зависимых друг от друга CompletableFutures 

Допустим, мы хотим получить данные пользователя с помощью удаленного сервиса API, а потом получить данные его кредитной карты с помощью другого сервиса.

У нас есть соответствующие сервисы:

CompletableFuture<User> getUsersDetail(String userId) {
	return CompletableFuture.supplyAsync(() -> {
		UserService.getUserDetails(userId);
	});	
}

CompletableFuture<Double> getCreditRating(User user) {
	return CompletableFuture.supplyAsync(() -> {
		CreditRatingService.getCreditRating(user);
	});
}

Логично применить thenApply(), как и раньше. Но. В предыдущих примерах callback-функция, которая запускалась в  thenApply(), возвращала обычное значение, а не CompletableFuture. А теперь у нас эта callback-функция возвращает CompletableFuture (сервис getCreditRating() – нам же его надо вызвать):

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

Таким образом конечный результат – это вложенный CompletableFuture.

Если же мы хотим избавиться от вложенности, то надо использовать метод thenCompose():

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));
Короче говоря, метод thenCompose() надо использовать, когда callback-функция возвращает не простое значение, а CompletableFuture, а надо избавиться от вложенности.

thenCombine() для двух независимых друг от друга CompletableFutures 

С помощью thenCompose() мы комбинировали два Future, в которых один зависел от другого.

thenCombine() используется, когда надо запустить два независимых Future и сделать нечто после того, как оба они завершатся.

Вот пример. Первый CompletableFuture возвращает вес, второй – рост. Когда оба значения вычислятся, мы рассчитываем индекс bmi. Он вычисляется в callback-функции, переданной в метод thenCombine():

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

Комбинация нескольких CompletableFutures

Пока что мы комбинировали два CompletableFuture. А что если их много? Для этого есть методы:

static CompletableFuture<Void>	 allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

CompletableFuture.allOf()

Допустим, есть список независимых CompletableFuture. Мы хотим их запустить параллельно, а по окончании выполнения их всех сделать кое-что еще. Вот тут то и пригодится CompletableFuture.allOf().

Допустим, мы хотим загрузить 100 веб-страниц. Загружать мы их хотим параллельно, то есть каждая страница грузится асинхронно:

CompletableFuture<String> downloadWebPage(String pageLink) {
	return CompletableFuture.supplyAsync(() -> {
		// Code to download and return the web page's content
	});
}

А когда все страницы загрузятся, мы хотим посчитать количество страниц, текст которых содержит слово “CompletableFuture”

List<String> webPageLinks = Arrays.asList(...)	// A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

Единственная проблема состоит в том, что CompletableFuture.allOf() возвращает тип CompletableFuture<Void>. Но эту проблему можно решить, написав несколько дополнительных строк кода:

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

Посмотрите на код выше. Поскольку мы вызываем future.join() тогда, когда все страницы уже будут закачаны, ничего не блокируется.

Метод join() такой же, как get(): единственное отличие в том, что он выбрасывает  unchecked exception (если его выбросил CompletableFuture).

Теперь подсчитаем число страниц, содержащих наше слово:

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

CompletableFuture.anyOf()

Метод CompletableFuture.anyOf() возвращает CompletableFuture, который завершается сразу же, как только завершается первый из переданных ему в качестве аргументов  CompletableFuture, и возвращает этот же самый результат, что и первый завершившийся CompletableFuture.

В примере первым завершается future2:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

Так что конечный результат:

 Result of Future 2

Метод CompletableFuture.anyOf() принимает varargs, а возвращает CompletableFuture<Object>. Проблема в том, что все аргументы –CompletableFuture могут возвращать разный тип, и конечный тип неизвестен.

Обработка исключений CompletableFuture

Рассмотрим, что будет, если в какой-то из функций возникнет исключение:

CompletableFuture.supplyAsync(() -> {
	// Code which might throw an exception
	return "Some result";
}).thenApply(result -> {
	return "processed result";
}).thenApply(result -> {
	return "result after further processing";
}).thenAccept(result -> {
	// do something with the final result
});

Если оно возникло в supplyAsync(), то никакой из колбэков вызван не будет.

Если оно возникло в первом thenApply(), то второй (и дальнейшие thenApply()) вызваны не будут. И так далее.

Обработка исключений с помощью колбэка exceptionally() 

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get());

Здесь в supplyAsync() возникает исключение и сразу выполняется колбэк из exceptionally(), в котором можно вернуть значение по умолчанию.

Обработка исключений с помощью handle() 

Коллбэк в handle() вызывается независимо от того, возникло ли исключение:

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

Если исключение возникает, то аргумент res равен null, если же нет – то ex равен null.

Итог

Код примеров частично доступен на GitHub (код к первой части статьи, сервисов обращения к репозиторию там нет).

CompletableFuture – руководство с примерами: 2 комментария

  1. main go further
    ForkJoinPool.commonPool-worker-1
    ForkJoinPool.commonPool-worker-1
    main Hello Rajeev, Welcome to the CalliCoder Blog

    Откуда берётся ” Welcome to the CalliCoder Blog” в самом первом примере thenApply()? Откуда берётся в цепочке thenApply() понятно)

    БОЛЬШОЕ спасибо за ваш труд!

    1. Конечно его там быть не должно (и нет). Спасибо за внимательность. Исправлено.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *