Spring Batch: Tasklet vs Chunks

В этой статье рассмотрим разницу между Chunks и Tasklet.

В Spring Batch для обработки элементов есть два подхода: Chunks (основной) и Tasklet. Первый (Chunks)  мы уже использовали во вводной статье.

  • В случае Chunks элементы разбиваются на порции (chunks), и каждая порция обрабатывается в отдельной транзакции. Так что если возникнет исключение, обработанные порции сохранятся. (И можно даже перезапустить задание Job повторно с нужной точки, если мы запрограммировали сохранение состояния). Для обработки данных порциями предусмотрены интерфейсы ItemReader, ItemProcessor, ItemWriter.
    Кроме того, здесь можно настроить отказоустойчивость: сколько элементов разрешено пропустить, если они выбросили исключение при обработке. И сколько раз вообще пытаться обработать элемент.
  • Но бывает, что всё задание (чаще отдельный шаг Step задания, а иначе зачем Spring Batch) нужно выполнить целиком. Для этого предусмотрен другой интерфейс — Tasklet. Здесь если возникнет исключение, то транзакция будет полностью откатана.

Перейдем к примеру. В нем мы будем скачивать страницы с GitHub и записывать их в базу данных PostgreSQL. Сначала сделаем это с помощью Chunks, а потом в Tasklet.

Maven-зависимость

Чтобы воспользоваться Spring Batch, добавим в проект

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

Задача

Мы воспользуемся GitHub API: найдем репозитории, в названии которых встречается «Spring».  Они доступны по ссылке типа:

https://api.github.com/search/repositories?q=spring&page=1

То есть результат поиска выдается постранично в зависимости от параметра &page=?. Каждая такая страница содержит 30 репозиториев.

Осторожно, без аутентификации доступно 60 запросов в час.

Модель данных

Под полученный по ссылке JSON-результат создадим класс ReposPage:

@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@Data
public class ReposPage {
    private List<Repo> items;
}

Все поля получаемого JSON-объекта нас не интересуют — мы будем брать только items — массив репозиториев. В репозитории, в свою очередь, тоже игнорируем большую часть полей. Берем только три — название, ссылку и описание.

Класс Repo:

@Entity
@Data
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class Repo {
    @Id
    @GeneratedValue(strategy = GenerationType.SEQUENCE)
    private long id;
    @Column(columnDefinition="text")
    private String name;
    @Column(columnDefinition="text")
    private String htmlUrl;
    @Column(columnDefinition="text")
    private String description;

}

Эти сущности Repo и будем записывать в базу.

Сначала рассмотрим подход с порциями (Chunks), а потом Tasklet.

Chunks

В одной порции можно обрабатывать несколько элементов. А элементом у нас будет страница (то, что возвращается API-запросом с GitHub).

Задание Job

Сначала определим бин Job:

@EnableBatchProcessing
@Configuration
@Profile("chunk")
public class BatchChunkConfig {
    @Autowired
    private JobBuilderFactory jobs;
    @Autowired
    private StepBuilderFactory steps;
  

    @Bean
    public Job job() {
        return jobs.get("GithubJob")
                .start(step())
                .build();
    }

  // шаг Step и ItemReader
}

Мы определили задание, состоящее из одного шага Step (ниже будет его описание). А вообще шагов может быть несколько, и можно даже построить логический набор из шагов (например: если первый шаг закончился успешно, то перейти к такому-то шагу, а иначе —  к другому). Но у нас все проще — шаг один.

Шаг Step

Шаг в случае обработки по частям (Chunks) состоит из трех действий: считать элемент, обработать элемент и записать элемент. Вот его описание:

@Bean
public Step step() {
    return steps.get("step")
            .<Integer, ReposPage>chunk(2)
            .reader(getItemReader())
            .processor(repoPageProcessor)
            .writer(repoPageWriter)
            .build();
}

Делается это по порциям. Запись

.<Integer, ReposPage>chunk(2)

означает, что порция состоит из двух элементов. При этом ItemReader выдает процессору элемент типа Integer (номер страницы), а ItemWriter получает на вход элементы ReposPage — то есть страницы (и записывает в базу объекты Repo).

ItemProcessor

Запрос к API выполняет процессор RepoPageProcessor — он получает на вход номер страницы, и передает райтеру ReposPage:

@Component
public class RepoPageProcessor implements ItemProcessor<Integer, ReposPage> {
    @Autowired
    private RestTemplate template;
    @Override
    public ReposPage process(Integer integer) throws Exception {
        ReposPage reposPage = template.getForObject(
                "https://api.github.com/search/repositories?q=spring&page=" + integer, ReposPage.class);

        Thread.sleep(1000);
        return reposPage;
    }
}

Как видите, процессор реализует интерфейс ItemProcessor, который состоит из единственного метода process(). Метод принимает один объект и возвращает другой. Мы превращаем номер страницы в объект ReposPage — в этом и состоит обработка.

ItemReader

ItemReader возьмем из коробки: ListItemReader.  Он выдает нам номера страниц, которые мы будем скачивать (эти номера передаются процессору):

@Bean
ItemReader<Integer> getItemReader() {
    return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5));
}

ListItemReader используется, когда уже есть готовый список, из которого необходимо считать элементы. И он у нас есть — это просто номера страниц. За этими номерами не нужно обращаться в файл, базу или куда-то еще. Мы берем маленькое число 5, чтобы не превысить лимит запросов. То есть будем в процессоре загружать страницы с 1 по 5.

ItemWriter

Наконец, ItemWriter:

@JobScope
@Component
public class RepoPageWriter implements ItemWriter<ReposPage> {
    @Autowired
    private RepoService repoService;

    @Override
    public void write(List<? extends ReposPage> list) throws Exception {
        repoService.saveRepos((List<ReposPage>) list);
    }

   // ...
}

Здесь мы с помощью заранее написанного несложного сервиса RepoService (см. исходник) сохраняем в базе репозитории.

Интерфейс ItemWriter состоит из одного метода write().

Обратите внимание, что на вход метода write() передается не элемент, а массив — в нем столько элементов, сколько мы настроили в Step в в методе <Integer, ReposPage>chunk(2). Запись выполняется порциями по два элемента. Если на любом этапе возникнет исключение, то откатится только последний chunk из двух страниц.

@AfterChunk

Стоит заметить, что если аннотировать любой метод @AfterChunk, он будет выполняться после обработки каждой порции.

У нас в ItemWriter добавлен метод, который уведомляет об окончании обработки порции в консоли:

@AfterChunk 
public void afterChunk() { 
    System.out.println("AFTER CHUNK"); 
}

Аналогично можно создать методы, которые выполняются до и после каждого шага — в них можно передавать небольшие данные с шага на шаг, и сохранять эти данные в ExecutionContext задания. Но об этом в следующий раз.

@JobScope

Обратите внимание, что ItemReader у нас аннотирован @JobScope. Дело в том, что по умолчанию ItemReader (как и любой бин) — синглтон, но если аннотировать его @JobScope, то для каждого нового выполнения задания ItemReader будет создаваться заново. Это нужно, если задание выполняется неоднократно: например, по расписанию. Ведь после первого выполнения задания список чисел в ListItemReader будет исчерпан, и метод read() будет выдавать null. Поэтому при повторном выполнении задания нечего будет обрабатывать (без аннотации @JobScope).

Существует еще @StepScope — здесь новый бин создается при каждом выполнении шага.

Какую аннотацию выбрать, зависит от конкретного задания. Мы могли бы вообще ничего не ставить, поскольку у нас задание выполняется однократно.

Tasklet

Теперь выполним ту же задачу с помощью Tasklet. Снова создадим конфигурацию BatchTaskletConfig:

@EnableBatchProcessing
@Configuration
@Profile("tasklet")
public class BatchTaskletConfig {
    @Autowired
    private JobBuilderFactory jobs;
    @Autowired
    private StepBuilderFactory steps;

    @Autowired
    private RepoTasklet repoTasklet;

    @Bean
    public Step step() {
        return steps.get("step")
                .tasklet(repoTasklet)
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("GithubJob")
                .start(step())
                .build();
    }
}

Как видите, теперь в шаге нет поэлементного ItemReader, ItemProcessor и ItemWriter. Вместо этого есть Tasklet:

@Component
public class RepoTasklet implements Tasklet {
    @Autowired
    private RestTemplate template;

    @Autowired
    private RepoRepository repoRepository;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        for (int page = 1; page <= 5; page++) {
            ReposPage reposPage = template.getForObject(
                    "https://api.github.com/search/repositories?q=spring&page=" + page, ReposPage.class);
            repoRepository.saveAll(reposPage.getItems());
            Thread.sleep(1000);
        }
        return RepeatStatus.FINISHED;
    }
}

В его единственном методе execute() и выполняется вся работа. Но если возникнет любое исключение, то транзакция откатится, и в базе не будет ни одного элемента.

Дело в том, что несмотря на то, что методы репозитория по умолчанию транзакционные (в том числе метод saveAll):

public interface RepoRepository extends JpaRepository<Repo, Long> {
}

Spring Batch оборачивает execute() в свою более крупную транзакцию и она откатывает все вложенные транзакции при возникновении ошибки.

Исходный код

Исходный код примера доступен на GitHub.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *