В этой части показывается, как использовать Reactive Streams в реальном приложении. Первая часть тут.
Код приложения есть на GitHub. Обратите внимание, что для запуска необходимо установить MongoDB, а также получить токены к API GitHub и API Gitter.
Итак, в примере:
- Обращение к внешним REST-сервисам (а именно GitHub и Gitter API)
- Работа с репозиторием
- Обработка ошибок
- Отображение html view и статических ресурсов
- Server Sent Events (SSE)
- и другое
И все это в реактивном стиле.
Пример реактивного контроллера
Ниже представлен обычный контроллер с обычным методом test(), написанном в традиционном стиле. Мы будем добавлять в контроллер новые методы, в которых используется реактивный подход.
Mono.just()
Первый метод hello():
@RestController @RequestMapping("/demo") public class DemoController { // You should create and share a single instance, or even preferrably // get the the factory from the underlying container private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); @RequestMapping("/test") public String test() { return "This is a test"; } @RequestMapping("/hello") public Mono<String> hello(@RequestParam String name) { return Mono.just("Hello, " + name + "!"); } // другие методы }
Если ввести в браузере строку:
localhost:8080/hello?name=world
то пойдет запрос, который вызовет subscription chain (цепочку операторов).
ServerWebExchange
Создадим еще один метод (в том же контроллере DemoController). В нем мы покажем использование более низкоуровневого API.
С версии Spring 5 появилось понятие «обмена», а именно класс org.springframework.web.server.ServerWebExchange. Объект типа ServerWebExchange можно инжектировать как параметр метода контроллера.
ServerWebExchange содержит ServerHttpRequest и ServerHttpResponse — это тоже новые интерфейсы, не имеющие отношения к сервлетам.
В response мы передаем код ответа и заголовки. Для формирования body нам нужен Publisher, на который фреймворк может подписаться и использовать для записи в response. Мы его передаем в метод writeWith().
@RestController @RequestMapping("/demo") public class DemoController { private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); //другие методы @RequestMapping("/exchange") public Mono<Void> exchange(ServerWebExchange exchange) { ServerHttpResponse response = exchange.getResponse(); response.setStatusCode(HttpStatus.OK); response.getHeaders().setContentType(MediaType.TEXT_PLAIN); DataBuffer buf = dataBufferFactory.wrap("Hello from exchange".getBytes(StandardCharsets.UTF_8)); return response.writeWith(Flux.just(buf)); } //другие методы }
Вместо обычной строки используется DataBuffer, поскольку в реактивном подходе мы стремимся к производительности.
Метод возвращает тип Mono<Void>. Как говорилось в 1 части, этот тип используется тогда, когда надо дать сигнал завершения. Больше он ничего не передает.
Mono.never()
В 1 части мы видели, что Publisher может более или менее быстро завершиться, а может и не завершиться вовсе. В следующем методе контроллера мы попробуем вернуть Publisher, который никогда не завершается. То есть добавим такой метод:
@RequestMapping("/waitforit") public Mono<String> waiting() { return Mono.never(); }
В браузере на запрос
localhost:8080/waitforit
мы увидим долгое ожидание, и все (1:40:50). Ответ не возвращается. Делать так в реальных приложениях не надо, это просто пример управления временем.
Mono.error()
Теперь вернем для примера ошибку:
@RequestMapping("/error") public Mono<String> error() { return Mono.error(new IllegalArgumentException("My custom error message")); }
Результат:
Приложение для вывода пользователей, списка задач и чата Gitter
Теперь перейдем к реальному приложению.
Конфигурация
В нем будут шаблоны на Freemarker, поэтому сконфигурируем FreeMarkerConfigurer и ViewResolver:
@Configuration public class WebConfig implements WebFluxConfigurer { @Bean public FreeMarkerConfigurer freeMarkerConfigurer(ReactiveWebApplicationContext applicationContext) { FreeMarkerConfigurer configurer = new FreeMarkerConfigurer(); configurer.setTemplateLoaderPath("classpath:/templates/"); configurer.setResourceLoader(applicationContext); return configurer; } @Override public void configureViewResolvers(ViewResolverRegistry registry) { registry.freeMarker(); } }
Обратите внимание, что класс конфигурации реализует интерфейс WebFluxConfigurer.
Главная страница
Главная страница использует Freemarker шаблон home.ftl. Вот наш контроллер и метод для главной страницы:
@Controller public class DashboardController { //другие методы и конструктор @GetMapping("/") public String home() { return "home"; } }
Тут пока нет ничего реактивного.
Реактивный репозиторий
Далее выведем список людей. Тут уже используется реактивный репозиторий, который мы внедряем в контроллер:
@Controller public class DashboardController { private final ReactorPersonRepository repository; @Autowired public DashboardController(DashboardService dashboardService, ReactorPersonRepository repository) { this.repository = repository; } @GetMapping("/reactor/people") @ResponseBody public Flux<ReactorPerson> findReactorPeople() { return this.repository.findAll(); } //другие методы }
ReactorPerson — сущность MongoDB:
@Document public class ReactorPerson { private String id; private String name; //getters/setters/constructor }
А ReactorPersonRepository расширяет org.springframework.data.repository.reactive.ReactiveCrudRepository:
public interface ReactorPersonRepository extends ReactiveCrudRepository<ReactorPerson, String> { }
Заполнение базы данными
Базу данных заполняем в главном классе приложения с помощью бина CommandLineRunner, который позволяет выполнить команды на старте приложения:
@SpringBootApplication @EnableConfigurationProperties({DashboardProperties.class, MongoProperties.class}) @EnableAutoConfiguration(exclude={MongoAutoConfiguration.class, MongoDataAutoConfiguration.class}) public class SpringReactiveDashboardApplication { public static void main(String[] args) { SpringApplication.run(SpringReactiveDashboardApplication.class, args); } @Bean public CommandLineRunner initDatabase(ReactorPersonRepository repository) { Flux<ReactorPerson> people = Flux.just( new ReactorPerson("smaldini", "Stephane Maldini"), new ReactorPerson("simonbasle", "Simon Basle"), new ReactorPerson("akarnokd", "David Karnok"), new ReactorPerson("rstoya05", "Rossen Stoyanchev"), new ReactorPerson("sdeleuze", "Sebastien Deleuze"), new ReactorPerson("poutsma", "Arjen Poutsma"), new ReactorPerson("bclozel", "Brian Clozel") ); return args -> { repository.deleteAll().thenMany(repository.saveAll(people)).blockLast(); }; } }
Мы сначала удаляем данные из репозитория, и затем добавляем в него пользователей. Это блокирующие команды.
При запросе:
http://localhost:8080/reactor/people
выведется список добавленных пользователей в формате JSON. Пользователи поступают и сериализуются по очереди, они не накапливаются в буфере. Потенциально тут может быть бесконечный поток (см. в примере тут)
Вывод одного пользователя
Давайте теперь напишем метод для вывода одного пользователя. Возвращаемый тип тут будет Mono<ReactorPerson>:
@GetMapping("/reactor/people/{id}") @ResponseBody public Mono<ReactorPerson> findReactorPerson(@PathVariable String id) { return this.repository.findById(id) .switchIfEmpty(Mono.error(new ReactorPersonNotFoundException(id))); }
При запросе в браузере мы увидим одного пользователя в формате JSON.
Если пользователя с заданным id не существует, то this.repository.findById(id) вернет пустую страницу со статусом 200.
Можно вернуть какого-либо пользователя по умолчанию (если пользователя с заданным id не существует), делается это командой:
this.repository.findById(id).defaultIfEmpty(...)
В нашем примере выше при отсутствии пользователя выбрасывается и обрабатывается исключение:
@ExceptionHandler public ResponseEntity handleNotFoundException(ReactorPersonNotFoundException exc) { return ResponseEntity.notFound().build(); }
В консоли браузера будет ошибка 400.
Страница списка задач — обращение к REST-сервисам
Теперь нам пригодятся токены от GitHub и Gitter. Мы напишем классы GithubClient и GitterClient, которые обращаются к GitHub API и Gitter API .
Сначала напишем GithubClient.
GithubClient
Для обращения к GitHub API технически можно использовать RestTemplate, но мы продемонстрируем реактивный подход и будем пользоваться:
import org.springframework.web.reactive.function.client.WebClient
Он создается так (в конструкторе):
@Component public class GithubClient { private final WebClient webClient; public GithubClient(DashboardProperties properties) { this.webClient = WebClient.builder() .baseUrl("https://api.github.com") .filter(ExchangeFilterFunctions .basicAuthentication(properties.getGithub().getUsername(), properties.getGithub().getToken())) .filter(userAgent()).build(); } // другие методы private ExchangeFilterFunction userAgent() { return (clientRequest, exchangeFunction) -> { ClientRequest newRequest = ClientRequest.from(clientRequest) .header("User-Agent", "Spring Framework WebClient").build(); return exchangeFunction.exchange(newRequest); }; } }
- Имя пользователя и токен от Github хранятся в классе свойств DashboardProperties, внедряем их через конструктор.
- webClient создается с помощью билдера, первый фильтр — метод filter() — добавляет basic authentication header
- второй фильтр добавляет User-Agent header, для этого у нас отдельный метод userAgent(), он показан в коде выше. Заголовок User-Agent тоже требует GitGub API.
- ClientRequest — immutable, то есть неизменяемый объект, поэтому чтобы добавить заголовок, мы создаем новый объект из старого.
Итак, webClient создан. Напишем теперь метод, возвращающий список задач, этот метод тоже в классе GithubClient:
public Flux<GithubIssue> findOpenIssues(String owner, String repo) { return this.webClient.get() .uri("/repos/{owner}/{repo}/issues?state=open", owner, repo) .accept(VND_GITHUB_V3).exchange() .flatMapMany(response -> response.bodyToFlux(GithubIssue.class)); }
Метод flatMapMany() превращает Mono во Flux.
Метод accept() добавляет соответствующий заголовок accept (значение константы — MediaType.valueOf(«application/vnd.github.v3+json»)
Мы получили список задач типа GithubIssue, но это пока не тот список. Конечной целью является получить список задач типа ReactorIssue, в котором помимо названия задачи и имени создавшего ее пользователя присутствует онлайн-статус пользователя. А для этого придется обратиться к Gitter API. А затем составить комбинацию обоих результатов.
GitterClient
Этот клиент аналогичный, только вместо basic authentication используется oauth token.
Список пользователей (канала Gitter) получаем методом getUsersInRoom():
@Component public class GitterClient { private final WebClient webClient; public GitterClient(DashboardProperties properties) { this.webClient = WebClient.builder() .filter(oAuthToken(properties.getGitter().getToken())).build(); } public Flux<GitterUser> getUsersInRoom(String roomId, int limit) { return this.webClient .get().uri("https://api.gitter.im/v1/rooms/{roomId}/users?limit={limit}", roomId, limit) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany(response -> response.bodyToFlux(GitterUser.class)); } //другие методы }
Здесь используется roomId — идентификатор реального Gitter-канала, он зашит в коде в свойствах DashboardProperties.
Комбинация результатов в сервисе DashboardDefaultService
Далее в DashboardDefaultService мы комбинируем результаты запросов к GitHub API и Gittter API в методе findReactorIssues():
@Service public class DefaultDashboardService implements DashboardService { //поля, конструктор @Override public Flux<ReactorIssue> findReactorIssues() { Flux<GithubIssue> issues = this.githubClient.findOpenIssues("reactor", "reactor-core"); Mono<List<GitterUser>> users = this.gitterClient .getUsersInRoom(this.properties.getReactor().getGitterRoomId(), 300) .collectList(); return users.flatMapMany(gitterUserList -> { return issues.map(issue -> { String userLogin = issue.getUser().getLogin(); Optional<GitterUser> gitterUser = gitterUserList.stream() .filter(gu -> gu.getUsername().equals(userLogin)).findFirst(); return new ReactorIssue(issue, gitterUser.isPresent()); }); }); } // другие методы }
Первые две команды — это обращения к API. Надо из Flux<GitHubIssue> получить Flux<ReactorIssue>, в которых присутствует еще и онлайн-статус пользователя — это наша цель. ReactorIssue создается так:
new ReactorIssue(issue, gitterUser.isPresent())
где issue имеет тип GitHubIssue.
Если в методе findReactorIssues() вернуть просто:
return this.githubClient.findOpenIssues("reactor", "reactor-core") .map(issue - > new ReactorIssue(issue, false));
то мы получим список задач, но при этом все пользователи будут иметь статус оффлайн (второй параметр false):
Теперь попытаемся скомбинировать результаты для получения реального онлайн-статуса.
Обратите внимание, что заранее пользователей Gitter делаем типом Mono<List…> с помощью collectList():
Mono<List<GitterUser>> users = this.gitterClient .getUsersInRoom(this.properties.getReactor().getGitterRoomId(), 300) .collectList();
Далее по комбинированию: оператор zip() нам не подходит — он берет за раз один GitHubIssue и один GitterUser, а нам надо для одного GitHubIssue брать целый список пользователей (чтобы найти нужного).
Вот этот оборот возвращает для каждого List<GitterUser> (а у нас он только один) Flux<ReactorIssue>:
return users.flatMapMany(gitterUserList -> { return issues.map(issue -> { ... }); });
Вернемся к контроллеру и добавим метод для вывода задач:
@Controller public class DashboardController { //поля/конструктор //другие методы @GetMapping("/issues") public String issues(Model model) { model.addAttribute("issues", this.dashboardService.findReactorIssues()); return "issues"; } }
Здесь мы выводим Flux<ReactorIssue> во Freemarker-шаблон issues.ftl.
В результате теперь виден реальный онлайн-статус пользователей:
На момент доклада 2016 в году они уже поддерживали Freemarker, поэтому мы просто поместили Publisher в Model и вернули имя view.
А Thymeleaf тогда еще не поддерживали. Поэтому для передачи Publisher в Thymeleaf-шаблон надо было делать так (2:12:46):
@GetMapping("/issues") public Mono<String> issues(Model model) { return this.dashboardService.findReactorIssues().collectList() .then(issues->{ model.addAttribute("issues", issues); return Mono.just("issues"); }); }
Страница сообщений чата
Чтобы вывести последние сообщения канала Gitter, добавим в GitterClient новый метод:
public Flux<GitterMessage> latestChatMessages(String roomId, int limit) { return this.webClient .get() .uri("https://api.gitter.im/v1/rooms/{roomId}/chatMessages?limit={limit}", roomId, limit) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany(response -> response.bodyToFlux(GitterMessage.class)); }
Затем добавим еще один метод в DefaultDashboardService:
@Override public Flux<GitterMessage> getLatestChatMessages(int limit) { return this.gitterClient .latestChatMessages(this.properties.getReactor().getGitterRoomId(), limit); }
И, наконец, перейдем к контроллеру:
@Controller public class DashboardController { // другие методы и конструктор, поля @GetMapping("/chat") public String chat() { return "chat"; } @GetMapping(path = "/chatMessages", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) @ResponseBody public Flux<GitterMessage> chatMessages(@RequestParam(required = false, defaultValue = "10") String limit) { return this.dashboardService.getLatestChatMessages(Integer.parseInt(limit)); } }
Метод chat() возвращает view chat.ftl
Метод chatMessages() возвращает сообщения как JSON, к нему идет обращение уже непосредственно из шаблона chat.ftl на JavaScript — ajax-запрос:
$(function () { // еще код $.ajax("/chatMessages") .done(function (messages) { messages.forEach(function (msg) { appendChatMessage(msg); }); }); });
При запросе
http://localhost:8080/chat
получим страницу последних сообщений:
Сообщения чата как Server Sent Events
А теперь выведем сообщения как реальный чат, который обновляется push-ем со стороны сервера.
GitterClient
Для этого добавим в GitterClient новый метод:
public Flux<GitterMessage> streamChatMessages(String roomId) { return this.webClient .get() .uri("https://stream.gitter.im/v1/rooms/{roomId}/chatMessages", roomId) .accept(MediaType.TEXT_EVENT_STREAM) .exchange() .flatMapMany(response -> response.bodyToFlux(GitterMessage.class)); }
Обратите внимание, теперь мы обращаемся по другому uri, не к REST API, а Streaming API. Также мы задаем другой заголовок .accept(MediaType.TEXT_EVENT_STREAM) — это специальный заголовок для SSE (Server Sent Events).
DefaultDashboardService
Добавим соответствующий метод в DefaultDashboardService:
@Override public Flux<GitterMessage> streamChatMessages() { String roomId = this.properties.getReactor().getGitterRoomId(); return this.gitterClient.streamChatMessages(roomId); }
DashboardController
И добавим метод в контроллер:
@GetMapping(value = "/chatStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody public Flux<GitterMessage> streamChatMessages() { return this.dashboardService.streamChatMessages(); }
Снова обратите внимание, что в контроллере требуется указать тип MediaType.TEXT_EVENT_STREAM_VALUE.
Теперь в шаблон chat.ftl надо добавить JavaScript код для работы с SSE:
var chatEventSource = new EventSource("/chatStream"); chatEventSource.onmessage = function (e) { appendChatMessage(JSON.parse(e.data)); }
Теперь чат обновляется при появлении новых сообщений в канале без перезагрузки страницы браузера.
Итог
Мы написали веб-приложение на реактивном стеке, но тут показаны не все возможности, более интересные см. тут.