Разработка реактивных приложений с Reactive Streams и Java 8. Часть 1.

Здесь говорится о реактивном стиле программирования на Java 8.

В обычном случае (как писали всегда раньше) участок кода, который обращается к базе или другому внешнему сервису, требующему долгого ожидания, блокировал текущий поток, и для решения проблемы использовалась многопоточность.

В реактивном программировании мы не создаем много потоков, достаточно одного или нескольких. Просто мы возвращаем не данные, а “обещание” данных (Publisher), и эта строка-обещание выполняется мгновенно, мы на ней не зависаем в ожидании данных, а идем дальше. Следовательно, новый поток не нужен, так как текущий поток не блокируется.

Данные же будут получены тогда, когда они будут готовы – произойти это может в этом же самом либо другом потоке. Publisher их сам отдаст, когда они придут. Это так называемая push-модель. Отдача готового элемента – это event. То есть реактивная модель основана на событиях (event-driven).

Статья написана по докладу Brian Clozel, Sébastien Deleuze:

Где использовать Reactive Streams

Рассмотрим ситуации, в которых реактивный код нам очень пригодится.

  • Допустим, надо обратиться к удаленному REST-сервису, и ответ от него приходится ждать 2-5 секунд. Чтобы не блокировать свой поток, используем реактивный подход. При этом нам будет достаточно одного единственного потока, все равно он не будет заблокирован этим обращением к сервису.
Обращение к тормозному удаленному сервису
Обращение к тормозному удаленному сервису
  • Допустим у нас сервис, к которому обращается много клиентов по плохой сети. К примеру, это мобильные клиенты.
Обслужить много клиентов
  • Допустим, есть брокер (вроде RabbitMQ), который испускает сообщения. Нам надо протолкнуть сообщения от него в браузер с помощью Websocket (или Server-Sent Events)

Есть также и другие примеры:

  • Live database queries
  • Mobile (Android UI)
  • Big Data
  • Real-time analitycs
  • HTTP/2

Альтернативы реактивного подхода

Давайте подумаем, можно ли обойтись без реактивного программирования?

Можно использовать callback-функции, например, библиотека Java Swing так реализована (вызывается callback-функция на каждый клик, нажатие кнопки в графическом интерфейсе и т.п. действия пользователя). С Java 8 эти callback-функции можно совместить с lambda-выражениями. Но все, кто имел дело с callback-функциями, знают, что начиная с некоторого уровня вложенности все это трудно читать и поддерживать. Вот пример подобного кода:

То есть это асинхронный код, но читать его трудно.

Вот еще варианты, которые, как показалось бы,  могли бы подойти, но нет:

Reactive streams

За неимением ничего годного понадобились новые инструменты.

Итак, что такое реактивное программирование?

Reactive Streams – это интерфейсы, которые были разработан совместно несколькими компаниями (включая Pivotal, Netflix..).

Реализуют эти интерфейсы библиотеки RxJava 1/2, Reactor Core, Akka Streams. Мы будем рассматривать библиотеку Reactor Core.

Что такое Back-pressure в Reactive Streams

Как уже говорилось, мы возвращаем не объект, а “обещание” объекта – Publisher, который будет отдавать объекты, как только они появятся. Но кому? Subscriber-у – тому, кто подписывается на Publisher; подписчик может быть как один, так или много. Subscriber и получает объекты. Причем подписчик может регулировать скорость потока, это и называется Back-pressure (обратное давление).

Эта концепция была придумана для того, чтобы система имела возможность отказаться от получения того, что не способна обработать прямо сейчас, ведь часто речь идет о Big Data, и если скорость потока слишком велика, возникнет ошибка out-of-memory (иначе говоря, памяти не хватит). Система запрашивает столько элементов, сколько может обработать (при этом push-принцип остается, request просто обозначает возможный в данный момент максимум).

На видео 11:14 все показано в динамике, но на картинке пока запрос первого элемента

Интерфейсы в Reactive Streams

Reactive Streams представляет собой четыре интерфейса:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription { 
    public void request(long n);
    public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Где они

Эти интерфейсы есть в пакете org.reactivestreams, а еще они вошли в Java 9 в пакет java.util.concurrent:

Какую реализацию использовать

Несколько библиотек реализуют эти интерфейсы, ниже они перечислены в таблице по поколениям.

Авторы советуют использовать Core Reactor 3, если вы уже на Java 8, и RxJava 2, если еще вдруг на Java 6.

Что такое Reactor Core 3

  • Это jar-библиотека весом 1 Мб
  • В ней два типа, которые реализуются интерфейс Publisher: Flux и Mono
  • На Reactor Core 3 построен Spring WebFlux

Maven-зависимость

<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core</artifactId>
	<version>3.1.8.RELEASE</version>
</dependency>

Тип Flux

Тип Mono
Тип Mono

Оба типа, Flux и Mono, реализуются интерфейс Publisher.  Они испускают элементы (события). Выше приведены marble-диаграммы.

Шкала времени на них простирается слева направо, а цветные элементы – события. Вертикальной чертой обозначено окончание потока – событие onComplete(), а красным крестом – ошибка onError().

Обычно к событиям применяются операторы, то есть делаются преобразования, они бывают очень сложными.

Для объяснения оператора как раз и используются диаграммы. В середине подписывается оператор, а на нижней шкале отображается, что происходит с исходными событиями.

StepVerifier – для тестирования Publisher

Этот класс создан для тестирования Publisher, но он не входит в пакет reactor core 3, его надо добавить отдельно:

<dependency>
	<groupId>io.projectreactor.addons</groupId>
	<artifactId>reactor-test</artifactId>
	<version>3.0.7.RELEASE</version>
	<scope>test</scope>
</dependency>

Далее мы будем создавать различные Flux и Mono и тестировать с использованием StepVerifier. Все это рекомендуется проделать самостоятельно, здесь находится проект-заготовка с различными заданиями – нужно отредактировать методы с целью получения определенного результата.

Продемонстрируем некоторые из них.

Создание Flux и Mono

Во-первых, создадим Flux, который не испускает элементы, а сразу выдает сигнал завершения:

Flux<String> emptyFlux() {
    return Flux.empty();
}

Протестируем его:

@Test
public void empty() {
    Flux<String> flux = workshop.emptyFlux();

    StepVerifier.create(flux).verifyComplete();
}

Теперь создадим Flux, который испускает элементы “foo” и “bar”:

Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
}

Далее, создадим Flux, который испускает эти же самые элементы из списка:

Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
}

Создадим Flux, который испускает ошибку. Для этого есть специальный сигнал, так как мы не можем просто выбросить исключение – оно может оказаться в другом треде:

Flux<String> errorFlux() {
    return  Flux.error(new IllegalStateException());
}

Наконец, создадим Flux, который испускает числа от 1 до 10 каждые 100 мс:

Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100)).take(10);
}

Теперь аналогичные испускания напишем для Mono.

Mono, испускающий сигнал завершения:

Mono<String> emptyMono() {
    return Mono.empty();
}

Mono, не испускающий никакого сигнала (в том числе сигнала завершения):

Mono<String> monoWithNoSignal() {
    return Mono.never();
}

Тестирование в данном случае будет выглядеть так:

@Test
public void noSignal() {
    Mono<String> mono = workshop.monoWithNoSignal();
    StepVerifier
		.create(mono)
		.expectSubscription()
		.expectNoEvent(Duration.ofSeconds(1))
		.thenCancel()
		.verify();
}

Mono, испускающий значение “foo”:

Mono<String> fooMono() {
    return Mono.just("foo");
}

Mono, испускающий ошибку:

Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
}

Примеры тестирования с помощью StepVerifier

Теперь приведем некоторые примеры тестирования Flux и Mono с использованием класса StepVerifier.

Допустим, надо протестировать тот самый Flux, который испускает элементы “foo” и “bar”. В статический метод StepVerifier.create() передаем этот самый flux, далее ожидаем два элемента и сигнал завершения:

StepVerifier.create(flux).expectNext("foo", "bar").expectComplete().verify();

Завершающий метод в цепочке verify() подписывается на Publisher и проверяет все ожидания.

Следующая проверка тоже проверяет эти два элемента и далее ошибку с RuntimeException:

StepVerifier.create(flux).expectNext("foo", "bar").expectError(RuntimeException.class).verify();

Далее, допустим, наш Flux испускает пользователей (исходный код вы найдете там же по вышеприведенной ссылке):

Flux.just(new User("swhite", null, null), new User("jpinkman", null, null)

И нам надо проверить имена пользователей:

StepVerifier.create(flux)
	.expectNextMatches(user->user.getUsername().equals("swhite"))
	.consumeNextWith(user->  assertEquals(user.getUsername(), "jpinkman"))
	.expectComplete().verify();

Обратите внимание, что имена первого и второго пользователя мы проверяем разными способами – для второго используется внешняя библиотека (JUnit, но могли использовать Assertj или Hamcrest).

Теперь протестируем Flux, которые требуют длительного времени для выполнения. Мы будем ускорять время в тестах.

Итак, допустим наш Flux испускает элементы каждую секунду, всего 10 элементов:

Flux.interval(Duration.ofSeconds(1)).take(10)

Вот такой тест займет 10 секунд:

StepVerifier.create(flux).expectNextCount(10).expectComplete().verify();

А что, если элементы испускаются час, например:

Flux.interval(Duration.ofSeconds(1)).take(3600)

Нам надо ускорить время, для этого в StepVerifier.withVirtualTime() принимаем не Flux, а Supplier:

StepVerifier.withVirtualTime(()->flux)
		.thenAwait(Duration.ofHours(1))
		.expectNextCount(3600)
		.expectComplete()
		.verify();

Последний тест проходит очень быстро.

Трансформации

Мы рассмотрели создание Publisher, но в реальной жизни мы обычно не создаем, а преобразовываем потоки. То есть делаем с ними что-либо, синхронно или асинхронно.

map()

Допустим, есть Mono, испускающий User-а (полный код смотрите на GitHub), и надо привести его к верхнему регистру. Напишем метод, в нем используется преобразование map():

Mono<User> capitalizeOne(Mono<User> mono) {
    return mono.map(user -> new User(user.getUsername().toUpperCase(), 
                                     user.getFirstname().toUpperCase(),
				     user.getLastname().toUpperCase()));
}

Для тестирования там используется реактивный репозиторий (приближенный  к реальной жизни, правда без базы; просто в коде репозитория создаются  пользователи и хранятся там в списке). И все реализованные методы репозитория испускают пользователей из этого списка, но с намеренной задержкой.  Так вот User берется из репозитория:

@Test
public void transformMono() {
	Mono<User> mono = repository.findFirst();
	StepVerifier.create(workshop.capitalizeOne(mono))
		    .expectNext(new User("SWHITE", "SKYLER", "WHITE"))
		    .verifyComplete();
}

И вот сразу интерфейс репозитория, чтобы не лезть в код (подобный же используется в Spring в реактивных репозиториях):

public interface ReactiveRepository<T> {

	Mono<Void> save(Publisher<T> publisher);
	Mono<T> findFirst();
	Flux<T> findAll();
	Mono<T> findById(String id);
}

Для Flux преобразование выглядит аналогично, просто тут испускается несколько элементов, а не один (но это вы можете увидеть в коде теста):

Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(user -> new User(user.getUsername().toUpperCase(), 
                                user.getFirstname().toUpperCase(),
				user.getLastname().toUpperCase()));
}

flatMap()

Теперь представим, что приведение к верхнему регистру требует много времени (например, надо обратиться к внешним сервисам). В таком случае используем асинхронное преобразование flatMap(). В нем каждому пользователю ставится в соответствие не пользователь, а поток, Publisher пользователей, пусть и состоящий из одного элемента (у нас Mono). То есть возвращаем “обещание” пользователя, а не самого пользователя, чтобы не задерживаться на операции. И итоговый поток потоков приводим снова к потоку Flux пользователей:

Flux<User> asyncCapitalizeMany(Flux<User> flux) {
		return flux.flatMap(user -> asyncCapitalizeUser(user));
}

Mono<User> asyncCapitalizeUser(User u) {
    return Mono.just(
		     new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase()));
}

Обратите внимание, здесь метод asyncCapitalizeUser() – это та самая “долгая” операция, возвращающая не пользователя, а поток (Publisher).

Если вы загляните в документацию, то увидите диаграмму оператора flatMap:

flatMap()

Здесь каждый User (круг) превращается поток юзеров (квадраты), и они сливаются на нижней итоговой шкале времени в единый поток квадратов, причем их порядок может меняться (синие и розовые квадраты перемежаются). Но в нашем случае поток юзеров состоит из одного элемента, Mono, так что каждый цветной квадрат представлен только в единственном экземпляре.

Слияние

merge()

Допустим есть два потока: один возвращает двух пользователей из одного репозитория сразу без задержки, а второй – четырех пользователей из другого репозитория с задержкой. Нам надо слить потоки:

Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) {
    return Flux.merge(flux1, flux2);
}

Вот два репозитория (весь код по выше уже приведенной ссылке):

// здесь два пользователя, отдаются сразу
ReactiveRepository<User> repository          = new ReactiveUserRepository(MARIE, MIKE); 
// а зедсь четыре, отдаются с задержкой 500 мс
ReactiveRepository<User> repositoryWithDelay = new ReactiveUserRepository(500);

merge() сольет потоки в итоговый поток, в котором пользователи идут в порядке следования во времени:

@Test
public void mergeWithInterleave() {
	Flux<User> flux = workshop.mergeFluxWithInterleave(repositoryWithDelay.findAll(), repository.findAll());
	StepVerifier.create(flux)
		    .expectNext(MARIE, MIKE, User.SKYLER, User.JESSE, User.WALTER, User.SAUL)
		    .verifyComplete();
}

concat()

Если нужно слить потоки так, чтобы сначала шли пользователи из первого потока, а потом из второго, независимо от того, как они приходят в реальном времени, то используйте concat():

Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) {
    return Flux.concat(flux1, flux2);
}

Проверим:

@Test
public void mergeWithNoInterleave() {
	Flux<User> flux = workshop.mergeFluxWithNoInterleave(repositoryWithDelay.findAll(), repository.findAll());
	StepVerifier.create(flux)
		.expectNext(User.SKYLER, User.JESSE, User.WALTER, User.SAUL, MARIE, MIKE)
		.verifyComplete();
}

С помощью concat() можно слить и потоки Mono:

Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) {
    return Flux.concat(mono1, mono2);
}

Request

Теперь рассмотрим как запрашивать не все элементы сразу.

Вот так запрашиваются все четыре сразу:

StepVerifier requestAllExpectFour(Flux<User> flux) {
    return StepVerifier.create(flux).expectNextCount(4).expectComplete();
}

Теперь запрашиваем и проверяем по одному элементу, и отменяем последние два:

StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) {
    return StepVerifier
			.create(flux, 1)
			.expectNext(User.SKYLER)
			.thenRequest(1)
			.expectNext(User.JESSE)
			.thenCancel();
}

Теперь залогируем испускание элементов из репозитория (метод log()) и посмотрим что происходит:

Flux<User> fluxWithLog() {
   return repository.findAll().log();
}

И теперь смотрим:

@Test
public void experimentWithLog() {
    Flux<User> flux = workshop.fluxWithLog();
    StepVerifier.create(flux, 0)
		.thenRequest(1)
		.expectNextMatches(u -> true)
		.thenRequest(1)
		.expectNextMatches(u -> true)
		.thenRequest(2)
		.expectNextMatches(u -> true)
		.expectNextMatches(u -> true)
		.verifyComplete();
}

Лог:

21:38:26.141 [main] INFO  reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
21:38:26.157 [main] INFO  reactor.Flux.Zip.1 - request(1)
21:38:26.288 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'})
21:38:26.289 [parallel-1] INFO  reactor.Flux.Zip.1 - request(1)
21:38:26.386 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'})
21:38:26.386 [parallel-1] INFO  reactor.Flux.Zip.1 - request(2)
21:38:26.486 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'})
21:38:26.586 [parallel-1] INFO  reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'})
21:38:26.589 [parallel-1] INFO  reactor.Flux.Zip.1 - onComplete()

Как видно, после запроса одного элемента callback-функция onNext() вызывается один раз, а после запроса двух элементов – два раза.

doOnSubcribe(), doOnNext(), doOnComplete()

Давайте напечатаем текст на каждое действие – подписку, получение нового элемента и завершающего сигнала:

Flux<User> fluxWithDoOnPrintln() {
    return repository.findAll()
                     .doOnSubscribe(s->System.out.println("start"))
	             .doOnNext(user->System.out.println(user.getUsername()))
		     .doOnComplete(()->System.out.println("end!"));
}

Результат:

start
swhite
jpinkman
wwhite
sgoodman
end!

Ошибки

Допустим есть поток Mono<User>, испускающий пользователя. Надо вернуть пользователя User.SAUL, если поток испустит ошибку. Иначе просто вернуть исходный поток:

Mono<User> betterCallSaulForBogusMono(Mono<User> mono) {
    return mono.onErrorResume(e -> Mono.just(User.SAUL));
}

Вот как выглядит диаграмма onErrorResume (на картинке otherwise, но его переименовали в onErrorResume):onErrrorResume

Протестируем метод, испуская пользователя с ошибкой и без:

@Test
public void monoWithValueInsteadOfError() {
    Mono<User> mono = workshop.betterCallSaulForBogusMono(Mono.error(new IllegalStateException()));
    StepVerifier.create(mono)
			    .expectNext(User.SAUL)
		            .verifyComplete();

    mono = workshop.betterCallSaulForBogusMono(Mono.just(User.SKYLER));
    StepVerifier.create(mono)
			    .expectNext(User.SKYLER)
		            .verifyComplete();
}

Та же самая задача для Flux:

Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) {
    return flux.onErrorResume(e -> Flux.just(User.SAUL, User.JESSE));
}

Теперь рассмотрим случай, когда наш поток выбрасывает исключение. Например, операция приведения в верхнему регистру такова, что выбрасывает исключение:

User capitalizeUser(User user) throws GetOutOfHereException {
    if (user.equals(User.SAUL)) {
       throw new GetOutOfHereException();
    }
    return new User(user.getUsername(), user.getFirstname(), user.getLastname());
}

protected final class GetOutOfHereException extends Exception {
}

А мы ее используем в map():

Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(user -> {
		try {
			return capitalizeUser(user);
		} catch (GetOutOfHereException e) {
			throw Exceptions.propagate(e);
		}
	});
    }

как видите, операцию пришлось окружить блоком try-catch, но не простым. Вместо обычного проброса исключения мы сделали:

throw Exceptions.propagate(e);

Здесь класс Exceptions принадлежит пакету reactor.core. Он специально написан для того, чтобы поток испускал законный сигнал об ошибке, который можно проверить с помощью StepVerifier (а не просто выбрасывал исключение):

@Test
public void handleCheckedExceptions() {
    Flux<User> flux = workshop.capitalizeMany(Flux.just(User.SAUL, User.JESSE));

    StepVerifier.create(flux)
			    .verifyError(Part07Errors.GetOutOfHereException.class);
}

Другие операторы

zip()

Допустим, имеется три потока Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux. Нам надо из этого составить поток Flux<User>:

Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) {
    return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux)
	       .map(t->new User(t.getT1(),t.getT2(),t.getT3()));
}

Вот диаграмма оператора:

first()

Надо вернуть Mono, который испускает значение быстрее:

Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) {
    return Mono.first(mono1, mono2);
}

Протестируем. Как уже говорилось, у нас репозиторий, для которого можно задать задержку отдачи элемента:

@Test
public void fastestMono() {
    ReactiveRepository<User> repository = new ReactiveUserRepository(MARIE);
    ReactiveRepository<User> repositoryWithDelay = new ReactiveUserRepository(250, MIKE);
    Mono<User> mono = workshop.useFastestMono(repository.findFirst(), repositoryWithDelay.findFirst());
    StepVerifier.create(mono)
		.expectNext(MARIE)
	        .verifyComplete();

    repository = new ReactiveUserRepository(250, MARIE);
    repositoryWithDelay = new ReactiveUserRepository(MIKE);
    mono = workshop.useFastestMono(repository.findFirst(), repositoryWithDelay.findFirst());
    StepVerifier.create(mono)
		.expectNext(MIKE)
		.verifyComplete();
}

then()

Преобразовать Flux в Mono, который испускает сигнал завершения тогда, когда приходит сигнал завершения в Flux:

Mono<Void> fluxCompletion(Flux<User> flux) {
    return flux.then();
}

Тест:

@Test
public void complete() {
    ReactiveRepository<User> repository = new ReactiveUserRepository();
    PublisherProbe<User> probe = PublisherProbe.of(repository.findAll());
    Mono<Void> completion = workshop.fluxCompletion(probe.flux());
    StepVerifier.create(completion)
				.verifyComplete();
    probe.assertWasRequested();
}

justOrEmpty()

Выдает Mono<User>, если пользователь не null, иначе сигнал завершения:

Mono<User> nullAwareUserToMono(User user) {
    return Mono.justOrEmpty(user);
}

switchIfEmpty()

Выдает элемент User.SKYLER, если входной Mono пустой:

Mono<User> emptyToSkyler(Mono<User> mono) {
    return mono.switchIfEmpty(Mono.just(User.SKYLER));
}

Протестируем:

@Test
public void emptyHandling() {
    Mono<User> mono = workshop.emptyToSkyler(Mono.just(User.WALTER));
    StepVerifier.create(mono)
			     .expectNext(User.WALTER)
			     .verifyComplete();
    mono = workshop.emptyToSkyler(Mono.empty());
    StepVerifier.create(mono)
			     .expectNext(User.SKYLER)
			     .verifyComplete();
}

Совместимость с другими библиотеками

Здесь рассматривается, как преобразовать к Reactor типы RxJava и обратно и т.п. совместимость. Проще сразу показать примеры:

	// TODO Adapt Flux to RxJava Flowable
	Flowable<User> fromFluxToFlowable(Flux<User> flux) {
		return Flowable.fromPublisher(flux);
	}

	// TODO Adapt RxJava Flowable to Flux
	Flux<User> fromFlowableToFlux(Flowable<User> flowable) {
		return Flux.from(flowable);
	}

	// ========================================================================================

	// TODO Adapt Flux to RxJava Observable
	Observable<User> fromFluxToObservable(Flux<User> flux) {
		return Observable.fromPublisher(flux);
	}

	// TODO Adapt RxJava Observable to Flux
	Flux<User> fromObservableToFlux(Observable<User> observable) {
		return Flux.from(observable.toFlowable(BackpressureStrategy.BUFFER));
	}

	// ========================================================================================

	// TODO Adapt Mono to RxJava Single
	Single<User> fromMonoToSingle(Mono<User> mono) {
		return Observable.fromPublisher(mono).firstOrError();
	}

	// TODO Adapt RxJava Single to Mono
	Mono<User> fromSingleToMono(Single<User> single) {
		return Mono.from(single.toFlowable());
	}

	// ========================================================================================

	// TODO Adapt Mono to Java 8+ CompletableFuture
	CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) {
		return mono.toFuture();
	}

	// TODO Adapt Java 8+ CompletableFuture to Mono
	Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) {
		return Mono.fromFuture(future);
	}

Получение обычных значений из потоков

Рассмотрим, как заблокировать текущий поток и вытащить значение из него:

User monoToValue(Mono<User> mono) {
    return mono.block();
}

Теперь то же самое для Flux:

Iterable<User> fluxToValues(Flux<User> flux) {
    return flux.toIterable();
}

=========================================================

Вторая часть тут.

Примеры приложений на реактивном стеке можно посмотреть еще в этой статье.

Разработка реактивных приложений с Reactive Streams и Java 8. Часть 1.: 7 комментариев

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *