Step

Step은 μ‹€μ§ˆμ μΈ 배치 처리λ₯Ό μ •μ˜ν•˜κ³  μ œμ–΄ν•˜λŠ” 데 ν•„μš”ν•œ λͺ¨λ“  정보가 λ“€μ–΄μžˆλŠ” 도메인 객체둜, Job을 μ²˜λ¦¬ν•˜λŠ” μ‹€μ§ˆμ μΈ λ‹¨μœ„λ‘œ 쓰인닀.(Job:Step = 1:M)

  • Step은 Job을 κ΅¬μ„±ν•˜λŠ” λ…λ¦½λœ μž‘μ—… λ‹¨μœ„

  • 순차적으둜 배치 처리 μˆ˜ν–‰

  • Step은 λͺ¨λ“  λ‹¨μœ„ μž‘μ—…μ˜ 쑰각으둜 자체적으둜 μž…λ ₯, 처리기, 좜λ ₯을 닀룬닀.

  • νŠΈλžœμž­μ…˜μ€ Step λ‚΄λΆ€μ—μ„œ 이루어짐

  • org.springframework.batch.core.Step

StepExecution

Step의 μ‹€ν–‰ 정보λ₯Ό λ‹΄λŠ” 객체둜, 각각의 Step이 싀행될 λ•Œλ§ˆλ‹€ StepExecution이 μƒμ„±λœλ‹€.

public class StepExecution extends Entity {
    private final JobExecution jobExecution; // ν˜„μž¬ JobExecution 정보
    private final String stepName; // Step 이름
    private volatile BatchStatus status; // Step의 μ‹€ν–‰ μƒνƒœ(COMPLETED, STARTING, STARTED ...)
    private volatile int readCount; // μ„±κ³΅μ μœΌλ‘œ 읽은 λ ˆμ½”λ“œ 수
    private volatile int writeCount; // μ„±κ³΅μ μœΌλ‘œ μ“΄ λ ˆμ½”λ“œ 수
    private volatile int commitCount; // Step의 싀행에 λŒ€ν•΄ μ»€λ°‹λœ νŠΈλžœμž­μ…˜ 수
    private volatile int rollbackCount; // Step의 싀행에 λŒ€ν•΄ 둀백된 νŠΈλžœμž­μ…˜ 수
    private volatile int readSkipCount; // 읽기에 μ‹€νŒ¨ν•΄ κ±΄λ„ˆ 띈 λ ‰μ½”λ“œ 수
    private volatile int processSkipCount; // ν”„λ‘œμ„ΈμŠ€κ°€ μ‹€νŒ¨ν•΄ κ±΄λ„ˆ 띈 λ ‰μ½”λ“œ 수
    private volatile int writeSkipCount;// 쓰기에 μ‹€νŒ¨ν•΄ κ±΄λ„ˆ 띈 λ ‰μ½”λ“œ 수
    private volatile Date startTime; // Step이 μ‹€ν–‰λœ μ‹œκ°„(null == μ‹œμž‘λ˜μ§€ μ•ŠμŒ)
    private volatile Date endTime; // Step의 μ‹€ν–‰ 성곡 여뢀와 관계 없이 λλ‚œ μ‹œκ°„
    private volatile Date lastUpdated; // λ§ˆμ§€λ§‰μœΌλ‘œ μˆ˜μ •λœ μ‹œκ°„
    private volatile ExecutionContext executionContext; // Step μ‹€ν–‰ 사이에 μœ μ§€ν•΄μ•Όν•˜λŠ” μ‚¬μš©μž 데이터
    private volatile ExitStatus exitStatus; // Step μ‹€ν–‰ 결과에 λŒ€ν•œ μƒνƒœ κ°’(UNKOWN, EXECUTING, COMPLETE, ...)
    private volatile boolean terminateOnly; // Job μ‹€ν–‰ 쀑지 μ—¬λΆ€
    private volatile int filterCount; // μ‹€ν–‰μ—μ„œ ν•„ν„°λ§λœ λ ˆμ½”λ“œ 수
    private transient volatile List<Throwable> failureExceptions; // Step 싀행쀑 λ°œμƒν•œ μ˜ˆμ™Έ 리슀트
    ...
}

Tasklet 기반

  • Tasklet은 μž„μ˜μ˜ Step을 μ‹€ν–‰ν•  λ•Œ ν•˜λ‚˜μ˜ μž‘μ—…μ„ μ²˜λ¦¬ν•˜λŠ” 방식

  • 읽기, 처리, μ“°κΈ°λ‘œ λ‚˜λ‰œ 방식이 청크 지ν–₯ ν”„λ‘œμ„Έμ‹±μ΄λΌλ©΄ 이λ₯Ό 단일 μž‘μ—…μœΌλ‘œ λ§Œλ“œλŠ” κ°œλ…μ΄ Tasklet

  • νŠΈλžœμž­μ…˜ λ‚΄μ—μ„œ 둜직이 싀행될 수 μžˆλŠ” κΈ°λŠ₯을 μ œκ³΅ν•˜λŠ” μ „λž΅ μΈν„°νŽ˜μ΄μŠ€

  • org.springframework.batch.core.step.tasklet.Tasklet

public interface Tasklet {
  	// 내뢀에 μ›ν•˜λŠ” 단일 μž‘μ—…μ„ κ΅¬ν˜„ν•˜κ³  λ‚˜λ©΄, RepeatStatus.FINISHED λ°˜ν™˜ν›„ μž‘μ—…μ΄ κ³„μ†λ˜λ©΄ RepeatStatus.CONTINUABLE λ°˜ν™˜
    @Nullable
    RepeatStatus execute(StepContribution var1, ChunkContext var2) throws Exception;
}

Adapter

CallableTaskletAdapter

  • org.springframework.batch.core.step.tasklet.CallableTaskletAdapter

  • Callable<V> μΈν„°νŽ˜μ΄μŠ€μ˜ κ΅¬ν˜„μ²΄λ₯Ό ꡬ성할 수 있게 ν•΄μ£ΌλŠ” Adapter

    • 리턴값이 μ‘΄μž¬ν•˜κΈ° λ•Œλ¬Έμ— 곡유 객체λ₯Ό μ‚¬μš©ν•˜μ§€ μ•ŠλŠ”λ‹€.

    • 체크 μ˜ˆμ™Έλ₯Ό μ™ΈλΆ€λ‘œ 던질 수 μžˆλ‹€.

  • Step의 νŠΉμ • λ‘œμ§μ„ ν•΄λ‹Ή Step이 μ‹€ν–‰λ˜λŠ” μŠ€λ ˆλ“œκ°€ μ•„λ‹Œ λ‹€λ₯Έ μŠ€λ ˆλ“œμ—μ„œ μ‹€ν–‰ν•˜κ³  싢을 λ•Œ μ‚¬μš©

@EnableBatchProcessing
@SpringBootApplication
public class CallableTaskletConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job callableJob() {
        return this.jobBuilderFactory.get("callableJob")
                .start(callableStep())
                .build();
    }

    @Bean
    public Step callableStep() {
        return this.stepBuilderFactory.get("callableStep")
                .tasklet(callableTasklet())
                .build();
    }

    @Bean
    public Callable<RepeatStatus> callableObject() {
        return () -> {
            System.out.println("This was executed in another thread");
            return RepeatStatus.FINISHED;
        };
    }

    @Bean
    public CallableTaskletAdapter callableTasklet() {

        // CallableTaskletAdapterλŠ” Step이 μ‹€ν–‰λ˜λŠ” μŠ€λ ˆλ“œμ™€ λ³„κ°œμ˜ μŠ€λ ˆλ“œμ—μ„œ μ‹€ν–‰λ˜μ§€λ§Œ
        // Stepκ³Ό λ³‘λ ¬λ‘œ μ‹€ν–‰λ˜λŠ” 것은 μ•„λ‹ˆλ‹€.
        CallableTaskletAdapter callableTaskletAdapter = new CallableTaskletAdapter();

        callableTaskletAdapter.setCallable(callableObject());

        return callableTaskletAdapter;
    }

}

MethodInvokingTaskletAdapter

  • org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter1

  • λ‹€λ₯Έ 클래슀 λ‚΄μ˜ λ©”μ„œλ“œλ₯Ό Tasklet처럼 μ‹€ν–‰ κ°€λŠ₯

@EnableBatchProcessing
@Configuration
public class MethodInvokingTaskletConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job methodInvokingJob() {
        return this.jobBuilderFactory.get("methodInvokingJob")
                .start(methodInvokingStep())
                .build();
    }

    @Bean
    public Step methodInvokingStep() {
        return this.stepBuilderFactory.get("methodInvokingStep")
                .tasklet(methodInvokingTasklet())
                .build();
    }

  	@StepScope
    @Bean
    public MethodInvokingTaskletAdapter methodInvokingTasklet(
            @Value("#{jobParameters['message']}") String message) {
        // λ‹€λ₯Έ 클래슀 λ‚΄μ˜ λ©”μ„œλ“œλ₯Ό Tasklet처럼 μ‹€ν–‰ κ°€λŠ₯
        MethodInvokingTaskletAdapter methodInvokingTaskletAdapter = new MethodInvokingTaskletAdapter();

        methodInvokingTaskletAdapter.setTargetObject(customerService()); // ν˜ΈμΆœν•  λ©”μ„œλ“œκ°€ μžˆλŠ” 객체
        methodInvokingTaskletAdapter.setTargetMethod("serviceMethod"); // ν˜ΈμΆœν•  λ©”μ„œλ“œλͺ…
        methodInvokingTaskletAdapter.setArguments(new String[] {message});

        return methodInvokingTaskletAdapter;
    }

    @Bean
    public CustomerService customerService() {
        return new CustomerService();
    }

}

TargetMethodλŠ” ExitStatus.COMPLETED default이며, ExitStatusλ₯Ό λ°˜ν™˜ν•˜λ©΄ λ©”μ„œλ“œκ°€ λ°˜ν™˜ν•œ 값이 Taskletμ—μ„œ λ°˜ν™˜λœλ‹€.

SystemCommandTasklet

  • org.springframework.batch.core.step.tasklet.SystemCommandTasklet

  • μ‹œμŠ€ν…œ λͺ…령을 μ‹€ν–‰ν•  λ•Œ μ‚¬μš©ν•˜λ©°, μ§€μ •ν•œ μ‹œμŠ€ν…œ λͺ…령을 λΉ„λ™κΈ°λ‘œ μ‹€ν–‰ν•œλ‹€.

@EnableBatchProcessing
@Configuration
public class SystemCommandConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job systemCommandJob() {
        return this.jobBuilderFactory.get("systemCommandJob")
                .start(systemCommandStep())
                .build();
    }

    @Bean
    public Step systemCommandStep() {
        return this.stepBuilderFactory.get("systemCommandStep")
                .tasklet(systemCommandTasklet())
                .build();
    }

    @Bean
    public SystemCommandTasklet systemCommandTasklet() {
        SystemCommandTasklet systemCommandTasklet = new SystemCommandTasklet();

        // λͺ…λ Ήμ–΄
        systemCommandTasklet.setCommand("touch tmp.txt");
        systemCommandTasklet.setTimeout(5000);

        // Job이 λΉ„μ •μƒμ μœΌλ‘œ μ’…λ£Œλ  λ–„ μ‹œμŠ€ν…œ ν”„λ‘œμ„ΈμŠ€μ™€ κ΄€λ ¨λœ μŠ€λ ˆλ“œλ₯Ό κ°•μ œ μ’…λ£Œν• μ§€ μ—¬λΆ€
        systemCommandTasklet.setInterruptOnCancel(true);

        // μž‘μ—… 디렉토리 μ„€μ •
        systemCommandTasklet.setWorkingDirectory("/Users/dh0023/Develop/gitbook/TIL");

        // μ‹œμŠ€ν…œ λ°˜ν™˜ μ½”λ“œλ₯Ό μŠ€ν”„λ§ 배치 μƒνƒœ κ°’μœΌλ‘œ 맀핑
        systemCommandTasklet.setSystemProcessExitCodeMapper(touchCodeMapper());

        // λΉ„λ™κΈ°λ‘œ μ‹€ν–‰ν•˜λŠ” μ‹œμŠ€ν…œ λͺ…령을 주기적으둜 μ™„λ£Œ μ—¬λΆ€ 확인, μ™„λ£Œ μ—¬λΆ€λ₯Ό ν™•μΈν•˜λŠ” μ£ΌκΈ°(default=1초)
        systemCommandTasklet.setTerminationCheckInterval(5000);

        // μ‹œμŠ€ν…œ λͺ…령을 μ‹€ν–‰ν•˜λŠ” TaskExecutor ꡬ성 κ°€λŠ₯
        // λ¬Έμ œκ°€ λ°œμƒν•˜λ©΄ 락이 λ°œμƒν•  수 μžˆμœΌλ―€λ‘œ, λ™κΈ°λ°©μ‹μœΌλ‘œ κ΅¬ν˜„ν•˜μ§€ μ•ŠλŠ” 것이 μ’‹λ‹€.
        systemCommandTasklet.setTaskExecutor(new SimpleAsyncTaskExecutor());

        // λͺ…령을 μ‹€ν–‰ν•˜κΈ° 전에 μ„€μ •ν•˜λŠ” ν™˜κ²½ νŒŒλΌλ―Έν„° λͺ©λ‘
        systemCommandTasklet.setEnvironmentParams(
                new String[]{"BATCH_HOME=/Users/dh0023/Develop/spring/spring-practice/batch-practice"});

        return systemCommandTasklet;
    }

    @Bean
    public SimpleSystemProcessExitCodeMapper touchCodeMapper() {
        // λ°˜ν™˜λœ μ‹œμŠ€ν…œ μ½”λ“œκ°€ 0이 ExitStatus.FINISHED
        // 0이 μ•„λ‹ˆλ©΄ ExitStatus.FAILED
        return new SimpleSystemProcessExitCodeMapper();
    }

    @Bean
    public ConfigurableSystemProcessExitCodeMapper configurableSystemProcessExitCodeMapper() {
        // 일반적인 ꡬ성 λ°©λ²•μœΌλ‘œ 맀핑 ꡬ성을 ν•  수 있음.
        ConfigurableSystemProcessExitCodeMapper mapper = new ConfigurableSystemProcessExitCodeMapper();

        Map<Object, ExitStatus> mappings = new HashMap<Object, ExitStatus>() {
            {
                put(0, ExitStatus.COMPLETED);
                put(1, ExitStatus.FAILED);
                put(2, ExitStatus.EXECUTING);
                put(3, ExitStatus.NOOP);
                put(4, ExitStatus.UNKNOWN);
                put(ConfigurableSystemProcessExitCodeMapper.ELSE_KEY, ExitStatus.UNKNOWN);
            }
        };

        mapper.setMappings(mappings);

        return mapper;
    }
}
  • SimpleSystemProcessExitCodeMapper

    public class SimpleSystemProcessExitCodeMapper implements SystemProcessExitCodeMapper {
    	@Override
    	public ExitStatus getExitStatus(int exitCode) {
    		if (exitCode == 0) {
    			return ExitStatus.COMPLETED;
    		} else {
    			return ExitStatus.FAILED;
    		}
    	}
    
    }

Chunk 기반

Chunkλž€ μ•„μ΄ν…œμ΄ νŠΈλžœμž­μ…˜μ— commitλ˜λŠ” 수λ₯Ό λ§ν•œλ‹€.

즉, 청크 지ν–₯ μ²˜λ¦¬λž€ ν•œ λ²ˆμ— ν•˜λ‚˜μ”© 데이터λ₯Ό 읽어 ChunkλΌλŠ” 덩어리λ₯Ό λ§Œλ“  λ’€, Chunk λ‹¨μœ„λ‘œ νŠΈλžœμž­μ…˜μ„ λ‹€λ£¨λŠ” 것을 μ˜λ―Έν•œλ‹€.

Chunk 지ν–₯ ν”„λ‘œμ„Έμ‹±μ€ 1000개의 데이터에 λŒ€ν•΄ 배치 λ‘œμ§μ„ μ‹€ν–‰ν•œλ‹€κ³  κ°€μ •ν•˜λ©΄, Chunk λ‹¨μœ„λ‘œ λ‚˜λˆ„μ§€ μ•Šμ•˜μ„ κ²½μš°μ—λŠ” ν•œκ°œλ§Œ μ‹€νŒ¨ν•΄λ„ μ„±κ³΅ν•œ 999개의 데이터가 λ‘€λ°±λœλ‹€. Chunk λ‹¨μœ„λ₯Ό 10으둜 ν•œλ‹€λ©΄, μž‘μ—… 쀑에 λ‹€λ₯Έ ChunkλŠ” 영ν–₯을 받지 μ•ŠλŠ”λ‹€.

μ΄λ•Œ, ChunkλŠ” 컀밋 간격(commit interval)에 μ˜ν•΄ μ •μ˜λ˜κ³  μˆ˜ν–‰ν•˜λ―€λ‘œ, 컀밋 간격에 따라 μ„±λŠ₯이 λ‹¬λΌμ§ˆ 수 μžˆλ‹€. μ΅œμƒμ˜ μ„±λŠ₯을 μ–»κΈ° μœ„ν•΄μ„œλŠ” 컀밋 간격 섀정이 μ€‘μš”ν•˜λ‹€.

ItemReader, ItemProcessor, ItemWriter 3λ‹¨κ³„λ‘œ λΉ„μ§€λ‹ˆμŠ€ λ‘œμ§μ„ 뢄리해 역할을 λͺ…ν™•ν•˜κ²Œ 뢄리할 수 μžˆλ‹€.

  • λΉ„μ¦ˆλ‹ˆμŠ€ 둜직 뢄리

  • μ½μ–΄μ˜¨ 배치 데이터와 μ“°μ—¬μ§ˆ 데이터 νƒ€μž…μ΄ λ‹€λ₯Έ κ²½μš°μ— λŒ€ν•œ λŒ€μ‘

  • 각 ChunkλŠ” 자체 νŠΈλžœμž­μ…˜μœΌλ‘œ μ‹€ν–‰λ˜λ©°, μ²˜λ¦¬μ— μ‹€νŒ¨ν•˜λ©΄ μ„±κ³΅ν•œ νŠΈλžœμž­μ…˜ 이후뢀터 λ‹€μ‹œ μ‹œμž‘ κ°€λŠ₯

κ·ΈλŸ¬λ―€λ‘œ μ½μ–΄μ˜¨ 배치의 데이터와 μ €μž₯ν•  데이터 νƒ€μž…μ΄ λ‹€λ₯Έ κ²½μš°μ— λŒ€μ‘ν•  수 μžˆλ‹€.

ItemReader

  • Step의 λŒ€μƒμ΄ λ˜λŠ” 배치 데이터(File, Xml, DB λ“±)λ₯Ό μ½μ–΄μ˜€λŠ” μΈν„°νŽ˜μ΄μŠ€

  • org.springframework.batch.item.ItemReader<T>

public interface ItemReader<T> {
    // read λ©”μ„œλ“œμ˜ λ°˜ν™˜ νƒ€μž…μ„ T(μ œλ„ˆλ¦­)으둜 κ΅¬ν˜„ν•˜μ—¬ 직접 νƒ€μž…μ„ 지정할 수 있음
    @Nullable
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

ItemProcessor

  • ItemReader둜 읽어 온 배치 데이터λ₯Ό λ³€ν™˜ν•˜λŠ” 역할을 μˆ˜ν–‰

  • ItemProcessorλŠ” 둜직 처리만 μˆ˜ν–‰ν•˜μ—¬ 역할을 λΆ„λ¦¬ν•˜κ³ , λͺ…ν™•ν•œ input/output을 ItemProcessor둜 κ΅¬ν˜„ν•΄λ†“μœΌλ©΄ 더 직관적인 μ½”λ“œκ°€ 될 것이닀.

  • org.springframework.batch.item.ItemProcessor<T>

    public interface ItemProcessor<I, O> {
        @Nullable
        O process(@NonNull I var1) throws Exception;
    }

ItemWriter

  • 배치 데이터(DB, File λ“±)λ₯Ό μ €μž₯ν•œλ‹€.

  • org.springframework.batch.item.ItemWriter<T>

public interface ItemWriter<T> {
    // T(μ œλ„€λ¦­)으둜 μ§€μ •ν•œ νƒ€μž…μ„ List λ§€κ°œλ³€μˆ˜λ‘œ λ°›λŠ”λ‹€.
    void write(List<? extends T> var1) throws Exception;
}

리슀트의 데이터 μˆ˜λŠ” μ„€μ •ν•œ *청크(Chunk) λ‹¨μœ„λ‘œ λΆˆλŸ¬μ˜¨λ‹€.

청크 기반 Job μ˜ˆμ‹œ

@EnableBatchProcessing
@Configuration
public class ChunkBasedConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    /**
     * μ‹€μ œ μŠ€ν”„λ§ 배치 Job 생성
     */
    @Bean
    public Job chunkBasedJob() {
        return this.jobBuilderFactory.get("chunkBasedJob")
                .start(chunkStep())
                .build();
    }

    @Bean
    public Step chunkStep() {
        return this.stepBuilderFactory.get("chunkStep")
                .<String, String> chunk(1000) // chunk 기반, 컀밋간격 1000
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public ListItemReader<String> itemReader(){

        List<String> items = new ArrayList<>(100000);

        for (int i = 0; i < 100000; i++) {
            items.add(UUID.randomUUID().toString());
        }

        return new ListItemReader<>(items);
    }

    @Bean
    public ItemWriter<String> itemWriter() {
        return items -> {
            for (String item : items) {
                System.out.println(">> current item = " + item);
            }
            System.out.println(">> end itemWriter chunk " + items.size());
        };
    }
}

μΌλ°˜μ μœΌλ‘œλŠ” μœ„μ™€ 같이 컀밋간격을 ν•˜λ“œ μ½”λ”©ν•΄ 크기λ₯Ό μ •μ˜ν•˜μ§€λ§Œ, 크기가 λ™μΌν•˜μ§€ μ•Šμ€ 청크λ₯Ό μ²˜λ¦¬ν•΄μ•Όν•˜λŠ” κ²½μš°λ„ μžˆλ‹€. μŠ€ν”„λ§ λ°°μΉ˜λŠ” org.springframework.batch.repeat.CompletionPolicy μΈν„°νŽ˜μ΄μŠ€λ₯Ό μ œκ³΅ν•΄ 청크가 μ™„λ£Œλ˜λŠ” μ‹œμ μ„ μ •μ˜ν•  수 μžˆλ„λ‘ μ œκ³΅ν•΄μ€€λ‹€.

CompletionPolicy

청크 μ™„λ£Œ μ—¬λΆ€λ₯Ό κ²°μ •ν•  수 μžˆλŠ” κ²°μ •λ‘œμ§μ„ κ΅¬ν˜„ν•  수 μžˆλŠ” μΈν„°νŽ˜μ΄μŠ€λ‘œ, CompletionPolicy μΈν„°νŽ˜μ΄μŠ€μ˜ κ΅¬ν˜„μ²΄μ— λŒ€ν•΄μ„œ μ•Œμ•„ λ³Ό 것이닀.

package org.springframework.batch.repeat;

public interface CompletionPolicy {

  // 청크 μ™„λ£Œ μ—¬λΆ€μ˜ μƒνƒœλ₯Ό 기반으둜 κ²°μ • 둜직 μˆ˜ν–‰
	boolean isComplete(RepeatContext context, RepeatStatus result);

  // λ‚΄λΆ€ μƒνƒœλ₯Ό μ΄μš©ν•΄ 청크 μ™„λ£Œ μ—¬λΆ€ νŒλ‹¨
	boolean isComplete(RepeatContext context);

  // 청크의 μ‹œμž‘μ„ μ•Œ 수 μžˆλ„λ‘ 정책을 μ΄ˆκΈ°ν™”
	RepeatContext start(RepeatContext parent);

 	// 각 item이 μ²˜λ¦¬κ°€λ˜λ©΄ update λ©”μ„œλ“œκ°€ ν˜ΈμΆœλ˜λ©΄μ„œ λ‚΄λΆ€ μƒνƒœ κ°±μ‹ 
	void update(RepeatContext context);
}

직접 κ΅¬ν˜„ν•˜λŠ” 방법

CompletionPolicyλ₯Ό κ΅¬ν˜„ν•˜μ—¬ ν•„μˆ˜ λ©”μ„œλ“œλ“€μ„ 각각 μ•Œλ§žκ²Œ λ‘œμ§μ„ κ΅¬μ„±ν•˜λ©΄λœλ‹€.

public class RandomChunkSizePolicy implements CompletionPolicy {

    private int chunksize;
    private int totalProcessed;
    private Random random = new Random();

  	// 청크 μ™„λ£Œ μ—¬λΆ€μ˜ μƒνƒœλ₯Ό 기반으둜 κ²°μ • 둜직 μˆ˜ν–‰
    @Override
    public boolean isComplete(RepeatContext context, RepeatStatus result) {
        if (RepeatStatus.FINISHED == result) {
            return true;
        } else {
            return isComplete(context);
        }
    }

  	// λ‚΄λΆ€ μƒνƒœλ₯Ό μ΄μš©ν•΄ 청크 μ™„λ£Œ μ—¬λΆ€ νŒλ‹¨
    @Override
    public boolean isComplete(RepeatContext context) {
        return this.totalProcessed >= chunksize;
    }

  	// 청크의 μ‹œμž‘μ„ μ•Œ 수 μžˆλ„λ‘ 정책을 μ΄ˆκΈ°ν™”
    @Override
    public RepeatContext start(RepeatContext parent) {
        this.chunksize = random.nextInt(20);
        this.totalProcessed = 0;

        System.out.println("chunk size has been set to => " + this.chunksize);
        return parent;
    }

  	// 각 item이 μ²˜λ¦¬κ°€λ˜λ©΄ update λ©”μ„œλ“œκ°€ ν˜ΈμΆœλ˜λ©΄μ„œ λ‚΄λΆ€ μƒνƒœ κ°±μ‹ 
    @Override
    public void update(RepeatContext context) {
        this.totalProcessed++;
    }
}
chunk size has been set to => 5
>> current item = b784cda0-961f-4faf-9737-4334e774f0d1
>> current item = 9fc3ec22-5d54-4632-94e3-fcc28cc00260
>> current item = 53452739-0122-4f8b-a52a-667e37cbf908
>> current item = de9ad8ec-1bf7-40d4-b991-72b6699594f9
>> current item = 352104aa-ae63-4928-96e2-460b3b145d43
>> end itemWriter chunk 5
chunk size has been set to => 10
>> current item = 4116f3e9-cad3-4387-8f46-a871d825d7d9
>> current item = 3e1f58b7-a1fa-45bd-b336-2484d1d543f3
>> current item = a58e2401-37e8-402a-8ac3-daddaf2e91a0
>> current item = 01162200-fc7d-4580-8533-d6deab7f3c65
>> current item = 31280b14-49bf-4a03-b1d4-210585e4da40
>> current item = 91b7d269-8105-41dc-ae6a-1d682913c168
>> current item = abeead5f-751b-4862-aa37-4f32eb09439a
>> current item = b6bf1a92-56e8-433a-a0ea-935e101cad6f
>> current item = 84af0778-d6e1-4aef-bd8f-347b047bb276
>> current item = b86477f3-b997-4b04-a3ca-27f517c9170f
>> end itemWriter chunk 10
chunk size has been set to => 11

λ‹€μŒκ³Ό 같이 Random으둜 μ§€μ •λœ 수만큼 chunkκ°€ μˆ˜ν–‰λ˜λŠ” 것을 확인 ν•  수 μžˆλ‹€.

SimpleCompletionPolicy

κ°€μž₯ 기본적인 κ΅¬ν˜„μ²΄λ‘œ, 미리 ꡬ성해둔 μž„κ³—κ°’μ— λ„λ‹¬ν•˜λ©΄ 청크 μ™„λ£Œλ‘œ ν‘œμ‹œν•œλ‹€.

public class SimpleCompletionPolicy extends DefaultResultCompletionPolicy {

	public static final int DEFAULT_CHUNK_SIZE = 5;

	int chunkSize = 0;

	public SimpleCompletionPolicy() {
		this(DEFAULT_CHUNK_SIZE);
	}

	public SimpleCompletionPolicy(int chunkSize) {
		super();
		this.chunkSize = chunkSize;
	}
@Bean
public Step chunkStep() {
  return this.stepBuilderFactory.get("chunkStep")
                .<String, String> chunk(completionPolicy()) // completePolicy 호좜
                .reader(itemReader())
                .writer(itemWriter())
                .build();
}


@Bean
public CompletionPolicy completionPolicy() {
    // 처리된 ITEM 개수λ₯Ό μ„Έμ–΄, 이 κ°œμˆ˜κ°€ μž„κ³„κ°’μ— λ„λ‹¬ν•˜λ©΄ chunk μ™„λ£Œλ‘œ ν‘œμ‹œ
    SimpleCompletionPolicy simpleCompletionPolicy = new SimpleCompletionPolicy(1000);

    return simpleCompletionPolicy;
}

TimeoutTerminationPolicy

νƒ€μž„μ•„μ›ƒ 값을 ꡬ성해, 청크 λ‚΄μ—μ„œ 처리 μ‹œκ°„μ΄ μ§€μ •ν•œ μ‹œκ°„μ΄ λ„˜μœΌλ©΄ 청크가 μ™„λ£Œλœ κ²ƒμœΌλ‘œ κ°„μ£Όν•˜κ³ , λͺ¨λ“  νŠΈλžœμž­μ…˜ 처리λ₯Ό μ •μƒμ μœΌλ‘œ ν•œλ‹€λŠ” 것이닀. TimeoutTerminationPolicy 만으둜 청크 μ™„λ£Œ μ‹œμ μ„ κ²°μ •ν•˜λŠ” κ²½μš°λŠ” 거의 μ‘΄μž¬ν•˜μ§€ μ•ŠμœΌλ©°, CompositeCompletionPolicy의 μΌλΆ€λ‘œ μ‚¬μš©ν•˜λŠ” κ²½μš°κ°€ λ§Žλ‹€.

public class TimeoutTerminationPolicy extends CompletionPolicySupport {

	/**
	 * Default timeout value in milliseconds (the value equivalent to 30 seconds).
	 */
	public static final long DEFAULT_TIMEOUT = 30000L;

	private long timeout = DEFAULT_TIMEOUT;

	/**
	 * Default constructor.
	 */
	public TimeoutTerminationPolicy() {
		super();
	}

	/**
	 * Construct a {@link TimeoutTerminationPolicy} with the specified timeout
	 * value (in milliseconds).
	 * 
	 * @param timeout duration of the timeout.
	 */
	public TimeoutTerminationPolicy(long timeout) {
		super();
		this.timeout = timeout;
	}
    @Bean
    public CompletionPolicy timeoutCompletionPolicy() {
        TimeoutTerminationPolicy timeoutTerminationPolicy = new TimeoutTerminationPolicy(3);
        return timeoutTerminationPolicy;
    }

TimeoutTerminationPolicy둜 μˆ˜ν–‰ν•œ 경우 각 chunk λ‹¨μœ„λ₯Ό 확인해보면 λ‹€μŒκ³Ό 같이 제각각인 것을 λ³Ό μˆ˜μžˆλ‹€.

>> end itemWriter chunk 795
>> end itemWriter chunk 679
>> end itemWriter chunk 841
>> end itemWriter chunk 1153
>> end itemWriter chunk 1061
>> end itemWriter chunk 1916
>> end itemWriter chunk 1667
>> end itemWriter chunk 1719
>> end itemWriter chunk 931
>> end itemWriter chunk 1634
>> end itemWriter chunk 941
>> end itemWriter chunk 667
>> end itemWriter chunk 665
>> end itemWriter chunk 547
>> end itemWriter chunk 533
>> end itemWriter chunk 973
>> end itemWriter chunk 647
>> end itemWriter chunk 1632
>> end itemWriter chunk 676

CompositeCompletionPolicy

CompositeCompletionPolicyλŠ” 청크 μ™„λ£Œ μ—¬λΆ€λ₯Ό κ²°μ •ν•˜λŠ” μ—¬λŸ¬ 정책을 ν•¨κ»˜ ꡬ성할 수 μžˆλ‹€. ν¬ν•¨ν•˜κ³  μžˆλŠ” μ—¬λŸ¬ μ •μ±… 쀑 ν•˜λ‚˜λΌλ„ 청크 μ™„λ£ŒλΌκ³  νŒλ‹¨λ˜λ©΄ ν•΄λ‹Ή 청크가 μ™„λ£Œλœ κ²ƒμœΌλ‘œ ν‘œμ‹œν•œλ‹€.

		@Bean
    public CompletionPolicy compositeCompletionPolicy() {
        CompositeCompletionPolicy policy = new CompositeCompletionPolicy();

        // μ—¬λŸ¬ μ •μ±… μ„€μ •
        policy.setPolicies(
                new CompletionPolicy[]{
                        new TimeoutTerminationPolicy(3),
                        new SimpleCompletionPolicy(1000)
                }
        );
      	return policy;
    }

λ‹€μŒκ³Ό 같이 μˆ˜ν–‰ν•œ κ²½μš°μ—λŠ” chunk λ‹¨μœ„κ°€ 1000개λ₯Ό λ„˜μ–΄μ„  κ²½μš°κ°€ μ—†λŠ” 것을 확인할 수 μžˆλ‹€.

>> end itemWriter chunk 731
>> end itemWriter chunk 1000
>> end itemWriter chunk 690
>> end itemWriter chunk 1000
>> end itemWriter chunk 798
>> end itemWriter chunk 1000
>> end itemWriter chunk 980
>> end itemWriter chunk 838
>> end itemWriter chunk 850
>> end itemWriter chunk 1000
>> end itemWriter chunk 263
>> end itemWriter chunk 556
>> end itemWriter chunk 629
>> end itemWriter chunk 962
>> end itemWriter chunk 960
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 898
>> end itemWriter chunk 1000
>> end itemWriter chunk 900
>> end itemWriter chunk 798
>> end itemWriter chunk 215

Step Listener

μŠ€νƒ­κ³Ό 청크의 μ‹œμž‘κ³Ό λμ—μ„œ νŠΉμ • λ‘œμ§μ„ μ²˜λ¦¬ν•  수 있게 ν•΄μ€€λ‹€.

(StepListenerλŠ” λͺ¨λ“  μŠ€νƒ­ λ¦¬μŠ€λ„ˆκ°€ μƒμ†ν•˜λŠ” 마컀 μΈν„°νŽ˜μ΄μŠ€μ΄λ‹€.)

λͺ¨λ“  μˆ˜μ€€μ— λ¦¬μŠ€λ„ˆλ₯Ό μ μš©ν•΄ Job을 쀑단할 수 있으며, 일반적으둜 μ „μ²˜λ¦¬λ₯Ό μˆ˜ν–‰ν•˜κ±°λ‚˜ 이후 κ²°κ³Όλ₯Ό ν‰κ°€ν•˜κ±°λ‚˜, 일뢀 였λ₯˜μ²˜λ¦¬μ—λ„ μ‚¬μš©λœλ‹€.

StepExecutionListener

  • org.springframework.batch.core.StepExecutionListener

public interface StepExecutionListener extends StepListener {

	void beforeStep(StepExecution stepExecution);

  // Listenerκ°€ μŠ€ν…μ΄ λ°˜ν™˜ν•œ ExitStatusλ₯Ό Job에 μ „λ‹¬ν•˜κΈ° 전에 μˆ˜μ •ν•  수 있음.
	@Nullable
	ExitStatus afterStep(StepExecution stepExecution);
}

@BeforeStep, @AfterStep μ• λ„ˆν…Œμ΄μ…˜ 제곡

public class LoggingStepStartStopListener {

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName() + " μ‹œμž‘");
    }


    @AfterStep
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println(stepExecution.getStepName() + " μ’…λ£Œ");
        return stepExecution.getExitStatus();
    }
}
    @Bean
    public Step chunkStep() {
        return this.stepBuilderFactory.get("chunkStep")
                .<String, String> chunk(randomChunkSizePolicy())
                .reader(itemReader())
                .writer(itemWriter())
                .listener(new LoggingStepStartStopListener()) // Listener μ„€μ •
                .build();
    }

ChunkListener

public interface ChunkListener extends StepListener {

	static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";

	void beforeChunk(ChunkContext context);

	void afterChunk(ChunkContext context);

	void afterChunkError(ChunkContext context);
}

@BeforeChunk, @AfterChunk μ• λ„ˆν…Œμ΄μ…˜ 제곡

Step Flow

쑰건 둜직

μŠ€ν”„λ§ 배치의 Step은 StepBuilder의 .next() λ©”μ„œλ“œλ₯Ό μ‚¬μš©ν•΄ μ§€μ •ν•œ μˆœμ„œλŒ€λ‘œ μ‹€ν–‰λœλ‹€. 전이(transition)λ₯Ό ꡬ성해 결과에 λ”°λ₯Έ λ‹€λ₯Έ μˆœμ„œλ‘œ μ‹€ν–‰ν•˜λŠ” 것도 κ°€λŠ₯ν•˜λ‹€.

    @Bean
    public Job conditionalJob() {
        return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").to(failureStep())
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();
    }

μŠ€ν”„λ§ λ°°μΉ˜λŠ” 기쀀에 따라 λ‘κ°œμ˜ μ™€μΌλ“œ μΉ΄λ“œλ₯Ό ν—ˆμš©ν•œλ‹€.

  • * : 0개 μ΄μƒμ˜ 문자λ₯Ό μΌμΉ˜ν•˜λŠ” 것을 의미

    • C* : COMPLETE, CORRECT

  • ? : 1개의 문자λ₯Ό 일치 μ‹œν‚€λŠ” 것을 의미

    • ?AT : CAT, KATκ³Ό μΌμΉ˜ν•˜μ§€λ§Œ, THATκ³ΌλŠ” 뢈일치

JobExecutionDecider

Job μ‹€ν–‰ 정보(jobExecution)와 μŠ€νƒ­ 싀행정보( stepExecution)λ₯Ό 인자둜 λ°›μ•„ λͺ¨λ“  정보λ₯Ό μ΄μš©ν•΄ λ‹€μŒμ— 무엇을 μˆ˜ν–‰ν• μ§€μ— λŒ€ν•΄ κ²°μ •ν•  수 μžˆλ‹€.

public interface JobExecutionDecider {

	/**
	 * Strategy for branching an execution based on the state of an ongoing
	 * {@link JobExecution}. The return value will be used as a status to
	 * determine the next step in the job.
	 * 
	 * @param jobExecution a job execution
	 * @param stepExecution the latest step execution (may be {@code null})
	 * @return the exit status code
	 */
	FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution);

}
public class RandomDecider implements JobExecutionDecider {

    private Random random = new Random();

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (random.nextBoolean()) {
            return new FlowExecutionStatus(FlowExecutionStatus.COMPLETED.getName());
        } else {
            return new FlowExecutionStatus(FlowExecutionStatus.FAILED.getName());
        }
    }
}
    @Bean
    public Job conditionalJob() {
        return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .next(decider())
                .from(decider())
                .on("FAILED").to(failureStep())
                .from(decider()).on("*").to(successStep())
                .end()
                .build();
    }

Job μ’…λ£Œν•˜κΈ°

μŠ€ν”„λ§ λ°°μΉ˜μ—μ„œλŠ” Job을 μ’…λ£Œν•  λ•Œ μ•„λž˜ 3가지 μƒνƒœλ‘œ μ’…λ£Œν•  수 μžˆλ‹€.

μƒνƒœ
μ„€λͺ…

Completed(μ™„λ£Œ)

μŠ€ν”„λ§ 배치 μ²˜λ¦¬κ°€ μ„±κ³΅μ μœΌλ‘œ μ’…λ£ŒλμŒμ„ 의미 JobInstanceκ°€ Completed둜 μ’…λ£Œλ˜λ©΄ λ™μΌν•œ νŒŒλΌλ―Έν„°λ₯Ό μ‚¬μš©ν•΄ λ‹€μ‹œ μ‹€ν–‰ν•  수 μ—†λ‹€.

Failed(μ‹€νŒ¨)

작이 μ„±κ³΅μ μœΌλ‘œ μ™„λ£Œλ˜μ§€ μ•Šμ•˜μŒμ„ 의미 Failed μƒνƒœλ‘œ μ’…λ£Œλœ μž‘μ€ μŠ€ν”„λ§ 배치λ₯Ό μ‚¬μš©ν•΄ λ™μΌν•œ νŒŒλΌλ―Έν„°λ‘œ λ‹€μ‹œ μ‹€ν–‰ν•  수 μžˆλ‹€.

Stopped(쀑지)

Stopped μƒνƒœλ‘œ μ’…λ£Œλœ μž‘μ€ λ‹€μ‹œ μˆ˜ν–‰ κ°€λŠ₯ν•˜λ‹€. Job에 였λ₯˜κ°€ λ°œμƒν•˜μ§€ μ•Šμ•˜μ–΄λ„, μ€‘λ‹¨λœ μœ„μΉ˜μ—μ„œ μž‘μ„ λ‹€μ‹œ μ‹œμž‘ν•  수 μžˆλ‹€. μ‚¬λžŒμ˜ κ°œμž…μ΄ ν•„μš”ν•˜κ±°λ‚˜ λ‹€λ₯Έ 검사/μ²˜λ¦¬κ°€ ν•„μš”ν•œ 상황에 μœ μš©ν•˜λ‹€.

BatchStatusλ₯Ό νŒλ³„ν•  λ•Œ, ExitStatusλ₯Ό ν‰κ°€ν•˜λ©΄μ„œ μ‹λ³„λœλ‹€. ExitStatusλŠ” μŠ€ν…, 청크, μž‘μ—μ„œ λ°˜ν™˜λ  수 있으며, BatchStatusλŠ” StepExecution μ΄λ‚˜ JobExecution 내에 λ³΄κ΄€λ˜λ©°, JobRepository에 μ €μž₯λœλ‹€.

Completed μƒνƒœλ‘œ μ’…λ£Œν•˜κΈ°

.end() λ©”μ„œλ“œ μ‚¬μš©

return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").end()
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();

BATCH_STEP_EXECUTION ν…Œμ΄λΈ”μ— μŠ€ν…μ΄ λ°˜ν™˜ν•œ ExitStatusκ°€ μ €μž₯되며, μŠ€ν…μ΄ λ°˜ν™˜ν•œ μƒνƒœκ°€ 무엇이든 상관없이 BATCH_JOB_EXECUTION에 COMPLETEDκ°€ μ €μž₯λœλ‹€.

Failed μƒνƒœλ‘œ μ’…λ£Œν•˜κΈ°

fail() λ©”μ„œλ“œ μ‚¬μš©

return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").fail()
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();

μ—¬κΈ°μ„œ firstStep() 이 FAILED둜 λλ‚˜λ©΄, JobRepository 에 ν•΄λ‹Ή Job이 μ‹€νŒ¨ν•œ κ²ƒμœΌλ‘œ μ €μž₯되며, λ™μΌν•œ νŒŒλΌλ―Έν„°λ₯Ό μ‚¬μš©ν•΄ λ‹€μ‹œ μ‹€ν–‰ν•  수 μžˆλ‹€.

Stopped μƒνƒœλ‘œ μ’…λ£Œν•˜κΈ°

.stopAndRestart() λ©”μ„œλ“œλ‘œ μž‘μ„ λ‹€μ‹œ μˆ˜ν–‰ν•œλ‹€λ©΄, 미리 ꡬ성해둔 μŠ€ν…λΆ€ν„° μ‹œμž‘λœλ‹€. μ•„λž˜ μ˜ˆμ œμ—μ„œλŠ” μž¬μˆ˜ν–‰μ‹œ successStep()λΆ€ν„° μˆ˜ν–‰λ˜λŠ”κ²ƒμ„ λ³Ό 수 μžˆλ‹€.

return this.jobBuilderFactory.get("conditionalJob")
                .start(firstStep())
                .on("FAILED").stopAndRestart(successStep())
                .from(firstStep()).on("*").to(successStep())
                .end()
                .build();

Last updated

Was this helpful?