Введение в Spring Integration на примере использования Service Activator

В этой статье показан пример использования Service Activator.

Попутно показан IntegrationFlowDirectChannel и как отправлять сообщения через @Gateway.

Проблема

Допустим, у нас есть класс A, осуществляющий обработку объекта Animal, и в классе A имеются зависимости B, C..и так далее, тоже обрабатывающие Animal.

Набор зависимостей получается слишком большой (больше одной – это уже много), код некрасивый:

@Component
public class A {
    @Autowired
    private B b;
    @Autowired
    private C c;
    // ..другие зависимости

    public Animal process(Animal animal) {

        b.process(animal);
        c.process(animal);
        // ..другие обработки

        System.out.println("A is processing animal");
        return animal;
    }
}

Идея в том, чтобы упостить метод process() в классе A – пусть каждая зависимость B, С…обрабатывает Animal по очереди, а мы будем пересылать обработанный Animal из метода B.process() в метод C.process() … и так далее до A.process(). Делать это мы будем с помощью Spring Integration, а именно с помощью Service Activator (это конечная точка, которая может вызвать метод какого-либо бина).

Решение

Основа Spring Integration  – это сообщения, каналы (по которым идут сообщения) и конечные точки (в которых делается перенаправление, фильтрация, вызывается метод и т.п.). Приложение теперь выглядит так:

@SpringBootApplication
public class SpringIntegrationApplication {
    @Bean
    DirectChannel outputChannel() {
        return new DirectChannel();
    }

    @MessagingGateway
    public interface I {
        @Gateway(requestChannel = "personFlow.input")
        void process(Animal animal);
    }

    // канал DirectChannel с именем personFlow.input создается автоматически
    @Bean
    public IntegrationFlow personFlow() {
        return flow -> flow
            .handle("bService", "process")
            .handle("cService", "process")
            .handle("aService", "process")
            .channel("outputChannel");
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationApplication.class, args);
        ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationApplication.class, args);

        DirectChannel outputChannel = ctx.getBean("outputChannel", DirectChannel.class);
        // обработчик внутри subscribe выполнится как только занокнчится выполнение flow
        outputChannel.subscribe(x -> System.out.println(x));
        // запускаем выполнение flow
        ctx.getBean(I.class)
            .process(new Animal("cat"));

        // можно было запустить flow отправкой сообщения во входной канал input:
        // MessageChannel inputChannel = ctx.getBean("personFlow.input", MessageChannel.class);
        // inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());
        ctx.close();
    }
}

IntegrationFlow – поток обработки

Главная часть – пересылка сообщения между сервисами B, C, А – задана в бине IntegrationFlow. Это поток, в данном случае он задан лямбда-выражением.

Service Activator – метод handle()

Метод handle() вызывает метод process() бинов  bService, cService и aService. Выглядят они теперь так:

@Component("aService")
public class A {

    public Animal process(Animal animal) {
        System.out.println("A is processing animal");
        return animal;
    }
}

@Component("bService")
public class B {
    public Animal process(Animal animal){
        System.out.println("B is processing animal");
        return animal;
    }
}

@Component("cService")
public class C {
    public Animal process(Animal animal) {
        System.out.println("C is processing animal");
        return animal;
    }
}

Как видите, A не перегружен никакими зависимости и в его методе process() не вызываются методы других бинов.
Это что касается потока.

GenericMessage – сообщение

Само сообщение – объект Animal – оборачивается при пересылке в тип GenericMessage, но это здесь не важно.

DirectChannel – канал

Чтобы его получить, мы выводим его в канал outputChannel с помощью метода channel(“outputChannel”) в самом конце. Это канал типа DirectChannel, что означает, что данные оказываются в нем сразу же, как только отработает выполнение потока IntegrationFlow, специально этот канал запрашивать их не должен (есть еще PollableChannel, вот он делает запросы – либо однократно либо как правило регулярно – для получения данных, у него для этого есть метод recieve).

Что касается DirectChannel, мы подписываемся на него еще до прогона потока и задаем обработчик, который выполнится как только данные попадут в канал. В нашем обработчике мы выводим пришедшие данные:

 outputChannel.subscribe(x -> System.out.println(x));

Результат выглядит так:

GenericMessage [payload=ru.sysout.springintegration.model.Animal@4a067c25, headers={id=5c9ac4d5-8e8d-05bf-a685-8703c4cdbf4e, timestamp=1540467581954}]

Видно, что наш Animal обернут в GenericMessage, а точнее Animal является пейлоадом – полезной нагрузкой сообщения. В сообщении есть еще заголовки, но в нашем примере они не используются.

@Gateway – запуск потока IntegrationFlow

Чтобы запустить поток и отправить сообщение мы используем аннотированный метод интерфейса I:

ctx.getBean(I.class).process(new Animal("cat"));

Этот интерфейс создан исключительно для отправки сообщения (первоначального необработанного Animal) во входной канал потока. Выглядит он так, обязательны аннотации @MessagingGateway и @Gateway:

@MessagingGateway
public interface I {

    @Gateway(requestChannel = "personFlow.input")
    void process(Animal animal);
}

Входной канал называется personFlow.input, он складывается из имени бина IntegrationFlow и через точку input. Этот канал создается неявно и тоже является DirectChannel.

Можно было бы отправить сообщение без всяких интерфейсов @MessagingGateway, задействуя методы SI, но @MessagingGateway позволяет нам не углублятся в методы SI, отделяя от внешнего кода SI. Если же углубиться, то делается это так:

MessageChannel inputChannel = ctx.getBean("personFlow.input", MessageChannel.class);
inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());

В inputChannel мы отправляем сообщение (обернутый в GenericMessage объект Animal) методом send().

Но первый способ предпочтительнее. Не всегда, конечно, мы отправляем сообщения напрямую, существует еще конечная точка Channel adapter, которая позволяет брать сообщения из внешнего источника – JMS, файловой системы и т.п. кучи других мест, все их вы найдете в доках.

Итог

Вроде все в примере рассмотрели, полный код приложения есть на GitHub. Если что, задавайте вопросы.

 

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *