Здесь говорится о реактивном стиле программирования на Java 8.
В обычном случае (как писали всегда раньше) участок кода, который обращается к базе или другому внешнему сервису, требующему долгого ожидания, блокировал текущий поток, и для решения проблемы использовалась многопоточность.
В реактивном программировании мы не создаем много потоков, достаточно одного или нескольких. Просто мы возвращаем не данные, а «обещание» данных (Publisher), и эта строка-обещание выполняется мгновенно, мы на ней не зависаем в ожидании данных, а идем дальше. Следовательно, новый поток не нужен, так как текущий поток не блокируется.
Данные же будут получены тогда, когда они будут готовы — произойти это может в этом же самом либо другом потоке. Publisher их сам отдаст, когда они придут. Это так называемая push-модель. Отдача готового элемента — это event. То есть реактивная модель основана на событиях (event-driven).
Статья написана по докладу Brian Clozel, Sébastien Deleuze:
Где использовать Reactive Streams
Рассмотрим ситуации, в которых реактивный код нам очень пригодится.
- Допустим, надо обратиться к удаленному REST-сервису, и ответ от него приходится ждать 2-5 секунд. Чтобы не блокировать свой поток, используем реактивный подход. При этом нам будет достаточно одного единственного потока, все равно он не будет заблокирован этим обращением к сервису.

- Допустим у нас сервис, к которому обращается много клиентов по плохой сети. К примеру, это мобильные клиенты.

- Допустим, есть брокер (вроде RabbitMQ), который испускает сообщения. Нам надо протолкнуть сообщения от него в браузер с помощью Websocket (или Server-Sent Events)
Есть также и другие примеры:
- Live database queries
- Mobile (Android UI)
- Big Data
- Real-time analitycs
- HTTP/2
Альтернативы реактивного подхода
Давайте подумаем, можно ли обойтись без реактивного программирования?
Можно использовать callback-функции, например, библиотека Java Swing так реализована (вызывается callback-функция на каждый клик, нажатие кнопки в графическом интерфейсе и т.п. действия пользователя). С Java 8 эти callback-функции можно совместить с lambda-выражениями. Но все, кто имел дело с callback-функциями, знают, что начиная с некоторого уровня вложенности все это трудно читать и поддерживать. Вот пример подобного кода:
То есть это асинхронный код, но читать его трудно.
Вот еще варианты, которые, как показалось бы, могли бы подойти, но нет:
Reactive streams
За неимением ничего годного понадобились новые инструменты.
Итак, что такое реактивное программирование?
Reactive Streams — это интерфейсы, которые были разработан совместно несколькими компаниями (включая Pivotal, Netflix..).
Реализуют эти интерфейсы библиотеки RxJava 1/2, Reactor Core, Akka Streams. Мы будем рассматривать библиотеку Reactor Core.
Что такое Back-pressure в Reactive Streams
Как уже говорилось, мы возвращаем не объект, а «обещание» объекта — Publisher, который будет отдавать объекты, как только они появятся. Но кому? Subscriber-у — тому, кто подписывается на Publisher; подписчик может быть как один, так или много. Subscriber и получает объекты. Причем подписчик может регулировать скорость потока, это и называется Back-pressure (обратное давление).
Эта концепция была придумана для того, чтобы система имела возможность отказаться от получения того, что не способна обработать прямо сейчас, ведь часто речь идет о Big Data, и если скорость потока слишком велика, возникнет ошибка out-of-memory (иначе говоря, памяти не хватит). Система запрашивает столько элементов, сколько может обработать (при этом push-принцип остается, request просто обозначает возможный в данный момент максимум).

Интерфейсы в Reactive Streams
Reactive Streams представляет собой четыре интерфейса:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Где они
Эти интерфейсы есть в пакете org.reactivestreams, а еще они вошли в Java 9 в пакет java.util.concurrent:
Какую реализацию использовать
Несколько библиотек реализуют эти интерфейсы, ниже они перечислены в таблице по поколениям.
Авторы советуют использовать Core Reactor 3, если вы уже на Java 8, и RxJava 2, если еще вдруг на Java 6.
Что такое Reactor Core 3
- Это jar-библиотека весом 1 Мб
- В ней два типа, которые реализуются интерфейс Publisher: Flux и Mono
- На Reactor Core 3 построен Spring WebFlux
Maven-зависимость
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.1.8.RELEASE</version> </dependency>
Тип Flux

Оба типа, Flux и Mono, реализуются интерфейс Publisher. Они испускают элементы (события). Выше приведены marble-диаграммы.
Шкала времени на них простирается слева направо, а цветные элементы — события. Вертикальной чертой обозначено окончание потока — событие onComplete(), а красным крестом — ошибка onError().
Обычно к событиям применяются операторы, то есть делаются преобразования, они бывают очень сложными.
Для объяснения оператора как раз и используются диаграммы. В середине подписывается оператор, а на нижней шкале отображается, что происходит с исходными событиями.
StepVerifier — для тестирования Publisher
Этот класс создан для тестирования Publisher, но он не входит в пакет reactor core 3, его надо добавить отдельно:
<dependency> <groupId>io.projectreactor.addons</groupId> <artifactId>reactor-test</artifactId> <version>3.0.7.RELEASE</version> <scope>test</scope> </dependency>
Далее мы будем создавать различные Flux и Mono и тестировать с использованием StepVerifier. Все это рекомендуется проделать самостоятельно, здесь находится проект-заготовка с различными заданиями — нужно отредактировать методы с целью получения определенного результата.
Продемонстрируем некоторые из них.
Создание Flux и Mono
Во-первых, создадим Flux, который не испускает элементы, а сразу выдает сигнал завершения:
Flux<String> emptyFlux() { return Flux.empty(); }
Протестируем его:
@Test public void empty() { Flux<String> flux = workshop.emptyFlux(); StepVerifier.create(flux).verifyComplete(); }
Теперь создадим Flux, который испускает элементы «foo» и «bar»:
Flux<String> fooBarFluxFromValues() { return Flux.just("foo", "bar"); }
Далее, создадим Flux, который испускает эти же самые элементы из списка:
Flux<String> fooBarFluxFromList() { return Flux.fromIterable(Arrays.asList("foo", "bar")); }
Создадим Flux, который испускает ошибку. Для этого есть специальный сигнал, так как мы не можем просто выбросить исключение — оно может оказаться в другом треде:
Flux<String> errorFlux() { return Flux.error(new IllegalStateException()); }
Наконец, создадим Flux, который испускает числа от 1 до 10 каждые 100 мс:
Flux<Long> counter() { return Flux.interval(Duration.ofMillis(100)).take(10); }
Теперь аналогичные испускания напишем для Mono.
Mono, испускающий сигнал завершения:
Mono<String> emptyMono() { return Mono.empty(); }
Mono, не испускающий никакого сигнала (в том числе сигнала завершения):
Mono<String> monoWithNoSignal() { return Mono.never(); }
Тестирование в данном случае будет выглядеть так:
@Test public void noSignal() { Mono<String> mono = workshop.monoWithNoSignal(); StepVerifier .create(mono) .expectSubscription() .expectNoEvent(Duration.ofSeconds(1)) .thenCancel() .verify(); }
Mono, испускающий значение «foo»:
Mono<String> fooMono() { return Mono.just("foo"); }
Mono, испускающий ошибку:
Mono<String> errorMono() { return Mono.error(new IllegalStateException()); }
Примеры тестирования с помощью StepVerifier
Теперь приведем некоторые примеры тестирования Flux и Mono с использованием класса StepVerifier.
Допустим, надо протестировать тот самый Flux, который испускает элементы «foo» и «bar». В статический метод StepVerifier.create() передаем этот самый flux, далее ожидаем два элемента и сигнал завершения:
StepVerifier.create(flux).expectNext("foo", "bar").expectComplete().verify();
Завершающий метод в цепочке verify() подписывается на Publisher и проверяет все ожидания.
Следующая проверка тоже проверяет эти два элемента и далее ошибку с RuntimeException:
StepVerifier.create(flux).expectNext("foo", "bar").expectError(RuntimeException.class).verify();
Далее, допустим, наш Flux испускает пользователей (исходный код вы найдете там же по вышеприведенной ссылке):
Flux.just(new User("swhite", null, null), new User("jpinkman", null, null)
И нам надо проверить имена пользователей:
StepVerifier.create(flux) .expectNextMatches(user->user.getUsername().equals("swhite")) .consumeNextWith(user-> assertEquals(user.getUsername(), "jpinkman")) .expectComplete().verify();
Обратите внимание, что имена первого и второго пользователя мы проверяем разными способами — для второго используется внешняя библиотека (JUnit, но могли использовать Assertj или Hamcrest).
Теперь протестируем Flux, которые требуют длительного времени для выполнения. Мы будем ускорять время в тестах.
Итак, допустим наш Flux испускает элементы каждую секунду, всего 10 элементов:
Flux.interval(Duration.ofSeconds(1)).take(10)
Вот такой тест займет 10 секунд:
StepVerifier.create(flux).expectNextCount(10).expectComplete().verify();
А что, если элементы испускаются час, например:
Flux.interval(Duration.ofSeconds(1)).take(3600)
Нам надо ускорить время, для этого в StepVerifier.withVirtualTime() принимаем не Flux, а Supplier:
StepVerifier.withVirtualTime(()->flux) .thenAwait(Duration.ofHours(1)) .expectNextCount(3600) .expectComplete() .verify();
Последний тест проходит очень быстро.
Трансформации
Мы рассмотрели создание Publisher, но в реальной жизни мы обычно не создаем, а преобразовываем потоки. То есть делаем с ними что-либо, синхронно или асинхронно.
map()
Допустим, есть Mono, испускающий User-а (полный код смотрите на GitHub), и надо привести его к верхнему регистру. Напишем метод, в нем используется преобразование map():
Mono<User> capitalizeOne(Mono<User> mono) { return mono.map(user -> new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase())); }
Для тестирования там используется реактивный репозиторий (приближенный к реальной жизни, правда без базы; просто в коде репозитория создаются пользователи и хранятся там в списке). И все реализованные методы репозитория испускают пользователей из этого списка, но с намеренной задержкой. Так вот User берется из репозитория:
@Test public void transformMono() { Mono<User> mono = repository.findFirst(); StepVerifier.create(workshop.capitalizeOne(mono)) .expectNext(new User("SWHITE", "SKYLER", "WHITE")) .verifyComplete(); }
И вот сразу интерфейс репозитория, чтобы не лезть в код (подобный же используется в Spring в реактивных репозиториях):
public interface ReactiveRepository<T> { Mono<Void> save(Publisher<T> publisher); Mono<T> findFirst(); Flux<T> findAll(); Mono<T> findById(String id); }
Для Flux преобразование выглядит аналогично, просто тут испускается несколько элементов, а не один (но это вы можете увидеть в коде теста):
Flux<User> capitalizeMany(Flux<User> flux) { return flux.map(user -> new User(user.getUsername().toUpperCase(), user.getFirstname().toUpperCase(), user.getLastname().toUpperCase())); }
flatMap()
Теперь представим, что приведение к верхнему регистру требует много времени (например, надо обратиться к внешним сервисам). В таком случае используем асинхронное преобразование flatMap(). В нем каждому пользователю ставится в соответствие не пользователь, а поток, Publisher пользователей, пусть и состоящий из одного элемента (у нас Mono). То есть возвращаем «обещание» пользователя, а не самого пользователя, чтобы не задерживаться на операции. И итоговый поток потоков приводим снова к потоку Flux пользователей:
Flux<User> asyncCapitalizeMany(Flux<User> flux) { return flux.flatMap(user -> asyncCapitalizeUser(user)); } Mono<User> asyncCapitalizeUser(User u) { return Mono.just( new User(u.getUsername().toUpperCase(), u.getFirstname().toUpperCase(), u.getLastname().toUpperCase())); }
Обратите внимание, здесь метод asyncCapitalizeUser() — это та самая «долгая» операция, возвращающая не пользователя, а поток (Publisher).
Если вы загляните в документацию, то увидите диаграмму оператора flatMap:

Здесь каждый User (круг) превращается поток юзеров (квадраты), и они сливаются на нижней итоговой шкале времени в единый поток квадратов, причем их порядок может меняться (синие и розовые квадраты перемежаются). Но в нашем случае поток юзеров состоит из одного элемента, Mono, так что каждый цветной квадрат представлен только в единственном экземпляре.
Слияние
merge()
Допустим есть два потока: один возвращает двух пользователей из одного репозитория сразу без задержки, а второй — четырех пользователей из другого репозитория с задержкой. Нам надо слить потоки:
Flux<User> mergeFluxWithInterleave(Flux<User> flux1, Flux<User> flux2) { return Flux.merge(flux1, flux2); }
Вот два репозитория (весь код по выше уже приведенной ссылке):
// здесь два пользователя, отдаются сразу ReactiveRepository<User> repository = new ReactiveUserRepository(MARIE, MIKE); // а зедсь четыре, отдаются с задержкой 500 мс ReactiveRepository<User> repositoryWithDelay = new ReactiveUserRepository(500);
merge() сольет потоки в итоговый поток, в котором пользователи идут в порядке следования во времени:
@Test public void mergeWithInterleave() { Flux<User> flux = workshop.mergeFluxWithInterleave(repositoryWithDelay.findAll(), repository.findAll()); StepVerifier.create(flux) .expectNext(MARIE, MIKE, User.SKYLER, User.JESSE, User.WALTER, User.SAUL) .verifyComplete(); }
concat()
Если нужно слить потоки так, чтобы сначала шли пользователи из первого потока, а потом из второго, независимо от того, как они приходят в реальном времени, то используйте concat():
Flux<User> mergeFluxWithNoInterleave(Flux<User> flux1, Flux<User> flux2) { return Flux.concat(flux1, flux2); }
Проверим:
@Test public void mergeWithNoInterleave() { Flux<User> flux = workshop.mergeFluxWithNoInterleave(repositoryWithDelay.findAll(), repository.findAll()); StepVerifier.create(flux) .expectNext(User.SKYLER, User.JESSE, User.WALTER, User.SAUL, MARIE, MIKE) .verifyComplete(); }
С помощью concat() можно слить и потоки Mono:
Flux<User> createFluxFromMultipleMono(Mono<User> mono1, Mono<User> mono2) { return Flux.concat(mono1, mono2); }
Request
Теперь рассмотрим как запрашивать не все элементы сразу.
Вот так запрашиваются все четыре сразу:
StepVerifier requestAllExpectFour(Flux<User> flux) { return StepVerifier.create(flux).expectNextCount(4).expectComplete(); }
Теперь запрашиваем и проверяем по одному элементу, и отменяем последние два:
StepVerifier requestOneExpectSkylerThenRequestOneExpectJesse(Flux<User> flux) { return StepVerifier .create(flux, 1) .expectNext(User.SKYLER) .thenRequest(1) .expectNext(User.JESSE) .thenCancel(); }
Теперь залогируем испускание элементов из репозитория (метод log()) и посмотрим что происходит:
Flux<User> fluxWithLog() { return repository.findAll().log(); }
И теперь смотрим:
@Test public void experimentWithLog() { Flux<User> flux = workshop.fluxWithLog(); StepVerifier.create(flux, 0) .thenRequest(1) .expectNextMatches(u -> true) .thenRequest(1) .expectNextMatches(u -> true) .thenRequest(2) .expectNextMatches(u -> true) .expectNextMatches(u -> true) .verifyComplete(); }
Лог:
21:38:26.141 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator) 21:38:26.157 [main] INFO reactor.Flux.Zip.1 - request(1) 21:38:26.288 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='swhite', firstname='Skyler', lastname='White'}) 21:38:26.289 [parallel-1] INFO reactor.Flux.Zip.1 - request(1) 21:38:26.386 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='jpinkman', firstname='Jesse', lastname='Pinkman'}) 21:38:26.386 [parallel-1] INFO reactor.Flux.Zip.1 - request(2) 21:38:26.486 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='wwhite', firstname='Walter', lastname='White'}) 21:38:26.586 [parallel-1] INFO reactor.Flux.Zip.1 - onNext(Person{username='sgoodman', firstname='Saul', lastname='Goodman'}) 21:38:26.589 [parallel-1] INFO reactor.Flux.Zip.1 - onComplete()
Как видно, после запроса одного элемента callback-функция onNext() вызывается один раз, а после запроса двух элементов — два раза.
doOnSubcribe(), doOnNext(), doOnComplete()
Давайте напечатаем текст на каждое действие — подписку, получение нового элемента и завершающего сигнала:
Flux<User> fluxWithDoOnPrintln() { return repository.findAll() .doOnSubscribe(s->System.out.println("start")) .doOnNext(user->System.out.println(user.getUsername())) .doOnComplete(()->System.out.println("end!")); }
Результат:
start swhite jpinkman wwhite sgoodman end!
Ошибки
Допустим есть поток Mono<User>, испускающий пользователя. Надо вернуть пользователя User.SAUL, если поток испустит ошибку. Иначе просто вернуть исходный поток:
Mono<User> betterCallSaulForBogusMono(Mono<User> mono) { return mono.onErrorResume(e -> Mono.just(User.SAUL)); }
Вот как выглядит диаграмма onErrorResume (на картинке otherwise, но его переименовали в onErrorResume):
Протестируем метод, испуская пользователя с ошибкой и без:
@Test public void monoWithValueInsteadOfError() { Mono<User> mono = workshop.betterCallSaulForBogusMono(Mono.error(new IllegalStateException())); StepVerifier.create(mono) .expectNext(User.SAUL) .verifyComplete(); mono = workshop.betterCallSaulForBogusMono(Mono.just(User.SKYLER)); StepVerifier.create(mono) .expectNext(User.SKYLER) .verifyComplete(); }
Та же самая задача для Flux:
Flux<User> betterCallSaulAndJesseForBogusFlux(Flux<User> flux) { return flux.onErrorResume(e -> Flux.just(User.SAUL, User.JESSE)); }
Теперь рассмотрим случай, когда наш поток выбрасывает исключение. Например, операция приведения в верхнему регистру такова, что выбрасывает исключение:
User capitalizeUser(User user) throws GetOutOfHereException { if (user.equals(User.SAUL)) { throw new GetOutOfHereException(); } return new User(user.getUsername(), user.getFirstname(), user.getLastname()); } protected final class GetOutOfHereException extends Exception { }
А мы ее используем в map():
Flux<User> capitalizeMany(Flux<User> flux) { return flux.map(user -> { try { return capitalizeUser(user); } catch (GetOutOfHereException e) { throw Exceptions.propagate(e); } }); }
как видите, операцию пришлось окружить блоком try-catch, но не простым. Вместо обычного проброса исключения мы сделали:
throw Exceptions.propagate(e);
Здесь класс Exceptions принадлежит пакету reactor.core. Он специально написан для того, чтобы поток испускал законный сигнал об ошибке, который можно проверить с помощью StepVerifier (а не просто выбрасывал исключение):
@Test public void handleCheckedExceptions() { Flux<User> flux = workshop.capitalizeMany(Flux.just(User.SAUL, User.JESSE)); StepVerifier.create(flux) .verifyError(Part07Errors.GetOutOfHereException.class); }
Другие операторы
zip()
Допустим, имеется три потока Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux. Нам надо из этого составить поток Flux<User>:
Flux<User> userFluxFromStringFlux(Flux<String> usernameFlux, Flux<String> firstnameFlux, Flux<String> lastnameFlux) { return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux) .map(t->new User(t.getT1(),t.getT2(),t.getT3())); }
Вот диаграмма оператора:
first()
Надо вернуть Mono, который испускает значение быстрее:
Mono<User> useFastestMono(Mono<User> mono1, Mono<User> mono2) { return Mono.first(mono1, mono2); }
Протестируем. Как уже говорилось, у нас репозиторий, для которого можно задать задержку отдачи элемента:
@Test public void fastestMono() { ReactiveRepository<User> repository = new ReactiveUserRepository(MARIE); ReactiveRepository<User> repositoryWithDelay = new ReactiveUserRepository(250, MIKE); Mono<User> mono = workshop.useFastestMono(repository.findFirst(), repositoryWithDelay.findFirst()); StepVerifier.create(mono) .expectNext(MARIE) .verifyComplete(); repository = new ReactiveUserRepository(250, MARIE); repositoryWithDelay = new ReactiveUserRepository(MIKE); mono = workshop.useFastestMono(repository.findFirst(), repositoryWithDelay.findFirst()); StepVerifier.create(mono) .expectNext(MIKE) .verifyComplete(); }
then()
Преобразовать Flux в Mono, который испускает сигнал завершения тогда, когда приходит сигнал завершения в Flux:
Mono<Void> fluxCompletion(Flux<User> flux) { return flux.then(); }
Тест:
@Test public void complete() { ReactiveRepository<User> repository = new ReactiveUserRepository(); PublisherProbe<User> probe = PublisherProbe.of(repository.findAll()); Mono<Void> completion = workshop.fluxCompletion(probe.flux()); StepVerifier.create(completion) .verifyComplete(); probe.assertWasRequested(); }
justOrEmpty()
Выдает Mono<User>, если пользователь не null, иначе сигнал завершения:
Mono<User> nullAwareUserToMono(User user) { return Mono.justOrEmpty(user); }
switchIfEmpty()
Выдает элемент User.SKYLER, если входной Mono пустой:
Mono<User> emptyToSkyler(Mono<User> mono) { return mono.switchIfEmpty(Mono.just(User.SKYLER)); }
Протестируем:
@Test public void emptyHandling() { Mono<User> mono = workshop.emptyToSkyler(Mono.just(User.WALTER)); StepVerifier.create(mono) .expectNext(User.WALTER) .verifyComplete(); mono = workshop.emptyToSkyler(Mono.empty()); StepVerifier.create(mono) .expectNext(User.SKYLER) .verifyComplete(); }
Совместимость с другими библиотеками
Здесь рассматривается, как преобразовать к Reactor типы RxJava и обратно и т.п. совместимость. Проще сразу показать примеры:
// TODO Adapt Flux to RxJava Flowable Flowable<User> fromFluxToFlowable(Flux<User> flux) { return Flowable.fromPublisher(flux); } // TODO Adapt RxJava Flowable to Flux Flux<User> fromFlowableToFlux(Flowable<User> flowable) { return Flux.from(flowable); } // ======================================================================================== // TODO Adapt Flux to RxJava Observable Observable<User> fromFluxToObservable(Flux<User> flux) { return Observable.fromPublisher(flux); } // TODO Adapt RxJava Observable to Flux Flux<User> fromObservableToFlux(Observable<User> observable) { return Flux.from(observable.toFlowable(BackpressureStrategy.BUFFER)); } // ======================================================================================== // TODO Adapt Mono to RxJava Single Single<User> fromMonoToSingle(Mono<User> mono) { return Observable.fromPublisher(mono).firstOrError(); } // TODO Adapt RxJava Single to Mono Mono<User> fromSingleToMono(Single<User> single) { return Mono.from(single.toFlowable()); } // ======================================================================================== // TODO Adapt Mono to Java 8+ CompletableFuture CompletableFuture<User> fromMonoToCompletableFuture(Mono<User> mono) { return mono.toFuture(); } // TODO Adapt Java 8+ CompletableFuture to Mono Mono<User> fromCompletableFutureToMono(CompletableFuture<User> future) { return Mono.fromFuture(future); }
Получение обычных значений из потоков
Рассмотрим, как заблокировать текущий поток и вытащить значение из него:
User monoToValue(Mono<User> mono) { return mono.block(); }
Теперь то же самое для Flux:
Iterable<User> fluxToValues(Flux<User> flux) { return flux.toIterable(); }
=========================================================
Вторая часть тут.
Примеры приложений на реактивном стеке можно посмотреть еще в этой статье.
А где 2-ая часть?
Еще нет, увы.
с нетерпением 🙂
можете почитать пока эту https://sysout.ru/servlet-vs-reactive-stack-5-primerov/ — пример приложения(й), но посложнее и годом позже.
часть 2 https://sysout.ru/razrabotka-reaktivnyh-prilozhenij-s-reactive-streams-i-java-8-chast-2/
Не хотите чат в телеге создать?