Агрегатор — конечная точка, которая может объединять несколько сообщений в одно. Часто используется в паре со Сплитером. Сплитер — конечная точка, которая наоборот, разбивает одно сообщение на несколько.
Рассмотрим типичный пример.
Пример Splitter+Aggregator
Отправим в канал список, разобьем его на элементы с помощью Сплитера и объединим их снова в единое сообщение с помощью Агрегатора.
Наш поток Spring Integration Flow выглядит так:
@Bean public IntegrationFlow flow1() { return IntegrationFlows.from("input1") .log() .split() .log() .aggregate() .log() .channel("outputChannel").get(); }
Подпишемся на выходной канал, в подписчике выводим содержимое сообщения:
outputChannel.subscribe(message -> System.out.println("OUTPUT CHANNEL: " + message.getPayload()));
Отправляем во входной канал потока одно сообщение-список:
List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h"); this.input1.send(MessageBuilder.withPayload(list).build());
В логах консоли видно, что поступает одно сообщение-список:
GenericMessage [payload=[a, b, c, d, e, f, g, h], headers={id=ffb847c2-bd88-a482-a93d-8f1c4617e63e, timestamp=1578754294405}]
Затем оно разбивается на 8 таких сообщений (в консоли 8 штук с разными id, но одинаковыми correlationId):
GenericMessage [payload=d, headers={sequenceNumber=4, correlationId=ffb847c2-bd88-a482-a93d-8f1c4617e63e, id=b2234504-b9eb-5dcb-7ade-b681573882a3, sequenceSize=8, timestamp=1578754294444}]
Эти 8 сообщений в конце снова объединяются агрегатором в одно сообщение:
GenericMessage [payload=[a, b, c, d, e, f, g, h], headers={sequenceNumber=7, correlationId=ffb847c2-bd88-a482-a93d-8f1c4617e63e, id=d675e54d-d4dd-7539-26b2-0e17a3d154c6, sequenceSize=8, timestamp=1578754294451}]
Результат, который выводится в обработчике-подписчике на outputChannel:
OUTPUT CHANNEL: [a, b, c, d, e, f, g, h]
correlationId и sequenceSize
Обратите внимание на заголовки correlationId и sequenceSize — именно их агрегатор использует для объединения сообщений в группы. А добавляет эти значения заголовкам Сплиттер. Агрегатор устроен так, что по умолчанию использует заголовки correlationId для объединения сообщений в группу. Сообщения с одинаковыми correlationId идут в одну группу (групп может быть и несколько). А sequenceSize показывает сколько сообщений должно накопиться в группе, чтобы агрегатор уже мог выпустить группу.
В нашем случае все correlationId одинаковые, а sequenceSize равен количеству сообщений в списке, вот и получается, что все сообщения попадают в одну группу, и выпускаются когда все придут.
И это все благодаря конечной точке Splitter — именно она обеспечила эти заголовки.
Если вставить еще и фильтр в середину потока, то в выходной канал outputChannel ничего не придет, так как 8 сообщений (значение sequenceSize) не набирается. А агрегатор ждет все восемь:
@Bean public IntegrationFlow flow1() { return IntegrationFlows.from("input1") .log() .split() .log() .filter(l->l.equals("B")) .aggregate() .log() .channel("outputChannel").get(); }
Теперь при запуске потока в выходной канал ничего не придет.
Пример Aggregaror без Splitter
А теперь попробуем отправить сообщения в такой же поток, но без сплиттера (и без фильтра):
@Bean public IntegrationFlow flow2() { return IntegrationFlows.from("input2") .log() .aggregate() .log() .channel("outputChannel").get(); }
Сделаем это в цикле, отправляя сообщения по одному:
List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h"); for (int i = 0; i < list.size(); i++) { this.input2.send(MessageBuilder.withPayload(list.get(i)).build()); }
Сообщения придут по одному:
OUTPUT CHANNEL: [A] .... OUTPUT CHANNEL: [H]
Получим исключение:
Exception in thread "main" org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'flow2.aggregator#0' for component 'flow2.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [ru/sysout/springintegration/integration/IntegrationConfig.class]'; from source: 'bean method flow2']; nested exception is java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing
Действительно, заголовков теперь нет, и как объединять сообщения, агрегатор не знает.
Зададим CorrelationStrategy и ReleaseStrategy самостоятельно.
Пример с CorrelationStrategy и ReleaseStrategy.
CorrelationStrategy задает, как группировать сообщения. ReleaseStrategy решает, пора ли выпускать группу сообщений.
Давайте сделаем так, чтобы a и b имели один ключ корреляции, а {c, d, e, f, g, h} — другой. Для этого реализуем свою CorrelationStrategy :
static class MyCorrelationStrategy implements CorrelationStrategy { @Override public Object getCorrelationKey(Message<?> message) { String letter = (String) message.getPayload(); if (letter.startsWith("a") || letter.startsWith("b")) return 1; else return 2; } }
Метод getCorrelationKey() возвращает ключ корреляции. При этом уже не нужен заголовок correlationId, который присутствовал в сообщениях в первом потоке. Он (correlationId) использовался «умолчательной» CorrelationStrategy, а мы задаем свою.
Также надо задать ReleaseStrategy, то есть при каком условии группа с одинаковыми ключами корреляции готова к выпуску. Сообщения приходят один за другим (в общем случае может быть и с паузами), и надо решить, на каком сообщении выпускать группу.
Мы зададим ReleaseStrategy, при которой группу из двух сообщений уже можно выпускать:
@Bean public IntegrationFlow flow3() { return IntegrationFlows.from("input3") .log() .aggregate(a -> a .correlationStrategy(new MyCorrelationStrategy()) .releaseStrategy(group -> group.size() == 2)) .log() .channel("outputChannel").get(); }
Результат:
OUTPUT CHANNEL: [a, b] OUTPUT CHANNEL: [c, d]
Куда же делить остальные буквы? Сейчас ключ корреляции считается исчерпанным после выпуска группы, и все последующие сообщения с тем же ключом корреляции отбрасываются. То есть когда набралась группа [c, d] из двух сообщений с ключом корреляции 2, ключ 2 становится неактуальным. Чтобы ключ со значением 2 снова шел в ход для новых сообщений e, f, g, h, надо включить свойство expireGroupsUponCompletion.
expireGroupsUponCompletion
Итак, теперь рассмотрим такой же поток с включенным свойством expireGroupsUponCompletion:
@Bean public IntegrationFlow flow4() { return IntegrationFlows.from("input4") .log() .aggregate(a -> a .correlationStrategy(new MyCorrelationStrategy()) .releaseStrategy(group -> group.size() == 2) .expireGroupsUponCompletion(true)) .log() .channel("outputChannel").get(); }
Результат:
OUTPUT CHANNEL: [a, b] OUTPUT CHANNEL: [c, d] OUTPUT CHANNEL: [e, f] OUTPUT CHANNEL: [g, h]
Как видите, сообщения не отбрасываются, а выпускаются до конца.
Добавление заголовков correlationId вручную
Теперь попробуем не задавать CorrelationStrategy, а задать сообщениям заголовки correlationId, как это делал сплиттер. В этом случае используется CorrelationStrategy по умолчанию, которая как раз таки опирается на заголовок correlationId. Поэтому поток заработает:
//такой же, как предыдущий, но без correlationStrategy @Bean public IntegrationFlow flow5() { return IntegrationFlows.from("input5") .log() .aggregate(a -> a.releaseStrategy(group -> group.size() == 2) .expireGroupsUponCompletion(true)) .log() .channel("outputChannel").get(); }
А в заголовках зададим correlationId:
input5.send(MessageBuilder.withPayload("a").setHeader(CORRELATION_ID, 1).build()); input5.send(MessageBuilder.withPayload("b").setHeader(CORRELATION_ID, 1).build()); input5.send(MessageBuilder.withPayload("c").setHeader(CORRELATION_ID, 2).build()); input5.send(MessageBuilder.withPayload("d").setHeader(CORRELATION_ID, 2).build()); input5.send(MessageBuilder.withPayload("e").setHeader(CORRELATION_ID, 2).build()); input5.send(MessageBuilder.withPayload("f").setHeader(CORRELATION_ID, 2).build()); input5.send(MessageBuilder.withPayload("g").setHeader(CORRELATION_ID, 2).build()); input5.send(MessageBuilder.withPayload("h").setHeader(CORRELATION_ID, 2).build());
Результат тот же:
OUTPUT CHANNEL: [a, b] OUTPUT CHANNEL: [c, d] OUTPUT CHANNEL: [e, f] OUTPUT CHANNEL: [g, h]
Группировка по времени: groupTimeout и sendPartialResultOnExpiry
Теперь рассмотрим отправку сообщений, растянутую во времени:
input6.send(MessageBuilder.withPayload("a").setHeader(CORRELATION_ID, 1).build()); input6.send(MessageBuilder.withPayload("b").setHeader(CORRELATION_ID, 1).build()); Thread.sleep(1000); input6.send(MessageBuilder.withPayload("c").setHeader(CORRELATION_ID, 1).build()); input6.send(MessageBuilder.withPayload("d").setHeader(CORRELATION_ID, 1).build()); input6.send(MessageBuilder.withPayload("e").setHeader(CORRELATION_ID, 1).build()); Thread.sleep(1000); input6.send(MessageBuilder.withPayload("f").setHeader(CORRELATION_ID, 1).build()); input6.send(MessageBuilder.withPayload("g").setHeader(CORRELATION_ID, 1).build()); input6.send(MessageBuilder.withPayload("h").setHeader(CORRELATION_ID, 1).build()); Thread.sleep(1000);
Мы хотим объединить сообщения, выпущенные между паузами, то есть чтобы в итоге было [a, b] , [c, d ,e], [f, g, h]
И тем не менее, отправить сообщения в разные группы можно.
Чтобы это сделать, надо задать свойство groupTimeout() и разрешить выпускать сообщения по истечении этого таймаута с помощью sendPartialResultOnExpiry(true).
Зададим groupTimeout(500) — это означает, что после прихода каждого сообщения агрегатор ждет 500 мс, и если нового сообщения в данную группу за это время не поступает, то группа выпускается (либо отбрасывается) независимо от того, сколько сообщений в ней накопилось.
Свойство sendPartialResultOnExpiry() позволяет именно выпустить (а не отбросить в помойку) не полностью собранную группу.
Ниже мы задали в ReleaseStrategy условие group.size() >=100500 — это значит, что группа выпустится при накоплении 100500 сообщений. Но благодаря groupTimeout(500) и sendPartialResultOnExpiry, даже если это количество не набралось, группа все равно выпустится по истечении 500 миллисекунд.
Итак, наш поток:
@Bean public IntegrationFlow flow6() { return IntegrationFlows.from("input6") .log() .aggregate(a -> a.releaseStrategy(group -> group.size() >=100500) .expireGroupsUponCompletion(true) .groupTimeout(500) .sendPartialResultOnExpiry(true)) .log() .channel("outputChannel").get(); }
Результат:
OUTPUT CHANNEL: [a, b] OUTPUT CHANNEL: [c, d, e] OUTPUT CHANNEL: [f, g, h]
TimeoutCountSequenceSizeReleaseStrategy
Есть еще одна стратегия группировки по времени, только время тут отсчитывается от первого элемента группы, а не от последнего.
Пусть элементы отправляются как в предыдущем примере 6, а поток задан с помощью new TimeoutCountSequenceSizeReleaseStrategy(100500, 500):
@Bean public IntegrationFlow flow7() { return IntegrationFlows.from("input7") .log() .aggregate(a -> a.releaseStrategy (new TimeoutCountSequenceSizeReleaseStrategy(100500, 500)) .expireGroupsUponCompletion(true)) .log() .channel("outputChannel").get(); }
100500 — количество элементов в группе, а 500 мс — то самое время. Группа выпускается, если хотя бы одно из условий настало — количество или время. Количество у нас не набирается (как и в предыдущем примере), а с учетом времени 500 мс результат группировки теперь такой:
OUTPUT CHANNEL: [a, b, c] OUTPUT CHANNEL: [d, e, f]
Объяснение такое (с учетом того, что порядок поступления элементов как в предыдущем примере):
- Приходят a, b — все они (и последующие) по ключу корреляции в одной группе, но 500 мс с момента прихода первого элемента группы a еще не прошло
- через секунду приходит с — вот теперь от a до с есть 500 мс, выпускаем группу. Начинаем новую группу.
- Приходят d, e -снова нет 500 мс
- С приходом f 500 мс (от d до f) наступает — выпускаем [d, e, f]
- Дальше через секунду приходят g, h, но последнего элемента, который бы отставал от первого g на 500 мс уже нет, так что эти элементы не выпускаются совсем.
Итог
Мы рассмотрели примеры настройки агрегатора для группировки сообщений. Исходный код проекта доступен на GitHub.