CompletableFuture – руководство с примерами

Класс CompletableFuture  входит в пакет java.util.concurrent и дополняет интерфейс Future. Упрощает работу с потоками.
Это свободный перевод статьи.

Создание CompletableFuture

Примитивный пример

Самый простой способ создать класс:

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

Чтобы получить результат вычислений, можно использовать блокирующий метод get(). Как и в Future, он блокирует текущий тред до тех пор, пока результат не будет готов. А поскольку он у нас и не готовится, в нашем примере данная строка просто заблокирует поток:

String result = completableFuture.get()

Но можно и вручную завершить выполнение:

completableFuture.complete("Future's Result")

Все клиенты, ожидающие результата вычислений, получат результат. Дальнейшие вызовы completableFuture.complete() будут проигнорированы.

Запуск параллельного потока без получения результата – метод runAsync()

С помощью runAsync() можно запустить в отдельном треде задачу, для которой не требуется возвращать результат. Будет возвращен CompletableFuture<Void>.

В примере мы просто ждем секунду и печатаем значение, но в отдельном треде. А в основном треде блокируем код до получения “результата”, но поскольку тип результата Void, то просто ждем завершения:

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

Результат:

I'll run in a separate thread than the main thread.

Если убрать последнюю строчку, то ничего не напечатается, поскольку программа завершится, не дожидаясь завершения параллельного потока (он же не демон).

То же самое с использованием лямбда-выражения:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

Запуск параллельного потока с получением результата – метод  supplyAsync()

А теперь вернем результат из параллельного треда. Для этого надо использовать метод supplyAsync():

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Результат:

Result of the asynchronous computation

Callback-функции в CompletableFuture

Метод get() блокирует текущий поток. Обычно нам требуется другое. Надо не блокировать текущий поток для получения значения, а задать функцию, которая сделает что-то со значением сразу после того, как оно будет вычислено, в том же параллельном потоке. Так называемую callback-функцию.

thenApply()

Допустим, нам надо закончить вычисления в supplyAsync() и далее сделать что-то с результатом, не блокируя текущие поток. Вот supplyAsync():

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   System.out.println(Thread.currentThread().getName());
   return "Rajeev";
});

Возвращает она строку. Нам надо по окончании наших фиктивных вычислений длительностью 1 секунда присоединить к результату “Rajeev” приветствие “hello”.

После thenApply() мы печатаем строку “go further”, чтобы убедиться, что thenApply() не блокирует код. Только в конце мы блокируем код, чтобы получить конечный результат методом get() и вывести его:

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
    System.out.println(Thread.currentThread().getName());    
    return "Hello " + name;
});
System.out.println(Thread.currentThread().getName()+" "+go further");

// Block and wait for the future to complete
System.out.println(Thread.currentThread().getName()+" "+greetingFuture.get());

Обратите внимание, что везде мы выводим имя потока. В консоли у нас такой результат:

main go further
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main Hello Rajeev, Welcome to the CalliCoder Blog

Обратите внимание, что функция внутри supplyAsync() и заданный для нее коллбэк внутри thenApply() не блокируют текущий поток.

Можно сделать целую цепочку методов thenApply():

CompletableFuture<String> welcomeText  = CompletableFuture.supplyAsync(() -> {
	try {
		TimeUnit.SECONDS.sleep(1);
	} catch (InterruptedException e) {
		throw new IllegalStateException(e);
	}
	System.out.println(Thread.currentThread().getName());
	return "Rajeev";
}).thenApply(name -> {
	System.out.println(Thread.currentThread().getName());
	return "Hello " + name;
		
}).thenApply(greeting -> {
	System.out.println(Thread.currentThread().getName());
	return greeting + ", Welcome to the CalliCoder Blog";
});
System.out.println(Thread.currentThread().getName()+" go further");
		
// Block and wait for the future to complete
System.out.println(Thread.currentThread().getName()+" "+welcomeText .get());

Результат:

main go further
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
main Hello Rajeev, Welcome to the CalliCoder Blog

thenRun() и thenAccept()

Если из callback-функции значение возвращать не надо, а надо просто что-то в ней сделать, то подойдут методы thenRun() и thenAccept(). Они возвращают CompletableFuture<Void>.

Пример:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
	return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
	System.out.println("Got product detail from remote service " + product.getName())
});

Тут сервис выдает продукт (ProductDetail) и когда ProductDetail получен, мы печатаем имя продукта.

thenAccept() имеет доступ к тому, что получено в supplyAsync(), а вот thenAccept() уже результат не возвращает.

thenRun() отличается от thenAccept() тем, что даже не имеет доступа к тому, что вычислил supplyAsync():

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

Комбинация двух CompletableFutures 

 thenCompose() для двух зависимых друг от друга CompletableFutures 

Допустим, мы хотим получить данные пользователя с помощью удаленного сервиса API, а потом получить данные его кредитной карты с помощью другого сервиса.

У нас есть соответствующие сервисы:

CompletableFuture<User> getUsersDetail(String userId) {
	return CompletableFuture.supplyAsync(() -> {
		UserService.getUserDetails(userId);
	});	
}

CompletableFuture<Double> getCreditRating(User user) {
	return CompletableFuture.supplyAsync(() -> {
		CreditRatingService.getCreditRating(user);
	});
}

Логично применить thenApply(), как и раньше. Но. В предыдущих примерах callback-функция, которая запускалась в  thenApply(), возвращала обычное значение, а не CompletableFuture. А теперь у нас эта callback-функция возвращает CompletableFuture (сервис getCreditRating() – нам же его надо вызвать):

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

Таким образом конечный результат – это вложенный CompletableFuture.

Если же мы хотим избавиться от вложенности, то надо использовать метод thenCompose():

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));
Короче говоря, метод thenCompose() надо использовать, когда callback-функция возвращает не простое значение, а CompletableFuture, а надо избавиться от вложенности.

thenCombine() для двух независимых друг от друга CompletableFutures 

С помощью thenCompose() мы комбинировали два Future, в которых один зависел от другого.

thenCombine() используется, когда надо запустить два независимых Future и сделать нечто после того, как оба они завершатся.

Вот пример. Первый CompletableFuture возвращает вес, второй – рост. Когда оба значения вычислятся, мы рассчитываем индекс bmi. Он вычисляется в callback-функции, переданной в метод thenCombine():

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

Комбинация нескольких CompletableFutures

Пока что мы комбинировали два CompletableFuture. А что если их много? Для этого есть методы:

static CompletableFuture<Void>	 allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

CompletableFuture.allOf()

Допустим, есть список независимых CompletableFuture. Мы хотим их запустить параллельно, а по окончании выполнения их всех сделать кое-что еще. Вот тут то и пригодится CompletableFuture.allOf().

Допустим, мы хотим загрузить 100 веб-страниц. Загружать мы их хотим параллельно, то есть каждая страница грузится асинхронно:

CompletableFuture<String> downloadWebPage(String pageLink) {
	return CompletableFuture.supplyAsync(() -> {
		// Code to download and return the web page's content
	});
}

А когда все страницы загрузятся, мы хотим посчитать количество страниц, текст которых содержит слово “CompletableFuture”

List<String> webPageLinks = Arrays.asList(...)	// A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

Единственная проблема состоит в том, что CompletableFuture.allOf() возвращает тип CompletableFuture<Void>. Но эту проблему можно решить, написав несколько дополнительных строк кода:

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

Посмотрите на код выше. Поскольку мы вызываем future.join() тогда, когда все страницы уже будут закачаны, ничего не блокируется.

Метод join() такой же, как get(): единственное отличие в том, что он выбрасывает  unchecked exception (если его выбросил CompletableFuture).

Теперь подсчитаем число страниц, содержащих наше слово:

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

CompletableFuture.anyOf()

Метод CompletableFuture.anyOf() возвращает CompletableFuture, который завершается сразу же, как только завершается первый из переданных ему в качестве аргументов  CompletableFuture, и возвращает этот же самый результат, что и первый завершившийся CompletableFuture.

В примере первым завершается future2:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

Так что конечный результат:

 Result of Future 2

Метод CompletableFuture.anyOf() принимает varargs, а возвращает CompletableFuture<Object>. Проблема в том, что все аргументы –CompletableFuture могут возвращать разный тип, и конечный тип неизвестен.

Обработка исключений CompletableFuture

Рассмотрим, что будет, если в какой-то из функций возникнет исключение:

CompletableFuture.supplyAsync(() -> {
	// Code which might throw an exception
	return "Some result";
}).thenApply(result -> {
	return "processed result";
}).thenApply(result -> {
	return "result after further processing";
}).thenAccept(result -> {
	// do something with the final result
});

Если оно возникло в supplyAsync(), то никакой из колбэков вызван не будет.

Если оно возникло в первом thenApply(), то второй (и дальнейшие thenApply()) вызваны не будут. И так далее.

Обработка исключений с помощью колбэка exceptionally() 

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get());

Здесь в supplyAsync() возникает исключение и сразу выполняется колбэк из exceptionally(), в котором можно вернуть значение по умолчанию.

Обработка исключений с помощью handle() 

Коллбэк в handle() вызывается независимо от того, возникло ли исключение:

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

Если исключение возникает, то аргумент res равен null, если же нет – то ex равен null.

Итог

Код примеров частично доступен на GitHub (код к первой части статьи, сервисов обращения к репозиторию там нет).

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *