Разработка реактивных приложений с Reactive Streams и Java 8. Часть 2.

В этой части показывается, как использовать Reactive Streams в реальном приложении. Первая часть тут.
Код приложения есть на GitHub. Обратите внимание, что для запуска необходимо установить MongoDB, а также получить токены к API GitHub и API Gitter.

Итак, в примере:

  • Обращение к внешним REST-сервисам (а именно GitHub и Gitter API)
  • Работа с репозиторием
  • Обработка ошибок
  • Отображение html view и статических ресурсов
  • Server Sent Events (SSE)
  •  и другое

И все это в реактивном стиле.

Пример реактивного контроллера

Ниже представлен обычный контроллер с обычным методом test(), написанном в традиционном стиле. Мы будем добавлять в контроллер новые методы, в которых используется реактивный подход.

Mono.just()

Первый метод hello():

@RestController
@RequestMapping("/demo")
public class DemoController {

	// You should create and share a single instance, or even preferrably
	// get the the factory from the underlying container
	private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();

	@RequestMapping("/test")
	public String test() {
		return "This is a test";
	}

	@RequestMapping("/hello")
	public Mono<String> hello(@RequestParam String name) {
		return Mono.just("Hello, " + name + "!");
	}

        // другие методы
}

Если ввести в браузере строку:

localhost:8080/hello?name=world

то пойдет запрос, который вызовет subscription chain (цепочку операторов).

ServerWebExchange

Создадим еще один метод (в том же контроллере DemoController). В нем мы покажем использование более низкоуровневого API.

С версии Spring 5 появилось понятие «обмена», а именно класс org.springframework.web.server.ServerWebExchange. Объект типа ServerWebExchange можно инжектировать как параметр метода контроллера.

ServerWebExchange содержит ServerHttpRequest и ServerHttpResponse — это тоже новые интерфейсы, не имеющие отношения к сервлетам.

В response мы передаем код ответа и заголовки. Для формирования body нам нужен Publisher, на который фреймворк может подписаться и использовать для записи в response. Мы его передаем в метод writeWith().

@RestController
@RequestMapping("/demo")
public class DemoController {

     private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
    
    //другие методы 

    @RequestMapping("/exchange")
    public Mono<Void> exchange(ServerWebExchange exchange) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.OK);
        response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
        DataBuffer buf = dataBufferFactory.wrap("Hello from exchange".getBytes(StandardCharsets.UTF_8));
        return response.writeWith(Flux.just(buf));
    }

   //другие методы 
}

Вместо обычной строки используется DataBuffer, поскольку в реактивном подходе мы стремимся к производительности.

Метод возвращает тип Mono<Void>. Как говорилось в 1 части, этот тип используется тогда, когда надо дать сигнал завершения. Больше он ничего не передает.

Mono.never()

В 1 части мы видели, что Publisher может более или менее быстро завершиться, а может и не завершиться вовсе. В следующем методе контроллера мы попробуем вернуть Publisher, который никогда не завершается. То есть добавим такой метод:

@RequestMapping("/waitforit")
public Mono<String> waiting() {
   return Mono.never();
}

В браузере на запрос

localhost:8080/waitforit

мы увидим долгое ожидание, и все (1:40:50). Ответ не возвращается. Делать так в реальных приложениях не надо, это просто пример управления временем.

Mono.error()

Теперь вернем для примера ошибку:

@RequestMapping("/error")
public Mono<String> error() {
    return Mono.error(new IllegalArgumentException("My custom error message"));
}

Результат:

return Mono.error() - результат в браузере
return Mono.error() — результат в браузере

Приложение для вывода пользователей, списка задач и чата Gitter

Теперь перейдем к реальному приложению.

Конфигурация

В нем будут шаблоны на Freemarker, поэтому сконфигурируем FreeMarkerConfigurer  и ViewResolver:

@Configuration
public class WebConfig implements WebFluxConfigurer {

	@Bean
	public FreeMarkerConfigurer freeMarkerConfigurer(ReactiveWebApplicationContext applicationContext) {
		FreeMarkerConfigurer configurer = new FreeMarkerConfigurer();
		configurer.setTemplateLoaderPath("classpath:/templates/");
		configurer.setResourceLoader(applicationContext);
		return configurer;
	}

	@Override
	public void configureViewResolvers(ViewResolverRegistry registry) {
		registry.freeMarker();
	}
}

Обратите внимание, что класс конфигурации реализует интерфейс WebFluxConfigurer.

Главная страница

Главная страница использует Freemarker шаблон home.ftl. Вот наш контроллер и метод для главной страницы:

@Controller
public class DashboardController {

	//другие методы и конструктор
 
	@GetMapping("/")
	public String home() {
		return "home";
	}
}

Тут пока нет ничего реактивного.

Реактивный репозиторий

Далее выведем список людей. Тут уже используется реактивный репозиторий, который мы внедряем в контроллер:

@Controller
public class DashboardController {


	private final ReactorPersonRepository repository;

	@Autowired
	public DashboardController(DashboardService dashboardService, ReactorPersonRepository repository) {
		this.repository = repository;
	}

	@GetMapping("/reactor/people")
	@ResponseBody
	public Flux<ReactorPerson> findReactorPeople() {
		return this.repository.findAll();
	}
        
        //другие методы
}

ReactorPerson — сущность MongoDB:

@Document
public class ReactorPerson {

    private String id;

    private String name;
    
    //getters/setters/constructor 
}

А ReactorPersonRepository расширяет org.springframework.data.repository.reactive.ReactiveCrudRepository:

public interface ReactorPersonRepository extends ReactiveCrudRepository<ReactorPerson, String> {

}
Заполнение базы данными

Базу данных заполняем в главном классе приложения с помощью бина CommandLineRunner, который позволяет выполнить команды на старте приложения:

@SpringBootApplication
@EnableConfigurationProperties({DashboardProperties.class, MongoProperties.class})
@EnableAutoConfiguration(exclude={MongoAutoConfiguration.class, MongoDataAutoConfiguration.class})
public class SpringReactiveDashboardApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringReactiveDashboardApplication.class, args);
	}

	@Bean
	public CommandLineRunner initDatabase(ReactorPersonRepository repository) {

		Flux<ReactorPerson> people = Flux.just(
						new ReactorPerson("smaldini", "Stephane Maldini"),
						new ReactorPerson("simonbasle", "Simon Basle"),
						new ReactorPerson("akarnokd", "David Karnok"),
						new ReactorPerson("rstoya05", "Rossen Stoyanchev"),
						new ReactorPerson("sdeleuze", "Sebastien Deleuze"),
						new ReactorPerson("poutsma", "Arjen Poutsma"),
						new ReactorPerson("bclozel", "Brian Clozel")
				);

		return args -> {
			repository.deleteAll().thenMany(repository.saveAll(people)).blockLast();
		};
	}

}

Мы сначала удаляем данные из репозитория, и затем добавляем в него пользователей. Это блокирующие команды.

При запросе:

http://localhost:8080/reactor/people

выведется список добавленных пользователей в формате JSON. Пользователи поступают и сериализуются по очереди, они не накапливаются в буфере. Потенциально тут может быть бесконечный поток  (см. в примере тут)

Вывод одного пользователя

Давайте теперь напишем метод для вывода одного пользователя. Возвращаемый тип тут будет Mono<ReactorPerson>:

@GetMapping("/reactor/people/{id}")
@ResponseBody
public Mono<ReactorPerson> findReactorPerson(@PathVariable String id) {
    return this.repository.findById(id)
        .switchIfEmpty(Mono.error(new ReactorPersonNotFoundException(id)));
}

При запросе в браузере мы увидим одного пользователя в формате JSON.

Если пользователя с заданным id не существует, то this.repository.findById(id) вернет пустую страницу со статусом 200.

Можно вернуть какого-либо пользователя по умолчанию (если пользователя с заданным id не существует), делается это командой:

this.repository.findById(id).defaultIfEmpty(...)

В нашем примере выше при отсутствии пользователя выбрасывается и обрабатывается исключение:

@ExceptionHandler
public ResponseEntity handleNotFoundException(ReactorPersonNotFoundException exc) {
    return ResponseEntity.notFound().build();
}

В консоли браузера будет ошибка 400.

Страница списка задач — обращение к REST-сервисам

Теперь нам пригодятся токены от GitHub и Gitter. Мы напишем классы GithubClient и GitterClient, которые обращаются к GitHub API и Gitter API .

Сначала напишем GithubClient.

GithubClient

Для обращения к GitHub API технически можно использовать RestTemplate, но мы продемонстрируем реактивный подход и будем пользоваться:

import org.springframework.web.reactive.function.client.WebClient

Он создается так (в конструкторе):

@Component
public class GithubClient {

    private final WebClient webClient;

    public GithubClient(DashboardProperties properties) {
        this.webClient = WebClient.builder()
	    .baseUrl("https://api.github.com")
            .filter(ExchangeFilterFunctions
 						            .basicAuthentication(properties.getGithub().getUsername(), properties.getGithub().getToken()))
	    .filter(userAgent()).build();
	}

      // другие методы
     
      private ExchangeFilterFunction userAgent() {
		return (clientRequest, exchangeFunction) -> {
			ClientRequest newRequest = ClientRequest.from(clientRequest)
					.header("User-Agent", "Spring Framework WebClient").build();
			return exchangeFunction.exchange(newRequest);
		};
      }

}
  • Имя пользователя и токен от Github хранятся в классе свойств DashboardProperties, внедряем их через конструктор.
  • webClient создается с помощью билдера, первый фильтр — метод filter() — добавляет basic authentication header
  • второй фильтр добавляет User-Agent header, для этого у нас отдельный метод userAgent(), он показан в коде выше. Заголовок User-Agent тоже требует GitGub API.
  • ClientRequest — immutable, то есть неизменяемый объект, поэтому чтобы добавить заголовок, мы создаем новый объект из старого.

Итак, webClient создан. Напишем теперь метод, возвращающий список задач, этот метод тоже в классе GithubClient:

public Flux<GithubIssue> findOpenIssues(String owner, String repo) {
	return this.webClient.get()
            .uri("/repos/{owner}/{repo}/issues?state=open", owner, repo)
            .accept(VND_GITHUB_V3).exchange()
            .flatMapMany(response -> response.bodyToFlux(GithubIssue.class));
}

Метод flatMapMany() превращает Mono во Flux.

Метод accept() добавляет соответствующий заголовок accept (значение константы  — MediaType.valueOf(«application/vnd.github.v3+json»)

Мы получили список задач типа GithubIssue, но это пока не тот список. Конечной целью является получить список задач типа ReactorIssue, в котором помимо названия задачи и имени создавшего ее пользователя присутствует онлайн-статус пользователя. А для этого придется обратиться к Gitter API. А затем составить комбинацию обоих результатов.

GitterClient

Этот клиент аналогичный, только вместо basic authentication используется oauth token.

Список пользователей (канала Gitter) получаем методом getUsersInRoom():

@Component
public class GitterClient {

	private final WebClient webClient;

	public GitterClient(DashboardProperties properties) {
		this.webClient = WebClient.builder()
				.filter(oAuthToken(properties.getGitter().getToken())).build();
	}

	public Flux<GitterUser> getUsersInRoom(String roomId, int limit) {
		return this.webClient
				.get().uri("https://api.gitter.im/v1/rooms/{roomId}/users?limit={limit}", roomId, limit)
				.accept(MediaType.APPLICATION_JSON)
				.exchange()
				.flatMapMany(response -> response.bodyToFlux(GitterUser.class));
	}

   //другие методы
}

Здесь используется roomId — идентификатор реального Gitter-канала, он зашит в коде в свойствах DashboardProperties.

Комбинация результатов в сервисе DashboardDefaultService

Далее в DashboardDefaultService мы комбинируем результаты запросов к GitHub API и Gittter API в методе findReactorIssues():

@Service
public class DefaultDashboardService implements DashboardService {

        //поля, конструктор

	@Override
	public Flux<ReactorIssue> findReactorIssues() {
		Flux<GithubIssue> issues = this.githubClient.findOpenIssues("reactor", "reactor-core");
		Mono<List<GitterUser>> users = this.gitterClient
				.getUsersInRoom(this.properties.getReactor().getGitterRoomId(), 300)
				.collectList();

		return users.flatMapMany(gitterUserList -> {
			return issues.map(issue -> {
				String userLogin = issue.getUser().getLogin();
				Optional<GitterUser> gitterUser = gitterUserList.stream()
						.filter(gu -> gu.getUsername().equals(userLogin)).findFirst();

				return new ReactorIssue(issue, gitterUser.isPresent());
			});
		});
	}

 
     // другие методы

}

Первые две команды — это обращения к API. Надо из Flux<GitHubIssue> получить Flux<ReactorIssue>, в которых присутствует еще и онлайн-статус пользователя — это наша цель. ReactorIssue создается так:

new ReactorIssue(issue, gitterUser.isPresent())

где issue имеет тип GitHubIssue.

Если в методе findReactorIssues() вернуть просто:

 return this.githubClient.findOpenIssues("reactor", "reactor-core")
            .map(issue - > new ReactorIssue(issue, false));

то мы получим список задач, но при этом все пользователи будут иметь статус оффлайн (второй параметр false):

Третья колонка — все оффлайн

Теперь попытаемся скомбинировать результаты для получения реального онлайн-статуса.

Обратите внимание, что заранее пользователей Gitter делаем типом Mono<List…> с помощью collectList():

Mono<List<GitterUser>> users = this.gitterClient
			.getUsersInRoom(this.properties.getReactor().getGitterRoomId(), 300)
			.collectList();

Далее по комбинированию: оператор zip() нам не подходит — он берет за раз один GitHubIssue и один GitterUser, а нам надо для одного GitHubIssue брать целый список пользователей (чтобы найти нужного).

Вот этот оборот возвращает для каждого List<GitterUser> (а у нас он только один) Flux<ReactorIssue>:

return users.flatMapMany(gitterUserList -> {
    return issues.map(issue -> {
	  ...
    });
});

Вернемся к контроллеру и добавим метод для вывода задач:

@Controller
public class DashboardController {

    //поля/конструктор
       
    //другие методы

    @GetMapping("/issues")
    public String issues(Model model) {
        model.addAttribute("issues", this.dashboardService.findReactorIssues());
	return "issues";
    }
}

Здесь мы выводим Flux<ReactorIssue> во Freemarker-шаблон issues.ftl.

В результате теперь виден реальный онлайн-статус пользователей:

Список задач с реальным статутом пользователей

На момент доклада 2016 в году они уже поддерживали Freemarker, поэтому мы просто поместили Publisher в Model и вернули имя view.

А Thymeleaf  тогда еще не поддерживали. Поэтому для передачи Publisher в Thymeleaf-шаблон надо было делать так (2:12:46):

@GetMapping("/issues") 
public Mono<String> issues(Model model) { 
    return  this.dashboardService.findReactorIssues().collectList()
            .then(issues->{
                 model.addAttribute("issues", issues);
                 return Mono.just("issues");
              });
}

Страница сообщений чата

Чтобы вывести последние сообщения канала Gitter, добавим в GitterClient новый метод:

public Flux<GitterMessage> latestChatMessages(String roomId, int limit) {
	return this.webClient
			.get()
			.uri("https://api.gitter.im/v1/rooms/{roomId}/chatMessages?limit={limit}", roomId, limit)
			.accept(MediaType.APPLICATION_JSON)
			.exchange()
			.flatMapMany(response -> response.bodyToFlux(GitterMessage.class));
}

Затем добавим еще один метод в DefaultDashboardService:

@Override
public Flux<GitterMessage> getLatestChatMessages(int limit) {
    return this.gitterClient
	.latestChatMessages(this.properties.getReactor().getGitterRoomId(), limit);
}

И, наконец, перейдем к контроллеру:

@Controller
public class DashboardController {

    // другие методы и конструктор, поля

    @GetMapping("/chat")
    public String chat() {
        return "chat";
    }

    @GetMapping(path = "/chatMessages", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    @ResponseBody
    public Flux<GitterMessage> chatMessages(@RequestParam(required = false, defaultValue = "10") String limit) {
        return this.dashboardService.getLatestChatMessages(Integer.parseInt(limit));
    }


}

Метод chat() возвращает view chat.ftl

Метод chatMessages() возвращает сообщения как JSON, к нему идет обращение уже непосредственно из шаблона chat.ftl на JavaScript — ajax-запрос:

    $(function () {
        
        // еще код
        
        $.ajax("/chatMessages")
		.done(function (messages) {
			messages.forEach(function (msg) {
					appendChatMessage(msg);
	                });
	        });
    });

 

При запросе

http://localhost:8080/chat

получим страницу последних сообщений:

Страница сообщений. Обновляется только по F5

Сообщения чата как Server Sent Events

А теперь выведем сообщения как реальный чат, который обновляется push-ем со стороны сервера.

GitterClient

Для этого добавим в GitterClient новый метод:

public Flux<GitterMessage> streamChatMessages(String roomId) {
	return this.webClient
			.get()
			.uri("https://stream.gitter.im/v1/rooms/{roomId}/chatMessages", roomId)
			.accept(MediaType.TEXT_EVENT_STREAM)
			.exchange()
			.flatMapMany(response -> response.bodyToFlux(GitterMessage.class));
}

Обратите внимание, теперь мы обращаемся по другому uri, не к REST API, а Streaming API. Также мы задаем другой заголовок  .accept(MediaType.TEXT_EVENT_STREAM) — это специальный заголовок для SSE (Server Sent Events).

DefaultDashboardService

Добавим соответствующий метод в DefaultDashboardService:

@Override
public Flux<GitterMessage> streamChatMessages() {
	String roomId = this.properties.getReactor().getGitterRoomId();
	return this.gitterClient.streamChatMessages(roomId);
}
 DashboardController

И добавим метод в контроллер:

@GetMapping(value = "/chatStream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<GitterMessage> streamChatMessages() {
    return this.dashboardService.streamChatMessages();
}

Снова обратите внимание, что в контроллере требуется указать тип MediaType.TEXT_EVENT_STREAM_VALUE.

Теперь в шаблон chat.ftl надо добавить JavaScript код для работы с SSE:

	var chatEventSource = new EventSource("/chatStream");
	chatEventSource.onmessage = function (e) {
		appendChatMessage(JSON.parse(e.data));
	}

Теперь чат обновляется при появлении новых сообщений в канале без перезагрузки страницы браузера.

Итог

Мы написали веб-приложение на реактивном стеке, но тут показаны не все возможности, более интересные см. тут.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *