Step
์ ์ค์ง์ ์ธ ๋ฐฐ์น ์ฒ๋ฆฌ๋ฅผ ์ ์ํ๊ณ ์ ์ดํ๋ ๋ฐ ํ์ํ ๋ชจ๋ ์ ๋ณด ๊ฐ ๋ค์ด์๋ ๋๋ฉ์ธ ๊ฐ์ฒด๋ก, Job
์ ์ฒ๋ฆฌํ๋ ์ค์ง์ ์ธ ๋จ์๋ก ์ฐ์ธ๋ค.(Job:Step = 1:M )
Step์ Job์ ๊ตฌ์ฑํ๋ ๋
๋ฆฝ๋ ์์
๋จ์
์์ฐจ์ ์ผ๋ก ๋ฐฐ์น ์ฒ๋ฆฌ ์ํ
Step์ ๋ชจ๋ ๋จ์ ์์
์ ์กฐ๊ฐ์ผ๋ก ์์ฒด์ ์ผ๋ก ์
๋ ฅ, ์ฒ๋ฆฌ๊ธฐ, ์ถ๋ ฅ์ ๋ค๋ฃฌ๋ค.
ํธ๋์ญ์
์ Step ๋ด๋ถ์์ ์ด๋ฃจ์ด์ง
org.springframework.batch.core.Step
StepExecution
Step
์ ์คํ ์ ๋ณด๋ฅผ ๋ด๋ ๊ฐ์ฒด๋ก, ๊ฐ๊ฐ์ Step
์ด ์คํ๋ ๋๋ง๋ค StepExecution
์ด ์์ฑ๋๋ค.
Copy public class StepExecution extends Entity {
private final JobExecution jobExecution; // ํ์ฌ JobExecution ์ ๋ณด
private final String stepName; // Step ์ด๋ฆ
private volatile BatchStatus status; // Step์ ์คํ ์ํ(COMPLETED, STARTING, STARTED ...)
private volatile int readCount; // ์ฑ๊ณต์ ์ผ๋ก ์ฝ์ ๋ ์ฝ๋ ์
private volatile int writeCount; // ์ฑ๊ณต์ ์ผ๋ก ์ด ๋ ์ฝ๋ ์
private volatile int commitCount; // Step์ ์คํ์ ๋ํด ์ปค๋ฐ๋ ํธ๋์ญ์
์
private volatile int rollbackCount; // Step์ ์คํ์ ๋ํด ๋กค๋ฐฑ๋ ํธ๋์ญ์
์
private volatile int readSkipCount; // ์ฝ๊ธฐ์ ์คํจํด ๊ฑด๋ ๋ ๋ ์ฝ๋ ์
private volatile int processSkipCount; // ํ๋ก์ธ์ค๊ฐ ์คํจํด ๊ฑด๋ ๋ ๋ ์ฝ๋ ์
private volatile int writeSkipCount; // ์ฐ๊ธฐ์ ์คํจํด ๊ฑด๋ ๋ ๋ ์ฝ๋ ์
private volatile Date startTime; // Step์ด ์คํ๋ ์๊ฐ(null == ์์๋์ง ์์)
private volatile Date endTime; // Step์ ์คํ ์ฑ๊ณต ์ฌ๋ถ์ ๊ด๊ณ ์์ด ๋๋ ์๊ฐ
private volatile Date lastUpdated; // ๋ง์ง๋ง์ผ๋ก ์์ ๋ ์๊ฐ
private volatile ExecutionContext executionContext; // Step ์คํ ์ฌ์ด์ ์ ์งํด์ผํ๋ ์ฌ์ฉ์ ๋ฐ์ดํฐ
private volatile ExitStatus exitStatus; // Step ์คํ ๊ฒฐ๊ณผ์ ๋ํ ์ํ ๊ฐ(UNKOWN, EXECUTING, COMPLETE, ...)
private volatile boolean terminateOnly; // Job ์คํ ์ค์ง ์ฌ๋ถ
private volatile int filterCount; // ์คํ์์ ํํฐ๋ง๋ ๋ ์ฝ๋ ์
private transient volatile List < Throwable > failureExceptions; // Step ์คํ์ค ๋ฐ์ํ ์์ธ ๋ฆฌ์คํธ
...
}
Tasklet
๊ธฐ๋ฐ
Tasklet
์ ์์์ Step
์ ์คํํ ๋ ํ๋์ ์์
์ ์ฒ๋ฆฌํ๋ ๋ฐฉ์
์ฝ๊ธฐ, ์ฒ๋ฆฌ, ์ฐ๊ธฐ๋ก ๋๋ ๋ฐฉ์์ด ์ฒญํฌ ์งํฅ ํ๋ก์ธ์ฑ์ด๋ผ๋ฉด ์ด๋ฅผ ๋จ์ผ ์์
์ผ๋ก ๋ง๋๋ ๊ฐ๋
์ด Tasklet
ํธ๋์ญ์
๋ด์์ ๋ก์ง์ด ์คํ๋ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ ์ ๋ต ์ธํฐํ์ด์ค
org.springframework.batch.core.step.tasklet.Tasklet
Copy public interface Tasklet {
// ๋ด๋ถ์ ์ํ๋ ๋จ์ผ ์์
์ ๊ตฌํํ๊ณ ๋๋ฉด, RepeatStatus.FINISHED ๋ฐํํ ์์
์ด ๊ณ์๋๋ฉด RepeatStatus.CONTINUABLE ๋ฐํ
@ Nullable
RepeatStatus execute ( StepContribution var1 , ChunkContext var2) throws Exception ;
}
Adapter
CallableTaskletAdapter
org.springframework.batch.core.step.tasklet.CallableTaskletAdapter
Callable<V>
์ธํฐํ์ด์ค์ ๊ตฌํ์ฒด๋ฅผ ๊ตฌ์ฑํ ์ ์๊ฒ ํด์ฃผ๋ Adapter
๋ฆฌํด๊ฐ์ด ์กด์ฌํ๊ธฐ ๋๋ฌธ์ ๊ณต์ ๊ฐ์ฒด๋ฅผ ์ฌ์ฉํ์ง ์๋๋ค.
์ฒดํฌ ์์ธ๋ฅผ ์ธ๋ถ๋ก ๋์ง ์ ์๋ค.
Step์ ํน์ ๋ก์ง์ ํด๋น Step์ด ์คํ๋๋ ์ค๋ ๋๊ฐ ์๋ ๋ค๋ฅธ ์ค๋ ๋์์ ์คํํ๊ณ ์ถ์ ๋ ์ฌ์ฉ
Copy @ EnableBatchProcessing
@ SpringBootApplication
public class CallableTaskletConfiguration {
@ Autowired
private JobBuilderFactory jobBuilderFactory;
@ Autowired
private StepBuilderFactory stepBuilderFactory;
@ Bean
public Job callableJob () {
return this . jobBuilderFactory . get ( "callableJob" )
. start ( callableStep() )
. build ();
}
@ Bean
public Step callableStep () {
return this . stepBuilderFactory . get ( "callableStep" )
. tasklet ( callableTasklet() )
. build ();
}
@ Bean
public Callable < RepeatStatus > callableObject () {
return () -> {
System . out . println ( "This was executed in another thread" );
return RepeatStatus . FINISHED ;
};
}
@ Bean
public CallableTaskletAdapter callableTasklet () {
// CallableTaskletAdapter๋ Step์ด ์คํ๋๋ ์ค๋ ๋์ ๋ณ๊ฐ์ ์ค๋ ๋์์ ์คํ๋์ง๋ง
// Step๊ณผ ๋ณ๋ ฌ๋ก ์คํ๋๋ ๊ฒ์ ์๋๋ค.
CallableTaskletAdapter callableTaskletAdapter = new CallableTaskletAdapter() ;
callableTaskletAdapter . setCallable ( callableObject() );
return callableTaskletAdapter;
}
}
MethodInvokingTaskletAdapter
org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter
1
๋ค๋ฅธ ํด๋์ค ๋ด์ ๋ฉ์๋๋ฅผ Tasklet์ฒ๋ผ ์คํ ๊ฐ๋ฅ
Copy @ EnableBatchProcessing
@ Configuration
public class MethodInvokingTaskletConfiguration {
@ Autowired
private JobBuilderFactory jobBuilderFactory;
@ Autowired
private StepBuilderFactory stepBuilderFactory;
@ Bean
public Job methodInvokingJob () {
return this . jobBuilderFactory . get ( "methodInvokingJob" )
. start ( methodInvokingStep() )
. build ();
}
@ Bean
public Step methodInvokingStep () {
return this . stepBuilderFactory . get ( "methodInvokingStep" )
. tasklet ( methodInvokingTasklet() )
. build ();
}
@ StepScope
@ Bean
public MethodInvokingTaskletAdapter methodInvokingTasklet (
@ Value ( "#{jobParameters['message']}" ) String message) {
// ๋ค๋ฅธ ํด๋์ค ๋ด์ ๋ฉ์๋๋ฅผ Tasklet์ฒ๋ผ ์คํ ๊ฐ๋ฅ
MethodInvokingTaskletAdapter methodInvokingTaskletAdapter = new MethodInvokingTaskletAdapter() ;
methodInvokingTaskletAdapter . setTargetObject ( customerService() ); // ํธ์ถํ ๋ฉ์๋๊ฐ ์๋ ๊ฐ์ฒด
methodInvokingTaskletAdapter . setTargetMethod ( "serviceMethod" ); // ํธ์ถํ ๋ฉ์๋๋ช
methodInvokingTaskletAdapter . setArguments ( new String [] {message});
return methodInvokingTaskletAdapter;
}
@ Bean
public CustomerService customerService () {
return new CustomerService() ;
}
}
TargetMethod๋ ExitStatus.COMPLETED
default์ด๋ฉฐ, ExitStatus
๋ฅผ ๋ฐํํ๋ฉด ๋ฉ์๋๊ฐ ๋ฐํํ ๊ฐ์ด Tasklet์์ ๋ฐํ๋๋ค.
SystemCommandTasklet
org.springframework.batch.core.step.tasklet.SystemCommandTasklet
์์คํ
๋ช
๋ น์ ์คํํ ๋ ์ฌ์ฉํ๋ฉฐ, ์ง์ ํ ์์คํ
๋ช
๋ น์ ๋น๋๊ธฐ๋ก ์คํํ๋ค.
Copy @ EnableBatchProcessing
@ Configuration
public class SystemCommandConfiguration {
@ Autowired
private JobBuilderFactory jobBuilderFactory;
@ Autowired
private StepBuilderFactory stepBuilderFactory;
@ Bean
public Job systemCommandJob () {
return this . jobBuilderFactory . get ( "systemCommandJob" )
. start ( systemCommandStep() )
. build ();
}
@ Bean
public Step systemCommandStep () {
return this . stepBuilderFactory . get ( "systemCommandStep" )
. tasklet ( systemCommandTasklet() )
. build ();
}
@ Bean
public SystemCommandTasklet systemCommandTasklet () {
SystemCommandTasklet systemCommandTasklet = new SystemCommandTasklet() ;
// ๋ช
๋ น์ด
systemCommandTasklet . setCommand ( "touch tmp.txt" );
systemCommandTasklet . setTimeout ( 5000 );
// Job์ด ๋น์ ์์ ์ผ๋ก ์ข
๋ฃ๋ ๋ ์์คํ
ํ๋ก์ธ์ค์ ๊ด๋ จ๋ ์ค๋ ๋๋ฅผ ๊ฐ์ ์ข
๋ฃํ ์ง ์ฌ๋ถ
systemCommandTasklet . setInterruptOnCancel ( true );
// ์์
๋๋ ํ ๋ฆฌ ์ค์
systemCommandTasklet . setWorkingDirectory ( "/Users/dh0023/Develop/gitbook/TIL" );
// ์์คํ
๋ฐํ ์ฝ๋๋ฅผ ์คํ๋ง ๋ฐฐ์น ์ํ ๊ฐ์ผ๋ก ๋งคํ
systemCommandTasklet . setSystemProcessExitCodeMapper ( touchCodeMapper() );
// ๋น๋๊ธฐ๋ก ์คํํ๋ ์์คํ
๋ช
๋ น์ ์ฃผ๊ธฐ์ ์ผ๋ก ์๋ฃ ์ฌ๋ถ ํ์ธ, ์๋ฃ ์ฌ๋ถ๋ฅผ ํ์ธํ๋ ์ฃผ๊ธฐ(default=1์ด)
systemCommandTasklet . setTerminationCheckInterval ( 5000 );
// ์์คํ
๋ช
๋ น์ ์คํํ๋ TaskExecutor ๊ตฌ์ฑ ๊ฐ๋ฅ
// ๋ฌธ์ ๊ฐ ๋ฐ์ํ๋ฉด ๋ฝ์ด ๋ฐ์ํ ์ ์์ผ๋ฏ๋ก, ๋๊ธฐ๋ฐฉ์์ผ๋ก ๊ตฌํํ์ง ์๋ ๊ฒ์ด ์ข๋ค.
systemCommandTasklet . setTaskExecutor ( new SimpleAsyncTaskExecutor() );
// ๋ช
๋ น์ ์คํํ๊ธฐ ์ ์ ์ค์ ํ๋ ํ๊ฒฝ ํ๋ผ๋ฏธํฐ ๋ชฉ๋ก
systemCommandTasklet . setEnvironmentParams (
new String []{ "BATCH_HOME=/Users/dh0023/Develop/spring/spring-practice/batch-practice" });
return systemCommandTasklet;
}
@ Bean
public SimpleSystemProcessExitCodeMapper touchCodeMapper () {
// ๋ฐํ๋ ์์คํ
์ฝ๋๊ฐ 0์ด ExitStatus.FINISHED
// 0์ด ์๋๋ฉด ExitStatus.FAILED
return new SimpleSystemProcessExitCodeMapper() ;
}
@ Bean
public ConfigurableSystemProcessExitCodeMapper configurableSystemProcessExitCodeMapper () {
// ์ผ๋ฐ์ ์ธ ๊ตฌ์ฑ ๋ฐฉ๋ฒ์ผ๋ก ๋งคํ ๊ตฌ์ฑ์ ํ ์ ์์.
ConfigurableSystemProcessExitCodeMapper mapper = new ConfigurableSystemProcessExitCodeMapper() ;
Map < Object , ExitStatus > mappings = new HashMap < Object , ExitStatus >() {
{
put( 0 , ExitStatus . COMPLETED ) ;
put( 1 , ExitStatus . FAILED ) ;
put( 2 , ExitStatus . EXECUTING ) ;
put( 3 , ExitStatus . NOOP ) ;
put( 4 , ExitStatus . UNKNOWN ) ;
put( ConfigurableSystemProcessExitCodeMapper . ELSE_KEY , ExitStatus . UNKNOWN ) ;
}
};
mapper . setMappings (mappings);
return mapper;
}
}
SimpleSystemProcessExitCodeMapper
Copy public class SimpleSystemProcessExitCodeMapper implements SystemProcessExitCodeMapper {
@ Override
public ExitStatus getExitStatus ( int exitCode) {
if (exitCode == 0 ) {
return ExitStatus . COMPLETED ;
} else {
return ExitStatus . FAILED ;
}
}
}
Chunk
๊ธฐ๋ฐ
Chunk๋ ์์ดํ
์ด ํธ๋์ญ์
์ commit๋๋ ์ ๋ฅผ ๋งํ๋ค.
์ฆ, ์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ๋ ํ ๋ฒ์ ํ๋์ฉ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด Chunk๋ผ๋ ๋ฉ์ด๋ฆฌ๋ฅผ ๋ง๋ ๋ค, Chunk ๋จ์๋ก ํธ๋์ญ์
์ ๋ค๋ฃจ๋ ๊ฒ์ ์๋ฏธ ํ๋ค.
Chunk ์งํฅ ํ๋ก์ธ์ฑ์ 1000๊ฐ์ ๋ฐ์ดํฐ์ ๋ํด ๋ฐฐ์น ๋ก์ง์ ์คํํ๋ค๊ณ ๊ฐ์ ํ๋ฉด, Chunk ๋จ์๋ก ๋๋์ง ์์์ ๊ฒฝ์ฐ์๋ ํ๊ฐ๋ง ์คํจํด๋ ์ฑ๊ณตํ 999๊ฐ์ ๋ฐ์ดํฐ๊ฐ ๋กค๋ฐฑ๋๋ค. Chunk ๋จ์๋ฅผ 10์ผ๋ก ํ๋ค๋ฉด, ์์
์ค์ ๋ค๋ฅธ Chunk๋ ์ํฅ์ ๋ฐ์ง ์๋๋ค.
์ด๋, Chunk๋ ์ปค๋ฐ ๊ฐ๊ฒฉ(commit interval)์ ์ํด ์ ์๋๊ณ ์ํํ๋ฏ๋ก, ์ปค๋ฐ ๊ฐ๊ฒฉ์ ๋ฐ๋ผ ์ฑ๋ฅ์ด ๋ฌ๋ผ์ง ์ ์๋ค. ์ต์์ ์ฑ๋ฅ์ ์ป๊ธฐ ์ํด์๋ ์ปค๋ฐ ๊ฐ๊ฒฉ ์ค์ ์ด ์ค์ํ๋ค.
ItemReader
, ItemProcessor
, ItemWriter
3๋จ๊ณ๋ก ๋น์ง๋์ค ๋ก์ง์ ๋ถ๋ฆฌํด ์ญํ ์ ๋ช
ํํ๊ฒ ๋ถ๋ฆฌํ ์ ์๋ค.
๋น์ฆ๋์ค ๋ก์ง ๋ถ๋ฆฌ
์ฝ์ด์จ ๋ฐฐ์น ๋ฐ์ดํฐ์ ์ฐ์ฌ์ง ๋ฐ์ดํฐ ํ์
์ด ๋ค๋ฅธ ๊ฒฝ์ฐ์ ๋ํ ๋์
๊ฐ Chunk๋ ์์ฒด ํธ๋์ญ์
์ผ๋ก ์คํ๋๋ฉฐ, ์ฒ๋ฆฌ์ ์คํจํ๋ฉด ์ฑ๊ณตํ ํธ๋์ญ์
์ดํ๋ถํฐ ๋ค์ ์์ ๊ฐ๋ฅ
๊ทธ๋ฌ๋ฏ๋ก ์ฝ์ด์จ ๋ฐฐ์น์ ๋ฐ์ดํฐ์ ์ ์ฅํ ๋ฐ์ดํฐ ํ์
์ด ๋ค๋ฅธ ๊ฒฝ์ฐ์ ๋์ํ ์ ์๋ค.
ItemReader
Step
์ ๋์์ด ๋๋ ๋ฐฐ์น ๋ฐ์ดํฐ(File, Xml, DB ๋ฑ)๋ฅผ ์ฝ์ด์ค๋ ์ธํฐํ์ด์ค
org.springframework.batch.item.ItemReader<T>
Copy public interface ItemReader < T > {
// read ๋ฉ์๋์ ๋ฐํ ํ์
์ T(์ ๋๋ฆญ)์ผ๋ก ๊ตฌํํ์ฌ ์ง์ ํ์
์ ์ง์ ํ ์ ์์
@ Nullable
T read () throws Exception , UnexpectedInputException , ParseException , NonTransientResourceException ;
}
ItemProcessor
ItemReader
๋ก ์ฝ์ด ์จ ๋ฐฐ์น ๋ฐ์ดํฐ๋ฅผ ๋ณํํ๋ ์ญํ ์ ์ํ
ItemProcessor
๋ ๋ก์ง ์ฒ๋ฆฌ๋ง ์ํํ์ฌ ์ญํ ์ ๋ถ๋ฆฌํ๊ณ , ๋ช
ํํ input/output์ ItemProcessor
๋ก ๊ตฌํํด๋์ผ๋ฉด ๋ ์ง๊ด์ ์ธ ์ฝ๋๊ฐ ๋ ๊ฒ์ด๋ค.
org.springframework.batch.item.ItemProcessor<T>
Copy public interface ItemProcessor < I , O > {
@ Nullable
O process (@ NonNull I var1) throws Exception ;
}
ItemWriter
๋ฐฐ์น ๋ฐ์ดํฐ(DB, File ๋ฑ)๋ฅผ ์ ์ฅ ํ๋ค.
org.springframework.batch.item.ItemWriter<T>
Copy public interface ItemWriter < T > {
// T(์ ๋ค๋ฆญ)์ผ๋ก ์ง์ ํ ํ์
์ List ๋งค๊ฐ๋ณ์๋ก ๋ฐ๋๋ค.
void write ( List < ? extends T > var1) throws Exception ;
}
๋ฆฌ์คํธ์ ๋ฐ์ดํฐ ์๋ ์ค์ ํ *์ฒญํฌ(Chunk) ๋จ์๋ก ๋ถ๋ฌ์จ๋ค.
์ฒญํฌ ๊ธฐ๋ฐ Job ์์
Copy @ EnableBatchProcessing
@ Configuration
public class ChunkBasedConfiguration {
@ Autowired
private JobBuilderFactory jobBuilderFactory;
@ Autowired
private StepBuilderFactory stepBuilderFactory;
/**
* ์ค์ ์คํ๋ง ๋ฐฐ์น Job ์์ฑ
*/
@ Bean
public Job chunkBasedJob () {
return this . jobBuilderFactory . get ( "chunkBasedJob" )
. start ( chunkStep() )
. build ();
}
@ Bean
public Step chunkStep () {
return this . stepBuilderFactory . get ( "chunkStep" )
. < String , String > chunk( 1000 ) // chunk ๊ธฐ๋ฐ, ์ปค๋ฐ๊ฐ๊ฒฉ 1000
. reader ( itemReader() )
. writer ( itemWriter() )
. build ();
}
@ Bean
public ListItemReader < String > itemReader (){
List < String > items = new ArrayList <>( 100000 );
for ( int i = 0 ; i < 100000 ; i ++ ) {
items . add ( UUID . randomUUID () . toString ());
}
return new ListItemReader <>(items);
}
@ Bean
public ItemWriter < String > itemWriter () {
return items -> {
for ( String item : items) {
System . out . println ( ">> current item = " + item);
}
System . out . println ( ">> end itemWriter chunk " + items . size ());
};
}
}
์ผ๋ฐ์ ์ผ๋ก๋ ์์ ๊ฐ์ด ์ปค๋ฐ๊ฐ๊ฒฉ์ ํ๋ ์ฝ๋ฉํด ํฌ๊ธฐ๋ฅผ ์ ์ํ์ง๋ง, ํฌ๊ธฐ๊ฐ ๋์ผํ์ง ์์ ์ฒญํฌ๋ฅผ ์ฒ๋ฆฌํด์ผํ๋ ๊ฒฝ์ฐ๋ ์๋ค. ์คํ๋ง ๋ฐฐ์น๋ org.springframework.batch.repeat.CompletionPolicy
์ธํฐํ์ด์ค๋ฅผ ์ ๊ณตํด ์ฒญํฌ๊ฐ ์๋ฃ๋๋ ์์ ์ ์ ์ํ ์ ์๋๋ก ์ ๊ณตํด์ค๋ค.
CompletionPolicy
์ฒญํฌ ์๋ฃ ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ ์ ์๋ ๊ฒฐ์ ๋ก์ง์ ๊ตฌํํ ์ ์๋ ์ธํฐํ์ด์ค๋ก, CompletionPolicy
์ธํฐํ์ด์ค์ ๊ตฌํ์ฒด์ ๋ํด์ ์์ ๋ณผ ๊ฒ์ด๋ค.
Copy package org . springframework . batch . repeat ;
public interface CompletionPolicy {
// ์ฒญํฌ ์๋ฃ ์ฌ๋ถ์ ์ํ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ฒฐ์ ๋ก์ง ์ํ
boolean isComplete ( RepeatContext context , RepeatStatus result);
// ๋ด๋ถ ์ํ๋ฅผ ์ด์ฉํด ์ฒญํฌ ์๋ฃ ์ฌ๋ถ ํ๋จ
boolean isComplete ( RepeatContext context);
// ์ฒญํฌ์ ์์์ ์ ์ ์๋๋ก ์ ์ฑ
์ ์ด๊ธฐํ
RepeatContext start ( RepeatContext parent);
// ๊ฐ item์ด ์ฒ๋ฆฌ๊ฐ๋๋ฉด update ๋ฉ์๋๊ฐ ํธ์ถ๋๋ฉด์ ๋ด๋ถ ์ํ ๊ฐฑ์
void update ( RepeatContext context);
}
์ง์ ๊ตฌํํ๋ ๋ฐฉ๋ฒ
CompletionPolicy
๋ฅผ ๊ตฌํํ์ฌ ํ์ ๋ฉ์๋๋ค์ ๊ฐ๊ฐ ์๋ง๊ฒ ๋ก์ง์ ๊ตฌ์ฑํ๋ฉด๋๋ค.
Copy public class RandomChunkSizePolicy implements CompletionPolicy {
private int chunksize;
private int totalProcessed;
private Random random = new Random() ;
// ์ฒญํฌ ์๋ฃ ์ฌ๋ถ์ ์ํ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ฒฐ์ ๋ก์ง ์ํ
@ Override
public boolean isComplete ( RepeatContext context , RepeatStatus result) {
if ( RepeatStatus . FINISHED == result) {
return true ;
} else {
return isComplete(context) ;
}
}
// ๋ด๋ถ ์ํ๋ฅผ ์ด์ฉํด ์ฒญํฌ ์๋ฃ ์ฌ๋ถ ํ๋จ
@ Override
public boolean isComplete ( RepeatContext context) {
return this . totalProcessed >= chunksize;
}
// ์ฒญํฌ์ ์์์ ์ ์ ์๋๋ก ์ ์ฑ
์ ์ด๊ธฐํ
@ Override
public RepeatContext start ( RepeatContext parent) {
this . chunksize = random . nextInt ( 20 );
this . totalProcessed = 0 ;
System . out . println ( "chunk size has been set to => " + this . chunksize );
return parent;
}
// ๊ฐ item์ด ์ฒ๋ฆฌ๊ฐ๋๋ฉด update ๋ฉ์๋๊ฐ ํธ์ถ๋๋ฉด์ ๋ด๋ถ ์ํ ๊ฐฑ์
@ Override
public void update ( RepeatContext context) {
this . totalProcessed ++ ;
}
}
Copy chunk size has been set to => 5
>> current item = b784cda0-961f-4faf-9737-4334e774f0d1
>> current item = 9fc3ec22-5d54-4632-94e3-fcc28cc00260
>> current item = 53452739-0122-4f8b-a52a-667e37cbf908
>> current item = de9ad8ec-1bf7-40d4-b991-72b6699594f9
>> current item = 352104aa-ae63-4928-96e2-460b3b145d43
>> end itemWriter chunk 5
chunk size has been set to => 10
>> current item = 4116f3e9-cad3-4387-8f46-a871d825d7d9
>> current item = 3e1f58b7-a1fa-45bd-b336-2484d1d543f3
>> current item = a58e2401-37e8-402a-8ac3-daddaf2e91a0
>> current item = 01162200-fc7d-4580-8533-d6deab7f3c65
>> current item = 31280b14-49bf-4a03-b1d4-210585e4da40
>> current item = 91b7d269-8105-41dc-ae6a-1d682913c168
>> current item = abeead5f-751b-4862-aa37-4f32eb09439a
>> current item = b6bf1a92-56e8-433a-a0ea-935e101cad6f
>> current item = 84af0778-d6e1-4aef-bd8f-347b047bb276
>> current item = b86477f3-b997-4b04-a3ca-27f517c9170f
>> end itemWriter chunk 10
chunk size has been set to => 11
๋ค์๊ณผ ๊ฐ์ด Random์ผ๋ก ์ง์ ๋ ์๋งํผ chunk๊ฐ ์ํ๋๋ ๊ฒ์ ํ์ธ ํ ์ ์๋ค.
SimpleCompletionPolicy
๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ๊ตฌํ์ฒด๋ก, ๋ฏธ๋ฆฌ ๊ตฌ์ฑํด๋ ์๊ณ๊ฐ์ ๋๋ฌํ๋ฉด ์ฒญํฌ ์๋ฃ๋ก ํ์ํ๋ค.
Copy public class SimpleCompletionPolicy extends DefaultResultCompletionPolicy {
public static final int DEFAULT_CHUNK_SIZE = 5 ;
int chunkSize = 0 ;
public SimpleCompletionPolicy () {
this (DEFAULT_CHUNK_SIZE);
}
public SimpleCompletionPolicy ( int chunkSize) {
super();
this . chunkSize = chunkSize;
}
Copy @ Bean
public Step chunkStep() {
return this . stepBuilderFactory . get ( "chunkStep" )
. < String , String > chunk(completionPolicy()) // completePolicy ํธ์ถ
. reader ( itemReader() )
. writer ( itemWriter() )
. build ();
}
@ Bean
public CompletionPolicy completionPolicy() {
// ์ฒ๋ฆฌ๋ ITEM ๊ฐ์๋ฅผ ์ธ์ด, ์ด ๊ฐ์๊ฐ ์๊ณ๊ฐ์ ๋๋ฌํ๋ฉด chunk ์๋ฃ๋ก ํ์
SimpleCompletionPolicy simpleCompletionPolicy = new SimpleCompletionPolicy( 1000 ) ;
return simpleCompletionPolicy;
}
TimeoutTerminationPolicy
ํ์์์ ๊ฐ์ ๊ตฌ์ฑํด, ์ฒญํฌ ๋ด์์ ์ฒ๋ฆฌ ์๊ฐ์ด ์ง์ ํ ์๊ฐ์ด ๋์ผ๋ฉด ์ฒญํฌ๊ฐ ์๋ฃ๋ ๊ฒ์ผ๋ก ๊ฐ์ฃผํ๊ณ , ๋ชจ๋ ํธ๋์ญ์
์ฒ๋ฆฌ๋ฅผ ์ ์์ ์ผ๋ก ํ๋ค๋ ๊ฒ์ด๋ค. TimeoutTerminationPolicy
๋ง์ผ๋ก ์ฒญํฌ ์๋ฃ ์์ ์ ๊ฒฐ์ ํ๋ ๊ฒฝ์ฐ๋ ๊ฑฐ์ ์กด์ฌํ์ง ์์ผ๋ฉฐ, CompositeCompletionPolicy
์ ์ผ๋ถ๋ก ์ฌ์ฉํ๋ ๊ฒฝ์ฐ๊ฐ ๋ง๋ค.
Copy public class TimeoutTerminationPolicy extends CompletionPolicySupport {
/**
* Default timeout value in milliseconds (the value equivalent to 30 seconds).
*/
public static final long DEFAULT_TIMEOUT = 30000L ;
private long timeout = DEFAULT_TIMEOUT;
/**
* Default constructor.
*/
public TimeoutTerminationPolicy () {
super();
}
/**
* Construct a {@link TimeoutTerminationPolicy} with the specified timeout
* value (in milliseconds).
*
* @param timeout duration of the timeout.
*/
public TimeoutTerminationPolicy ( long timeout) {
super();
this . timeout = timeout;
}
Copy @ Bean
public CompletionPolicy timeoutCompletionPolicy() {
TimeoutTerminationPolicy timeoutTerminationPolicy = new TimeoutTerminationPolicy( 3 ) ;
return timeoutTerminationPolicy;
}
TimeoutTerminationPolicy
๋ก ์ํํ ๊ฒฝ์ฐ ๊ฐ chunk ๋จ์๋ฅผ ํ์ธํด๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ์ด ์ ๊ฐ๊ฐ์ธ ๊ฒ์ ๋ณผ ์์๋ค.
Copy >> end itemWriter chunk 795
>> end itemWriter chunk 679
>> end itemWriter chunk 841
>> end itemWriter chunk 1153
>> end itemWriter chunk 1061
>> end itemWriter chunk 1916
>> end itemWriter chunk 1667
>> end itemWriter chunk 1719
>> end itemWriter chunk 931
>> end itemWriter chunk 1634
>> end itemWriter chunk 941
>> end itemWriter chunk 667
>> end itemWriter chunk 665
>> end itemWriter chunk 547
>> end itemWriter chunk 533
>> end itemWriter chunk 973
>> end itemWriter chunk 647
>> end itemWriter chunk 1632
>> end itemWriter chunk 676
CompositeCompletionPolicy
CompositeCompletionPolicy
๋ ์ฒญํฌ ์๋ฃ ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ๋ ์ฌ๋ฌ ์ ์ฑ
์ ํจ๊ป ๊ตฌ์ฑํ ์ ์๋ค. ํฌํจํ๊ณ ์๋ ์ฌ๋ฌ ์ ์ฑ
์ค ํ๋๋ผ๋ ์ฒญํฌ ์๋ฃ๋ผ๊ณ ํ๋จ๋๋ฉด ํด๋น ์ฒญํฌ๊ฐ ์๋ฃ๋ ๊ฒ์ผ๋ก ํ์ํ๋ค.
Copy @ Bean
public CompletionPolicy compositeCompletionPolicy() {
CompositeCompletionPolicy policy = new CompositeCompletionPolicy() ;
// ์ฌ๋ฌ ์ ์ฑ
์ค์
policy . setPolicies (
new CompletionPolicy []{
new TimeoutTerminationPolicy( 3 ) ,
new SimpleCompletionPolicy( 1000 )
}
);
return policy;
}
๋ค์๊ณผ ๊ฐ์ด ์ํํ ๊ฒฝ์ฐ์๋ chunk ๋จ์๊ฐ 1000๊ฐ๋ฅผ ๋์ด์ ๊ฒฝ์ฐ๊ฐ ์๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
Copy >> end itemWriter chunk 731
>> end itemWriter chunk 1000
>> end itemWriter chunk 690
>> end itemWriter chunk 1000
>> end itemWriter chunk 798
>> end itemWriter chunk 1000
>> end itemWriter chunk 980
>> end itemWriter chunk 838
>> end itemWriter chunk 850
>> end itemWriter chunk 1000
>> end itemWriter chunk 263
>> end itemWriter chunk 556
>> end itemWriter chunk 629
>> end itemWriter chunk 962
>> end itemWriter chunk 960
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 898
>> end itemWriter chunk 1000
>> end itemWriter chunk 900
>> end itemWriter chunk 798
>> end itemWriter chunk 215
Step Listener
์คํญ๊ณผ ์ฒญํฌ์ ์์๊ณผ ๋์์ ํน์ ๋ก์ง์ ์ฒ๋ฆฌํ ์ ์๊ฒ ํด์ค๋ค.
(StepListener
๋ ๋ชจ๋ ์คํญ ๋ฆฌ์ค๋๊ฐ ์์ํ๋ ๋ง์ปค ์ธํฐํ์ด์ค์ด๋ค.)
๋ชจ๋ ์์ค์ ๋ฆฌ์ค๋๋ฅผ ์ ์ฉํด Job์ ์ค๋จํ ์ ์์ผ๋ฉฐ, ์ผ๋ฐ์ ์ผ๋ก ์ ์ฒ๋ฆฌ๋ฅผ ์ํํ๊ฑฐ๋ ์ดํ ๊ฒฐ๊ณผ๋ฅผ ํ๊ฐํ๊ฑฐ๋, ์ผ๋ถ ์ค๋ฅ์ฒ๋ฆฌ์๋ ์ฌ์ฉ๋๋ค.
StepExecutionListener
org.springframework.batch.core.StepExecutionListener
Copy public interface StepExecutionListener extends StepListener {
void beforeStep ( StepExecution stepExecution);
// Listener๊ฐ ์คํ
์ด ๋ฐํํ ExitStatus๋ฅผ Job์ ์ ๋ฌํ๊ธฐ ์ ์ ์์ ํ ์ ์์.
@ Nullable
ExitStatus afterStep ( StepExecution stepExecution);
}
@BeforeStep
, @AfterStep
์ ๋ํ
์ด์
์ ๊ณต
Copy public class LoggingStepStartStopListener {
@ BeforeStep
public void beforeStep ( StepExecution stepExecution) {
System . out . println ( stepExecution . getStepName () + " ์์" );
}
@ AfterStep
public ExitStatus afterStep ( StepExecution stepExecution) {
System . out . println ( stepExecution . getStepName () + " ์ข
๋ฃ" );
return stepExecution . getExitStatus ();
}
}
Copy @ Bean
public Step chunkStep() {
return this . stepBuilderFactory . get ( "chunkStep" )
. < String , String > chunk(randomChunkSizePolicy())
. reader ( itemReader() )
. writer ( itemWriter() )
. listener ( new LoggingStepStartStopListener() ) // Listener ์ค์
. build ();
}
ChunkListener
Copy public interface ChunkListener extends StepListener {
static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception" ;
void beforeChunk ( ChunkContext context);
void afterChunk ( ChunkContext context);
void afterChunkError ( ChunkContext context);
}
@BeforeChunk
, @AfterChunk
์ ๋ํ
์ด์
์ ๊ณต
Step Flow
์กฐ๊ฑด ๋ก์ง
์คํ๋ง ๋ฐฐ์น์ Step์ StepBuilder
์ .next()
๋ฉ์๋๋ฅผ ์ฌ์ฉํด ์ง์ ํ ์์๋๋ก ์คํ๋๋ค. ์ ์ด(transition)๋ฅผ ๊ตฌ์ฑํด ๊ฒฐ๊ณผ์ ๋ฐ๋ฅธ ๋ค๋ฅธ ์์๋ก ์คํํ๋ ๊ฒ๋ ๊ฐ๋ฅํ๋ค.
Copy @ Bean
public Job conditionalJob() {
return this . jobBuilderFactory . get ( "conditionalJob" )
. start ( firstStep() )
. on ( "FAILED" ) . to ( failureStep() )
. from ( firstStep() ) . on ( "*" ) . to ( successStep() )
. end ()
. build ();
}
์คํ๋ง ๋ฐฐ์น๋ ๊ธฐ์ค์ ๋ฐ๋ผ ๋๊ฐ์ ์์ผ๋ ์นด๋๋ฅผ ํ์ฉํ๋ค.
*
: 0๊ฐ ์ด์์ ๋ฌธ์๋ฅผ ์ผ์นํ๋ ๊ฒ์ ์๋ฏธ
?
: 1๊ฐ์ ๋ฌธ์๋ฅผ ์ผ์น ์ํค๋ ๊ฒ์ ์๋ฏธ
?AT
: CAT, KAT๊ณผ ์ผ์นํ์ง๋ง, THAT๊ณผ๋ ๋ถ์ผ์น
JobExecutionDecider
Job ์คํ ์ ๋ณด(jobExecution
)์ ์คํญ ์คํ์ ๋ณด( stepExecution
)๋ฅผ ์ธ์๋ก ๋ฐ์ ๋ชจ๋ ์ ๋ณด๋ฅผ ์ด์ฉํด ๋ค์์ ๋ฌด์์ ์ํํ ์ง์ ๋ํด ๊ฒฐ์ ํ ์ ์๋ค.
Copy public interface JobExecutionDecider {
/**
* Strategy for branching an execution based on the state of an ongoing
* {@link JobExecution}. The return value will be used as a status to
* determine the next step in the job.
*
* @param jobExecution a job execution
* @param stepExecution the latest step execution (may be {@code null})
* @return the exit status code
*/
FlowExecutionStatus decide ( JobExecution jobExecution , @ Nullable StepExecution stepExecution);
}
Copy public class RandomDecider implements JobExecutionDecider {
private Random random = new Random() ;
@ Override
public FlowExecutionStatus decide ( JobExecution jobExecution , StepExecution stepExecution) {
if ( random . nextBoolean ()) {
return new FlowExecutionStatus( FlowExecutionStatus . COMPLETED . getName()) ;
} else {
return new FlowExecutionStatus( FlowExecutionStatus . FAILED . getName()) ;
}
}
}
Copy @ Bean
public Job conditionalJob() {
return this . jobBuilderFactory . get ( "conditionalJob" )
. start ( firstStep() )
. next ( decider() )
. from ( decider() )
. on ( "FAILED" ) . to ( failureStep() )
. from ( decider() ) . on ( "*" ) . to ( successStep() )
. end ()
. build ();
}
Job ์ข
๋ฃํ๊ธฐ
์คํ๋ง ๋ฐฐ์น์์๋ Job์ ์ข
๋ฃํ ๋ ์๋ 3๊ฐ์ง ์ํ๋ก ์ข
๋ฃํ ์ ์๋ค.
์ํ ์ค๋ช
์คํ๋ง ๋ฐฐ์น ์ฒ๋ฆฌ๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์ข
๋ฃ๋์์ ์๋ฏธ
JobInstance
๊ฐ Completed๋ก ์ข
๋ฃ๋๋ฉด ๋์ผํ ํ๋ผ๋ฏธํฐ๋ฅผ ์ฌ์ฉํด ๋ค์ ์คํํ ์ ์๋ค.
์ก์ด ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋์ง ์์์์ ์๋ฏธ
Failed ์ํ๋ก ์ข
๋ฃ๋ ์ก์ ์คํ๋ง ๋ฐฐ์น๋ฅผ ์ฌ์ฉํด ๋์ผํ ํ๋ผ๋ฏธํฐ๋ก ๋ค์ ์คํํ ์ ์๋ค.
Stopped ์ํ๋ก ์ข
๋ฃ๋ ์ก์ ๋ค์ ์ํ ๊ฐ๋ฅํ๋ค.
Job์ ์ค๋ฅ๊ฐ ๋ฐ์ํ์ง ์์์ด๋, ์ค๋จ๋ ์์น์์ ์ก์ ๋ค์ ์์ํ ์ ์๋ค.
์ฌ๋์ ๊ฐ์
์ด ํ์ํ๊ฑฐ๋ ๋ค๋ฅธ ๊ฒ์ฌ/์ฒ๋ฆฌ๊ฐ ํ์ํ ์ํฉ์ ์ ์ฉํ๋ค.
BatchStatus
๋ฅผ ํ๋ณํ ๋, ExitStatus
๋ฅผ ํ๊ฐํ๋ฉด์ ์๋ณ๋๋ค. ExitStatus
๋ ์คํ
, ์ฒญํฌ, ์ก์์ ๋ฐํ๋ ์ ์์ผ๋ฉฐ, BatchStatus
๋ StepExecution
์ด๋ JobExecution
๋ด์ ๋ณด๊ด๋๋ฉฐ, JobRepository
์ ์ ์ฅ๋๋ค.
Completed ์ํ๋ก ์ข
๋ฃํ๊ธฐ
.end()
๋ฉ์๋ ์ฌ์ฉ
Copy return this . jobBuilderFactory . get ( "conditionalJob" )
. start ( firstStep() )
. on ( "FAILED" ) . end ()
. from ( firstStep() ) . on ( "*" ) . to ( successStep() )
. end ()
. build ();
BATCH_STEP_EXECUTION
ํ
์ด๋ธ์ ์คํ
์ด ๋ฐํํ ExitStatus
๊ฐ ์ ์ฅ๋๋ฉฐ, ์คํ
์ด ๋ฐํํ ์ํ๊ฐ ๋ฌด์์ด๋ ์๊ด์์ด BATCH_JOB_EXECUTION
์ COMPLETED
๊ฐ ์ ์ฅ๋๋ค.
Failed ์ํ๋ก ์ข
๋ฃํ๊ธฐ
fail()
๋ฉ์๋ ์ฌ์ฉ
Copy return this . jobBuilderFactory . get ( "conditionalJob" )
. start ( firstStep() )
. on ( "FAILED" ) . fail ()
. from ( firstStep() ) . on ( "*" ) . to ( successStep() )
. end ()
. build ();
์ฌ๊ธฐ์ firstStep()
์ด FAILED๋ก ๋๋๋ฉด, JobRepository
์ ํด๋น Job์ด ์คํจํ ๊ฒ์ผ๋ก ์ ์ฅ๋๋ฉฐ, ๋์ผํ ํ๋ผ๋ฏธํฐ๋ฅผ ์ฌ์ฉํด ๋ค์ ์คํํ ์ ์๋ค.
Stopped ์ํ๋ก ์ข
๋ฃํ๊ธฐ
.stopAndRestart()
๋ฉ์๋๋ก ์ก์ ๋ค์ ์ํํ๋ค๋ฉด, ๋ฏธ๋ฆฌ ๊ตฌ์ฑํด๋ ์คํ
๋ถํฐ ์์๋๋ค. ์๋ ์์ ์์๋ ์ฌ์ํ์ successStep()
๋ถํฐ ์ํ๋๋๊ฒ์ ๋ณผ ์ ์๋ค.
Copy return this . jobBuilderFactory . get ( "conditionalJob" )
. start ( firstStep() )
. on ( "FAILED" ) . stopAndRestart ( successStep() )
. from ( firstStep() ) . on ( "*" ) . to ( successStep() )
. end ()
. build ();