Введение в Spring Integration на примере использования Service Activator

В этой статье показан пример использования Service Activator.

Попутно показан IntegrationFlowDirectChannel и как отправлять сообщения через @Gateway.

Проблема

Допустим, у нас есть класс A, осуществляющий обработку объекта Animal, и в классе A имеются зависимости B, C..и так далее, тоже обрабатывающие Animal.

Набор зависимостей получается слишком большой, код некрасивый:

@Service
public class A {
    @Autowired
    private B b;
    @Autowired
    private C c;
    // ..другие зависимости

    public Animal process(Animal animal) {

        b.process(animal);
        c.process(animal);
        // ..другие обработки

        System.out.println("A is processing animal");
        return animal;
    }
}

Идея в том, чтобы упростить метод process() в классе A – пусть каждая зависимость B, С…обрабатывает Animal по очереди, а мы будем пересылать обработанный Animal из метода B.process() в метод C.process() … и так далее до A.process(). Делать это мы будем с помощью Spring Integration, а именно с помощью Service Activator (это конечная точка, которая может вызвать метод какого-либо бина).

Решение

Основа Spring Integration  – это сообщения, каналы (по которым идут сообщения) и конечные точки (в которых делается перенаправление, фильтрация, вызывается метод и т.п.). Приложение теперь выглядит так:

@SpringBootApplication
public class SpringIntegrationApplication {
    @Bean
    DirectChannel outputChannel() {
        return new DirectChannel();
    }

    @MessagingGateway
    public interface I {
        @Gateway(requestChannel = "animalFlow.input")
        void process(Animal animal);
    }

    // канал DirectChannel с именем animalFlow.input создается автоматически
    @Bean
    public IntegrationFlow animalFlow() {
        return flow -> flow
            .handle("bService", "process")
            .handle("cService", "process")
            .handle("aService", "process")
            .channel("outputChannel");
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationApplication.class, args);
        ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationApplication.class, args);

        DirectChannel outputChannel = ctx.getBean("outputChannel", DirectChannel.class);
        // обработчик внутри subscribe выполнится как только закончится выполнение flow
        outputChannel.subscribe(x -> System.out.println(x));
        // запускаем выполнение flow
        ctx.getBean(I.class)
            .process(new Animal("cat"));

        // можно было запустить flow отправкой сообщения во входной канал input:
        // MessageChannel inputChannel = ctx.getBean("animalFlow.input", MessageChannel.class);
        // inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());
        ctx.close();
    }
}

IntegrationFlow – поток обработки

Главная часть – пересылка сообщения между сервисами B, C, А – задана в бине IntegrationFlow. Это поток, в данном случае он задан лямбда-выражением.

Service Activator – метод handle()

Метод handle() вызывает метод process() бинов  bService, cService и aService. Выглядят они теперь так:

@Service("aService")
public class A {

    public Animal process(Animal animal) {
        System.out.println("A is processing animal");
        return animal;
    }
}

@Service("bService")
public class B {
    public Animal process(Animal animal){
        System.out.println("B is processing animal");
        return animal;
    }
}

@Service("cService")
public class C {
    public Animal process(Animal animal) {
        System.out.println("C is processing animal");
        return animal;
    }
}

Как видите, A не перегружен никакими зависимости и в его методе process() не вызываются методы других бинов.
Это что касается потока.

GenericMessage – сообщение

Само сообщение – объект Animal – оборачивается при пересылке в тип GenericMessage, но это здесь не важно.

DirectChannel – канал

Чтобы его получить, мы выводим его в канал outputChannel с помощью метода channel(“outputChannel”) в самом конце. Это канал типа DirectChannel, что означает, что когда по потоку IntegrationFlow данные попадут в канал, запрашивать их из канала не надо, достаточно заранее подписаться на канал.

Есть еще PollableChannel, вот из него надо дергать данные с помощью метода recieve() – либо однократно, либо (как правило) регулярно.

Что касается DirectChannel, мы подписываемся на него еще до прогона потока и задаем обработчик, который выполнится как только данные попадут в канал. В нашем обработчике мы выводим пришедшие данные:

 outputChannel.subscribe(x -> System.out.println(x));

Результат выглядит так:

GenericMessage [payload=ru.sysout.springintegration.model.Animal@4a067c25, headers={id=5c9ac4d5-8e8d-05bf-a685-8703c4cdbf4e, timestamp=1540467581954}]

Видно, что наш Animal обернут в GenericMessage, а точнее Animal является пейлоадом – полезной нагрузкой сообщения. В сообщении есть еще заголовки, но в нашем примере они не используются.

@Gateway – запуск потока IntegrationFlow

Чтобы запустить поток и отправить сообщение мы используем аннотированный метод интерфейса I:

ctx.getBean(I.class).process(new Animal("cat"));

Этот интерфейс создан исключительно для отправки сообщения (первоначального необработанного Animal) во входной канал потока. Выглядит он так, обязательны аннотации @MessagingGateway и @Gateway:

@MessagingGateway
public interface I {

    @Gateway(requestChannel = "animalFlow.input")
    void process(Animal animal);
}
Входной канал называется animalFlow.input, он складывается из имени бина IntegrationFlow и через точку input. Этот канал создается неявно и тоже является DirectChannel.

Можно было бы отправить сообщение без всяких интерфейсов @MessagingGateway, задействуя методы SI, но @MessagingGateway позволяет нам не углубляться в методы SI, отделяя от внешнего кода SI. Если же углубиться, то делается это так:

MessageChannel inputChannel = ctx.getBean("animalFlow.input", MessageChannel.class);
inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());

В inputChannel мы отправляем сообщение (обернутый в GenericMessage объект Animal) методом send().

Но первый способ предпочтительнее. Не всегда, конечно, мы отправляем сообщения напрямую, существует еще конечная точка Channel adapter, которая позволяет брать сообщения из внешнего источника – JMS, файловой системы, почты базы данных и т.п. кучи других мест, все их вы найдете в доках.

Gateway vs Adapter

У нас был частный случай гейтвея с методом типа void, но в общем случае Gateway  возвращает сообщения. То есть он может как получать сообщение, так и отправлять ответ. Чтобы показать это, давайте сменим тип void нашего метода  в гейтвее на тип Animal (у нас тип void мешал отправлять ответ):

    @MessagingGateway
    public interface I {
        @Gateway(requestChannel = "animalFlow.input")
        Animal process(Animal animal);
    }

Теперь больше не нужно отправлять сообщения в outputChannel, подписываться на него и ловить там результат. Сообщения будут сразу возвращаться методом process():

Animal animal = ctx.getBean(I.class).process(new Animal("cat"));

Поток же теперь без outputChannel (тоже принадлежащего SI) выглядит так:

    @Bean
    public IntegrationFlow animalFlow() {
        return flow -> flow.handle("aService", "process")
                .handle("bService", "process")
                .handle("cService", "process");      
    }

Получился полноценный гейтвей, получающий сообщения извне SI (Spring Integration) и отдающий их обратно, вовне SI. Но учтите, что метод process() теперь будет ждать ответа до победного конца, и если, например, мы в потоке отфильтруем сообщение с помощью конечной точки Filter, и оно не придет, то программа остановится в бесконечном ожидании ответа:

    @Bean
    public IntegrationFlow animalFlow() {
        return flow -> flow.handle("aService", "process")
                .handle("bService", "process")
                .handle("cService", "process")
                .<Animal>filter(a->!a.getName().startsWith("cat"));
    }

А в примере с void и outputChannel программа бы завершилась несмотря на фильтр, хотя сообщений в outputChannel бы тоже не было:

    //если метод process() снова  делать типа void, ожидания не будет, хоть фильтр и не позволяет дойти сообщению до конца потока.
    @Bean
    public IntegrationFlow animalFlow() {
        return flow -> flow.handle("aService", "process")
                .handle("bService", "process")
                .handle("cService", "process")
                .<Animal>filter(a->!a.getName().startsWith("cat"))
                .channel("outputChannel");
    }
Помимо гейтвея существуют адаптеры, стандартный поток в Spring Integration обычно подключен к внешним источникам и получателям через адаптеры и выглядит так:

Channel Inbound adapter – endpoint – endpoint -.. endpoint – Channel Outbound adaper.

В отличие от гейтвея, адаптер либо только получает сообщения, либо только отправляет.

Channel Inbound адаптер стоит в начале потока и получает сообщения (из почты, базы или другого внешнего источника). Далее они идут по потоку Spring Integration, никакого ответа во внешний источник Channel Inbound adapter не отдает.

Channel Outbound адаптер, наоборот, отправляет сообщения во внешнюю систему. Получать сообщения  извне он не может. (Он их получает только от Spring integration). Стоит он в конце потока.

С внешними системами SI может общаться не только через адаптеры, но и через гейтвей (например, Amqp.inboundGateway, Tcp.outboundGateway), но опять же, гейтвей остается точкой, которая как получает сообщение, так и обязательно отправляет ответ. В гейтвеях течение двунаправленное, а в адаптерах однонаправленное.

Итог

Полный код приложения есть на GitHub. Если что, задавайте вопросы.

В следующей статье рассмотрим пример с Inbound и Outbound адаптерами.

Вообще авторы Spring Integration реализовывали шаблоны, описанные в книге и на сайте Enterprise Integration Patterns

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *