Step은 실질적인 배치 처리를 정의하고 제어하는 데 필요한 모든 정보가 들어있는 도메인 객체로, Job을 처리하는 실질적인 단위로 쓰인다.(Job:Step = 1:M)
Step은 Job을 구성하는 독립된 작업 단위
순차적으로 배치 처리 수행
Step은 모든 단위 작업의 조각으로 자체적으로 입력, 처리기, 출력을 다룬다.
트랜잭션은 Step 내부에서 이루어짐
org.springframework.batch.core.Step
StepExecution
Step의 실행 정보를 담는 객체로, 각각의 Step이 실행될 때마다 StepExecution이 생성된다.
publicclassStepExecutionextendsEntity {privatefinalJobExecution jobExecution; // 현재 JobExecution 정보privatefinalString stepName; // Step 이름privatevolatileBatchStatus status; // Step의 실행 상태(COMPLETED, STARTING, STARTED ...)privatevolatileint readCount; // 성공적으로 읽은 레코드 수privatevolatileint writeCount; // 성공적으로 쓴 레코드 수privatevolatileint commitCount; // Step의 실행에 대해 커밋된 트랜잭션 수privatevolatileint rollbackCount; // Step의 실행에 대해 롤백된 트랜잭션 수privatevolatileint readSkipCount; // 읽기에 실패해 건너 띈 렉코드 수privatevolatileint processSkipCount; // 프로세스가 실패해 건너 띈 렉코드 수privatevolatileint writeSkipCount;// 쓰기에 실패해 건너 띈 렉코드 수privatevolatileDate startTime; // Step이 실행된 시간(null == 시작되지 않음)privatevolatileDate endTime; // Step의 실행 성공 여부와 관계 없이 끝난 시간privatevolatileDate lastUpdated; // 마지막으로 수정된 시간privatevolatileExecutionContext executionContext; // Step 실행 사이에 유지해야하는 사용자 데이터privatevolatileExitStatus exitStatus; // Step 실행 결과에 대한 상태 값(UNKOWN, EXECUTING, COMPLETE, ...)privatevolatileboolean terminateOnly; // Job 실행 중지 여부privatevolatileint filterCount; // 실행에서 필터링된 레코드 수privatetransientvolatileList<Throwable> failureExceptions; // Step 실행중 발생한 예외 리스트...}
Tasklet 기반
Tasklet은 임의의 Step을 실행할 때 하나의 작업을 처리하는 방식
읽기, 처리, 쓰기로 나뉜 방식이 청크 지향 프로세싱이라면 이를 단일 작업으로 만드는 개념이 Tasklet
publicinterfaceTasklet {// 내부에 원하는 단일 작업을 구현하고 나면, RepeatStatus.FINISHED 반환후 작업이 계속되면 RepeatStatus.CONTINUABLE 반환 @NullableRepeatStatusexecute(StepContribution var1,ChunkContext var2) throwsException;}
@EnableBatchProcessing@ConfigurationpublicclassSystemCommandConfiguration { @AutowiredprivateJobBuilderFactory jobBuilderFactory; @AutowiredprivateStepBuilderFactory stepBuilderFactory; @BeanpublicJobsystemCommandJob() {returnthis.jobBuilderFactory.get("systemCommandJob").start(systemCommandStep()).build(); } @BeanpublicStepsystemCommandStep() {returnthis.stepBuilderFactory.get("systemCommandStep").tasklet(systemCommandTasklet()).build(); } @BeanpublicSystemCommandTaskletsystemCommandTasklet() {SystemCommandTasklet systemCommandTasklet =newSystemCommandTasklet();// 명령어systemCommandTasklet.setCommand("touch tmp.txt");systemCommandTasklet.setTimeout(5000);// Job이 비정상적으로 종료될 떄 시스템 프로세스와 관련된 스레드를 강제 종료할지 여부systemCommandTasklet.setInterruptOnCancel(true);// 작업 디렉토리 설정systemCommandTasklet.setWorkingDirectory("/Users/dh0023/Develop/gitbook/TIL");// 시스템 반환 코드를 스프링 배치 상태 값으로 매핑systemCommandTasklet.setSystemProcessExitCodeMapper(touchCodeMapper());// 비동기로 실행하는 시스템 명령을 주기적으로 완료 여부 확인, 완료 여부를 확인하는 주기(default=1초)systemCommandTasklet.setTerminationCheckInterval(5000);// 시스템 명령을 실행하는 TaskExecutor 구성 가능// 문제가 발생하면 락이 발생할 수 있으므로, 동기방식으로 구현하지 않는 것이 좋다.systemCommandTasklet.setTaskExecutor(newSimpleAsyncTaskExecutor());// 명령을 실행하기 전에 설정하는 환경 파라미터 목록systemCommandTasklet.setEnvironmentParams(newString[]{"BATCH_HOME=/Users/dh0023/Develop/spring/spring-practice/batch-practice"});return systemCommandTasklet; } @BeanpublicSimpleSystemProcessExitCodeMappertouchCodeMapper() {// 반환된 시스템 코드가 0이 ExitStatus.FINISHED// 0이 아니면 ExitStatus.FAILEDreturnnewSimpleSystemProcessExitCodeMapper(); } @BeanpublicConfigurableSystemProcessExitCodeMapperconfigurableSystemProcessExitCodeMapper() {// 일반적인 구성 방법으로 매핑 구성을 할 수 있음.ConfigurableSystemProcessExitCodeMapper mapper =newConfigurableSystemProcessExitCodeMapper();Map<Object,ExitStatus> mappings =newHashMap<Object,ExitStatus>() { {put(0,ExitStatus.COMPLETED);put(1,ExitStatus.FAILED);put(2,ExitStatus.EXECUTING);put(3,ExitStatus.NOOP);put(4,ExitStatus.UNKNOWN);put(ConfigurableSystemProcessExitCodeMapper.ELSE_KEY,ExitStatus.UNKNOWN); } };mapper.setMappings(mappings);return mapper; }}
일반적으로는 위와 같이 커밋간격을 하드 코딩해 크기를 정의하지만, 크기가 동일하지 않은 청크를 처리해야하는 경우도 있다. 스프링 배치는 org.springframework.batch.repeat.CompletionPolicy 인터페이스를 제공해 청크가 완료되는 시점을 정의할 수 있도록 제공해준다.
CompletionPolicy
청크 완료 여부를 결정할 수 있는 결정로직을 구현할 수 있는 인터페이스로, CompletionPolicy 인터페이스의 구현체에 대해서 알아 볼 것이다.
packageorg.springframework.batch.repeat;publicinterfaceCompletionPolicy {// 청크 완료 여부의 상태를 기반으로 결정 로직 수행booleanisComplete(RepeatContext context,RepeatStatus result);// 내부 상태를 이용해 청크 완료 여부 판단booleanisComplete(RepeatContext context);// 청크의 시작을 알 수 있도록 정책을 초기화RepeatContextstart(RepeatContext parent);// 각 item이 처리가되면 update 메서드가 호출되면서 내부 상태 갱신voidupdate(RepeatContext context);}
직접 구현하는 방법
CompletionPolicy를 구현하여 필수 메서드들을 각각 알맞게 로직을 구성하면된다.
publicclassRandomChunkSizePolicyimplementsCompletionPolicy {privateint chunksize;privateint totalProcessed;privateRandom random =newRandom();// 청크 완료 여부의 상태를 기반으로 결정 로직 수행 @OverridepublicbooleanisComplete(RepeatContext context,RepeatStatus result) {if (RepeatStatus.FINISHED== result) {returntrue; } else {returnisComplete(context); } }// 내부 상태를 이용해 청크 완료 여부 판단 @OverridepublicbooleanisComplete(RepeatContext context) {returnthis.totalProcessed>= chunksize; }// 청크의 시작을 알 수 있도록 정책을 초기화 @OverridepublicRepeatContextstart(RepeatContext parent) {this.chunksize=random.nextInt(20);this.totalProcessed=0;System.out.println("chunk size has been set to => "+this.chunksize);return parent; }// 각 item이 처리가되면 update 메서드가 호출되면서 내부 상태 갱신 @Overridepublicvoidupdate(RepeatContext context) {this.totalProcessed++; }}
chunk size has been set to => 5
>> current item = b784cda0-961f-4faf-9737-4334e774f0d1
>> current item = 9fc3ec22-5d54-4632-94e3-fcc28cc00260
>> current item = 53452739-0122-4f8b-a52a-667e37cbf908
>> current item = de9ad8ec-1bf7-40d4-b991-72b6699594f9
>> current item = 352104aa-ae63-4928-96e2-460b3b145d43
>> end itemWriter chunk 5
chunk size has been set to => 10
>> current item = 4116f3e9-cad3-4387-8f46-a871d825d7d9
>> current item = 3e1f58b7-a1fa-45bd-b336-2484d1d543f3
>> current item = a58e2401-37e8-402a-8ac3-daddaf2e91a0
>> current item = 01162200-fc7d-4580-8533-d6deab7f3c65
>> current item = 31280b14-49bf-4a03-b1d4-210585e4da40
>> current item = 91b7d269-8105-41dc-ae6a-1d682913c168
>> current item = abeead5f-751b-4862-aa37-4f32eb09439a
>> current item = b6bf1a92-56e8-433a-a0ea-935e101cad6f
>> current item = 84af0778-d6e1-4aef-bd8f-347b047bb276
>> current item = b86477f3-b997-4b04-a3ca-27f517c9170f
>> end itemWriter chunk 10
chunk size has been set to => 11
타임아웃 값을 구성해, 청크 내에서 처리 시간이 지정한 시간이 넘으면 청크가 완료된 것으로 간주하고, 모든 트랜잭션 처리를 정상적으로 한다는 것이다. TimeoutTerminationPolicy 만으로 청크 완료 시점을 결정하는 경우는 거의 존재하지 않으며, CompositeCompletionPolicy의 일부로 사용하는 경우가 많다.
publicclassTimeoutTerminationPolicyextendsCompletionPolicySupport { /** * Default timeout value in milliseconds (the value equivalent to 30 seconds). */publicstaticfinallong DEFAULT_TIMEOUT =30000L;privatelong timeout = DEFAULT_TIMEOUT; /** * Default constructor. */publicTimeoutTerminationPolicy() { super(); } /** * Construct a {@link TimeoutTerminationPolicy} with the specified timeout * value (in milliseconds). * * @param timeout duration of the timeout. */publicTimeoutTerminationPolicy(long timeout) { super();this.timeout= timeout; }
TimeoutTerminationPolicy로 수행한 경우 각 chunk 단위를 확인해보면 다음과 같이 제각각인 것을 볼 수있다.
>> end itemWriter chunk 795
>> end itemWriter chunk 679
>> end itemWriter chunk 841
>> end itemWriter chunk 1153
>> end itemWriter chunk 1061
>> end itemWriter chunk 1916
>> end itemWriter chunk 1667
>> end itemWriter chunk 1719
>> end itemWriter chunk 931
>> end itemWriter chunk 1634
>> end itemWriter chunk 941
>> end itemWriter chunk 667
>> end itemWriter chunk 665
>> end itemWriter chunk 547
>> end itemWriter chunk 533
>> end itemWriter chunk 973
>> end itemWriter chunk 647
>> end itemWriter chunk 1632
>> end itemWriter chunk 676
CompositeCompletionPolicy
CompositeCompletionPolicy는 청크 완료 여부를 결정하는 여러 정책을 함께 구성할 수 있다. 포함하고 있는 여러 정책 중 하나라도 청크 완료라고 판단되면 해당 청크가 완료된 것으로 표시한다.
@BeanpublicCompletionPolicycompositeCompletionPolicy() {CompositeCompletionPolicy policy =newCompositeCompletionPolicy();// 여러 정책 설정policy.setPolicies(newCompletionPolicy[]{newTimeoutTerminationPolicy(3),newSimpleCompletionPolicy(1000) } );return policy; }
다음과 같이 수행한 경우에는 chunk 단위가 1000개를 넘어선 경우가 없는 것을 확인할 수 있다.
>> end itemWriter chunk 731
>> end itemWriter chunk 1000
>> end itemWriter chunk 690
>> end itemWriter chunk 1000
>> end itemWriter chunk 798
>> end itemWriter chunk 1000
>> end itemWriter chunk 980
>> end itemWriter chunk 838
>> end itemWriter chunk 850
>> end itemWriter chunk 1000
>> end itemWriter chunk 263
>> end itemWriter chunk 556
>> end itemWriter chunk 629
>> end itemWriter chunk 962
>> end itemWriter chunk 960
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 1000
>> end itemWriter chunk 898
>> end itemWriter chunk 1000
>> end itemWriter chunk 900
>> end itemWriter chunk 798
>> end itemWriter chunk 215
Step Listener
스탭과 청크의 시작과 끝에서 특정 로직을 처리할 수 있게 해준다.
(StepListener는 모든 스탭 리스너가 상속하는 마커 인터페이스이다.)
모든 수준에 리스너를 적용해 Job을 중단할 수 있으며, 일반적으로 전처리를 수행하거나 이후 결과를 평가하거나, 일부 오류처리에도 사용된다.
Job 실행 정보(jobExecution)와 스탭 실행정보( stepExecution)를 인자로 받아 모든 정보를 이용해 다음에 무엇을 수행할지에 대해 결정할 수 있다.
publicinterfaceJobExecutionDecider { /** * Strategy for branching an execution based on the state of an ongoing * {@link JobExecution}. The return value will be used as a status to * determine the next step in the job. * * @param jobExecution a job execution * @param stepExecution the latest step execution (may be {@code null}) * @return the exit status code */FlowExecutionStatusdecide(JobExecution jobExecution, @NullableStepExecution stepExecution);}
BatchStatus를 판별할 때, ExitStatus를 평가하면서 식별된다. ExitStatus는 스텝, 청크, 잡에서 반환될 수 있으며, BatchStatus는 StepExecution 이나 JobExecution 내에 보관되며, JobRepository에 저장된다.