В этой части показывается, как использовать Reactive Streams в реальном приложении. Первая часть тут.
Код приложения есть на GitHub. Обратите внимание, что для запуска необходимо установить MongoDB, а также получить токены к API GitHub и API Gitter.
Итак, в примере:
- Обращение к внешним REST-сервисам (а именно GitHub и Gitter API)
- Работа с репозиторием
- Обработка ошибок
- Отображение html view и статических ресурсов
- Server Sent Events (SSE)
- и другое
И все это в реактивном стиле.
Пример реактивного контроллера
Ниже представлен обычный контроллер с обычным методом test(), написанном в традиционном стиле. Мы будем добавлять в контроллер новые методы, в которых используется реактивный подход.
Mono.just()
Первый метод hello():
@RestController
@RequestMapping("/demo")
public class DemoController {
// You should create and share a single instance, or even preferrably
// get the the factory from the underlying container
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
@RequestMapping("/test")
public String test() {
return "This is a test";
}
@RequestMapping("/hello")
public Mono<String> hello(@RequestParam String name) {
return Mono.just("Hello, " + name + "!");
}
// другие методы
}
Если ввести в браузере строку:
localhost:8080/hello?name=world
то пойдет запрос, который вызовет subscription chain (цепочку операторов).
ServerWebExchange
Создадим еще один метод (в том же контроллере DemoController). В нем мы покажем использование более низкоуровневого API.
С версии Spring 5 появилось понятие «обмена», а именно класс org.springframework.web.server.ServerWebExchange. Объект типа ServerWebExchange можно инжектировать как параметр метода контроллера.
ServerWebExchange содержит ServerHttpRequest и ServerHttpResponse — это тоже новые интерфейсы, не имеющие отношения к сервлетам.
В response мы передаем код ответа и заголовки. Для формирования body нам нужен Publisher, на который фреймворк может подписаться и использовать для записи в response. Мы его передаем в метод writeWith().
@RestController
@RequestMapping("/demo")
public class DemoController {
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
//другие методы
@RequestMapping("/exchange")
public Mono<Void> exchange(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
DataBuffer buf = dataBufferFactory.wrap("Hello from exchange".getBytes(StandardCharsets.UTF_8));
return response.writeWith(Flux.just(buf));
}
//другие методы
}
Вместо обычной строки используется DataBuffer, поскольку в реактивном подходе мы стремимся к производительности.
Метод возвращает тип Mono<Void>. Как говорилось в 1 части, этот тип используется тогда, когда надо дать сигнал завершения. Больше он ничего не передает.
Mono.never()
В 1 части мы видели, что Publisher может более или менее быстро завершиться, а может и не завершиться вовсе. В следующем методе контроллера мы попробуем вернуть Publisher, который никогда не завершается. То есть добавим такой метод:
@RequestMapping("/waitforit")
public Mono<String> waiting() {
return Mono.never();
}
В браузере на запрос
localhost:8080/waitforit
мы увидим долгое ожидание, и все (1:40:50). Ответ не возвращается. Делать так в реальных приложениях не надо, это просто пример управления временем.
Mono.error()
Теперь вернем для примера ошибку:
@RequestMapping("/error")
public Mono<String> error() {
return Mono.error(new IllegalArgumentException("My custom error message"));
}
Результат:

Приложение для вывода пользователей, списка задач и чата Gitter
Теперь перейдем к реальному приложению.
Конфигурация
В нем будут шаблоны на Freemarker, поэтому сконфигурируем FreeMarkerConfigurer и ViewResolver:
@Configuration
public class WebConfig implements WebFluxConfigurer {
@Bean
public FreeMarkerConfigurer freeMarkerConfigurer(ReactiveWebApplicationContext applicationContext) {
FreeMarkerConfigurer configurer = new FreeMarkerConfigurer();
configurer.setTemplateLoaderPath("classpath:/templates/");
configurer.setResourceLoader(applicationContext);
return configurer;
}
@Override
public void configureViewResolvers(ViewResolverRegistry registry) {
registry.freeMarker();
}
}
Обратите внимание, что класс конфигурации реализует интерфейс WebFluxConfigurer.
Главная страница
Главная страница использует Freemarker шаблон home.ftl. Вот наш контроллер и метод для главной страницы:
@Controller
public class DashboardController {
//другие методы и конструктор
@GetMapping("/")
public String home() {
return "home";
}
}
Тут пока нет ничего реактивного.
Реактивный репозиторий
Далее выведем список людей. Тут уже используется реактивный репозиторий, который мы внедряем в контроллер:
@Controller
public class DashboardController {
private final ReactorPersonRepository repository;
@Autowired
public DashboardController(DashboardService dashboardService, ReactorPersonRepository repository) {
this.repository = repository;
}
@GetMapping("/reactor/people")
@ResponseBody
public Flux<ReactorPerson> findReactorPeople() {
return this.repository.findAll();
}
//другие методы
}
ReactorPerson — сущность MongoDB:
@Document
public class ReactorPerson {
private String id;
private String name;
//getters/setters/constructor
}
А ReactorPersonRepository расширяет org.springframework.data.repository.reactive.ReactiveCrudRepository:
public interface ReactorPersonRepository extends ReactiveCrudRepository<ReactorPerson, String> {
}
Заполнение базы данными
Базу данных заполняем в главном классе приложения с помощью бина CommandLineRunner, который позволяет выполнить команды на старте приложения:
@SpringBootApplication
@EnableConfigurationProperties({DashboardProperties.class, MongoProperties.class})
@EnableAutoConfiguration(exclude={MongoAutoConfiguration.class, MongoDataAutoConfiguration.class})
public class SpringReactiveDashboardApplication {
public static void main(String[] args) {
SpringApplication.run(SpringReactiveDashboardApplication.class, args);
}
@Bean
public CommandLineRunner initDatabase(ReactorPersonRepository repository) {
Flux<ReactorPerson> people = Flux.just(
new ReactorPerson("smaldini", "Stephane Maldini"),
new ReactorPerson("simonbasle", "Simon Basle"),
new ReactorPerson("akarnokd", "David Karnok"),
new ReactorPerson("rstoya05", "Rossen Stoyanchev"),
new ReactorPerson("sdeleuze", "Sebastien Deleuze"),
new ReactorPerson("poutsma", "Arjen Poutsma"),
new ReactorPerson("bclozel", "Brian Clozel")
);
return args -> {
repository.deleteAll().thenMany(repository.saveAll(people)).blockLast();
};
}
}
Мы сначала удаляем данные из репозитория, и затем добавляем в него пользователей. Это блокирующие команды.
При запросе:
http://localhost:8080/reactor/people
выведется список добавленных пользователей в формате JSON. Пользователи поступают и сериализуются по очереди, они не накапливаются в буфере. Потенциально тут может быть бесконечный поток (см. в примере тут)
Вывод одного пользователя
Давайте теперь напишем метод для вывода одного пользователя. Возвращаемый тип тут будет Mono<ReactorPerson>:
@GetMapping("/reactor/people/{id}")
@ResponseBody
public Mono<ReactorPerson> findReactorPerson(@PathVariable String id) {
return this.repository.findById(id)
.switchIfEmpty(Mono.error(new ReactorPersonNotFoundException(id)));
}
При запросе в браузере мы увидим одного пользователя в формате JSON.
Если пользователя с заданным id не существует, то this.repository.findById(id) вернет пустую страницу со статусом 200.
Можно вернуть какого-либо пользователя по умолчанию (если пользователя с заданным id не существует), делается это командой:
this.repository.findById(id).defaultIfEmpty(...)
В нашем примере выше при отсутствии пользователя выбрасывается и обрабатывается исключение:
@ExceptionHandler
public ResponseEntity handleNotFoundException(ReactorPersonNotFoundException exc) {
return ResponseEntity.notFound().build();
}
В консоли браузера будет ошибка 400.
Страница списка задач — обращение к REST-сервисам
Теперь нам пригодятся токены от GitHub и Gitter. Мы напишем классы GithubClient и GitterClient, которые обращаются к GitHub API и Gitter API .
Сначала напишем GithubClient.
GithubClient
Для обращения к GitHub API технически можно использовать RestTemplate, но мы продемонстрируем реактивный подход и будем пользоваться:
import org.springframework.web.reactive.function.client.WebClient
Он создается так (в конструкторе):
@Component
public class GithubClient {
private final WebClient webClient;
public GithubClient(DashboardProperties properties) {
this.webClient = WebClient.builder()
.baseUrl("https://api.github.com")
.filter(ExchangeFilterFunctions
.basicAuthentication(properties.getGithub().getUsername(), properties.getGithub().getToken()))
.filter(userAgent()).build();
}
// другие методы
private ExchangeFilterFunction userAgent() {
return (clientRequest, exchangeFunction) -> {
ClientRequest newRequest = ClientRequest.from(clientRequest)
.header("User-Agent", "Spring Framework WebClient").build();
return exchangeFunction.exchange(newRequest);
};
}
}
- Имя пользователя и токен от Github хранятся в классе свойств DashboardProperties, внедряем их через конструктор.
- webClient создается с помощью билдера, первый фильтр — метод filter() — добавляет basic authentication header
- второй фильтр добавляет User-Agent header, для этого у нас отдельный метод userAgent(), он показан в коде выше. Заголовок User-Agent тоже требует GitGub API.
- ClientRequest — immutable, то есть неизменяемый объект, поэтому чтобы добавить заголовок, мы создаем новый объект из старого.
Итак, webClient создан. Напишем теперь метод, возвращающий список задач, этот метод тоже в классе GithubClient:
public Flux<GithubIssue> findOpenIssues(String owner, String repo) {
return this.webClient.get()
.uri("/repos/{owner}/{repo}/issues?state=open", owner, repo)
.accept(VND_GITHUB_V3).exchange()
.flatMapMany(response -> response.bodyToFlux(GithubIssue.class));
}
Метод flatMapMany() превращает Mono во Flux.
Метод accept() добавляет соответствующий заголовок accept (значение константы — MediaType.valueOf(«application/vnd.github.v3+json»)
Мы получили список задач типа GithubIssue, но это пока не тот список. Конечной целью является получить список задач типа ReactorIssue, в котором помимо названия задачи и имени создавшего ее пользователя присутствует онлайн-статус пользователя. А для этого придется обратиться к Gitter API. А затем составить комбинацию обоих результатов.
GitterClient
Этот клиент аналогичный, только вместо basic authentication используется oauth token.
Список пользователей (канала Gitter) получаем методом getUsersInRoom():
@Component
public class GitterClient {
private final WebClient webClient;
public GitterClient(DashboardProperties properties) {
this.webClient = WebClient.builder()
.filter(oAuthToken(properties.getGitter().getToken())).build();
}
public Flux<GitterUser> getUsersInRoom(String roomId, int limit) {
return this.webClient
.get().uri("https://api.gitter.im/v1/rooms/{roomId}/users?limit={limit}", roomId, limit)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(response -> response.bodyToFlux(GitterUser.class));
}
//другие методы
}
Здесь используется roomId — идентификатор реального Gitter-канала, он зашит в коде в свойствах DashboardProperties.
Комбинация результатов в сервисе DashboardDefaultService
Далее в DashboardDefaultService мы комбинируем результаты запросов к GitHub API и Gittter API в методе findReactorIssues():
@Service
public class DefaultDashboardService implements DashboardService {
//поля, конструктор
@Override
public Flux<ReactorIssue> findReactorIssues() {
Flux<GithubIssue> issues = this.githubClient.findOpenIssues("reactor", "reactor-core");
Mono<List<GitterUser>> users = this.gitterClient
.getUsersInRoom(this.properties.getReactor().getGitterRoomId(), 300)
.collectList();
return users.flatMapMany(gitterUserList -> {
return issues.map(issue -> {
String userLogin = issue.getUser().getLogin();
Optional<GitterUser> gitterUser = gitterUserList.stream()
.filter(gu -> gu.getUsername().equals(userLogin)).findFirst();
return new ReactorIssue(issue, gitterUser.isPresent());
});
});
}
// другие методы
}
Первые две команды — это обращения к API. Надо из Flux<GitHubIssue> получить Flux<ReactorIssue>, в которых присутствует еще и онлайн-статус пользователя — это наша цель. ReactorIssue создается так:
new ReactorIssue(issue, gitterUser.isPresent())
где issue имеет тип GitHubIssue.
Если в методе findReactorIssues() вернуть просто:
return this.githubClient.findOpenIssues("reactor", "reactor-core")
.map(issue - > new ReactorIssue(issue, false));
то мы получим список задач, но при этом все пользователи будут иметь статус оффлайн (второй параметр false):

Теперь попытаемся скомбинировать результаты для получения реального онлайн-статуса.
Обратите внимание, что заранее пользователей Gitter делаем типом Mono<List…> с помощью collectList():
Mono<List<GitterUser>> users = this.gitterClient .getUsersInRoom(this.properties.getReactor().getGitterRoomId(), 300) .collectList();
Далее по комбинированию: оператор zip() нам не подходит — он берет за раз один GitHubIssue и один GitterUser, а нам надо для одного GitHubIssue брать целый список пользователей (чтобы найти нужного).
Вот этот оборот возвращает для каждого List<GitterUser> (а у нас он только один) Flux<ReactorIssue>:
return users.flatMapMany(gitterUserList -> {
return issues.map(issue -> {
...
});
});
Вернемся к контроллеру и добавим метод для вывода задач:
@Controller
public class DashboardController {
//поля/конструктор
//другие методы
@GetMapping("/issues")
public String issues(Model model) {
model.addAttribute("issues", this.dashboardService.findReactorIssues());
return "issues";
}
}
Здесь мы выводим Flux<ReactorIssue> во Freemarker-шаблон issues.ftl.
В результате теперь виден реальный онлайн-статус пользователей:

На момент доклада 2016 в году они уже поддерживали Freemarker, поэтому мы просто поместили Publisher в Model и вернули имя view.
А Thymeleaf тогда еще не поддерживали. Поэтому для передачи Publisher в Thymeleaf-шаблон надо было делать так (2:12:46):
@GetMapping("/issues")
public Mono<String> issues(Model model) {
return this.dashboardService.findReactorIssues().collectList()
.then(issues->{
model.addAttribute("issues", issues);
return Mono.just("issues");
});
}
Страница сообщений чата
Чтобы вывести последние сообщения канала Gitter, добавим в GitterClient новый метод:
public Flux<GitterMessage> latestChatMessages(String roomId, int limit) {
return this.webClient
.get()
.uri("https://api.gitter.im/v1/rooms/{roomId}/chatMessages?limit={limit}", roomId, limit)
.accept(MediaType.APPLICATION_JSON)
.exchange()
.flatMapMany(response -> response.bodyToFlux(GitterMessage.class));
}
Затем добавим еще один метод в DefaultDashboardService:
@Override
public Flux<GitterMessage> getLatestChatMessages(int limit) {
return this.gitterClient
.latestChatMessages(this.properties.getReactor().getGitterRoomId(), limit);
}
И, наконец, перейдем к контроллеру:
@Controller
public class DashboardController {
// другие методы и конструктор, поля
@GetMapping("/chat")
public String chat() {
return "chat";
}
@GetMapping(path = "/chatMessages", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@ResponseBody
public Flux<GitterMessage> chatMessages(@RequestParam(required = false, defaultValue = "10") String limit) {
return this.dashboardService.getLatestChatMessages(Integer.parseInt(limit));
}
}
Метод chat() возвращает view chat.ftl
Метод chatMessages() возвращает сообщения как JSON, к нему идет обращение уже непосредственно из шаблона chat.ftl на JavaScript — ajax-запрос:
$(function () {
// еще код
$.ajax("/chatMessages")
.done(function (messages) {
messages.forEach(function (msg) {
appendChatMessage(msg);
});
});
});
При запросе
http://localhost:8080/chat
получим страницу последних сообщений:

Сообщения чата как Server Sent Events
А теперь выведем сообщения как реальный чат, который обновляется push-ем со стороны сервера.
GitterClient
Для этого добавим в GitterClient новый метод:
public Flux<GitterMessage> streamChatMessages(String roomId) {
return this.webClient
.get()
.uri("https://stream.gitter.im/v1/rooms/{roomId}/chatMessages", roomId)
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(response -> response.bodyToFlux(GitterMessage.class));
}
Обратите внимание, теперь мы обращаемся по другому uri, не к REST API, а Streaming API. Также мы задаем другой заголовок .accept(MediaType.TEXT_EVENT_STREAM) — это специальный заголовок для SSE (Server Sent Events).
DefaultDashboardService
Добавим соответствующий метод в DefaultDashboardService:
@Override
public Flux<GitterMessage> streamChatMessages() {
String roomId = this.properties.getReactor().getGitterRoomId();
return this.gitterClient.streamChatMessages(roomId);
}
DashboardController
И добавим метод в контроллер:
@GetMapping(value = "/chatStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<GitterMessage> streamChatMessages() {
return this.dashboardService.streamChatMessages();
}
Снова обратите внимание, что в контроллере требуется указать тип MediaType.TEXT_EVENT_STREAM_VALUE.
Теперь в шаблон chat.ftl надо добавить JavaScript код для работы с SSE:
var chatEventSource = new EventSource("/chatStream");
chatEventSource.onmessage = function (e) {
appendChatMessage(JSON.parse(e.data));
}
Теперь чат обновляется при появлении новых сообщений в канале без перезагрузки страницы браузера.
Итог
Мы написали веб-приложение на реактивном стеке, но тут показаны не все возможности, более интересные см. тут.