Spring Batch 快速入门

这篇文章中,我们将从如下四个方面对Spring Batch进行简要介绍, 旨在使读者清楚什么是Spring Batch,以及如何使用它。

  • Spring Batch简介
  • 批处理中的基础概念
  • 简单示例
  • 并行处理

1. Spring Batch简介

Spring Batch是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重试、资源管理。

对于数据量大、需要较高处理性能的批处理任务,Spring Batch同样提供了高级功能和特性来支持,比如远程分区、远程分块功能。

Spring Batch是一个批处理应用框架,不是调度框架,需要和调度框架(如 Quartz、Tivoli、Control-M、Cron 等)合作来构建完成的批处理任务。

1.1 为什么要使用Spring Batch

假设我们有这样一个批处理需求:它需要处理大量的数据,处理较多的步骤,同时希望处理时长越短越好;第一期需求做完,第二期需求很可能就是:支持重试、事务、监控等功能。

通常,批处理的需求都是类似的,在没有Spring Batch之前,由于没有统一的标准,大量企业重复开发类似的功能。而Accenture和SpringSource紧密合作后诞生的Spring Batch,使批处理框架和工具更加标准化了。

注意

只有当场景足够复杂、数据量足够大时,使用Spring Batch才有意义;如果仅仅是简单业务处理,数据量也不大,就没有必要使用Spring Batch了。

1.2 架构分层及常见策略

下图是Spring Batch分层体系结构的草图:

architecture

  • Application部分是开发人员根据特定业务逻辑编写的。
  • Batch Core包含启动和控制批处理作业所需的核心运行时类,它包含JobLauncher,Job和Step实现。
  • Batch Infrastructure提供一些基础的通用功能,Application和Batch Core都建立在Batch Infrastructure之上。

开发人员设计批处理的Application时,业务逻辑应该可以拆分为一系列步骤,这些步骤可以使用如下标准模型实现:

策略名称 说明
转换程序(Conversion Applications) 对于外部系统提供的文件(如Excel),必须创建转换程序,用来转换这些文件。
验证程序(Validation Applications) 对数据做正确性、一致性的验证,验证通常发生开头或结尾。
提取程序(Extract Applications) 从数据库或输入文件中读取一组记录,按照既定规则选择记录,然后将其写入到输出文件中。
提取/更新程序(Extract/Update Applications) 从数据库或输入文件中读取一组记录,根据每条记录的特点,对数据进行更改,然后将其写入到输出文件中。
处理和更新程序(Processing and Updating Applications) 对来自验证程序或提取程序的输入事务,执行相应的处理,通常包括读取所需数据,再更新处理结果。
输出/格式化程序(Output/Format Applications) 应用程序读取输入文件,根据标准格式从该记录中重组数据,并生成输出文件以便打印或传输到另一个程序或系统。

每个Application可以使一些实用工具,例如排序、拆分、合并。

2. 批处理中的基础概念

下图是使用了几十年的批处理参考体系结构的简化版本,它包含了批处理中涉及的概念。

Batch Stereotypes

概念 子类型 说明
JobRepository 为JobLauncher、Job和Step实现提供CRUD操作
JobLauncher 运行Job的入口
Job 封装整个批处理过程的实体
JobInstance 代表一个Job的运行实例,不同的运行实例,其关联的JobParameters不相同的,例如日期不同
JobParameters 获得一个JobInstance时,必须指定对应的JobParameters
JobExecution 代表Job的一次实际执行情况,它可能执行成功或失败
Step 封装了批处理作业的一个独立的、顺序的阶段,多个Step按顺序构成了一个Job
StepExecution 代表Step的一次实际执行情况
Item Reader Step中使用Item Reader输入数据
Item Writer Step中使用Item Writer输出数据
Item Processor Step中使用Item Processor处理数据

其中Job和Step之间的关系如下:

Step

除了上述的7个基础概念,还有个名为ExecutionContext的封装类,ExecutionContext是一个key/value形式的数据存储,开发人员可以向其中放入自定义的运行时数据;它有两个作用域StepExecution和JobExecution,相互之间是隔离的。

3. 简单示例

使用Spring Boot配置一个批处理Job比较简单,添加pom依赖后,再新增一个BatchConfiguration类即可,其内容如下:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    // tag::readerwriterprocessor[]
    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
            .name("personItemReader")
            .resource(new ClassPathResource("sample-data.csv"))
            .delimited()
            .names(new String[]{"firstName", "lastName"})
            .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }})
            .build();
    }

    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }

    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
            .dataSource(dataSource)
            .build();
    }
    // end::readerwriterprocessor[]

    // tag::jobstep[]
    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
            .incrementer(new RunIdIncrementer())
            .listener(listener)
            .flow(step1)
            .end()
            .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .build();
    }
    // end::jobstep[]
}

不难看出,配置一个简单的批处理任务需要的创建的Bean依次是:ItemReaderItemProcessorItemWriterStepJob,这与上文提到概念大体相符(查看完整示例)。

SpringApplication.run(Application.class, args)的方式启动服务后,Job将通过JobLauncherCommandLineRunner自动运行已配置的Job,因此不需要再使用JobLauncher显式启动任务。

3.1 分片处理(Chunk-oriented Processing)

Spring batch在配置Step时采用的是面向Chunk处理的机制,即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作。这样可以最大化的优化写入效率,整个事务也是基于Chunk来进行。其流程如下:

chunk-oriented

3.2 监听(Listener)

可以实现Spring Batch提供的Listener接口(位于org.springframework.batch.core.listener包内),监听任务的运行情况。上述示例中使用了JobExecutionListenerSupport,它用于监听Job的开始与结束。

4. 并行处理

为了加快处理速度,批处理任务通常需要并行处理,一般情况下,这也是在设计批处理任务时最复杂的地方。 Spring Batch为任务的并行处理提供了强有力的支持。

并行处理在顶层可以分为以下两类:

  • 单进程,多线程
  • 多进程

这两类可以继续细分为如下四类:

名称 说明
多线程Step(单进程) 在一个Step内部,使用多线程并行执行。
并行Step(单进程) 在多个Step之间,使用多线程并行执行。
远程分块(多进程) 分割一个Step,使其在多个进程中同时运行。
远程分区(多进程) 为Step分区,每个区是一个单独的进程,各个分区内都包含ItemReader、ItemWriter、ItemProcessor。

4.1 多线程Step(单进程)

在一个Step内部使用多线程的方式运行,通过指定TaskExecutor开启多线程处理。另外,使用Spring Batch提供的ItemWriter和ItemReader的一些实现时,需要留意是否是线程安全的。

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .build();
}

4.2 并行Step(单进程)

通过Flow提供的机制,开发人员可以方便地组合Step,使多个Step并行执行。

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

4.3 远程分块(多进程)

有时单进程并不能满足性能需求,尤其是ItemProcessor处理耗时较长时,Spring Batch提供了所谓“远程分块”的功能,即将Step处理分割为多个进程,通过一些中间件相互通信。

remote-chunk

4.4 远程分区(多进程)

当ItemProcessor不是导致瓶颈的相关I/O时,可以使用远程分区,它将Step拆分到了不同的进程中。

remote-chunk2

5. 小结

本文介绍了Spring Batch的基本概念、基础使用方式以及Spring Batch对并行处理的支持。除此之外,Spring Batch还有很多其它功能,例如事务管理、重启、重试、跳过、日志等功能。

希望通过阅读本文,读者可以对Spring Batch有一个整体的印象,以便在需要解决复杂批处理问题时,能够联想到该框架。

参考的文章

Spring Batch_百度百科
Spring Batch - Reference Documentation


文章作者: 沉迷思考的鱼
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 沉迷思考的鱼 !
评论
 上一篇
认识CPU 认识CPU
以前听说过一个面试题:如何使CPU使用率达到100%的JAVA代码应该如何编写?笔者心想开几个死循环的线程也就差不多了,启动6个死循环线程,在mac系统中使用top查看,发现cpu达到了600%+,机器CPU是4核的,跑到400%还能接受,
2019-04-17
下一篇 
工作流引擎Activiti快速入门 工作流引擎Activiti快速入门
Activiti是一个用Java编写的开源工作流引擎,可以执行BPMN 2.0中描述的业务流程。Activiti是Alfresco的Alfresco Process Services (APS)的基础,而Alfresco是Activiti项
2019-02-24
  目录