В Spring 5 есть два стека для написания веб-приложения — традиционный Spring MVC и реактивный Webflux.
При этом реактивное приложение не обязательно должно быть написано на Webflux, то есть включать в себя Maven-зависимость spring-boot-starter-webflux, для многих случаев (первых трех из рассмотренных ниже пяти) достаточно и традиционной зависимости spring-boot-starter-web. Ведь Servlet API тоже реализует асинхронный подход.
Статья написана по докладу Rossen Stoyanchev
В чем суть реактивного подхода? Это просто другая реализация параллельной обработки, основанная на событийной модели. Не многопоточность.
Многопоточность — это когда на каждый запрос клиента сервер создает отдельный тред. Потоков много, но многие из них в состоянии wait, то есть заблокированы: например, ждут получения данных из базы, либо еще чего-то.
В реактивном же подходе потоков мало — их несколько. Основной поток событий вообще один. Но он не блокируется, то есть никого не ждет. Вместо данных приходят «обещания данных» (тип Flux или Mono), короче говоря, колбэк, который сам вызовется, когда придут данные. Так что поток не блокируется.
В этой статье рассматривается многомодульное (хотя и простое) приложение — надо смотреть код, он тут. Хотя основные моменты вынесены в статью.
Reactive Data Repository
Вот так выглядит реактивный репозиторий:
public interface CarRepository extends ReactiveMongoRepository<Car, Long> { @Tailable Flux<Car> findCarsBy(); }
Обратите внимание на возвращаемый тип — Flux. @Tailable означает бесконечный поток, но конкретно этот метод мы будем использовать позднее, а пока достаточно широко известного findAll(), но тоже возвращающего Flux. ReactiveMongoRepository уже содержит findAll().
Чтобы вывести список машин, используется метод контроллера:
@RestController public class CarLocationController { ... @GetMapping("/cars") public Flux<Car> getCars() { return this.repository.findAll().log(); } ... }
Модель Car такая:
public class Car { private final Long id; private final Location location; // constructor/getters/setters }
Location — координаты машины (Зачем нужны координаты: наше приложение — упрощенное приложение для бронирования машины типа Uberа или Яндекс.Такси):
public class Location { private final BigDecimal longitude; private final BigDecimal latitude; //... }
При запуске приложения мы заполняем базу автоматически сгенерированными машинами и координатами:
@SpringBootApplication public class LocationServiceApp { public static void main(String[] args) { SpringApplication.run(LocationServiceApp.class, args); } @Bean public CommandLineRunner initData(MongoOperations mongo) { return (String... args) -> { mongo.dropCollection(Car.class); mongo.createCollection(Car.class, CollectionOptions.empty().size(1000000).capped()); LocationGenerator gen = new LocationGenerator(40.740900, -73.988000); Flux.range(1, 100).map(i -> new Car(i.longValue(), gen.location())).doOnNext(mongo::save) .blockLast(Duration.ofSeconds(5)); }; } }
В данный момент у нас подключена зависимость Spring Boot Web Starter:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
Запустим приложение и увидим в консоли, что запускается встроенный Tomcat:
... [restartedMain] 15:04 TomcatWebServer: Tomcat started on port(s): 8081 (http) with context path '' ...
Запросим список машин:
$ curl -v http:/localhost:8081/cars/
Получим массив JSON:
[ { "id":1, "location":{ "longitude":40.740992, "latitude":-73.987984 } }, { "id":2, "location":{ "longitude":40.740900, "latitude":-73.987928 } }, { "id":3, "location":{ "longitude":40.740912, "latitude":-73.987924 } } ... ]
Вернемся в консоль и пронаблюдаем за логом тредов:
[http-nio-8081-exec-1] 18:00 1: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber) [http-nio-8081-exec-1] 18:00 1: | request(unbounded) ... [Thread-15] 18:00 1: | onNext(Car{id=1, location={40.740992, -73.987984}}) [Thread-15] 18:00 1: | onNext(Car{id=2, location={40.740900, -73.987928}}) [Thread-15] 18:00 1: | onNext(Car{id=3, location={40.740912, -73.987924}}) [Thread-15] 18:00 1: | onNext(Car{id=4, location={40.740904, -73.987920}}) [Thread-15] 18:00 1: | onComplete() [Thread-15] 18:00 WebAsyncManager: Concurrent result value [[Car{id=1 ... [http-nio-8081-exec-2] 18:00 DispatcherServlet: DispatcherServlet with name ... [http-nio-8081-exec-2] 18:00 RequestMappingHandlerMapping: Looking up handler method for path /cars/ ...
http-nio-8081-exec-1 — это Tomcat-тред, в нем подписываются на данные и запрашивают все данные — request(unbounded), unbounded означает, что данные запрашиваются без back-pressure (это когда регулируется количество входных данных, которые мы способны переварить, и запрос идет не на все элементы сразу).
Дальше идет Mongo-тред Thread-15, в котором выдаются данные. И дальше снова Tomcat-тред.
Теперь заменим Maven-зависимость Spring Boot Web Starter на Spring Boot WebFlux Starter, есть допишем всего четыре буквы spring-boot-starter-webflux:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
И перезапустим приложение. Теперь в консоли видим, что запускается сервер Netty:
[restartedMain] 40:34 NettyWebServer: Netty started on port(s): 8081 [Thread-15] 47:31 EmbeddedMongo: 2018-08-30T16:47:31.065+0500 I NETWORK [initandlisten] connection accepted from 127.0.0.1:53265 #3 (3 connections now open) [Thread-15] 47:31 EmbeddedMongo: 2018-08-30T16:47:31.097+0500 I COMMAND [conn3] CMD: drop test.car [Thread-15] 47:31 EmbeddedMongo: 2018-08-30T16:47:31.176+0500 W COMMAND [conn3] the autoIndexId option is deprecated and will be removed in a future release [reactor-http-nio-2] 47:51 DispatcherHandler: Processing GET request for [http://localhost:8081/cars/] [reactor-http-nio-2] 47:51 RequestMappingHandlerMapping: Looking up handler method for path /cars/ [reactor-http-nio-2] 47:51 RequestMappingHandlerMapping: Returning handler method [public reactor.core.publisher.Flux<car.Car> car.location.CarLocationController.getCars()] [reactor-http-nio-2] 47:52 1: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber) [reactor-http-nio-2] 47:52 1: | request(unbounded) [Thread-15] 47:52 EmbeddedMongo: 2018-08-30T16:47:52.250+0500 I NETWORK [initandlisten] connection accepted from 127.0.0.1:53267 #4 (4 connections now open) [nioEventLoopGroup-2-2] 45:14 1: | onNext(Car{id=1, location={40.740980, -73.988028}}) [nioEventLoopGroup-2-2] 45:14 1: | onNext(Car{id=2, location={40.740976, -73.988048}}) [nioEventLoopGroup-2-2] 45:14 1: | onNext(Car{id=100, location={40.740884, -73.987932}}) ... [nioEventLoopGroup-2-2] 45:14 1: | onComplete()
Потоки теперь называются по-другому, в остальном все работает так же.
Вернемся к нашему контроллеру. Наш метод возвращает Flux<Car>, то есть поток, который может быть потенциально бесконечным. Мы его передаем в Spring Framework. Вопрос в том, как фреймфорк узнает, что с ним делать: хотим ли мы писать бесконечный поток, либо конечный набор, который мы будем возвращать клиенту в формате JSON-массив?
В втором случае данные надо записывать в буфер, а потом отправить, а в первом — надо отправлять сразу каждый элемент данных (по одной Car — ведь мы не знаем, когда поток машин закончится и закончится ли он вообще, а также могут быть перерывы между поступлениями следующего элемента). Надо как-то сообщить фреймворку, чего мы хотим.
Вопрос решается заданием MIME-типа. По умолчанию отправляется «application/json» (как в вышеприведенном примере — там подразумевается MIME-тип «application/json«). В этом случае все данные отправляются сразу, нет никакого back-pressure.
Отправка потока (с backpressure)
Чтобы отправить поток, надо задать либо значение:
- либо «text/event-stream» (это официально утвержденный формат для отправки в браузер Server Sent Events (SSE))
- либо «application/stream+json» (для отправки потока от сервера к серверу или http-клиенту, в общем всему, кроме браузера).
Формат «application/stream+json» могут пересмотреть (информация на сентябрь 2018, см. SPR-16742), но пока так.
В этом случае будет использоваться backpressure, причем оно уже реализовано внутри сервера.
Вот так выглядит метод, отправляющий SSE:
@GetMapping(path = "/cars", produces "text/event-stream") public Flux<Car> getCarStream() { return this.repository.findCarsBy().log(); }
Выполним теперь команду:
curl -v -H "Accept:text/event-stream" http://localhost:8081/cars
Получим поток. Префикс data означает, что это SSE:
data:{"id":1,"location":{"longitude":40.740888,"latitude":-73.987964}} data:{"id":2,"location":{"longitude":40.740988,"latitude":-73.988060}} data:{"id":3,"location":{"longitude":40.740908,"latitude":-73.987944}} ...
Давайте заглянем в консоль:
[reactor-http-nio-4] 36:24 3: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber) [reactor-http-nio-4] 36:24 3: | request(1) [nioEventLoopGroup-2-3] 36:24 3: | onNext(Car{id=1, location={40.740920, -73.988016}}) [reactor-http-nio-4] 36:24 3: | request(31) [reactor-http-nio-4] 36:24 3: | onNext(Car{id=2, location={40.740984, -73.987996}}) [nioEventLoopGroup-2-3] 36:24 3: | onNext(Car{id=3, location={40.740848, -73.988056}}) [nioEventLoopGroup-2-3] 36:24 3: | onNext(Car{id=4, location={40.740892, -73.987992}}) ... [reactor-http-nio-4] 36:24 3: | request(24) [nioEventLoopGroup-2-3] 36:24 3: | onNext(Car{id=57, location={40.740988, -73.988000}}) [nioEventLoopGroup-2-3] 36:24 3: | onNext(Car{id=58, location={40.740904, -73.987996}}) ...
Видно, что после подписки идут запросы — по 31 элементу, затем по 24, это происходит автоматически.
Это был Spring Boot WebFlux Starter, теперь переключимся на Spring Boot Web Starter:
... [http-nio-8081-exec-1] 53:40 1: | onSubscribe([Fuseable] ... [http-nio-8081-exec-1] 53:40 1: | request(1)... ... [Thread-11] 53:40 1: | onNext(Car{id=1, location={40.740972, -73.988040}}) [MvcAsync1] 53:40 1: | request(1) [MvcAsync1] 53:40 1: | onNext(Car{id=2, location={40.740952, -73.988036}}) [MvcAsync2] 53:40 1: | request(1) [Thread-11] 53:40 1: | onNext(Car{id=3, location={40.740904, -73.987940}}) [MvcAsync3] 53:40 1: | request(1) [MvcAsync3] 53:40 1: | onNext(Car{id=4, location={40.740916, -73.988020}}) [MvcAsync4] 53:40 1: | request(1) ...
Здесь запрашивается по одному элементу, и запись происходит на новых тредах MvcAsync1, MvcAsync2…, чтобы не заблокировать Mongo-тред.
Совместная работа удаленных сервисов
Рассмотрим другой модуль — приложение RequestServiceApp, тут обработчик CarRequestHandler симулирует водителя, который реагирует на запрос бронирования в течение 3-5 секунд (число генерируется случайно).
@Component public class CarRequestHandler { private static final Random random = new Random(); public Mono<ServerResponse> createBooking(ServerRequest request) { return Mono.delay(randomThinkTime()) .then(ServerResponse.created(bookingUrl(request)).build()); } /** * Simulate driver accepting the request after "think time" of 2-5 secs. */ private static Duration randomThinkTime() { return Duration.ofSeconds(random.nextInt(5 - 2) + 2); } private static URI bookingUrl(ServerRequest request) { Long id = Long.valueOf(request.pathVariable("id")); return URI.create("/car/" + id + "/booking/" + Math.abs(random.nextInt())); } }
Вышеприведенный метод createBooking() аналогичен методу внутри контроллера, аннотированному с помощью @PostMapping(«/cars/{id}/booking»), просто маппинг тут сделан новым способом с помощью RouterFunction:
@SpringBootApplication public class RequestServiceApp { public static void main(String[] args) { SpringApplication.run(RequestServiceApp.class, args); } @Bean public RouterFunction<?> routes(CarRequestHandler handler) { return RouterFunctions.route(POST("/cars/{id}/booking"), handler::createBooking); } }
Это приложение запускается на порту 8082 (а предыдущее на порту 8081).
Теперь запустим еще одно приложение на порту 8080, которое координирует работу только что рассмотренных двух приложений. В нем такой контроллер, реагирующий на запрос «/booking»:
@RestController public class CarController { private static final Logger logger = LoggerFactory.getLogger(CarController.class); private final WebClient carsClient = WebClient.create("http://localhost:8081"); private final WebClient bookClient = WebClient.create("http://localhost:8082"); @PostMapping("/booking") public Mono<ResponseEntity<Void>> book() { logger.debug("Processing booking request.."); return carsClient.get().uri("/cars") .retrieve() .bodyToFlux(Car.class) .doOnNext(car -> logger.debug("Trying to book " + car)) .take(5) .flatMap(this::requestCar) .next() .doOnNext(car -> logger.debug("Booked car " + car)); } private Mono<ResponseEntity<Void>> requestCar(Car car) { return bookClient.post() .uri("/cars/{id}/booking", car.getId()) .exchange() .flatMap(response -> response.toEntity(Void.class)); } }
Когда мы делаем запрос «/booking», вызывается метод book(), который задействует двух клиентов: первый клиент запрашивает список машин (первые пять штук), а второй делает запрос на бронирование каждой из машин («/cars/{id}/booking»). Бронируется та машина, которая откликнулась первой (помните, время генерируется случайно от 3 до 5 секунд).
Итак, у нас запущены три приложения, сделаем запрос бронирования:
curl -v -X POST http://localhost:8080/booking
В результате одна из пяти машин будет забронирована, в нашем случае случайная, для которой сгенерировалось наименьшее время «думанья» водителя:
[reactor-http-nio-6] 19:46 CarController: Booked car <201 Created,{Location=[/car/1/booking/1766464542], content-length=[0]}>
Reactive Request Input
Это тот случай, который доступен только c Spring Boot WebFlux Starter (на обычный веб-стек переключиться нелья).
Рассмотрим этот метод класса CarLocationController:
@PostMapping(path="/cars", consumes = "application/stream+json") @ResponseStatus(HttpStatus.CREATED) public Mono<Void> loadCars(@RequestBody Flux<Car> cars) { return this.repository.insert(cars).then(); }
Суть здесь в том, что когда метод loadCars() вызывается, тело запроса еще не прочитано.
Data Ingestion with Back Pressure
В вышеприведенном методе мы вставляем в базу сгенерированные машины. Причем это происходит с обратным давлением (back-pressure), то есть мы можем попросить приостановить поток машин, если не успеваем с ним справляться (добавлять в базу). Все это происходит автоматически.
Для вывода списка добавленный машин будем использовать LocationServiceApp. Перед запуском надо убедиться, что LocationServiceApp работает на Maven-зависимости Spring Boot WebFlux Starter.
Но теперь в CarLocationController для вывода будет использоваться другой метод, подразумевающий бесконечный поток (т.к. база будет пополнятся новыми данными бесконечно):
@GetMapping(path = "/cars", produces = "text/event-stream") public Flux<Car> getCarStream() { return this.repository.findCarsBy().log(); }
То есть вместо метода findAll() мы теперь используем @Tailable метод репозитория findCarsBy().
Запустим снова LocationServiceApp , сделаем запрос
curl -v -H "Accept:text/event-stream" http://localhost:8081/cars
Данные пойдут и встанут в режиме ожидания:
Запустим теперь приложение, которое добавляет машины в репозиторий:
public class ClientUploadApp { public static void main(String[] args) { WebClient client = WebClient.create("http://localhost:8081"); LocationGenerator gen = new LocationGenerator(40.740900, -73.988000); Flux<Car> cars = Flux.interval(Duration.ofSeconds(2)).map(i -> new Car(i + 200, gen.location())); client.post() .uri("/cars") .contentType(MediaType.APPLICATION_STREAM_JSON) .body(cars, Car.class) .retrieve() .bodyToMono(Void.class) .block(); } }
Вернемся в консоль LocationServiceApp и увидим, вывод в консоль машин возобновился — начали выводится машины, добавляемые приложением ClientUploadApp:
... [nioEventLoopGroup-2-2] 41:04 1: | onNext(Car{id=200, location={40.7409, -73.987992}}) [nioEventLoopGroup-2-2] 41:06 1: | onNext(Car{id=201, location={40.7409, -73.988044}}) [nioEventLoopGroup-2-3] 41:08 1: | onNext(Car{id=202, location={40.740916, -73.988004}}) [nioEventLoopGroup-2-3] 41:10 1: | onNext(Car{id=203, location={40.74096, -73.988012}}) [nioEventLoopGroup-2-3] 41:12 1: | onNext(Car{id=204, location={40.740832, -73.987996}}) [nioEventLoopGroup-2-3] 41:14 1: | onNext(Car{id=205, location={40.740904, -73.988016}}) [nioEventLoopGroup-2-3] 41:16 1: | onNext(Car{id=206, location={40.740884, -73.988072}}) ...
То есть одно приложение ClientUploadApp генерирует поток добавлемых в базу машин, а второе приложение LocationServiceApp в это же самое время выводит в консоль новые машины из базы.