Step
μ μ€μ§μ μΈ λ°°μΉ μ²λ¦¬λ₯Ό μ μνκ³ μ μ΄νλ λ° νμν λͺ¨λ μ 보 κ° λ€μ΄μλ λλ©μΈ κ°μ²΄λ‘, Job
μ μ²λ¦¬νλ μ€μ§μ μΈ λ¨μλ‘ μ°μΈλ€.(Job:Step = 1:M )
Stepμ Jobμ ꡬμ±νλ λ
립λ μμ
λ¨μ
μμ°¨μ μΌλ‘ λ°°μΉ μ²λ¦¬ μν
Stepμ λͺ¨λ λ¨μ μμ
μ μ‘°κ°μΌλ‘ μ체μ μΌλ‘ μ
λ ₯, μ²λ¦¬κΈ°, μΆλ ₯μ λ€λ£¬λ€.
νΈλμμ
μ Step λ΄λΆμμ μ΄λ£¨μ΄μ§
org.springframework.batch.core.Step
StepExecution
Step
μ μ€ν μ 보λ₯Ό λ΄λ κ°μ²΄λ‘, κ°κ°μ Step
μ΄ μ€νλ λλ§λ€ StepExecution
μ΄ μμ±λλ€.
Copy public class StepExecution extends Entity {
private final JobExecution jobExecution; // νμ¬ JobExecution μ 보
private final String stepName; // Step μ΄λ¦
private volatile BatchStatus status; // Stepμ μ€ν μν(COMPLETED, STARTING, STARTED ...)
private volatile int readCount; // μ±κ³΅μ μΌλ‘ μ½μ λ μ½λ μ
private volatile int writeCount; // μ±κ³΅μ μΌλ‘ μ΄ λ μ½λ μ
private volatile int commitCount; // Stepμ μ€νμ λν΄ μ»€λ°λ νΈλμμ
μ
private volatile int rollbackCount; // Stepμ μ€νμ λν΄ λ‘€λ°±λ νΈλμμ
μ
private volatile int readSkipCount; // μ½κΈ°μ μ€ν¨ν΄ 건λ λ λ μ½λ μ
private volatile int processSkipCount; // νλ‘μΈμ€κ° μ€ν¨ν΄ 건λ λ λ μ½λ μ
private volatile int writeSkipCount;// μ°κΈ°μ μ€ν¨ν΄ 건λ λ λ μ½λ μ
private volatile Date startTime; // Stepμ΄ μ€νλ μκ°(null == μμλμ§ μμ)
private volatile Date endTime; // Stepμ μ€ν μ±κ³΅ μ¬λΆμ κ΄κ³ μμ΄ λλ μκ°
private volatile Date lastUpdated; // λ§μ§λ§μΌλ‘ μμ λ μκ°
private volatile ExecutionContext executionContext; // Step μ€ν μ¬μ΄μ μ μ§ν΄μΌνλ μ¬μ©μ λ°μ΄ν°
private volatile ExitStatus exitStatus; // Step μ€ν κ²°κ³Όμ λν μν κ°(UNKOWN, EXECUTING, COMPLETE, ...)
private volatile boolean terminateOnly; // Job μ€ν μ€μ§ μ¬λΆ
private volatile int filterCount; // μ€νμμ νν°λ§λ λ μ½λ μ
private transient volatile List<Throwable> failureExceptions; // Step μ€νμ€ λ°μν μμΈ λ¦¬μ€νΈ
...
}
Tasklet
κΈ°λ°
Tasklet
μ μμμ Step
μ μ€νν λ νλμ μμ
μ μ²λ¦¬νλ λ°©μ
μ½κΈ°, μ²λ¦¬, μ°κΈ°λ‘ λλ λ°©μμ΄ μ²ν¬ μ§ν₯ νλ‘μΈμ±μ΄λΌλ©΄ μ΄λ₯Ό λ¨μΌ μμ
μΌλ‘ λ§λλ κ°λ
μ΄ Tasklet
νΈλμμ
λ΄μμ λ‘μ§μ΄ μ€νλ μ μλ κΈ°λ₯μ μ 곡νλ μ λ΅ μΈν°νμ΄μ€
org.springframework.batch.core.step.tasklet.Tasklet
Copy public interface Tasklet {
// λ΄λΆμ μνλ λ¨μΌ μμ
μ ꡬννκ³ λλ©΄, RepeatStatus.FINISHED λ°νν μμ
μ΄ κ³μλλ©΄ RepeatStatus.CONTINUABLE λ°ν
@Nullable
RepeatStatus execute(StepContribution var1, ChunkContext var2) throws Exception;
}
Adapter
CallableTaskletAdapter
org.springframework.batch.core.step.tasklet.CallableTaskletAdapter
Callable<V>
μΈν°νμ΄μ€μ ꡬν체λ₯Ό ꡬμ±ν μ μκ² ν΄μ£Όλ Adapter
리ν΄κ°μ΄ μ‘΄μ¬νκΈ° λλ¬Έμ 곡μ κ°μ²΄λ₯Ό μ¬μ©νμ§ μλλ€.
μ²΄ν¬ μμΈλ₯Ό μΈλΆλ‘ λμ§ μ μλ€.
Stepμ νΉμ λ‘μ§μ ν΄λΉ Stepμ΄ μ€νλλ μ€λ λκ° μλ λ€λ₯Έ μ€λ λμμ μ€ννκ³ μΆμ λ μ¬μ©
Copy @EnableBatchProcessing
@SpringBootApplication
public class CallableTaskletConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job callableJob() {
return this.jobBuilderFactory.get("callableJob")
.start(callableStep())
.build();
}
@Bean
public Step callableStep() {
return this.stepBuilderFactory.get("callableStep")
.tasklet(callableTasklet())
.build();
}
@Bean
public Callable<RepeatStatus> callableObject() {
return () -> {
System.out.println("This was executed in another thread");
return RepeatStatus.FINISHED;
};
}
@Bean
public CallableTaskletAdapter callableTasklet() {
// CallableTaskletAdapterλ Stepμ΄ μ€νλλ μ€λ λμ λ³κ°μ μ€λ λμμ μ€νλμ§λ§
// Stepκ³Ό λ³λ ¬λ‘ μ€νλλ κ²μ μλλ€.
CallableTaskletAdapter callableTaskletAdapter = new CallableTaskletAdapter();
callableTaskletAdapter.setCallable(callableObject());
return callableTaskletAdapter;
}
}
MethodInvokingTaskletAdapter
org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter
1
λ€λ₯Έ ν΄λμ€ λ΄μ λ©μλλ₯Ό Taskletμ²λΌ μ€ν κ°λ₯
Copy @EnableBatchProcessing
@Configuration
public class MethodInvokingTaskletConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job methodInvokingJob() {
return this.jobBuilderFactory.get("methodInvokingJob")
.start(methodInvokingStep())
.build();
}
@Bean
public Step methodInvokingStep() {
return this.stepBuilderFactory.get("methodInvokingStep")
.tasklet(methodInvokingTasklet())
.build();
}
@StepScope
@Bean
public MethodInvokingTaskletAdapter methodInvokingTasklet(
@Value("#{jobParameters['message']}") String message) {
// λ€λ₯Έ ν΄λμ€ λ΄μ λ©μλλ₯Ό Taskletμ²λΌ μ€ν κ°λ₯
MethodInvokingTaskletAdapter methodInvokingTaskletAdapter = new MethodInvokingTaskletAdapter();
methodInvokingTaskletAdapter.setTargetObject(customerService()); // νΈμΆν λ©μλκ° μλ κ°μ²΄
methodInvokingTaskletAdapter.setTargetMethod("serviceMethod"); // νΈμΆν λ©μλλͺ
methodInvokingTaskletAdapter.setArguments(new String[] {message});
return methodInvokingTaskletAdapter;
}
@Bean
public CustomerService customerService() {
return new CustomerService();
}
}
TargetMethodλ ExitStatus.COMPLETED
defaultμ΄λ©°, ExitStatus
λ₯Ό λ°ννλ©΄ λ©μλκ° λ°νν κ°μ΄ Taskletμμ λ°νλλ€.
SystemCommandTasklet
org.springframework.batch.core.step.tasklet.SystemCommandTasklet
μμ€ν
λͺ
λ Ήμ μ€νν λ μ¬μ©νλ©°, μ§μ ν μμ€ν
λͺ
λ Ήμ λΉλκΈ°λ‘ μ€ννλ€.
Copy @EnableBatchProcessing
@Configuration
public class SystemCommandConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job systemCommandJob() {
return this.jobBuilderFactory.get("systemCommandJob")
.start(systemCommandStep())
.build();
}
@Bean
public Step systemCommandStep() {
return this.stepBuilderFactory.get("systemCommandStep")
.tasklet(systemCommandTasklet())
.build();
}
@Bean
public SystemCommandTasklet systemCommandTasklet() {
SystemCommandTasklet systemCommandTasklet = new SystemCommandTasklet();
// λͺ
λ Ήμ΄
systemCommandTasklet.setCommand("touch tmp.txt");
systemCommandTasklet.setTimeout(5000);
// Jobμ΄ λΉμ μμ μΌλ‘ μ’
λ£λ λ μμ€ν
νλ‘μΈμ€μ κ΄λ ¨λ μ€λ λλ₯Ό κ°μ μ’
λ£ν μ§ μ¬λΆ
systemCommandTasklet.setInterruptOnCancel(true);
// μμ
λλ ν 리 μ€μ
systemCommandTasklet.setWorkingDirectory("/Users/dh0023/Develop/gitbook/TIL");
// μμ€ν
λ°ν μ½λλ₯Ό μ€νλ§ λ°°μΉ μν κ°μΌλ‘ 맀ν
systemCommandTasklet.setSystemProcessExitCodeMapper(touchCodeMapper());
// λΉλκΈ°λ‘ μ€ννλ μμ€ν
λͺ
λ Ήμ μ£ΌκΈ°μ μΌλ‘ μλ£ μ¬λΆ νμΈ, μλ£ μ¬λΆλ₯Ό νμΈνλ μ£ΌκΈ°(default=1μ΄)
systemCommandTasklet.setTerminationCheckInterval(5000);
// μμ€ν
λͺ
λ Ήμ μ€ννλ TaskExecutor κ΅¬μ± κ°λ₯
// λ¬Έμ κ° λ°μνλ©΄ λ½μ΄ λ°μν μ μμΌλ―λ‘, λκΈ°λ°©μμΌλ‘ ꡬννμ§ μλ κ²μ΄ μ’λ€.
systemCommandTasklet.setTaskExecutor(new SimpleAsyncTaskExecutor());
// λͺ
λ Ήμ μ€ννκΈ° μ μ μ€μ νλ νκ²½ νλΌλ―Έν° λͺ©λ‘
systemCommandTasklet.setEnvironmentParams(
new String[]{"BATCH_HOME=/Users/dh0023/Develop/spring/spring-practice/batch-practice"});
return systemCommandTasklet;
}
@Bean
public SimpleSystemProcessExitCodeMapper touchCodeMapper() {
// λ°νλ μμ€ν
μ½λκ° 0μ΄ ExitStatus.FINISHED
// 0μ΄ μλλ©΄ ExitStatus.FAILED
return new SimpleSystemProcessExitCodeMapper();
}
@Bean
public ConfigurableSystemProcessExitCodeMapper configurableSystemProcessExitCodeMapper() {
// μΌλ°μ μΈ κ΅¬μ± λ°©λ²μΌλ‘ 맀ν ꡬμ±μ ν μ μμ.
ConfigurableSystemProcessExitCodeMapper mapper = new ConfigurableSystemProcessExitCodeMapper();
Map<Object, ExitStatus> mappings = new HashMap<Object, ExitStatus>() {
{
put(0, ExitStatus.COMPLETED);
put(1, ExitStatus.FAILED);
put(2, ExitStatus.EXECUTING);
put(3, ExitStatus.NOOP);
put(4, ExitStatus.UNKNOWN);
put(ConfigurableSystemProcessExitCodeMapper.ELSE_KEY, ExitStatus.UNKNOWN);
}
};
mapper.setMappings(mappings);
return mapper;
}
}
SimpleSystemProcessExitCodeMapper
Copy public class SimpleSystemProcessExitCodeMapper implements SystemProcessExitCodeMapper {
@Override
public ExitStatus getExitStatus(int exitCode) {
if (exitCode == 0) {
return ExitStatus.COMPLETED;
} else {
return ExitStatus.FAILED;
}
}
}
Chunk
κΈ°λ°
Chunkλ μμ΄ν
μ΄ νΈλμμ
μ commitλλ μ λ₯Ό λ§νλ€.
μ¦, μ²ν¬ μ§ν₯ μ²λ¦¬λ ν λ²μ νλμ© λ°μ΄ν°λ₯Ό μ½μ΄ ChunkλΌλ λ©μ΄λ¦¬λ₯Ό λ§λ λ€, Chunk λ¨μλ‘ νΈλμμ
μ λ€λ£¨λ κ²μ μλ―Έ νλ€.
Chunk μ§ν₯ νλ‘μΈμ±μ 1000κ°μ λ°μ΄ν°μ λν΄ λ°°μΉ λ‘μ§μ μ€ννλ€κ³ κ°μ νλ©΄, Chunk λ¨μλ‘ λλμ§ μμμ κ²½μ°μλ νκ°λ§ μ€ν¨ν΄λ μ±κ³΅ν 999κ°μ λ°μ΄ν°κ° λ‘€λ°±λλ€. Chunk λ¨μλ₯Ό 10μΌλ‘ νλ€λ©΄, μμ
μ€μ λ€λ₯Έ Chunkλ μν₯μ λ°μ§ μλλ€.
μ΄λ, Chunkλ μ»€λ° κ°κ²©(commit interval)μ μν΄ μ μλκ³ μννλ―λ‘, μ»€λ° κ°κ²©μ λ°λΌ μ±λ₯μ΄ λ¬λΌμ§ μ μλ€. μ΅μμ μ±λ₯μ μ»κΈ° μν΄μλ μ»€λ° κ°κ²© μ€μ μ΄ μ€μνλ€.
ItemReader
, ItemProcessor
, ItemWriter
3λ¨κ³λ‘ λΉμ§λμ€ λ‘μ§μ λΆλ¦¬ν΄ μν μ λͺ
ννκ² λΆλ¦¬ν μ μλ€.
λΉμ¦λμ€ λ‘μ§ λΆλ¦¬
μ½μ΄μ¨ λ°°μΉ λ°μ΄ν°μ μ°μ¬μ§ λ°μ΄ν° νμ
μ΄ λ€λ₯Έ κ²½μ°μ λν λμ
κ° Chunkλ μ체 νΈλμμ
μΌλ‘ μ€νλλ©°, μ²λ¦¬μ μ€ν¨νλ©΄ μ±κ³΅ν νΈλμμ
μ΄νλΆν° λ€μ μμ κ°λ₯
κ·Έλ¬λ―λ‘ μ½μ΄μ¨ λ°°μΉμ λ°μ΄ν°μ μ μ₯ν λ°μ΄ν° νμ
μ΄ λ€λ₯Έ κ²½μ°μ λμν μ μλ€.
ItemReader
Step
μ λμμ΄ λλ λ°°μΉ λ°μ΄ν°(File, Xml, DB λ±)λ₯Ό μ½μ΄μ€λ μΈν°νμ΄μ€
org.springframework.batch.item.ItemReader<T>
Copy public interface ItemReader<T> {
// read λ©μλμ λ°ν νμ
μ T(μ λλ¦)μΌλ‘ ꡬννμ¬ μ§μ νμ
μ μ§μ ν μ μμ
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
ItemProcessor
ItemReader
λ‘ μ½μ΄ μ¨ λ°°μΉ λ°μ΄ν°λ₯Ό λ³ννλ μν μ μν
ItemProcessor
λ λ‘μ§ μ²λ¦¬λ§ μννμ¬ μν μ λΆλ¦¬νκ³ , λͺ
νν input/outputμ ItemProcessor
λ‘ κ΅¬νν΄λμΌλ©΄ λ μ§κ΄μ μΈ μ½λκ° λ κ²μ΄λ€.
org.springframework.batch.item.ItemProcessor<T>
Copy public interface ItemProcessor<I, O> {
@Nullable
O process(@NonNull I var1) throws Exception;
}
ItemWriter
λ°°μΉ λ°μ΄ν°(DB, File λ±)λ₯Ό μ μ₯ νλ€.
org.springframework.batch.item.ItemWriter<T>
Copy public interface ItemWriter<T> {
// T(μ λ€λ¦)μΌλ‘ μ§μ ν νμ
μ List 맀κ°λ³μλ‘ λ°λλ€.
void write(List<? extends T> var1) throws Exception;
}
리μ€νΈμ λ°μ΄ν° μλ μ€μ ν *μ²ν¬(Chunk) λ¨μλ‘ λΆλ¬μ¨λ€.
μ²ν¬ κΈ°λ° Job μμ
Copy @EnableBatchProcessing
@Configuration
public class ChunkBasedConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* μ€μ μ€νλ§ λ°°μΉ Job μμ±
*/
@Bean
public Job chunkBasedJob() {
return this.jobBuilderFactory.get("chunkBasedJob")
.start(chunkStep())
.build();
}
@Bean
public Step chunkStep() {
return this.stepBuilderFactory.get("chunkStep")
.<String, String> chunk(1000) // chunk κΈ°λ°, 컀λ°κ°κ²© 1000
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public ListItemReader<String> itemReader(){
List<String> items = new ArrayList<>(100000);
for (int i = 0; i < 100000; i++) {
items.add(UUID.randomUUID().toString());
}
return new ListItemReader<>(items);
}
@Bean
public ItemWriter<String> itemWriter() {
return items -> {
for (String item : items) {
System.out.println(">> current item = " + item);
}
System.out.println(">> end itemWriter chunk " + items.size());
};
}
}
μΌλ°μ μΌλ‘λ μμ κ°μ΄ 컀λ°κ°κ²©μ νλ μ½λ©ν΄ ν¬κΈ°λ₯Ό μ μνμ§λ§, ν¬κΈ°κ° λμΌνμ§ μμ μ²ν¬λ₯Ό μ²λ¦¬ν΄μΌνλ κ²½μ°λ μλ€. μ€νλ§ λ°°μΉλ org.springframework.batch.repeat.CompletionPolicy
μΈν°νμ΄μ€λ₯Ό μ κ³΅ν΄ μ²ν¬κ° μλ£λλ μμ μ μ μν μ μλλ‘ μ 곡ν΄μ€λ€.
CompletionPolicy
μ²ν¬ μλ£ μ¬λΆλ₯Ό κ²°μ ν μ μλ κ²°μ λ‘μ§μ ꡬνν μ μλ μΈν°νμ΄μ€λ‘, CompletionPolicy
μΈν°νμ΄μ€μ ꡬν체μ λν΄μ μμ λ³Ό κ²μ΄λ€.
Copy package org.springframework.batch.repeat;
public interface CompletionPolicy {
// μ²ν¬ μλ£ μ¬λΆμ μνλ₯Ό κΈ°λ°μΌλ‘ κ²°μ λ‘μ§ μν
boolean isComplete(RepeatContext context, RepeatStatus result);
// λ΄λΆ μνλ₯Ό μ΄μ©ν΄ μ²ν¬ μλ£ μ¬λΆ νλ¨
boolean isComplete(RepeatContext context);
// μ²ν¬μ μμμ μ μ μλλ‘ μ μ±
μ μ΄κΈ°ν
RepeatContext start(RepeatContext parent);
// κ° itemμ΄ μ²λ¦¬κ°λλ©΄ update λ©μλκ° νΈμΆλλ©΄μ λ΄λΆ μν κ°±μ
void update(RepeatContext context);
}
μ§μ ꡬννλ λ°©λ²
CompletionPolicy
λ₯Ό ꡬννμ¬ νμ λ©μλλ€μ κ°κ° μλ§κ² λ‘μ§μ ꡬμ±νλ©΄λλ€.
Copy public class RandomChunkSizePolicy implements CompletionPolicy {
private int chunksize;
private int totalProcessed;
private Random random = new Random();
// μ²ν¬ μλ£ μ¬λΆμ μνλ₯Ό κΈ°λ°μΌλ‘ κ²°μ λ‘μ§ μν
@Override
public boolean isComplete(RepeatContext context, RepeatStatus result) {
if (RepeatStatus.FINISHED == result) {
return true;
} else {
return isComplete(context);
}
}
// λ΄λΆ μνλ₯Ό μ΄μ©ν΄ μ²ν¬ μλ£ μ¬λΆ νλ¨
@Override
public boolean isComplete(RepeatContext context) {
return this.totalProcessed >= chunksize;
}
// μ²ν¬μ μμμ μ μ μλλ‘ μ μ±
μ μ΄κΈ°ν
@Override
public RepeatContext start(RepeatContext parent) {
this.chunksize = random.nextInt(20);
this.totalProcessed = 0;
System.out.println("chunk size has been set to => " + this.chunksize);
return parent;
}
// κ° itemμ΄ μ²λ¦¬κ°λλ©΄ update λ©μλκ° νΈμΆλλ©΄μ λ΄λΆ μν κ°±μ
@Override
public void update(RepeatContext context) {
this.totalProcessed++;
}
}
Copy chunk size has been set to => 5
>> current item = b784cda0-961f-4faf-9737-4334e774f0d1
>> current item = 9fc3ec22-5d54-4632-94e3-fcc28cc00260
>> current item = 53452739-0122-4f8b-a52a-667e37cbf908
>> current item = de9ad8ec-1bf7-40d4-b991-72b6699594f9
>> current item = 352104aa-ae63-4928-96e2-460b3b145d43
>> end itemWriter chunk 5
chunk size has been set to => 10
>> current item = 4116f3e9-cad3-4387-8f46-a871d825d7d9
>> current item = 3e1f58b7-a1fa-45bd-b336-2484d1d543f3
>> current item = a58e2401-37e8-402a-8ac3-daddaf2e91a0
>> current item = 01162200-fc7d-4580-8533-d6deab7f3c65
>> current item = 31280b14-49bf-4a03-b1d4-210585e4da40
>> current item = 91b7d269-8105-41dc-ae6a-1d682913c168
>> current item = abeead5f-751b-4862-aa37-4f32eb09439a
>> current item = b6bf1a92-56e8-433a-a0ea-935e101cad6f
>> current item = 84af0778-d6e1-4aef-bd8f-347b047bb276
>> current item = b86477f3-b997-4b04-a3ca-27f517c9170f
>> end itemWriter chunk 10
chunk size has been set to => 11
λ€μκ³Ό κ°μ΄ RandomμΌλ‘ μ§μ λ μλ§νΌ chunkκ° μνλλ κ²μ νμΈ ν μ μλ€.
SimpleCompletionPolicy
κ°μ₯ κΈ°λ³Έμ μΈ κ΅¬ν체λ‘, 미리 ꡬμ±ν΄λ μκ³κ°μ λλ¬νλ©΄ μ²ν¬ μλ£λ‘ νμνλ€.
Copy public class SimpleCompletionPolicy extends DefaultResultCompletionPolicy {
public static final int DEFAULT_CHUNK_SIZE = 5;
int chunkSize = 0;
public SimpleCompletionPolicy() {
this(DEFAULT_CHUNK_SIZE);
}
public SimpleCompletionPolicy(int chunkSize) {
super();
this.chunkSize = chunkSize;
}
Copy @Bean
public Step chunkStep() {
return this.stepBuilderFactory.get("chunkStep")
.<String, String> chunk(completionPolicy()) // completePolicy νΈμΆ
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public CompletionPolicy completionPolicy() {
// μ²λ¦¬λ ITEM κ°μλ₯Ό μΈμ΄, μ΄ κ°μκ° μκ³κ°μ λλ¬νλ©΄ chunk μλ£λ‘ νμ
SimpleCompletionPolicy simpleCompletionPolicy = new SimpleCompletionPolicy(1000);
return simpleCompletionPolicy;
}
TimeoutTerminationPolicy
νμμμ κ°μ ꡬμ±ν΄, μ²ν¬ λ΄μμ μ²λ¦¬ μκ°μ΄ μ§μ ν μκ°μ΄ λμΌλ©΄ μ²ν¬κ° μλ£λ κ²μΌλ‘ κ°μ£Όνκ³ , λͺ¨λ νΈλμμ
μ²λ¦¬λ₯Ό μ μμ μΌλ‘ νλ€λ κ²μ΄λ€. TimeoutTerminationPolicy
λ§μΌλ‘ μ²ν¬ μλ£ μμ μ κ²°μ νλ κ²½μ°λ κ±°μ μ‘΄μ¬νμ§ μμΌλ©°, CompositeCompletionPolicy
μ μΌλΆλ‘ μ¬μ©νλ κ²½μ°κ° λ§λ€.
Copy public class TimeoutTerminationPolicy extends CompletionPolicySupport {
/**
* Default timeout value in milliseconds (the value equivalent to 30 seconds).
*/
public static final long DEFAULT_TIMEOUT = 30000L;
private long timeout = DEFAULT_TIMEOUT;
/**
* Default constructor.
*/
public TimeoutTerminationPolicy() {
super();
}
/**
* Construct a {@link TimeoutTerminationPolicy} with the specified timeout
* value (in milliseconds).
*
* @param timeout duration of the timeout.
*/
public TimeoutTerminationPolicy(long timeout) {
super();
this.timeout = timeout;
}
Copy @Bean
public CompletionPolicy timeoutCompletionPolicy() {
TimeoutTerminationPolicy timeoutTerminationPolicy = new TimeoutTerminationPolicy(3);
return timeoutTerminationPolicy;
}
TimeoutTerminationPolicy
λ‘ μνν κ²½μ° κ° chunk λ¨μλ₯Ό νμΈν΄λ³΄λ©΄ λ€μκ³Ό κ°μ΄ μ κ°κ°μΈ κ²μ λ³Ό μμλ€.
Copy >> end itemWriter chunk 795
>> end itemWriter chunk 679
>> end itemWriter chunk 841
>> end itemWriter chunk 1153
>> end itemWriter chunk 1061
>> end itemWriter chunk 1916
>> end itemWriter chunk 1667
>> end itemWriter chunk 1719
>> end itemWriter chunk 931
>> end itemWriter chunk 1634
>> end itemWriter chunk 941
>> end itemWriter chunk 667
>> end itemWriter chunk 665
>> end itemWriter chunk 547
>> end itemWriter chunk 533
>> end itemWriter chunk 973
>> end itemWriter chunk 647
>> end itemWriter chunk 1632
>> end itemWriter chunk 676
CompositeCompletionPolicy
CompositeCompletionPolicy
λ μ²ν¬ μλ£ μ¬λΆλ₯Ό κ²°μ νλ μ¬λ¬ μ μ±
μ ν¨κ» ꡬμ±ν μ μλ€. ν¬ν¨νκ³ μλ μ¬λ¬ μ μ±
μ€ νλλΌλ μ²ν¬ μλ£λΌκ³ νλ¨λλ©΄ ν΄λΉ μ²ν¬κ° μλ£λ κ²μΌλ‘ νμνλ€.
Copy @Bean
public CompletionPolicy compositeCompletionPolicy() {
CompositeCompletionPolicy policy = new CompositeCompletionPolicy();
// μ¬λ¬ μ μ±
μ€μ
policy.setPolicies(
new CompletionPolicy[]{
new TimeoutTerminationPolicy(3),
new SimpleCompletionPolicy(1000)
}
);
return policy;
}
λ€μκ³Ό κ°μ΄ μνν κ²½μ°μλ chunk λ¨μκ° 1000κ°λ₯Ό λμ΄μ κ²½μ°κ° μλ κ²μ νμΈν μ μλ€.
Copy >> end itemWriter chunk 731
>> end itemWriter chunk 1000
>> end itemWriter chunk 690
>> end itemWriter chunk 1000
>> end itemWriter chunk 798
>> end itemWriter chunk 1000
>> end itemWriter chunk 980
>> end itemWriter chunk 838
>> end itemWriter chunk 850
>> end itemWriter chunk 1000
>> end itemWriter chunk 263
>> end itemWriter chunk 556
>> end itemWriter chunk 629
>> end itemWriter chunk 962
>> end itemWriter chunk 960
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 898
>> end itemWriter chunk 1000
>> end itemWriter chunk 900
>> end itemWriter chunk 798
>> end itemWriter chunk 215
Step Listener
μ€νκ³Ό μ²ν¬μ μμκ³Ό λμμ νΉμ λ‘μ§μ μ²λ¦¬ν μ μκ² ν΄μ€λ€.
(StepListener
λ λͺ¨λ μ€ν 리μ€λκ° μμνλ λ§μ»€ μΈν°νμ΄μ€μ΄λ€.)
λͺ¨λ μμ€μ 리μ€λλ₯Ό μ μ©ν΄ Jobμ μ€λ¨ν μ μμΌλ©°, μΌλ°μ μΌλ‘ μ μ²λ¦¬λ₯Ό μννκ±°λ μ΄ν κ²°κ³Όλ₯Ό νκ°νκ±°λ, μΌλΆ μ€λ₯μ²λ¦¬μλ μ¬μ©λλ€.
StepExecutionListener
org.springframework.batch.core.StepExecutionListener
Copy public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
// Listenerκ° μ€ν
μ΄ λ°νν ExitStatusλ₯Ό Jobμ μ λ¬νκΈ° μ μ μμ ν μ μμ.
@Nullable
ExitStatus afterStep(StepExecution stepExecution);
}
@BeforeStep
, @AfterStep
μ λν
μ΄μ
μ 곡
Copy public class LoggingStepStartStopListener {
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " μμ");
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println(stepExecution.getStepName() + " μ’
λ£");
return stepExecution.getExitStatus();
}
}
Copy @Bean
public Step chunkStep() {
return this.stepBuilderFactory.get("chunkStep")
.<String, String> chunk(randomChunkSizePolicy())
.reader(itemReader())
.writer(itemWriter())
.listener(new LoggingStepStartStopListener()) // Listener μ€μ
.build();
}
ChunkListener
Copy public interface ChunkListener extends StepListener {
static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
@BeforeChunk
, @AfterChunk
μ λν
μ΄μ
μ 곡
Step Flow
쑰건 λ‘μ§
μ€νλ§ λ°°μΉμ Stepμ StepBuilder
μ .next()
λ©μλλ₯Ό μ¬μ©ν΄ μ§μ ν μμλλ‘ μ€νλλ€. μ μ΄(transition)λ₯Ό ꡬμ±ν΄ κ²°κ³Όμ λ°λ₯Έ λ€λ₯Έ μμλ‘ μ€ννλ κ²λ κ°λ₯νλ€.
Copy @Bean
public Job conditionalJob() {
return this.jobBuilderFactory.get("conditionalJob")
.start(firstStep())
.on("FAILED").to(failureStep())
.from(firstStep()).on("*").to(successStep())
.end()
.build();
}
μ€νλ§ λ°°μΉλ κΈ°μ€μ λ°λΌ λκ°μ μμΌλ μΉ΄λλ₯Ό νμ©νλ€.
*
: 0κ° μ΄μμ λ¬Έμλ₯Ό μΌμΉνλ κ²μ μλ―Έ
?
: 1κ°μ λ¬Έμλ₯Ό μΌμΉ μν€λ κ²μ μλ―Έ
?AT
: CAT, KATκ³Ό μΌμΉνμ§λ§, THATκ³Όλ λΆμΌμΉ
JobExecutionDecider
Job μ€ν μ 보(jobExecution
)μ μ€ν μ€νμ 보( stepExecution
)λ₯Ό μΈμλ‘ λ°μ λͺ¨λ μ 보λ₯Ό μ΄μ©ν΄ λ€μμ 무μμ μνν μ§μ λν΄ κ²°μ ν μ μλ€.
Copy public interface JobExecutionDecider {
/**
* Strategy for branching an execution based on the state of an ongoing
* {@link JobExecution}. The return value will be used as a status to
* determine the next step in the job.
*
* @param jobExecution a job execution
* @param stepExecution the latest step execution (may be {@code null})
* @return the exit status code
*/
FlowExecutionStatus decide(JobExecution jobExecution, @Nullable StepExecution stepExecution);
}
Copy public class RandomDecider implements JobExecutionDecider {
private Random random = new Random();
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
if (random.nextBoolean()) {
return new FlowExecutionStatus(FlowExecutionStatus.COMPLETED.getName());
} else {
return new FlowExecutionStatus(FlowExecutionStatus.FAILED.getName());
}
}
}
Copy @Bean
public Job conditionalJob() {
return this.jobBuilderFactory.get("conditionalJob")
.start(firstStep())
.next(decider())
.from(decider())
.on("FAILED").to(failureStep())
.from(decider()).on("*").to(successStep())
.end()
.build();
}
Job μ’
λ£νκΈ°
μ€νλ§ λ°°μΉμμλ Jobμ μ’
λ£ν λ μλ 3κ°μ§ μνλ‘ μ’
λ£ν μ μλ€.
μ€νλ§ λ°°μΉ μ²λ¦¬κ° μ±κ³΅μ μΌλ‘ μ’
λ£λμμ μλ―Έ
JobInstance
κ° Completedλ‘ μ’
λ£λλ©΄ λμΌν νλΌλ―Έν°λ₯Ό μ¬μ©ν΄ λ€μ μ€νν μ μλ€.
μ‘μ΄ μ±κ³΅μ μΌλ‘ μλ£λμ§ μμμμ μλ―Έ
Failed μνλ‘ μ’
λ£λ μ‘μ μ€νλ§ λ°°μΉλ₯Ό μ¬μ©ν΄ λμΌν νλΌλ―Έν°λ‘ λ€μ μ€νν μ μλ€.
Stopped μνλ‘ μ’
λ£λ μ‘μ λ€μ μν κ°λ₯νλ€.
Jobμ μ€λ₯κ° λ°μνμ§ μμμ΄λ, μ€λ¨λ μμΉμμ μ‘μ λ€μ μμν μ μλ€.
μ¬λμ κ°μ
μ΄ νμνκ±°λ λ€λ₯Έ κ²μ¬/μ²λ¦¬κ° νμν μν©μ μ μ©νλ€.
BatchStatus
λ₯Ό νλ³ν λ, ExitStatus
λ₯Ό νκ°νλ©΄μ μλ³λλ€. ExitStatus
λ μ€ν
, μ²ν¬, μ‘μμ λ°νλ μ μμΌλ©°, BatchStatus
λ StepExecution
μ΄λ JobExecution
λ΄μ 보κ΄λλ©°, JobRepository
μ μ μ₯λλ€.
Completed μνλ‘ μ’
λ£νκΈ°
.end()
λ©μλ μ¬μ©
Copy return this.jobBuilderFactory.get("conditionalJob")
.start(firstStep())
.on("FAILED").end()
.from(firstStep()).on("*").to(successStep())
.end()
.build();
BATCH_STEP_EXECUTION
ν
μ΄λΈμ μ€ν
μ΄ λ°νν ExitStatus
κ° μ μ₯λλ©°, μ€ν
μ΄ λ°νν μνκ° λ¬΄μμ΄λ μκ΄μμ΄ BATCH_JOB_EXECUTION
μ COMPLETED
κ° μ μ₯λλ€.
Failed μνλ‘ μ’
λ£νκΈ°
fail()
λ©μλ μ¬μ©
Copy return this.jobBuilderFactory.get("conditionalJob")
.start(firstStep())
.on("FAILED").fail()
.from(firstStep()).on("*").to(successStep())
.end()
.build();
μ¬κΈ°μ firstStep()
μ΄ FAILEDλ‘ λλλ©΄, JobRepository
μ ν΄λΉ Jobμ΄ μ€ν¨ν κ²μΌλ‘ μ μ₯λλ©°, λμΌν νλΌλ―Έν°λ₯Ό μ¬μ©ν΄ λ€μ μ€νν μ μλ€.
Stopped μνλ‘ μ’
λ£νκΈ°
.stopAndRestart()
λ©μλλ‘ μ‘μ λ€μ μννλ€λ©΄, 미리 ꡬμ±ν΄λ μ€ν
λΆν° μμλλ€. μλ μμ μμλ μ¬μνμ successStep()
λΆν° μνλλκ²μ λ³Ό μ μλ€.
Copy return this.jobBuilderFactory.get("conditionalJob")
.start(firstStep())
.on("FAILED").stopAndRestart(successStep())
.from(firstStep()).on("*").to(successStep())
.end()
.build();