В этой статье рассмотрим разницу между Chunks и Tasklet.
В Spring Batch для обработки элементов есть два подхода: Chunks (основной) и Tasklet. Первый (Chunks) мы уже использовали во вводной статье.
- В случае Chunks элементы разбиваются на порции (chunks), и каждая порция обрабатывается в отдельной транзакции. Так что если возникнет исключение, обработанные порции сохранятся. (И можно даже перезапустить задание Job повторно с нужной точки, если мы запрограммировали сохранение состояния). Для обработки данных порциями предусмотрены интерфейсы ItemReader, ItemProcessor, ItemWriter.
Кроме того, здесь можно настроить отказоустойчивость: сколько элементов разрешено пропустить, если они выбросили исключение при обработке. И сколько раз вообще пытаться обработать элемент. - Но бывает, что всё задание (чаще отдельный шаг Step задания, а иначе зачем Spring Batch) нужно выполнить целиком. Для этого предусмотрен другой интерфейс — Tasklet. Здесь если возникнет исключение, то транзакция будет полностью откатана.
Перейдем к примеру. В нем мы будем скачивать страницы с GitHub и записывать их в базу данных PostgreSQL. Сначала сделаем это с помощью Chunks, а потом в Tasklet.
Maven-зависимость
Чтобы воспользоваться Spring Batch, добавим в проект
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency>
Задача
Мы воспользуемся GitHub API: найдем репозитории, в названии которых встречается «Spring». Они доступны по ссылке типа:
https://api.github.com/search/repositories?q=spring&page=1
То есть результат поиска выдается постранично в зависимости от параметра &page=?. Каждая такая страница содержит 30 репозиториев.
Модель данных
Под полученный по ссылке JSON-результат создадим класс ReposPage:
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@Data
public class ReposPage {
private List<Repo> items;
}
Все поля получаемого JSON-объекта нас не интересуют — мы будем брать только items — массив репозиториев. В репозитории, в свою очередь, тоже игнорируем большую часть полей. Берем только три — название, ссылку и описание.
Класс Repo:
@Entity
@Data
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class Repo {
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE)
private long id;
@Column(columnDefinition="text")
private String name;
@Column(columnDefinition="text")
private String htmlUrl;
@Column(columnDefinition="text")
private String description;
}
Эти сущности Repo и будем записывать в базу.
Сначала рассмотрим подход с порциями (Chunks), а потом Tasklet.
Chunks
В одной порции можно обрабатывать несколько элементов. А элементом у нас будет страница (то, что возвращается API-запросом с GitHub).
Задание Job
Сначала определим бин Job:
@EnableBatchProcessing
@Configuration
@Profile("chunk")
public class BatchChunkConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Job job() {
return jobs.get("GithubJob")
.start(step())
.build();
}
// шаг Step и ItemReader
}
Мы определили задание, состоящее из одного шага Step (ниже будет его описание). А вообще шагов может быть несколько, и можно даже построить логический набор из шагов (например: если первый шаг закончился успешно, то перейти к такому-то шагу, а иначе — к другому). Но у нас все проще — шаг один.
Шаг Step
Шаг в случае обработки по частям (Chunks) состоит из трех действий: считать элемент, обработать элемент и записать элемент. Вот его описание:
@Bean
public Step step() {
return steps.get("step")
.<Integer, ReposPage>chunk(2)
.reader(getItemReader())
.processor(repoPageProcessor)
.writer(repoPageWriter)
.build();
}
Делается это по порциям. Запись
.<Integer, ReposPage>chunk(2)
означает, что порция состоит из двух элементов. При этом ItemReader выдает процессору элемент типа Integer (номер страницы), а ItemWriter получает на вход элементы ReposPage — то есть страницы (и записывает в базу объекты Repo).
ItemProcessor
Запрос к API выполняет процессор RepoPageProcessor — он получает на вход номер страницы, и передает райтеру ReposPage:
@Component
public class RepoPageProcessor implements ItemProcessor<Integer, ReposPage> {
@Autowired
private RestTemplate template;
@Override
public ReposPage process(Integer integer) throws Exception {
ReposPage reposPage = template.getForObject(
"https://api.github.com/search/repositories?q=spring&page=" + integer, ReposPage.class);
Thread.sleep(1000);
return reposPage;
}
}
Как видите, процессор реализует интерфейс ItemProcessor, который состоит из единственного метода process(). Метод принимает один объект и возвращает другой. Мы превращаем номер страницы в объект ReposPage — в этом и состоит обработка.
ItemReader
ItemReader возьмем из коробки: ListItemReader. Он выдает нам номера страниц, которые мы будем скачивать (эти номера передаются процессору):
@Bean
ItemReader<Integer> getItemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5));
}
ListItemReader используется, когда уже есть готовый список, из которого необходимо считать элементы. И он у нас есть — это просто номера страниц. За этими номерами не нужно обращаться в файл, базу или куда-то еще. Мы берем маленькое число 5, чтобы не превысить лимит запросов. То есть будем в процессоре загружать страницы с 1 по 5.
ItemWriter
Наконец, ItemWriter:
@JobScope
@Component
public class RepoPageWriter implements ItemWriter<ReposPage> {
@Autowired
private RepoService repoService;
@Override
public void write(List<? extends ReposPage> list) throws Exception {
repoService.saveRepos((List<ReposPage>) list);
}
// ...
}
Здесь мы с помощью заранее написанного несложного сервиса RepoService (см. исходник) сохраняем в базе репозитории.
Интерфейс ItemWriter состоит из одного метода write().
Обратите внимание, что на вход метода write() передается не элемент, а массив — в нем столько элементов, сколько мы настроили в Step в в методе <Integer, ReposPage>chunk(2). Запись выполняется порциями по два элемента. Если на любом этапе возникнет исключение, то откатится только последний chunk из двух страниц.
@AfterChunk
Стоит заметить, что если аннотировать любой метод @AfterChunk, он будет выполняться после обработки каждой порции.
У нас в ItemWriter добавлен метод, который уведомляет об окончании обработки порции в консоли:
@AfterChunk
public void afterChunk() {
System.out.println("AFTER CHUNK");
}
Аналогично можно создать методы, которые выполняются до и после каждого шага — в них можно передавать небольшие данные с шага на шаг, и сохранять эти данные в ExecutionContext задания. Но об этом в следующий раз.
@JobScope
Обратите внимание, что ItemReader у нас аннотирован @JobScope. Дело в том, что по умолчанию ItemReader (как и любой бин) — синглтон, но если аннотировать его @JobScope, то для каждого нового выполнения задания ItemReader будет создаваться заново. Это нужно, если задание выполняется неоднократно: например, по расписанию. Ведь после первого выполнения задания список чисел в ListItemReader будет исчерпан, и метод read() будет выдавать null. Поэтому при повторном выполнении задания нечего будет обрабатывать (без аннотации @JobScope).
Существует еще @StepScope — здесь новый бин создается при каждом выполнении шага.
Какую аннотацию выбрать, зависит от конкретного задания. Мы могли бы вообще ничего не ставить, поскольку у нас задание выполняется однократно.
Tasklet
Теперь выполним ту же задачу с помощью Tasklet. Снова создадим конфигурацию BatchTaskletConfig:
@EnableBatchProcessing
@Configuration
@Profile("tasklet")
public class BatchTaskletConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private RepoTasklet repoTasklet;
@Bean
public Step step() {
return steps.get("step")
.tasklet(repoTasklet)
.build();
}
@Bean
public Job job() {
return jobs.get("GithubJob")
.start(step())
.build();
}
}
Как видите, теперь в шаге нет поэлементного ItemReader, ItemProcessor и ItemWriter. Вместо этого есть Tasklet:
@Component
public class RepoTasklet implements Tasklet {
@Autowired
private RestTemplate template;
@Autowired
private RepoRepository repoRepository;
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
for (int page = 1; page <= 5; page++) {
ReposPage reposPage = template.getForObject(
"https://api.github.com/search/repositories?q=spring&page=" + page, ReposPage.class);
repoRepository.saveAll(reposPage.getItems());
Thread.sleep(1000);
}
return RepeatStatus.FINISHED;
}
}
В его единственном методе execute() и выполняется вся работа. Но если возникнет любое исключение, то транзакция откатится, и в базе не будет ни одного элемента.
Дело в том, что несмотря на то, что методы репозитория по умолчанию транзакционные (в том числе метод saveAll):
public interface RepoRepository extends JpaRepository<Repo, Long> {
}
Spring Batch оборачивает execute() в свою более крупную транзакцию и она откатывает все вложенные транзакции при возникновении ошибки.
Исходный код
Исходный код примера доступен на GitHub.