Введение в Spring Batch: Job, JobInstance, ExecutionContext, JobLauncher и рестарт

Spring Batch предоставляет интерфейс для пакетной обработки данных. Например – конвертация данных из одной базы в другую, из файла в базу, из брокера сообщений в файл и т.д. и т.п. Эти задания могут выполняться разово или по расписанию.

В Spring Batch эти задания можно масштабировать: настроить выполнение в нескольких потоках или даже на нескольких jvm.

Также можно настраивать политики  пропуска элементов или же, наоборот, попыток их повторной обработки (тут подробнее).

Кроме того, Spring Batch позволяет в случае возникновения исключения перезапустить задание не с самого начала, а с проблемного места: есть много готовых способных на это реализаций (для File, JDBC, NoSQL, JMS). Ниже мы рассмотрим пример с FlatFileItemReader.

Чтение, обработка и запись элементов

FlatFileItemReader — это «считыватель» элементов, реализация ItemReader для файла. FlatFileItemReader считывает данные построчно. Если на какой-то строке выбросится исключение, то при перезапуске задания (Job) данные будут считываться не с нуля, а с проблемной строки (если быть точнее, то с начала порции (chunk), содержащей проблемную строку).

Вообще ItemReader — один из основных интерфейсов Spring Batch — он предназначен для чтения элементов. Есть также ItemProcessor и ItemWriter — для обработки и записи элементов.

Данные можно разбивать на порции. Согласно архитектуре Spring Batch, каждая порция обрабатывается в отдельной транзакции (порции — chunks, подробнее тут). Размер порции настраивается в зависимости от задачи.

Задача

Как говорилось, у нас для чтения данных из файла используется FlatFileItemReader, способный сохранять ход выполнения задания. Так что если для задания Job при попытке его выполнения JobExecution возникнет исключение, новая попытка начнется не с начала. Мы это увидим на примере.

Это входной файл input.csv:
1,dog
2,cat
3,fox
4,elephant
5,eagle
6,squirrel
7,tiger
8,shark

Наша задача — считать данные из файла input.csv, преобразовать название животного в верхний регистр и записать результат в файл output.json в JSON-формате:

[
 {"id":"1","name":"DOG"},
 {"id":"2","name":"CAT"},
 {"id":"3","name":"FOX"},
 {"id":"4","name":"ELEPHANT"},
 {"id":"5","name":"EAGLE"},
 {"id":"6","name":"SQUIRREL"},
 {"id":"7","name":"TIGER"},
 {"id":"8","name":"SHARK"}
]

Для записи будем использовать JsonFileItemWriter — тоже готовую реализацию, но ItemWriter.

У нас 8 строк, размер порции зададим равным 3. Тогда если исключение возникнет в 5 строке (а мы сделаем, что возникнет), будут обработаны все равно только 3 строки. Они сохранятся в выходном файле. Остальные будут дописаны только при перезапуске задания.

Подготовка

Maven-зависимость

Чтобы начать работать с Spring Batch, добавим в проект:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

Кроме того, нам понадобится база данных. Дело в том, что Spring Batch хранит  в служебных таблицах в базе задания Job, попытки их выполнения JobExecution и многое другое — всего 6 таблиц.

В нашем примере используется PostgreSQL, но можно добавить любую базу. Главное, чтобы после запуска программы можно было посмотреть содержимое служебных таблиц, мы будем на них ссылаться в статье.

Создание служебных таблиц

Spring Boot создает служебные таблицы Spring Batch автоматически, если в настройках указать:

spring.batch.initialize-schema=always

Теперь при первом запуске приложения в базе появятся таблицы:

Служебные таблицы Spring Batch
Служебные таблицы Spring Batch

Конфигурация задания Job

Ниже рассмотрим конфигурацию задания Spring Batch. По умолчанию сконфигурированное задание Job запускается на старте приложения, но часто это неудобно. Мы будем запускать задание с помощью JobLauncher — заодно рассмотрим, как задать параметры задания и какое значение они имеют. Чтобы отключить автозапуск, в настройках зададим:

spring.batch.job.enabled=false

Задание Job

Задание задается как бин:

@Bean
public Job job(Step step) {
    return jobs.get("job")
            .start(step)
            .build();
}

Оно состоит из шагов. Вообще можно задать несколько шагов, но у нас один.

Шаг Step

В шаге задается ItemReader, ItemWriter и ItemProcessor:

@Bean
public Step step(ItemReader<Animal> csvItemReader, ItemWriter<Animal> jsonItemWriter) throws IOException {
    // @formatter:off
    return steps
            .get("step1")
            .<Animal, Animal>chunk(3)
            .reader(csvItemReader)
            .processor(animalItemProcessor())
            .writer(jsonItemWriter)
            .build();
    // @formatter:on
}

Также задан размер порции равным три. По три элемента обрабатываются в одной транзакции.

ItemReader (FlatFileItemReader)

Здесь мы настраиваем FlatFileItemReader — указываем для него файл и mapper:

@Bean
@StepScope
public FlatFileItemReader<Animal> csvItemReader(@Value("#{jobParameters['file.input']}") String input) {
    FlatFileItemReaderBuilder<Animal> builder = new FlatFileItemReaderBuilder<>();
    FieldSetMapper<Animal> animalFieldSetMapper = new AnimalFieldSetMapper();

    // @formatter:off
    return builder
            .name("animalReader")
            .resource(new FileSystemResource(input))
            .delimited()
            .names(TOKENS)
            .fieldSetMapper(animalFieldSetMapper)
            .build();
    // @formatter:on
}

AnimalFieldSetMapper:

public class AnimalFieldSetMapper  implements FieldSetMapper<Animal> {

    @Override
    public Animal mapFieldSet(FieldSet fieldSet) throws BindException {
        Animal animal = new Animal();
        animal.setId(fieldSet.readString("id"));
        animal.setName(fieldSet.readString("name"));
        return animal;
    }
}

ItemProcessor

ItemProcessor у нас просто при водит имя к верхнему регистру:

@Bean
@StepScope
ItemProcessor<Animal, Animal> animalItemProcessor() {
    return animal -> {
        LOGGER.info("Processing " + animal.getName());
        animal.setName(animal.getName().toUpperCase());
        return animal;
    };
}

ItemWriter (JsonFileItemWriter)

В качестве ItemWriter тоже берем готовую реализацию — JsonFileItemWriter:

@Bean
@StepScope
public JsonFileItemWriter<Animal> jsonItemWriter(@Value("#{jobParameters['file.output']}") String output) throws IOException {
    JsonFileItemWriterBuilder<Animal> builder = new JsonFileItemWriterBuilder<>();
    JacksonJsonObjectMarshaller<Animal> marshaller = new JacksonJsonObjectMarshaller<>();

    // @formatter:off
    return builder
            .name("animalWriter")
            .jsonObjectMarshaller(marshaller)
            .resource(new FileSystemResource(output))
            .build();
    // @formatter:on
}

Запуск задания с помощью JobLauncher

Выше в настройках мы отключили автозапуск задания при старте приложения. Запускать мы будем его все равно при старте, но сами — с помощью JobLauncher:

@SpringBootApplication
public class Main implements CommandLineRunner {
    @Autowired
    private Job job;
    @Autowired
    private JobLauncher jobLauncher;
    @Value("${file.input}")
    private String input;

    @Value("${file.output}")
    private String output;

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);

    }

    @Override
    public void run(String... args) throws Exception {
        JobParametersBuilder jobParameters = new JobParametersBuilder();
        jobParameters.addString("file.input", input);
        jobParameters.addString("file.output", output);
        jobParameters.addString("trial","1");

        jobLauncher.run(job, jobParameters.toJobParameters());
    }
}

Как видно выше, в качестве параметров мы задаем входной и выходной файл, а также trial — номер попытки, ниже покажу зачем.

При запуске задание выполняется успешно, в базе мы получаем строку в таблице batch_job_instance:

и строку в batch_job_execution:

JobInstanceAlreadyCompleteException

Но если повторно запустить программу, сделано ничего не будет, вместо этого выбросится исключение:

Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={file.input=src/main/resources/input.csv, file.output=src/main/resources/output.json, trial=1}.  If you want to run this job again, change the parameters.

Потому что для данного JobInstance успешный JobExecution (то есть JobExecution со статусом COMPLETED) может быть только один.

Как создать новый JobInstance

Но что, если задание нужно выполнить неоднократно? Для этого нужен новый JobInstance. Он будет создан, если поменять хотя бы один параметр задания, например :

jobParameters.addString("trial","2");

Соотношение Job и JobInstance иллюстрируется картинкой из документации:

Часто (например, при запуске задания по расписанию с @Scheduled)  добавляют автоматически генерируемый уникальный параметр:

.addLong("launchTime", System.currentTimeMillis())

Но мы изменили параметр вручную (поставили другой номер для trial). Теперь приложение можно запустить вновь и задание выполнится с нуля повторно. В базе появится второй JobInstance, а для него —  новый JobExecution.

Может ли для одного JobInstance быть несколько JobExecution? Да: успешный JobExecution (со статусом COMPLETED) должен быть единственным, но со статусом  FAILED их может быть много. Если бы при предыдущем запуске возникло исключение и задание не выполнилось, результат JobExecution был бы FAILED. Тогда были бы разрешены новые попытки выполнения задания, и каждый раз в таблицу добавлялся бы новый JobExecution для старого JobInstance (до тех пор, пока не получится COMPLETED). Эти попытки называются рестартом задания — и это как раз та самая уникальная возможность Spring Batch, позволяющая повторить задание с проблемной точки, а не с нуля.

Рестарт

Давайте сделаем рестарт. Для этого надо сначала получить JobExecution в статусе FAILED, а потом исправить ошибку и попробовать снова.

Получение JobExecution в статусе FAILED

Чтобы вызвать исключение, исправим исходный файл, чтобы при парсинге 5 строки возникло исключение:

1,dog
2,cat
3,fox
4,elephant
,,,5,eagle
6,squirrel
7,tiger
8,shark

Теперь запустим приложение (только не забудьте поменять параметр trial, ведь задание у нас считается уже успешно выполненным и больше выполняться не будет. Нужно, чтобы создался новый JobInstance, который еще не выполнен (для него еще нет JobExecution в статусе COMPLETED). Так что цифру ставить надо ту, с которой еще не запускали приложение:

jobParameters.addString("trial","2");

В таблице появится еще один JobExecution, и он получит статус FAILED:

Обработается три элемента, поскольку .chunk(3):

Executing step: [step1]
ru.sysout.batch.BatchConfig              : Processing dog
ru.sysout.batch.BatchConfig              : Processing cat
ru.sysout.batch.BatchConfig              : Processing fox
o.s.batch.core.step.AbstractStep         : Encountered an error executing step step1 in job job

org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 5 in resource=[file [C:\Code\sysout\batch0\src\main\resources\input.csv]], input=[,,,5,eagle]

В выходном файле будет три элемента.

В таблице batch_step_execution видно, что считано 4 элемента, записано 3 и выполнен 1 commit:

Поскольку JobExecution для последнего JobInstance имеет статус FAILED, можно перезапустить задание с теми же параметрами (по сути выполнить тот же JobInstance снова). Будет создан новый JobExecution. Причем задание начнет выполняться не сначала, а с проблемной точки.

А теперь рестарт

Только сделаем входной файл снова нормальным, чтобы задание прошло успешно.

1,dog
2,cat
3,fox
4,elephant
5,eagle
6,squirrel
7,tiger
8,shark

И запустим приложение с тем же значением параметра trial.

Задание выполнится успешно. При этом:

  • В консоли видно, что обрабатываются 4-8 элементы. Первые три заново не обрабатываются:
Processing elephant
Processing eagle
Processing squirrel
Processing tiger
Processing shark
  • Новый batch_job_instance не появляется в базе.
  • А появляется новый batch_job_execution в статусе COMPLETED (для старого batch_job_instance).
  • В таблице batch_step_execution добавляется строка, где видно, что считаны и записаны оставшиеся 5 элементов, выполнено 2 commit:

Итоги

Мы рассмотрели основные понятия Spring Batch и что такое рестарт задания. Исходный код примера есть на GitHub.

Введение в Spring Batch: Job, JobInstance, ExecutionContext, JobLauncher и рестарт: 1 комментарий

  1. Подскажите, можно реализовать Spring Batch как бесконечный цикл? Т.е. на вход периодически поступает массив с данными, обрабатывается сохраняется в другом виде для отображения на фронте и возможно передается куда-то дальше. Конечно необходима возможности запуска такого цикла и временной остановки.
    Причем первый и третий шаги должны работать с фиксированной частотой, например 20мсек, второму а второму достаточно и периода в 1сек, чтобы сильно не нагружать систему.
    Т.е. получается , что третий шаг получает данные с первого и работает с такой же частотой. Второй шаг тоже берет данные у первого, но можно запускать его раз в 100 реже.
    Как считаете подойдет для реализации такой задачи Spring Batch? Или лучше использовать что-то другое от Spring? Или третий вариант реализовать собственные потоки.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *