В этой статье показан пример использования Service Activator.
Попутно показан IntegrationFlow, DirectChannel и как отправлять сообщения через @Gateway.
Проблема
Допустим, у нас есть класс A, осуществляющий обработку объекта Animal, и в классе A имеются зависимости B, C..и так далее, тоже обрабатывающие Animal.
Набор зависимостей получается слишком большой, код некрасивый:
@Service public class A { @Autowired private B b; @Autowired private C c; // ..другие зависимости public Animal process(Animal animal) { b.process(animal); c.process(animal); // ..другие обработки System.out.println("A is processing animal"); return animal; } }
Идея в том, чтобы упростить метод process() в классе A — пусть каждая зависимость B, С…обрабатывает Animal по очереди, а мы будем пересылать обработанный Animal из метода B.process() в метод C.process() … и так далее до A.process(). Делать это мы будем с помощью Spring Integration, а именно с помощью Service Activator (это конечная точка, которая может вызвать метод какого-либо бина).
Решение
Основа Spring Integration — это сообщения, каналы (по которым идут сообщения) и конечные точки (в которых делается перенаправление, фильтрация, вызывается метод и т.п.). Приложение теперь выглядит так:
@SpringBootApplication public class SpringIntegrationApplication { @Bean DirectChannel outputChannel() { return new DirectChannel(); } @MessagingGateway public interface I { @Gateway(requestChannel = "animalFlow.input") void process(Animal animal); } // канал DirectChannel с именем animalFlow.input создается автоматически @Bean public IntegrationFlow animalFlow() { return flow -> flow .handle("bService", "process") .handle("cService", "process") .handle("aService", "process") .channel("outputChannel"); } public static void main(String[] args) { SpringApplication.run(SpringIntegrationApplication.class, args); ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationApplication.class, args); DirectChannel outputChannel = ctx.getBean("outputChannel", DirectChannel.class); // обработчик внутри subscribe выполнится как только закончится выполнение flow outputChannel.subscribe(x -> System.out.println(x)); // запускаем выполнение flow ctx.getBean(I.class) .process(new Animal("cat")); // можно было запустить flow отправкой сообщения во входной канал input: // MessageChannel inputChannel = ctx.getBean("animalFlow.input", MessageChannel.class); // inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build()); ctx.close(); } }
IntegrationFlow — поток обработки
Главная часть — пересылка сообщения между сервисами B, C, А — задана в бине IntegrationFlow. Это поток, в данном случае он задан лямбда-выражением.
Service Activator — метод handle()
Метод handle() вызывает метод process() бинов bService, cService и aService. Выглядят они теперь так:
@Service("aService") public class A { public Animal process(Animal animal) { System.out.println("A is processing animal"); return animal; } } @Service("bService") public class B { public Animal process(Animal animal){ System.out.println("B is processing animal"); return animal; } } @Service("cService") public class C { public Animal process(Animal animal) { System.out.println("C is processing animal"); return animal; } }
Как видите, A не перегружен никакими зависимости и в его методе process() не вызываются методы других бинов.
Это что касается потока.
GenericMessage — сообщение
Само сообщение — объект Animal — оборачивается при пересылке в тип GenericMessage, но это здесь не важно.
DirectChannel — канал
Чтобы его получить, мы выводим его в канал outputChannel с помощью метода channel(«outputChannel») в самом конце. Это канал типа DirectChannel, что означает, что когда по потоку IntegrationFlow данные попадут в канал, запрашивать их из канала не надо, достаточно заранее подписаться на канал.
Что касается DirectChannel, мы подписываемся на него еще до прогона потока и задаем обработчик, который выполнится как только данные попадут в канал. В нашем обработчике мы выводим пришедшие данные:
outputChannel.subscribe(x -> System.out.println(x));
Результат выглядит так:
GenericMessage [payload=ru.sysout.springintegration.model.Animal@4a067c25, headers={id=5c9ac4d5-8e8d-05bf-a685-8703c4cdbf4e, timestamp=1540467581954}]
Видно, что наш Animal обернут в GenericMessage, а точнее Animal является пейлоадом — полезной нагрузкой сообщения. В сообщении есть еще заголовки, но в нашем примере они не используются.
@Gateway — запуск потока IntegrationFlow
Чтобы запустить поток и отправить сообщение мы используем аннотированный метод интерфейса I:
ctx.getBean(I.class).process(new Animal("cat"));
Этот интерфейс создан исключительно для отправки сообщения (первоначального необработанного Animal) во входной канал потока. Выглядит он так, обязательны аннотации @MessagingGateway и @Gateway:
@MessagingGateway public interface I { @Gateway(requestChannel = "animalFlow.input") void process(Animal animal); }
Можно было бы отправить сообщение без всяких интерфейсов @MessagingGateway, задействуя методы SI, но @MessagingGateway позволяет нам не углубляться в методы SI, отделяя от внешнего кода SI. Если же углубиться, то делается это так:
MessageChannel inputChannel = ctx.getBean("animalFlow.input", MessageChannel.class); inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());
В inputChannel мы отправляем сообщение (обернутый в GenericMessage объект Animal) методом send().
Но первый способ предпочтительнее. Не всегда, конечно, мы отправляем сообщения напрямую, существует еще конечная точка Channel adapter, которая позволяет брать сообщения из внешнего источника — JMS, файловой системы, почты базы данных и т.п. кучи других мест, все их вы найдете в доках.
Gateway vs Adapter
У нас был частный случай гейтвея с методом типа void, но в общем случае Gateway возвращает сообщения. То есть он может как получать сообщение, так и отправлять ответ. Чтобы показать это, давайте сменим тип void нашего метода в гейтвее на тип Animal (у нас тип void мешал отправлять ответ):
@MessagingGateway public interface I { @Gateway(requestChannel = "animalFlow.input") Animal process(Animal animal); }
Теперь больше не нужно отправлять сообщения в outputChannel, подписываться на него и ловить там результат. Сообщения будут сразу возвращаться методом process():
Animal animal = ctx.getBean(I.class).process(new Animal("cat"));
Поток же теперь без outputChannel (тоже принадлежащего SI) выглядит так:
@Bean public IntegrationFlow animalFlow() { return flow -> flow.handle("aService", "process") .handle("bService", "process") .handle("cService", "process"); }
Получился полноценный гейтвей, получающий сообщения извне SI (Spring Integration) и отдающий их обратно, вовне SI. Но учтите, что метод process() теперь будет ждать ответа до победного конца, и если, например, мы в потоке отфильтруем сообщение с помощью конечной точки Filter, и оно не придет, то программа остановится в бесконечном ожидании ответа:
@Bean public IntegrationFlow animalFlow() { return flow -> flow.handle("aService", "process") .handle("bService", "process") .handle("cService", "process") .<Animal>filter(a->!a.getName().startsWith("cat")); }
А в примере с void и outputChannel программа бы завершилась несмотря на фильтр, хотя сообщений в outputChannel бы тоже не было:
//если метод process() снова делать типа void, ожидания не будет, хоть фильтр и не позволяет дойти сообщению до конца потока. @Bean public IntegrationFlow animalFlow() { return flow -> flow.handle("aService", "process") .handle("bService", "process") .handle("cService", "process") .<Animal>filter(a->!a.getName().startsWith("cat")) .channel("outputChannel"); }
Channel Inbound adapter — endpoint — endpoint -.. endpoint — Channel Outbound adaper.
В отличие от гейтвея, адаптер либо только получает сообщения, либо только отправляет.
Channel Inbound адаптер стоит в начале потока и получает сообщения (из почты, базы или другого внешнего источника). Далее они идут по потоку Spring Integration, никакого ответа во внешний источник Channel Inbound adapter не отдает.
Channel Outbound адаптер, наоборот, отправляет сообщения во внешнюю систему. Получать сообщения извне он не может. (Он их получает только от Spring integration). Стоит он в конце потока.
С внешними системами SI может общаться не только через адаптеры, но и через гейтвей (например, Amqp.inboundGateway, Tcp.outboundGateway), но опять же, гейтвей остается точкой, которая как получает сообщение, так и обязательно отправляет ответ. В гейтвеях течение двунаправленное, а в адаптерах однонаправленное.
Итог
Полный код приложения есть на GitHub. Если что, задавайте вопросы.
В следующей статье рассмотрим пример с Inbound и Outbound адаптерами.
Надо бы ещё добавить роутер и фильтровать ответы — нормально обработанные возвращать в один output, а с ошибками в другой. Получится более реальный пример)