Step

Step์€ ์‹ค์งˆ์ ์ธ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๋ฅผ ์ •์˜ํ•˜๊ณ  ์ œ์–ดํ•˜๋Š” ๋ฐ ํ•„์š”ํ•œ ๋ชจ๋“  ์ •๋ณด๊ฐ€ ๋“ค์–ด์žˆ๋Š” ๋„๋ฉ”์ธ ๊ฐ์ฒด๋กœ, Job์„ ์ฒ˜๋ฆฌํ•˜๋Š” ์‹ค์งˆ์ ์ธ ๋‹จ์œ„๋กœ ์“ฐ์ธ๋‹ค.(Job:Step = 1:M)

  • Step์€ Job์„ ๊ตฌ์„ฑํ•˜๋Š” ๋…๋ฆฝ๋œ ์ž‘์—… ๋‹จ์œ„

  • ์ˆœ์ฐจ์ ์œผ๋กœ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ์ˆ˜ํ–‰

  • Step์€ ๋ชจ๋“  ๋‹จ์œ„ ์ž‘์—…์˜ ์กฐ๊ฐ์œผ๋กœ ์ž์ฒด์ ์œผ๋กœ ์ž…๋ ฅ, ์ฒ˜๋ฆฌ๊ธฐ, ์ถœ๋ ฅ์„ ๋‹ค๋ฃฌ๋‹ค.

  • ํŠธ๋žœ์žญ์…˜์€ Step ๋‚ด๋ถ€์—์„œ ์ด๋ฃจ์–ด์ง

  • org.springframework.batch.core.Step

StepExecution

Step์˜ ์‹คํ–‰ ์ •๋ณด๋ฅผ ๋‹ด๋Š” ๊ฐ์ฒด๋กœ, ๊ฐ๊ฐ์˜ Step์ด ์‹คํ–‰๋  ๋•Œ๋งˆ๋‹ค StepExecution์ด ์ƒ์„ฑ๋œ๋‹ค.

public class StepExecution extends Entity {
    private final JobExecution jobExecution; // ํ˜„์žฌ JobExecution ์ •๋ณด
    private final String stepName; // Step ์ด๋ฆ„
    private volatile BatchStatus status; // Step์˜ ์‹คํ–‰ ์ƒํƒœ(COMPLETED, STARTING, STARTED ...)
    private volatile int readCount; // ์„ฑ๊ณต์ ์œผ๋กœ ์ฝ์€ ๋ ˆ์ฝ”๋“œ ์ˆ˜
    private volatile int writeCount; // ์„ฑ๊ณต์ ์œผ๋กœ ์“ด ๋ ˆ์ฝ”๋“œ ์ˆ˜
    private volatile int commitCount; // Step์˜ ์‹คํ–‰์— ๋Œ€ํ•ด ์ปค๋ฐ‹๋œ ํŠธ๋žœ์žญ์…˜ ์ˆ˜
    private volatile int rollbackCount; // Step์˜ ์‹คํ–‰์— ๋Œ€ํ•ด ๋กค๋ฐฑ๋œ ํŠธ๋žœ์žญ์…˜ ์ˆ˜
    private volatile int readSkipCount; // ์ฝ๊ธฐ์— ์‹คํŒจํ•ด ๊ฑด๋„ˆ ๋ˆ ๋ ‰์ฝ”๋“œ ์ˆ˜
    private volatile int processSkipCount; // ํ”„๋กœ์„ธ์Šค๊ฐ€ ์‹คํŒจํ•ด ๊ฑด๋„ˆ ๋ˆ ๋ ‰์ฝ”๋“œ ์ˆ˜
    private volatile int writeSkipCount;// ์“ฐ๊ธฐ์— ์‹คํŒจํ•ด ๊ฑด๋„ˆ ๋ˆ ๋ ‰์ฝ”๋“œ ์ˆ˜
    private volatile Date startTime; // Step์ด ์‹คํ–‰๋œ ์‹œ๊ฐ„(null == ์‹œ์ž‘๋˜์ง€ ์•Š์Œ)
    private volatile Date endTime; // Step์˜ ์‹คํ–‰ ์„ฑ๊ณต ์—ฌ๋ถ€์™€ ๊ด€๊ณ„ ์—†์ด ๋๋‚œ ์‹œ๊ฐ„
    private volatile Date lastUpdated; // ๋งˆ์ง€๋ง‰์œผ๋กœ ์ˆ˜์ •๋œ ์‹œ๊ฐ„
    private volatile ExecutionContext executionContext; // Step ์‹คํ–‰ ์‚ฌ์ด์— ์œ ์ง€ํ•ด์•ผํ•˜๋Š” ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ
    private volatile ExitStatus exitStatus; // Step ์‹คํ–‰ ๊ฒฐ๊ณผ์— ๋Œ€ํ•œ ์ƒํƒœ ๊ฐ’(UNKOWN, EXECUTING, COMPLETE, ...)
    private volatile boolean terminateOnly; // Job ์‹คํ–‰ ์ค‘์ง€ ์—ฌ๋ถ€
    private volatile int filterCount; // ์‹คํ–‰์—์„œ ํ•„ํ„ฐ๋ง๋œ ๋ ˆ์ฝ”๋“œ ์ˆ˜
    private transient volatile List<Throwable> failureExceptions; // Step ์‹คํ–‰์ค‘ ๋ฐœ์ƒํ•œ ์˜ˆ์™ธ ๋ฆฌ์ŠคํŠธ
    ...
}

Tasklet ๊ธฐ๋ฐ˜

  • Tasklet์€ ์ž„์˜์˜ Step์„ ์‹คํ–‰ํ•  ๋•Œ ํ•˜๋‚˜์˜ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹

  • ์ฝ๊ธฐ, ์ฒ˜๋ฆฌ, ์“ฐ๊ธฐ๋กœ ๋‚˜๋‰œ ๋ฐฉ์‹์ด ์ฒญํฌ ์ง€ํ–ฅ ํ”„๋กœ์„ธ์‹ฑ์ด๋ผ๋ฉด ์ด๋ฅผ ๋‹จ์ผ ์ž‘์—…์œผ๋กœ ๋งŒ๋“œ๋Š” ๊ฐœ๋…์ด Tasklet

  • ํŠธ๋žœ์žญ์…˜ ๋‚ด์—์„œ ๋กœ์ง์ด ์‹คํ–‰๋  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•˜๋Š” ์ „๋žต ์ธํ„ฐํŽ˜์ด์Šค

  • org.springframework.batch.core.step.tasklet.Tasklet

public interface Tasklet {
  	// ๋‚ด๋ถ€์— ์›ํ•˜๋Š” ๋‹จ์ผ ์ž‘์—…์„ ๊ตฌํ˜„ํ•˜๊ณ  ๋‚˜๋ฉด, RepeatStatus.FINISHED ๋ฐ˜ํ™˜ํ›„ ์ž‘์—…์ด ๊ณ„์†๋˜๋ฉด RepeatStatus.CONTINUABLE ๋ฐ˜ํ™˜
    @Nullable
    RepeatStatus execute(StepContribution var1, ChunkContext var2) throws Exception;
}

Adapter

CallableTaskletAdapter

  • org.springframework.batch.core.step.tasklet.CallableTaskletAdapter

  • Callable<V> ์ธํ„ฐํŽ˜์ด์Šค์˜ ๊ตฌํ˜„์ฒด๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๋Š” Adapter

    • ๋ฆฌํ„ด๊ฐ’์ด ์กด์žฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๊ณต์œ  ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š”๋‹ค.

    • ์ฒดํฌ ์˜ˆ์™ธ๋ฅผ ์™ธ๋ถ€๋กœ ๋˜์งˆ ์ˆ˜ ์žˆ๋‹ค.

  • Step์˜ ํŠน์ • ๋กœ์ง์„ ํ•ด๋‹น Step์ด ์‹คํ–‰๋˜๋Š” ์Šค๋ ˆ๋“œ๊ฐ€ ์•„๋‹Œ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๊ณ  ์‹ถ์„ ๋•Œ ์‚ฌ์šฉ

@EnableBatchProcessing
@SpringBootApplication
public class CallableTaskletConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job callableJob() {
        return this.jobBuilderFactory.get("callableJob")
                .start(callableStep())
                .build();
    }

    @Bean
    public Step callableStep() {
        return this.stepBuilderFactory.get("callableStep")
                .tasklet(callableTasklet())
                .build();
    }

    @Bean
    public Callable<RepeatStatus> callableObject() {
        return () -> {
            System.out.println("This was executed in another thread");
            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public CallableTaskletAdapter callableTasklet() {

        // CallableTaskletAdapter๋Š” Step์ด ์‹คํ–‰๋˜๋Š” ์Šค๋ ˆ๋“œ์™€ ๋ณ„๊ฐœ์˜ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋˜์ง€๋งŒ
        // Step๊ณผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋˜๋Š” ๊ฒƒ์€ ์•„๋‹ˆ๋‹ค.
        CallableTaskletAdapter callableTaskletAdapter = new CallableTaskletAdapter();

        callableTaskletAdapter.setCallable(callableObject());

        return callableTaskletAdapter;
    }

}

MethodInvokingTaskletAdapter

  • org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter1

  • ๋‹ค๋ฅธ ํด๋ž˜์Šค ๋‚ด์˜ ๋ฉ”์„œ๋“œ๋ฅผ Tasklet์ฒ˜๋Ÿผ ์‹คํ–‰ ๊ฐ€๋Šฅ

@EnableBatchProcessing
@Configuration
public class MethodInvokingTaskletConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job methodInvokingJob() {
        return this.jobBuilderFactory.get("methodInvokingJob")
                .start(methodInvokingStep())
                .build();
    }

    @Bean
    public Step methodInvokingStep() {
        return this.stepBuilderFactory.get("methodInvokingStep")
                .tasklet(methodInvokingTasklet())
                .build();
    }

  	@StepScope
    @Bean
    public MethodInvokingTaskletAdapter methodInvokingTasklet(
            @Value("#{jobParameters['message']}") String message) {
        // ๋‹ค๋ฅธ ํด๋ž˜์Šค ๋‚ด์˜ ๋ฉ”์„œ๋“œ๋ฅผ Tasklet์ฒ˜๋Ÿผ ์‹คํ–‰ ๊ฐ€๋Šฅ
        MethodInvokingTaskletAdapter methodInvokingTaskletAdapter = new MethodInvokingTaskletAdapter();

        methodInvokingTaskletAdapter.setTargetObject(customerService()); // ํ˜ธ์ถœํ•  ๋ฉ”์„œ๋“œ๊ฐ€ ์žˆ๋Š” ๊ฐ์ฒด
        methodInvokingTaskletAdapter.setTargetMethod("serviceMethod"); // ํ˜ธ์ถœํ•  ๋ฉ”์„œ๋“œ๋ช…
        methodInvokingTaskletAdapter.setArguments(new String[] {message});

        return methodInvokingTaskletAdapter;
    }

    @Bean
    public CustomerService customerService() {
        return new CustomerService();
    }

}

TargetMethod๋Š” ExitStatus.COMPLETED default์ด๋ฉฐ, ExitStatus๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉด ๋ฉ”์„œ๋“œ๊ฐ€ ๋ฐ˜ํ™˜ํ•œ ๊ฐ’์ด Tasklet์—์„œ ๋ฐ˜ํ™˜๋œ๋‹ค.

SystemCommandTasklet

  • org.springframework.batch.core.step.tasklet.SystemCommandTasklet

  • ์‹œ์Šคํ…œ ๋ช…๋ น์„ ์‹คํ–‰ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋ฉฐ, ์ง€์ •ํ•œ ์‹œ์Šคํ…œ ๋ช…๋ น์„ ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰ํ•œ๋‹ค.

@EnableBatchProcessing
@Configuration
public class SystemCommandConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job systemCommandJob() {
        return this.jobBuilderFactory.get("systemCommandJob")
                .start(systemCommandStep())
                .build();
    }

    @Bean
    public Step systemCommandStep() {
        return this.stepBuilderFactory.get("systemCommandStep")
                .tasklet(systemCommandTasklet())
                .build();
    }

    @Bean
    public SystemCommandTasklet systemCommandTasklet() {
        SystemCommandTasklet systemCommandTasklet = new SystemCommandTasklet();

        // ๋ช…๋ น์–ด
        systemCommandTasklet.setCommand("touch tmp.txt");
        systemCommandTasklet.setTimeout(5000);

        // Job์ด ๋น„์ •์ƒ์ ์œผ๋กœ ์ข…๋ฃŒ๋  ๋–„ ์‹œ์Šคํ…œ ํ”„๋กœ์„ธ์Šค์™€ ๊ด€๋ จ๋œ ์Šค๋ ˆ๋“œ๋ฅผ ๊ฐ•์ œ ์ข…๋ฃŒํ• ์ง€ ์—ฌ๋ถ€
        systemCommandTasklet.setInterruptOnCancel(true);

        // ์ž‘์—… ๋””๋ ‰ํ† ๋ฆฌ ์„ค์ •
        systemCommandTasklet.setWorkingDirectory("/Users/dh0023/Develop/gitbook/TIL");

        // ์‹œ์Šคํ…œ ๋ฐ˜ํ™˜ ์ฝ”๋“œ๋ฅผ ์Šคํ”„๋ง ๋ฐฐ์น˜ ์ƒํƒœ ๊ฐ’์œผ๋กœ ๋งคํ•‘
        systemCommandTasklet.setSystemProcessExitCodeMapper(touchCodeMapper());

        // ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰ํ•˜๋Š” ์‹œ์Šคํ…œ ๋ช…๋ น์„ ์ฃผ๊ธฐ์ ์œผ๋กœ ์™„๋ฃŒ ์—ฌ๋ถ€ ํ™•์ธ, ์™„๋ฃŒ ์—ฌ๋ถ€๋ฅผ ํ™•์ธํ•˜๋Š” ์ฃผ๊ธฐ(default=1์ดˆ)
        systemCommandTasklet.setTerminationCheckInterval(5000);

        // ์‹œ์Šคํ…œ ๋ช…๋ น์„ ์‹คํ–‰ํ•˜๋Š” TaskExecutor ๊ตฌ์„ฑ ๊ฐ€๋Šฅ
        // ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ๋ฝ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ, ๋™๊ธฐ๋ฐฉ์‹์œผ๋กœ ๊ตฌํ˜„ํ•˜์ง€ ์•Š๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.
        systemCommandTasklet.setTaskExecutor(new SimpleAsyncTaskExecutor());

        // ๋ช…๋ น์„ ์‹คํ–‰ํ•˜๊ธฐ ์ „์— ์„ค์ •ํ•˜๋Š” ํ™˜๊ฒฝ ํŒŒ๋ผ๋ฏธํ„ฐ ๋ชฉ๋ก
        systemCommandTasklet.setEnvironmentParams(
                new String[]{"BATCH_HOME=/Users/dh0023/Develop/spring/spring-practice/batch-practice"});

        return systemCommandTasklet;
    }

    @Bean
    public SimpleSystemProcessExitCodeMapper touchCodeMapper() {
        // ๋ฐ˜ํ™˜๋œ ์‹œ์Šคํ…œ ์ฝ”๋“œ๊ฐ€ 0์ด ExitStatus.FINISHED
        // 0์ด ์•„๋‹ˆ๋ฉด ExitStatus.FAILED
        return new SimpleSystemProcessExitCodeMapper();
    }

    @Bean
    public ConfigurableSystemProcessExitCodeMapper configurableSystemProcessExitCodeMapper() {
        // ์ผ๋ฐ˜์ ์ธ ๊ตฌ์„ฑ ๋ฐฉ๋ฒ•์œผ๋กœ ๋งคํ•‘ ๊ตฌ์„ฑ์„ ํ•  ์ˆ˜ ์žˆ์Œ.
        ConfigurableSystemProcessExitCodeMapper mapper = new ConfigurableSystemProcessExitCodeMapper();

        Map<Object, ExitStatus> mappings = new HashMap<Object, ExitStatus>() {
            {
                put(0, ExitStatus.COMPLETED);
                put(1, ExitStatus.FAILED);
                put(2, ExitStatus.EXECUTING);
                put(3, ExitStatus.NOOP);
                put(4, ExitStatus.UNKNOWN);
                put(ConfigurableSystemProcessExitCodeMapper.ELSE_KEY, ExitStatus.UNKNOWN);
            }
        };

        mapper.setMappings(mappings);

        return mapper;
    }
}
  • SimpleSystemProcessExitCodeMapper

    public class SimpleSystemProcessExitCodeMapper implements SystemProcessExitCodeMapper {
    	@Override
    	public ExitStatus getExitStatus(int exitCode) {
    		if (exitCode == 0) {
    			return ExitStatus.COMPLETED;
    		} else {
    			return ExitStatus.FAILED;
    		}
    	}
    
    }

Chunk ๊ธฐ๋ฐ˜

Chunk๋ž€ ์•„์ดํ…œ์ด ํŠธ๋žœ์žญ์…˜์— commit๋˜๋Š” ์ˆ˜๋ฅผ ๋งํ•œ๋‹ค.

์ฆ‰, ์ฒญํฌ ์ง€ํ–ฅ ์ฒ˜๋ฆฌ๋ž€ ํ•œ ๋ฒˆ์— ํ•˜๋‚˜์”ฉ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด Chunk๋ผ๋Š” ๋ฉ์–ด๋ฆฌ๋ฅผ ๋งŒ๋“  ๋’ค, Chunk ๋‹จ์œ„๋กœ ํŠธ๋žœ์žญ์…˜์„ ๋‹ค๋ฃจ๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•œ๋‹ค.

Chunk ์ง€ํ–ฅ ํ”„๋กœ์„ธ์‹ฑ์€ 1000๊ฐœ์˜ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด ๋ฐฐ์น˜ ๋กœ์ง์„ ์‹คํ–‰ํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๋ฉด, Chunk ๋‹จ์œ„๋กœ ๋‚˜๋ˆ„์ง€ ์•Š์•˜์„ ๊ฒฝ์šฐ์—๋Š” ํ•œ๊ฐœ๋งŒ ์‹คํŒจํ•ด๋„ ์„ฑ๊ณตํ•œ 999๊ฐœ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋กค๋ฐฑ๋œ๋‹ค. Chunk ๋‹จ์œ„๋ฅผ 10์œผ๋กœ ํ•œ๋‹ค๋ฉด, ์ž‘์—… ์ค‘์— ๋‹ค๋ฅธ Chunk๋Š” ์˜ํ–ฅ์„ ๋ฐ›์ง€ ์•Š๋Š”๋‹ค.

์ด๋•Œ, Chunk๋Š” ์ปค๋ฐ‹ ๊ฐ„๊ฒฉ(commit interval)์— ์˜ํ•ด ์ •์˜๋˜๊ณ  ์ˆ˜ํ–‰ํ•˜๋ฏ€๋กœ, ์ปค๋ฐ‹ ๊ฐ„๊ฒฉ์— ๋”ฐ๋ผ ์„ฑ๋Šฅ์ด ๋‹ฌ๋ผ์งˆ ์ˆ˜ ์žˆ๋‹ค. ์ตœ์ƒ์˜ ์„ฑ๋Šฅ์„ ์–ป๊ธฐ ์œ„ํ•ด์„œ๋Š” ์ปค๋ฐ‹ ๊ฐ„๊ฒฉ ์„ค์ •์ด ์ค‘์š”ํ•˜๋‹ค.

ItemReader, ItemProcessor, ItemWriter 3๋‹จ๊ณ„๋กœ ๋น„์ง€๋‹ˆ์Šค ๋กœ์ง์„ ๋ถ„๋ฆฌํ•ด ์—ญํ• ์„ ๋ช…ํ™•ํ•˜๊ฒŒ ๋ถ„๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

  • ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง ๋ถ„๋ฆฌ

  • ์ฝ์–ด์˜จ ๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ์™€ ์“ฐ์—ฌ์งˆ ๋ฐ์ดํ„ฐ ํƒ€์ž…์ด ๋‹ค๋ฅธ ๊ฒฝ์šฐ์— ๋Œ€ํ•œ ๋Œ€์‘

  • ๊ฐ Chunk๋Š” ์ž์ฒด ํŠธ๋žœ์žญ์…˜์œผ๋กœ ์‹คํ–‰๋˜๋ฉฐ, ์ฒ˜๋ฆฌ์— ์‹คํŒจํ•˜๋ฉด ์„ฑ๊ณตํ•œ ํŠธ๋žœ์žญ์…˜ ์ดํ›„๋ถ€ํ„ฐ ๋‹ค์‹œ ์‹œ์ž‘ ๊ฐ€๋Šฅ

๊ทธ๋Ÿฌ๋ฏ€๋กœ ์ฝ์–ด์˜จ ๋ฐฐ์น˜์˜ ๋ฐ์ดํ„ฐ์™€ ์ €์žฅํ•  ๋ฐ์ดํ„ฐ ํƒ€์ž…์ด ๋‹ค๋ฅธ ๊ฒฝ์šฐ์— ๋Œ€์‘ํ•  ์ˆ˜ ์žˆ๋‹ค.

ItemReader

  • Step์˜ ๋Œ€์ƒ์ด ๋˜๋Š” ๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ(File, Xml, DB ๋“ฑ)๋ฅผ ์ฝ์–ด์˜ค๋Š” ์ธํ„ฐํŽ˜์ด์Šค

  • org.springframework.batch.item.ItemReader<T>

public interface ItemReader<T> {
    // read ๋ฉ”์„œ๋“œ์˜ ๋ฐ˜ํ™˜ ํƒ€์ž…์„ T(์ œ๋„ˆ๋ฆญ)์œผ๋กœ ๊ตฌํ˜„ํ•˜์—ฌ ์ง์ ‘ ํƒ€์ž…์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์Œ
    @Nullable
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

ItemProcessor

  • ItemReader๋กœ ์ฝ์–ด ์˜จ ๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณ€ํ™˜ํ•˜๋Š” ์—ญํ• ์„ ์ˆ˜ํ–‰

  • ItemProcessor๋Š” ๋กœ์ง ์ฒ˜๋ฆฌ๋งŒ ์ˆ˜ํ–‰ํ•˜์—ฌ ์—ญํ• ์„ ๋ถ„๋ฆฌํ•˜๊ณ , ๋ช…ํ™•ํ•œ input/output์„ ItemProcessor๋กœ ๊ตฌํ˜„ํ•ด๋†“์œผ๋ฉด ๋” ์ง๊ด€์ ์ธ ์ฝ”๋“œ๊ฐ€ ๋  ๊ฒƒ์ด๋‹ค.

  • org.springframework.batch.item.ItemProcessor<T>

    public interface ItemProcessor<I, O> {
        @Nullable
        O process(@NonNull I var1) throws Exception;
    }

ItemWriter

  • ๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ(DB, File ๋“ฑ)๋ฅผ ์ €์žฅํ•œ๋‹ค.

  • org.springframework.batch.item.ItemWriter<T>

public interface ItemWriter<T> {
    // T(์ œ๋„ค๋ฆญ)์œผ๋กœ ์ง€์ •ํ•œ ํƒ€์ž…์„ List ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋ฐ›๋Š”๋‹ค.
    void write(List<? extends T> var1) throws Exception;
}

๋ฆฌ์ŠคํŠธ์˜ ๋ฐ์ดํ„ฐ ์ˆ˜๋Š” ์„ค์ •ํ•œ *์ฒญํฌ(Chunk) ๋‹จ์œ„๋กœ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.

์ฒญํฌ ๊ธฐ๋ฐ˜ Job ์˜ˆ์‹œ

@EnableBatchProcessing
@Configuration
public class ChunkBasedConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    /**
     * ์‹ค์ œ ์Šคํ”„๋ง ๋ฐฐ์น˜ Job ์ƒ์„ฑ
     */
    @Bean
    public Job chunkBasedJob() {
        return this.jobBuilderFactory.get("chunkBasedJob")
                .start(chunkStep())
                .build();
    }

    @Bean
    public Step chunkStep() {
        return this.stepBuilderFactory.get("chunkStep")
                .<String, String> chunk(1000) // chunk ๊ธฐ๋ฐ˜, ์ปค๋ฐ‹๊ฐ„๊ฒฉ 1000
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ListItemReader<String> itemReader(){

        List<String> items = new ArrayList<>(100000);

        for (int i = 0; i < 100000; i++) {
            items.add(UUID.randomUUID().toString());
        }

        return new ListItemReader<>(items);
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        return items -> {
            for (String item : items) {
                System.out.println(">> current item = " + item);
            }
            System.out.println(">> end itemWriter chunk " + items.size());
        };
    }
}

์ผ๋ฐ˜์ ์œผ๋กœ๋Š” ์œ„์™€ ๊ฐ™์ด ์ปค๋ฐ‹๊ฐ„๊ฒฉ์„ ํ•˜๋“œ ์ฝ”๋”ฉํ•ด ํฌ๊ธฐ๋ฅผ ์ •์˜ํ•˜์ง€๋งŒ, ํฌ๊ธฐ๊ฐ€ ๋™์ผํ•˜์ง€ ์•Š์€ ์ฒญํฌ๋ฅผ ์ฒ˜๋ฆฌํ•ด์•ผํ•˜๋Š” ๊ฒฝ์šฐ๋„ ์žˆ๋‹ค. ์Šคํ”„๋ง ๋ฐฐ์น˜๋Š” org.springframework.batch.repeat.CompletionPolicy ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ œ๊ณตํ•ด ์ฒญํฌ๊ฐ€ ์™„๋ฃŒ๋˜๋Š” ์‹œ์ ์„ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋„๋ก ์ œ๊ณตํ•ด์ค€๋‹ค.

CompletionPolicy

์ฒญํฌ ์™„๋ฃŒ ์—ฌ๋ถ€๋ฅผ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒฐ์ •๋กœ์ง์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋กœ, CompletionPolicy ์ธํ„ฐํŽ˜์ด์Šค์˜ ๊ตฌํ˜„์ฒด์— ๋Œ€ํ•ด์„œ ์•Œ์•„ ๋ณผ ๊ฒƒ์ด๋‹ค.

package org.springframework.batch.repeat;

public interface CompletionPolicy {

  // ์ฒญํฌ ์™„๋ฃŒ ์—ฌ๋ถ€์˜ ์ƒํƒœ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ฒฐ์ • ๋กœ์ง ์ˆ˜ํ–‰
	boolean isComplete(RepeatContext context, RepeatStatus result);

  // ๋‚ด๋ถ€ ์ƒํƒœ๋ฅผ ์ด์šฉํ•ด ์ฒญํฌ ์™„๋ฃŒ ์—ฌ๋ถ€ ํŒ๋‹จ
	boolean isComplete(RepeatContext context);

  // ์ฒญํฌ์˜ ์‹œ์ž‘์„ ์•Œ ์ˆ˜ ์žˆ๋„๋ก ์ •์ฑ…์„ ์ดˆ๊ธฐํ™”
	RepeatContext start(RepeatContext parent);

 	// ๊ฐ item์ด ์ฒ˜๋ฆฌ๊ฐ€๋˜๋ฉด update ๋ฉ”์„œ๋“œ๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด์„œ ๋‚ด๋ถ€ ์ƒํƒœ ๊ฐฑ์‹ 
	void update(RepeatContext context);
}

์ง์ ‘ ๊ตฌํ˜„ํ•˜๋Š” ๋ฐฉ๋ฒ•

CompletionPolicy๋ฅผ ๊ตฌํ˜„ํ•˜์—ฌ ํ•„์ˆ˜ ๋ฉ”์„œ๋“œ๋“ค์„ ๊ฐ๊ฐ ์•Œ๋งž๊ฒŒ ๋กœ์ง์„ ๊ตฌ์„ฑํ•˜๋ฉด๋œ๋‹ค.

public class RandomChunkSizePolicy implements CompletionPolicy {

    private int chunksize;
    private int totalProcessed;
    private Random random = new Random();

  	// ์ฒญํฌ ์™„๋ฃŒ ์—ฌ๋ถ€์˜ ์ƒํƒœ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ฒฐ์ • ๋กœ์ง ์ˆ˜ํ–‰
    @Override
    public boolean isComplete(RepeatContext context, RepeatStatus result) {
        if (RepeatStatus.FINISHED == result) {
            return true;
        } else {
            return isComplete(context);
        }
    }

  	// ๋‚ด๋ถ€ ์ƒํƒœ๋ฅผ ์ด์šฉํ•ด ์ฒญํฌ ์™„๋ฃŒ ์—ฌ๋ถ€ ํŒ๋‹จ
    @Override
    public boolean isComplete(RepeatContext context) {
        return this.totalProcessed >= chunksize;
    }

  	// ์ฒญํฌ์˜ ์‹œ์ž‘์„ ์•Œ ์ˆ˜ ์žˆ๋„๋ก ์ •์ฑ…์„ ์ดˆ๊ธฐํ™”
    @Override
    public RepeatContext start(RepeatContext parent) {
        this.chunksize = random.nextInt(20);
        this.totalProcessed = 0;

        System.out.println("chunk size has been set to => " + this.chunksize);
        return parent;
    }

  	// ๊ฐ item์ด ์ฒ˜๋ฆฌ๊ฐ€๋˜๋ฉด update ๋ฉ”์„œ๋“œ๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด์„œ ๋‚ด๋ถ€ ์ƒํƒœ ๊ฐฑ์‹ 
    @Override
    public void update(RepeatContext context) {
        this.totalProcessed++;
    }
}
chunk size has been set to => 5
>> current item = b784cda0-961f-4faf-9737-4334e774f0d1
>> current item = 9fc3ec22-5d54-4632-94e3-fcc28cc00260
>> current item = 53452739-0122-4f8b-a52a-667e37cbf908
>> current item = de9ad8ec-1bf7-40d4-b991-72b6699594f9
>> current item = 352104aa-ae63-4928-96e2-460b3b145d43
>> end itemWriter chunk 5
chunk size has been set to => 10
>> current item = 4116f3e9-cad3-4387-8f46-a871d825d7d9
>> current item = 3e1f58b7-a1fa-45bd-b336-2484d1d543f3
>> current item = a58e2401-37e8-402a-8ac3-daddaf2e91a0
>> current item = 01162200-fc7d-4580-8533-d6deab7f3c65
>> current item = 31280b14-49bf-4a03-b1d4-210585e4da40
>> current item = 91b7d269-8105-41dc-ae6a-1d682913c168
>> current item = abeead5f-751b-4862-aa37-4f32eb09439a
>> current item = b6bf1a92-56e8-433a-a0ea-935e101cad6f
>> current item = 84af0778-d6e1-4aef-bd8f-347b047bb276
>> current item = b86477f3-b997-4b04-a3ca-27f517c9170f
>> end itemWriter chunk 10
chunk size has been set to => 11

๋‹ค์Œ๊ณผ ๊ฐ™์ด Random์œผ๋กœ ์ง€์ •๋œ ์ˆ˜๋งŒํผ chunk๊ฐ€ ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์„ ํ™•์ธ ํ•  ์ˆ˜ ์žˆ๋‹ค.

SimpleCompletionPolicy

๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ ๊ตฌํ˜„์ฒด๋กœ, ๋ฏธ๋ฆฌ ๊ตฌ์„ฑํ•ด๋‘” ์ž„๊ณ—๊ฐ’์— ๋„๋‹ฌํ•˜๋ฉด ์ฒญํฌ ์™„๋ฃŒ๋กœ ํ‘œ์‹œํ•œ๋‹ค.

public class SimpleCompletionPolicy extends DefaultResultCompletionPolicy {

	public static final int DEFAULT_CHUNK_SIZE = 5;

	int chunkSize = 0;

	public SimpleCompletionPolicy() {
		this(DEFAULT_CHUNK_SIZE);
	}

	public SimpleCompletionPolicy(int chunkSize) {
		super();
		this.chunkSize = chunkSize;
	}
@Bean
public Step chunkStep() {
  return this.stepBuilderFactory.get("chunkStep")
                .<String, String> chunk(completionPolicy()) // completePolicy ํ˜ธ์ถœ
                .reader(itemReader())
                .writer(itemWriter())
                .build();
}


@Bean
public CompletionPolicy completionPolicy() {
    // ์ฒ˜๋ฆฌ๋œ ITEM ๊ฐœ์ˆ˜๋ฅผ ์„ธ์–ด, ์ด ๊ฐœ์ˆ˜๊ฐ€ ์ž„๊ณ„๊ฐ’์— ๋„๋‹ฌํ•˜๋ฉด chunk ์™„๋ฃŒ๋กœ ํ‘œ์‹œ
    SimpleCompletionPolicy simpleCompletionPolicy = new SimpleCompletionPolicy(1000);

    return simpleCompletionPolicy;
}

TimeoutTerminationPolicy

ํƒ€์ž„์•„์›ƒ ๊ฐ’์„ ๊ตฌ์„ฑํ•ด, ์ฒญํฌ ๋‚ด์—์„œ ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด ์ง€์ •ํ•œ ์‹œ๊ฐ„์ด ๋„˜์œผ๋ฉด ์ฒญํฌ๊ฐ€ ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ๊ฐ„์ฃผํ•˜๊ณ , ๋ชจ๋“  ํŠธ๋žœ์žญ์…˜ ์ฒ˜๋ฆฌ๋ฅผ ์ •์ƒ์ ์œผ๋กœ ํ•œ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค. TimeoutTerminationPolicy ๋งŒ์œผ๋กœ ์ฒญํฌ ์™„๋ฃŒ ์‹œ์ ์„ ๊ฒฐ์ •ํ•˜๋Š” ๊ฒฝ์šฐ๋Š” ๊ฑฐ์˜ ์กด์žฌํ•˜์ง€ ์•Š์œผ๋ฉฐ, CompositeCompletionPolicy์˜ ์ผ๋ถ€๋กœ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋งŽ๋‹ค.

public class TimeoutTerminationPolicy extends CompletionPolicySupport {

	/**
	 * Default timeout value in milliseconds (the value equivalent to 30 seconds).
	 */
	public static final long DEFAULT_TIMEOUT = 30000L;

	private long timeout = DEFAULT_TIMEOUT;

	/**
	 * Default constructor.
	 */
	public TimeoutTerminationPolicy() {
		super();
	}

	/**
	 * Construct a {@link TimeoutTerminationPolicy} with the specified timeout
	 * value (in milliseconds).
	 * 
	 * @param timeout duration of the timeout.
	 */
	public TimeoutTerminationPolicy(long timeout) {
		super();
		this.timeout = timeout;
	}
    @Bean
    public CompletionPolicy timeoutCompletionPolicy() {
        TimeoutTerminationPolicy timeoutTerminationPolicy = new TimeoutTerminationPolicy(3);
        return timeoutTerminationPolicy;
    }

TimeoutTerminationPolicy๋กœ ์ˆ˜ํ–‰ํ•œ ๊ฒฝ์šฐ ๊ฐ chunk ๋‹จ์œ„๋ฅผ ํ™•์ธํ•ด๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ œ๊ฐ๊ฐ์ธ ๊ฒƒ์„ ๋ณผ ์ˆ˜์žˆ๋‹ค.

>> end itemWriter chunk 795
>> end itemWriter chunk 679
>> end itemWriter chunk 841
>> end itemWriter chunk 1153
>> end itemWriter chunk 1061
>> end itemWriter chunk 1916
>> end itemWriter chunk 1667
>> end itemWriter chunk 1719
>> end itemWriter chunk 931
>> end itemWriter chunk 1634
>> end itemWriter chunk 941
>> end itemWriter chunk 667
>> end itemWriter chunk 665
>> end itemWriter chunk 547
>> end itemWriter chunk 533
>> end itemWriter chunk 973
>> end itemWriter chunk 647
>> end itemWriter chunk 1632
>> end itemWriter chunk 676

CompositeCompletionPolicy

CompositeCompletionPolicy๋Š” ์ฒญํฌ ์™„๋ฃŒ ์—ฌ๋ถ€๋ฅผ ๊ฒฐ์ •ํ•˜๋Š” ์—ฌ๋Ÿฌ ์ •์ฑ…์„ ํ•จ๊ป˜ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค. ํฌํ•จํ•˜๊ณ  ์žˆ๋Š” ์—ฌ๋Ÿฌ ์ •์ฑ… ์ค‘ ํ•˜๋‚˜๋ผ๋„ ์ฒญํฌ ์™„๋ฃŒ๋ผ๊ณ  ํŒ๋‹จ๋˜๋ฉด ํ•ด๋‹น ์ฒญํฌ๊ฐ€ ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ํ‘œ์‹œํ•œ๋‹ค.

		@Bean
    public CompletionPolicy compositeCompletionPolicy() {
        CompositeCompletionPolicy policy = new CompositeCompletionPolicy();

        // ์—ฌ๋Ÿฌ ์ •์ฑ… ์„ค์ •
        policy.setPolicies(
                new CompletionPolicy[]{
                        new TimeoutTerminationPolicy(3),
                        new SimpleCompletionPolicy(1000)
                }
        );
      	return policy;
    }

๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ˆ˜ํ–‰ํ•œ ๊ฒฝ์šฐ์—๋Š” chunk ๋‹จ์œ„๊ฐ€ 1000๊ฐœ๋ฅผ ๋„˜์–ด์„  ๊ฒฝ์šฐ๊ฐ€ ์—†๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

>> end itemWriter chunk 731
>> end itemWriter chunk 1000
>> end itemWriter chunk 690
>> end itemWriter chunk 1000
>> end itemWriter chunk 798
>> end itemWriter chunk 1000
>> end itemWriter chunk 980
>> end itemWriter chunk 838
>> end itemWriter chunk 850
>> end itemWriter chunk 1000
>> end itemWriter chunk 263
>> end itemWriter chunk 556
>> end itemWriter chunk 629
>> end itemWriter chunk 962
>> end itemWriter chunk 960
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 898
>> end itemWriter chunk 1000
>> end itemWriter chunk 900
>> end itemWriter chunk 798
>> end itemWriter chunk 215

Step Listener

์Šคํƒญ๊ณผ ์ฒญํฌ์˜ ์‹œ์ž‘๊ณผ ๋์—์„œ ํŠน์ • ๋กœ์ง์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ค€๋‹ค.

(StepListener๋Š” ๋ชจ๋“  ์Šคํƒญ ๋ฆฌ์Šค๋„ˆ๊ฐ€ ์ƒ์†ํ•˜๋Š” ๋งˆ์ปค ์ธํ„ฐํŽ˜์ด์Šค์ด๋‹ค.)

๋ชจ๋“  ์ˆ˜์ค€์— ๋ฆฌ์Šค๋„ˆ๋ฅผ ์ ์šฉํ•ด Job์„ ์ค‘๋‹จํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์ผ๋ฐ˜์ ์œผ๋กœ ์ „์ฒ˜๋ฆฌ๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ฑฐ๋‚˜ ์ดํ›„ ๊ฒฐ๊ณผ๋ฅผ ํ‰๊ฐ€ํ•˜๊ฑฐ๋‚˜, ์ผ๋ถ€ ์˜ค๋ฅ˜์ฒ˜๋ฆฌ์—๋„ ์‚ฌ์šฉ๋œ๋‹ค.

StepExecutionListener

  • org.springframework.batch.core.StepExecutionListener

public interface StepExecutionListener extends StepListener {

	void beforeStep(StepExecution stepExecution);

  // Listener๊ฐ€ ์Šคํ…์ด ๋ฐ˜ํ™˜ํ•œ ExitStatus๋ฅผ Job์— ์ „๋‹ฌํ•˜๊ธฐ ์ „์— ์ˆ˜์ •ํ•  ์ˆ˜ ์žˆ์Œ.
	@Nullable
	ExitStatus afterStep(StepExecution stepExecution);
}

@BeforeStep, @AfterStep ์• ๋„ˆํ…Œ์ด์…˜ ์ œ๊ณต

public class LoggingStepStartStopListener {

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName() + " ์‹œ์ž‘");
    }


    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName() + " ์ข…๋ฃŒ");
        return stepExecution.getExitStatus();
    }
}
    @Bean
    public Step chunkStep() {
        return this.stepBuilderFactory.get("chunkStep")
                .<String, String> chunk(randomChunkSizePolicy())
                .reader(itemReader())
                .writer(itemWriter())
                .listener(new LoggingStepStartStopListener()) // Listener ์„ค์ •
                .build();
    }

ChunkListener

public interface ChunkListener extends StepListener {

	static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";

	void beforeChunk(ChunkContext context);

	void afterChunk(ChunkContext context);

	void afterChunkError(ChunkContext context);
}

@BeforeChunk, @AfterChunk ์• ๋„ˆํ…Œ์ด์…˜ ์ œ๊ณต

Step Flow

์กฐ๊ฑด ๋กœ์ง

์Šคํ”„๋ง ๋ฐฐ์น˜์˜ Step์€ StepBuilder์˜ .next() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด ์ง€์ •ํ•œ ์ˆœ์„œ๋Œ€๋กœ ์‹คํ–‰๋œ๋‹ค. ์ „์ด(transition)๋ฅผ ๊ตฌ์„ฑํ•ด ๊ฒฐ๊ณผ์— ๋”ฐ๋ฅธ ๋‹ค๋ฅธ ์ˆœ์„œ๋กœ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ๋„ ๊ฐ€๋Šฅํ•˜๋‹ค.

    @Bean
    public Job conditionalJob() {
        return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").to(failureStep())
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();
    }

์Šคํ”„๋ง ๋ฐฐ์น˜๋Š” ๊ธฐ์ค€์— ๋”ฐ๋ผ ๋‘๊ฐœ์˜ ์™€์ผ๋“œ ์นด๋“œ๋ฅผ ํ—ˆ์šฉํ•œ๋‹ค.

  • * : 0๊ฐœ ์ด์ƒ์˜ ๋ฌธ์ž๋ฅผ ์ผ์น˜ํ•˜๋Š” ๊ฒƒ์„ ์˜๋ฏธ

    • C* : COMPLETE, CORRECT

  • ? : 1๊ฐœ์˜ ๋ฌธ์ž๋ฅผ ์ผ์น˜ ์‹œํ‚ค๋Š” ๊ฒƒ์„ ์˜๋ฏธ

    • ?AT : CAT, KAT๊ณผ ์ผ์น˜ํ•˜์ง€๋งŒ, THAT๊ณผ๋Š” ๋ถˆ์ผ์น˜

JobExecutionDecider

Job ์‹คํ–‰ ์ •๋ณด(jobExecution)์™€ ์Šคํƒญ ์‹คํ–‰์ •๋ณด( stepExecution)๋ฅผ ์ธ์ž๋กœ ๋ฐ›์•„ ๋ชจ๋“  ์ •๋ณด๋ฅผ ์ด์šฉํ•ด ๋‹ค์Œ์— ๋ฌด์—‡์„ ์ˆ˜ํ–‰ํ• ์ง€์— ๋Œ€ํ•ด ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

public interface JobExecutionDecider {

	/**
	 * Strategy for branching an execution based on the state of an ongoing
	 * {@link JobExecution}. The return value will be used as a status to
	 * determine the next step in the job.
	 * 
	 * @param jobExecution a job execution
	 * @param stepExecution the latest step execution (may be {@code null})
	 * @return the exit status code
	 */
	FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution);

}
public class RandomDecider implements JobExecutionDecider {

    private Random random = new Random();

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (random.nextBoolean()) {
            return new FlowExecutionStatus(FlowExecutionStatus.COMPLETED.getName());
        } else {
            return new FlowExecutionStatus(FlowExecutionStatus.FAILED.getName());
        }
    }
}
    @Bean
    public Job conditionalJob() {
        return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .next(decider())
                .from(decider())
                .on("FAILED").to(failureStep())
                .from(decider()).on("*").to(successStep())
                .end()
                .build();
    }

Job ์ข…๋ฃŒํ•˜๊ธฐ

์Šคํ”„๋ง ๋ฐฐ์น˜์—์„œ๋Š” Job์„ ์ข…๋ฃŒํ•  ๋•Œ ์•„๋ž˜ 3๊ฐ€์ง€ ์ƒํƒœ๋กœ ์ข…๋ฃŒํ•  ์ˆ˜ ์žˆ๋‹ค.

์ƒํƒœ
์„ค๋ช…

Completed(์™„๋ฃŒ)

์Šคํ”„๋ง ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ข…๋ฃŒ๋์Œ์„ ์˜๋ฏธ JobInstance๊ฐ€ Completed๋กœ ์ข…๋ฃŒ๋˜๋ฉด ๋™์ผํ•œ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์‚ฌ์šฉํ•ด ๋‹ค์‹œ ์‹คํ–‰ํ•  ์ˆ˜ ์—†๋‹ค.

Failed(์‹คํŒจ)

์žก์ด ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋˜์ง€ ์•Š์•˜์Œ์„ ์˜๋ฏธ Failed ์ƒํƒœ๋กœ ์ข…๋ฃŒ๋œ ์žก์€ ์Šคํ”„๋ง ๋ฐฐ์น˜๋ฅผ ์‚ฌ์šฉํ•ด ๋™์ผํ•œ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋‹ค์‹œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

Stopped(์ค‘์ง€)

Stopped ์ƒํƒœ๋กœ ์ข…๋ฃŒ๋œ ์žก์€ ๋‹ค์‹œ ์ˆ˜ํ–‰ ๊ฐ€๋Šฅํ•˜๋‹ค. Job์— ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š์•˜์–ด๋„, ์ค‘๋‹จ๋œ ์œ„์น˜์—์„œ ์žก์„ ๋‹ค์‹œ ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ๋‹ค. ์‚ฌ๋žŒ์˜ ๊ฐœ์ž…์ด ํ•„์š”ํ•˜๊ฑฐ๋‚˜ ๋‹ค๋ฅธ ๊ฒ€์‚ฌ/์ฒ˜๋ฆฌ๊ฐ€ ํ•„์š”ํ•œ ์ƒํ™ฉ์— ์œ ์šฉํ•˜๋‹ค.

BatchStatus๋ฅผ ํŒ๋ณ„ํ•  ๋•Œ, ExitStatus๋ฅผ ํ‰๊ฐ€ํ•˜๋ฉด์„œ ์‹๋ณ„๋œ๋‹ค. ExitStatus๋Š” ์Šคํ…, ์ฒญํฌ, ์žก์—์„œ ๋ฐ˜ํ™˜๋  ์ˆ˜ ์žˆ์œผ๋ฉฐ, BatchStatus๋Š” StepExecution ์ด๋‚˜ JobExecution ๋‚ด์— ๋ณด๊ด€๋˜๋ฉฐ, JobRepository์— ์ €์žฅ๋œ๋‹ค.

Completed ์ƒํƒœ๋กœ ์ข…๋ฃŒํ•˜๊ธฐ

.end() ๋ฉ”์„œ๋“œ ์‚ฌ์šฉ

return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").end()
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();

BATCH_STEP_EXECUTION ํ…Œ์ด๋ธ”์— ์Šคํ…์ด ๋ฐ˜ํ™˜ํ•œ ExitStatus๊ฐ€ ์ €์žฅ๋˜๋ฉฐ, ์Šคํ…์ด ๋ฐ˜ํ™˜ํ•œ ์ƒํƒœ๊ฐ€ ๋ฌด์—‡์ด๋“  ์ƒ๊ด€์—†์ด BATCH_JOB_EXECUTION์— COMPLETED๊ฐ€ ์ €์žฅ๋œ๋‹ค.

Failed ์ƒํƒœ๋กœ ์ข…๋ฃŒํ•˜๊ธฐ

fail() ๋ฉ”์„œ๋“œ ์‚ฌ์šฉ

return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").fail()
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();

์—ฌ๊ธฐ์„œ firstStep() ์ด FAILED๋กœ ๋๋‚˜๋ฉด, JobRepository ์— ํ•ด๋‹น Job์ด ์‹คํŒจํ•œ ๊ฒƒ์œผ๋กœ ์ €์žฅ๋˜๋ฉฐ, ๋™์ผํ•œ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์‚ฌ์šฉํ•ด ๋‹ค์‹œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

Stopped ์ƒํƒœ๋กœ ์ข…๋ฃŒํ•˜๊ธฐ

.stopAndRestart() ๋ฉ”์„œ๋“œ๋กœ ์žก์„ ๋‹ค์‹œ ์ˆ˜ํ–‰ํ•œ๋‹ค๋ฉด, ๋ฏธ๋ฆฌ ๊ตฌ์„ฑํ•ด๋‘” ์Šคํ…๋ถ€ํ„ฐ ์‹œ์ž‘๋œ๋‹ค. ์•„๋ž˜ ์˜ˆ์ œ์—์„œ๋Š” ์žฌ์ˆ˜ํ–‰์‹œ successStep()๋ถ€ํ„ฐ ์ˆ˜ํ–‰๋˜๋Š”๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").stopAndRestart(successStep())
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();

Last updated