Агрегатор в Spring Integration

Агрегатор — конечная точка, которая может объединять несколько сообщений в одно. Часто используется в паре со Сплитером. Сплитер — конечная точка, которая наоборот, разбивает одно сообщение на несколько.
Рассмотрим типичный пример.

Пример Splitter+Aggregator

Отправим в канал список, разобьем его на элементы с помощью Сплитера и объединим их снова в единое сообщение с помощью Агрегатора.

Наш поток Spring Integration Flow выглядит так:

    @Bean
    public IntegrationFlow flow1() {
        return IntegrationFlows.from("input1")
                .log()
                .split()
                .log()
                .aggregate()
                .log()
                .channel("outputChannel").get();
    }

Подпишемся на выходной канал, в подписчике выводим содержимое сообщения:

outputChannel.subscribe(message -> System.out.println("OUTPUT CHANNEL: " + message.getPayload()));

Отправляем во входной канал потока одно сообщение-список:

List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
this.input1.send(MessageBuilder.withPayload(list).build());

В логах консоли видно, что поступает одно сообщение-список:

GenericMessage [payload=[a, b, c, d, e, f, g, h], headers={id=ffb847c2-bd88-a482-a93d-8f1c4617e63e, timestamp=1578754294405}]

Затем оно разбивается на 8 таких сообщений (в консоли 8 штук с разными id, но одинаковыми correlationId):

GenericMessage [payload=d, headers={sequenceNumber=4, correlationId=ffb847c2-bd88-a482-a93d-8f1c4617e63e, id=b2234504-b9eb-5dcb-7ade-b681573882a3, sequenceSize=8, timestamp=1578754294444}]

Эти 8 сообщений в конце снова объединяются агрегатором в одно сообщение:

GenericMessage [payload=[a, b, c, d, e, f, g, h], headers={sequenceNumber=7, correlationId=ffb847c2-bd88-a482-a93d-8f1c4617e63e, id=d675e54d-d4dd-7539-26b2-0e17a3d154c6, sequenceSize=8, timestamp=1578754294451}]

Результат, который выводится в обработчике-подписчике на outputChannel:

OUTPUT CHANNEL: [a, b, c, d, e, f, g, h]

correlationId и sequenceSize

Обратите внимание на заголовки correlationId и sequenceSize — именно их агрегатор использует для объединения сообщений в группы. А добавляет эти значения заголовкам Сплиттер. Агрегатор устроен так, что по умолчанию использует заголовки correlationId для объединения сообщений в группу. Сообщения с одинаковыми correlationId идут в одну группу (групп может быть и несколько). А sequenceSize показывает сколько сообщений должно накопиться в группе, чтобы агрегатор уже мог выпустить группу.

В нашем случае все correlationId одинаковые, а sequenceSize равен количеству сообщений в списке, вот и получается, что все сообщения попадают в одну группу, и выпускаются когда все придут.

И это все благодаря конечной точке Splitter — именно она обеспечила эти  заголовки.

Если вставить еще и фильтр в середину потока, то в выходной канал outputChannel ничего не придет, так как 8 сообщений (значение sequenceSize) не набирается. А агрегатор ждет все восемь:

    @Bean
    public IntegrationFlow flow1() {
        return IntegrationFlows.from("input1")
                .log()
                .split()
                .log()
                .filter(l->l.equals("B"))
                .aggregate()
                .log()
                .channel("outputChannel").get();
    }

Теперь при запуске потока в выходной канал ничего не придет.

Пример Aggregaror без Splitter

А теперь попробуем отправить сообщения в такой же поток, но без сплиттера (и без фильтра):

    @Bean
    public IntegrationFlow flow2() {
        return IntegrationFlows.from("input2")
                .log()
                .aggregate()
                .log()
                .channel("outputChannel").get();
    }

Сделаем это в цикле, отправляя сообщения по одному:

List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
for (int i = 0; i < list.size(); i++) {
    this.input2.send(MessageBuilder.withPayload(list.get(i)).build());
}

Сообщения придут по одному:

OUTPUT CHANNEL: [A]
....
OUTPUT CHANNEL: [H]

Получим исключение:

Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'flow2.aggregator#0' for component 'flow2.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [ru/sysout/springintegration/integration/IntegrationConfig.class]'; from source: 'bean method flow2']; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing

Действительно, заголовков теперь нет, и как объединять сообщения, агрегатор не знает.

Зададим CorrelationStrategy и ReleaseStrategy самостоятельно.

Пример с CorrelationStrategy и ReleaseStrategy.

CorrelationStrategy задает, как группировать сообщения. ReleaseStrategy решает, пора ли выпускать группу сообщений.

Давайте сделаем так, чтобы a и b имели один ключ корреляции, а {c, d, e, f, g, h} — другой. Для этого реализуем свою CorrelationStrategy :

    static class MyCorrelationStrategy implements CorrelationStrategy {

        @Override
        public Object getCorrelationKey(Message<?> message) {
            String letter = (String) message.getPayload();
            if (letter.startsWith("a") || letter.startsWith("b"))
                return 1;
            else return 2;
        }
    }

Метод getCorrelationKey() возвращает ключ корреляции. При этом уже не нужен заголовок correlationId, который присутствовал в сообщениях в первом потоке. Он (correlationId) использовался «умолчательной» CorrelationStrategy, а мы задаем свою.

Также надо задать ReleaseStrategy, то есть при каком условии группа с одинаковыми ключами корреляции готова к выпуску. Сообщения приходят один за другим (в общем случае может быть и с паузами), и надо решить, на каком сообщении выпускать группу.

Мы зададим ReleaseStrategy, при которой группу из двух сообщений уже можно выпускать:

    @Bean
    public IntegrationFlow flow3() {
        return IntegrationFlows.from("input3")
                .log()
                .aggregate(a -> a
                        .correlationStrategy(new MyCorrelationStrategy())
                        .releaseStrategy(group -> group.size() == 2))                
                .log()
                .channel("outputChannel").get();
    }

Результат:

OUTPUT CHANNEL: [a, b]
OUTPUT CHANNEL: [c, d]

Куда же делить остальные буквы? Сейчас ключ корреляции считается исчерпанным после выпуска группы, и все последующие сообщения с тем же ключом корреляции отбрасываются. То есть когда набралась группа [c, d] из двух сообщений с ключом корреляции 2, ключ 2 становится неактуальным. Чтобы ключ со значением 2 снова шел в ход для новых сообщений e, f, g, h, надо включить свойство expireGroupsUponCompletion.

expireGroupsUponCompletion

Итак, теперь рассмотрим такой же поток с включенным свойством expireGroupsUponCompletion:

    @Bean
    public IntegrationFlow flow4() {
        return IntegrationFlows.from("input4")
                .log()
                .aggregate(a -> a
                        .correlationStrategy(new MyCorrelationStrategy())
                        .releaseStrategy(group -> group.size() == 2)
                        .expireGroupsUponCompletion(true))
                .log()
                .channel("outputChannel").get();
    }

Результат:

OUTPUT CHANNEL: [a, b]
OUTPUT CHANNEL: [c, d]
OUTPUT CHANNEL: [e, f]
OUTPUT CHANNEL: [g, h]

Как видите, сообщения не отбрасываются, а выпускаются до конца.

Добавление заголовков correlationId вручную

Теперь попробуем не задавать CorrelationStrategy, а задать сообщениям заголовки correlationId, как это делал сплиттер. В этом случае используется CorrelationStrategy по умолчанию, которая как раз таки опирается на заголовок correlationId. Поэтому поток заработает:

//такой же, как предыдущий, но без correlationStrategy    
@Bean
public IntegrationFlow flow5() {
    return IntegrationFlows.from("input5")
            .log()
            .aggregate(a -> a.releaseStrategy(group -> group.size() == 2)
                    .expireGroupsUponCompletion(true))
            .log()
            .channel("outputChannel").get();
}

А в заголовках зададим correlationId:

input5.send(MessageBuilder.withPayload("a").setHeader(CORRELATION_ID, 1).build());
input5.send(MessageBuilder.withPayload("b").setHeader(CORRELATION_ID, 1).build());
input5.send(MessageBuilder.withPayload("c").setHeader(CORRELATION_ID, 2).build());
input5.send(MessageBuilder.withPayload("d").setHeader(CORRELATION_ID, 2).build());
input5.send(MessageBuilder.withPayload("e").setHeader(CORRELATION_ID, 2).build());
input5.send(MessageBuilder.withPayload("f").setHeader(CORRELATION_ID, 2).build());
input5.send(MessageBuilder.withPayload("g").setHeader(CORRELATION_ID, 2).build());
input5.send(MessageBuilder.withPayload("h").setHeader(CORRELATION_ID, 2).build());

Результат тот же:

OUTPUT CHANNEL: [a, b] 
OUTPUT CHANNEL: [c, d] 
OUTPUT CHANNEL: [e, f]
OUTPUT CHANNEL: [g, h]

Группировка по времени: groupTimeout и sendPartialResultOnExpiry

Теперь рассмотрим отправку сообщений, растянутую во времени:

        input6.send(MessageBuilder.withPayload("a").setHeader(CORRELATION_ID, 1).build());
        input6.send(MessageBuilder.withPayload("b").setHeader(CORRELATION_ID, 1).build());

        Thread.sleep(1000);

        input6.send(MessageBuilder.withPayload("c").setHeader(CORRELATION_ID, 1).build());
        input6.send(MessageBuilder.withPayload("d").setHeader(CORRELATION_ID, 1).build());
        input6.send(MessageBuilder.withPayload("e").setHeader(CORRELATION_ID, 1).build());

        Thread.sleep(1000);

        input6.send(MessageBuilder.withPayload("f").setHeader(CORRELATION_ID, 1).build());
        input6.send(MessageBuilder.withPayload("g").setHeader(CORRELATION_ID, 1).build());
        input6.send(MessageBuilder.withPayload("h").setHeader(CORRELATION_ID, 1).build());

        Thread.sleep(1000);

Мы хотим объединить сообщения, выпущенные между паузами, то есть чтобы в итоге было [a, b] , [c, d ,e], [f, g, h]

Обратите внимание, что значение CORRELATION_ID=1 одинаково для всех сообщений, так что оно не способствует попаданию сообщений в разные группы.

И тем не менее, отправить сообщения в разные группы можно.

Чтобы это сделать, надо задать свойство groupTimeout() и разрешить выпускать сообщения по истечении этого таймаута с помощью sendPartialResultOnExpiry(true).

Зададим groupTimeout(500) — это означает, что после прихода каждого сообщения агрегатор ждет 500 мс, и если нового сообщения в данную группу за это время не поступает, то группа выпускается (либо отбрасывается) независимо от того, сколько сообщений в ней накопилось.

Свойство sendPartialResultOnExpiry() позволяет именно выпустить (а не отбросить в помойку) не полностью собранную группу.

Ниже мы задали в ReleaseStrategy условие group.size() >=100500 — это значит, что группа выпустится при накоплении 100500 сообщений. Но благодаря groupTimeout(500) и sendPartialResultOnExpiry, даже если это количество не набралось, группа все равно выпустится по истечении 500 миллисекунд.

Итак, наш поток:

@Bean
public IntegrationFlow flow6() {
    return IntegrationFlows.from("input6")
            .log()
            .aggregate(a -> a.releaseStrategy(group -> group.size() >=100500)
                    .expireGroupsUponCompletion(true)
            .groupTimeout(500)
            .sendPartialResultOnExpiry(true))
            .log()
            .channel("outputChannel").get();
}

Результат:

OUTPUT CHANNEL: [a, b]
OUTPUT CHANNEL: [c, d, e]
OUTPUT CHANNEL: [f, g, h]

TimeoutCountSequenceSizeReleaseStrategy

Есть еще одна стратегия группировки по времени, только время тут отсчитывается от первого элемента группы, а не от последнего.

Пусть элементы отправляются как в предыдущем примере 6, а поток задан с помощью new TimeoutCountSequenceSizeReleaseStrategy(100500, 500):

    @Bean
    public IntegrationFlow flow7() {
        return IntegrationFlows.from("input7")
                .log()
                .aggregate(a -> a.releaseStrategy
                        (new TimeoutCountSequenceSizeReleaseStrategy(100500, 500))
                        .expireGroupsUponCompletion(true))
                .log()
                .channel("outputChannel").get();
    }

100500 — количество элементов в группе, а 500 мс — то самое время. Группа выпускается, если хотя бы одно из условий настало — количество или время. Количество у нас не набирается (как и в предыдущем примере), а с учетом времени 500 мс результат группировки теперь такой:

OUTPUT CHANNEL: [a, b, c]
OUTPUT CHANNEL: [d, e, f]

Объяснение такое (с учетом того, что порядок поступления элементов как в предыдущем примере):

  • Приходят a, b — все они (и последующие) по ключу корреляции в одной группе, но 500 мс с момента прихода первого элемента группы a еще не прошло
  • через секунду приходит с — вот теперь от a до с есть 500 мс, выпускаем группу. Начинаем новую группу.
  • Приходят d, e -снова нет 500 мс
  • С приходом f 500 мс (от d до f) наступает — выпускаем [d, e, f]
  • Дальше через секунду приходят g, h, но последнего элемента, который бы отставал от первого g на 500 мс уже нет, так что эти элементы не выпускаются совсем.

Итог

Мы рассмотрели примеры настройки агрегатора для группировки сообщений. Исходный код проекта доступен на GitHub.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *