В этой статье рассмотрим разницу между Chunks и Tasklet.
В Spring Batch для обработки элементов есть два подхода: Chunks (основной) и Tasklet. Первый (Chunks) мы уже использовали во вводной статье.
- В случае Chunks элементы разбиваются на порции (chunks), и каждая порция обрабатывается в отдельной транзакции. Так что если возникнет исключение, обработанные порции сохранятся. (И можно даже перезапустить задание Job повторно с нужной точки, если мы запрограммировали сохранение состояния). Для обработки данных порциями предусмотрены интерфейсы ItemReader, ItemProcessor, ItemWriter.
Кроме того, здесь можно настроить отказоустойчивость: сколько элементов разрешено пропустить, если они выбросили исключение при обработке. И сколько раз вообще пытаться обработать элемент. - Но бывает, что всё задание (чаще отдельный шаг Step задания, а иначе зачем Spring Batch) нужно выполнить целиком. Для этого предусмотрен другой интерфейс — Tasklet. Здесь если возникнет исключение, то транзакция будет полностью откатана.
Перейдем к примеру. В нем мы будем скачивать страницы с GitHub и записывать их в базу данных PostgreSQL. Сначала сделаем это с помощью Chunks, а потом в Tasklet.
Maven-зависимость
Чтобы воспользоваться Spring Batch, добавим в проект
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency>
Задача
Мы воспользуемся GitHub API: найдем репозитории, в названии которых встречается «Spring». Они доступны по ссылке типа:
https://api.github.com/search/repositories?q=spring&page=1
То есть результат поиска выдается постранично в зависимости от параметра &page=?. Каждая такая страница содержит 30 репозиториев.
Модель данных
Под полученный по ссылке JSON-результат создадим класс ReposPage:
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class) @Data public class ReposPage { private List<Repo> items; }
Все поля получаемого JSON-объекта нас не интересуют — мы будем брать только items — массив репозиториев. В репозитории, в свою очередь, тоже игнорируем большую часть полей. Берем только три — название, ссылку и описание.
Класс Repo:
@Entity @Data @JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class) public class Repo { @Id @GeneratedValue(strategy = GenerationType.SEQUENCE) private long id; @Column(columnDefinition="text") private String name; @Column(columnDefinition="text") private String htmlUrl; @Column(columnDefinition="text") private String description; }
Эти сущности Repo и будем записывать в базу.
Сначала рассмотрим подход с порциями (Chunks), а потом Tasklet.
Chunks
В одной порции можно обрабатывать несколько элементов. А элементом у нас будет страница (то, что возвращается API-запросом с GitHub).
Задание Job
Сначала определим бин Job:
@EnableBatchProcessing @Configuration @Profile("chunk") public class BatchChunkConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Bean public Job job() { return jobs.get("GithubJob") .start(step()) .build(); } // шаг Step и ItemReader }
Мы определили задание, состоящее из одного шага Step (ниже будет его описание). А вообще шагов может быть несколько, и можно даже построить логический набор из шагов (например: если первый шаг закончился успешно, то перейти к такому-то шагу, а иначе — к другому). Но у нас все проще — шаг один.
Шаг Step
Шаг в случае обработки по частям (Chunks) состоит из трех действий: считать элемент, обработать элемент и записать элемент. Вот его описание:
@Bean public Step step() { return steps.get("step") .<Integer, ReposPage>chunk(2) .reader(getItemReader()) .processor(repoPageProcessor) .writer(repoPageWriter) .build(); }
Делается это по порциям. Запись
.<Integer, ReposPage>chunk(2)
означает, что порция состоит из двух элементов. При этом ItemReader выдает процессору элемент типа Integer (номер страницы), а ItemWriter получает на вход элементы ReposPage — то есть страницы (и записывает в базу объекты Repo).
ItemProcessor
Запрос к API выполняет процессор RepoPageProcessor — он получает на вход номер страницы, и передает райтеру ReposPage:
@Component public class RepoPageProcessor implements ItemProcessor<Integer, ReposPage> { @Autowired private RestTemplate template; @Override public ReposPage process(Integer integer) throws Exception { ReposPage reposPage = template.getForObject( "https://api.github.com/search/repositories?q=spring&page=" + integer, ReposPage.class); Thread.sleep(1000); return reposPage; } }
Как видите, процессор реализует интерфейс ItemProcessor, который состоит из единственного метода process(). Метод принимает один объект и возвращает другой. Мы превращаем номер страницы в объект ReposPage — в этом и состоит обработка.
ItemReader
ItemReader возьмем из коробки: ListItemReader. Он выдает нам номера страниц, которые мы будем скачивать (эти номера передаются процессору):
@Bean ItemReader<Integer> getItemReader() { return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5)); }
ListItemReader используется, когда уже есть готовый список, из которого необходимо считать элементы. И он у нас есть — это просто номера страниц. За этими номерами не нужно обращаться в файл, базу или куда-то еще. Мы берем маленькое число 5, чтобы не превысить лимит запросов. То есть будем в процессоре загружать страницы с 1 по 5.
ItemWriter
Наконец, ItemWriter:
@JobScope @Component public class RepoPageWriter implements ItemWriter<ReposPage> { @Autowired private RepoService repoService; @Override public void write(List<? extends ReposPage> list) throws Exception { repoService.saveRepos((List<ReposPage>) list); } // ... }
Здесь мы с помощью заранее написанного несложного сервиса RepoService (см. исходник) сохраняем в базе репозитории.
Интерфейс ItemWriter состоит из одного метода write().
Обратите внимание, что на вход метода write() передается не элемент, а массив — в нем столько элементов, сколько мы настроили в Step в в методе <Integer, ReposPage>chunk(2). Запись выполняется порциями по два элемента. Если на любом этапе возникнет исключение, то откатится только последний chunk из двух страниц.
@AfterChunk
Стоит заметить, что если аннотировать любой метод @AfterChunk, он будет выполняться после обработки каждой порции.
У нас в ItemWriter добавлен метод, который уведомляет об окончании обработки порции в консоли:
@AfterChunk public void afterChunk() { System.out.println("AFTER CHUNK"); }
Аналогично можно создать методы, которые выполняются до и после каждого шага — в них можно передавать небольшие данные с шага на шаг, и сохранять эти данные в ExecutionContext задания. Но об этом в следующий раз.
@JobScope
Обратите внимание, что ItemReader у нас аннотирован @JobScope. Дело в том, что по умолчанию ItemReader (как и любой бин) — синглтон, но если аннотировать его @JobScope, то для каждого нового выполнения задания ItemReader будет создаваться заново. Это нужно, если задание выполняется неоднократно: например, по расписанию. Ведь после первого выполнения задания список чисел в ListItemReader будет исчерпан, и метод read() будет выдавать null. Поэтому при повторном выполнении задания нечего будет обрабатывать (без аннотации @JobScope).
Существует еще @StepScope — здесь новый бин создается при каждом выполнении шага.
Какую аннотацию выбрать, зависит от конкретного задания. Мы могли бы вообще ничего не ставить, поскольку у нас задание выполняется однократно.
Tasklet
Теперь выполним ту же задачу с помощью Tasklet. Снова создадим конфигурацию BatchTaskletConfig:
@EnableBatchProcessing @Configuration @Profile("tasklet") public class BatchTaskletConfig { @Autowired private JobBuilderFactory jobs; @Autowired private StepBuilderFactory steps; @Autowired private RepoTasklet repoTasklet; @Bean public Step step() { return steps.get("step") .tasklet(repoTasklet) .build(); } @Bean public Job job() { return jobs.get("GithubJob") .start(step()) .build(); } }
Как видите, теперь в шаге нет поэлементного ItemReader, ItemProcessor и ItemWriter. Вместо этого есть Tasklet:
@Component public class RepoTasklet implements Tasklet { @Autowired private RestTemplate template; @Autowired private RepoRepository repoRepository; @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { for (int page = 1; page <= 5; page++) { ReposPage reposPage = template.getForObject( "https://api.github.com/search/repositories?q=spring&page=" + page, ReposPage.class); repoRepository.saveAll(reposPage.getItems()); Thread.sleep(1000); } return RepeatStatus.FINISHED; } }
В его единственном методе execute() и выполняется вся работа. Но если возникнет любое исключение, то транзакция откатится, и в базе не будет ни одного элемента.
Дело в том, что несмотря на то, что методы репозитория по умолчанию транзакционные (в том числе метод saveAll):
public interface RepoRepository extends JpaRepository<Repo, Long> { }
Spring Batch оборачивает execute() в свою более крупную транзакцию и она откатывает все вложенные транзакции при возникновении ошибки.
Исходный код
Исходный код примера доступен на GitHub.