Step

Step실질적인 배치 처리를 정의하고 제어하는 데 필요한 모든 정보가 들어있는 도메인 객체로, Job을 처리하는 실질적인 단위로 쓰인다.(Job:Step = 1:M)

  • Step은 Job을 구성하는 독립된 작업 단위

  • 순차적으로 배치 처리 수행

  • Step은 모든 단위 작업의 조각으로 자체적으로 입력, 처리기, 출력을 다룬다.

  • 트랜잭션은 Step 내부에서 이루어짐

  • org.springframework.batch.core.Step

StepExecution

Step의 실행 정보를 담는 객체로, 각각의 Step이 실행될 때마다 StepExecution이 생성된다.

public class StepExecution extends Entity {
    private final JobExecution jobExecution; // 현재 JobExecution 정보
    private final String stepName; // Step 이름
    private volatile BatchStatus status; // Step의 실행 상태(COMPLETED, STARTING, STARTED ...)
    private volatile int readCount; // 성공적으로 읽은 레코드 수
    private volatile int writeCount; // 성공적으로 쓴 레코드 수
    private volatile int commitCount; // Step의 실행에 대해 커밋된 트랜잭션 수
    private volatile int rollbackCount; // Step의 실행에 대해 롤백된 트랜잭션 수
    private volatile int readSkipCount; // 읽기에 실패해 건너 띈 렉코드 수
    private volatile int processSkipCount; // 프로세스가 실패해 건너 띈 렉코드 수
    private volatile int writeSkipCount;// 쓰기에 실패해 건너 띈 렉코드 수
    private volatile Date startTime; // Step이 실행된 시간(null == 시작되지 않음)
    private volatile Date endTime; // Step의 실행 성공 여부와 관계 없이 끝난 시간
    private volatile Date lastUpdated; // 마지막으로 수정된 시간
    private volatile ExecutionContext executionContext; // Step 실행 사이에 유지해야하는 사용자 데이터
    private volatile ExitStatus exitStatus; // Step 실행 결과에 대한 상태 값(UNKOWN, EXECUTING, COMPLETE, ...)
    private volatile boolean terminateOnly; // Job 실행 중지 여부
    private volatile int filterCount; // 실행에서 필터링된 레코드 수
    private transient volatile List<Throwable> failureExceptions; // Step 실행중 발생한 예외 리스트
    ...
}

Tasklet 기반

  • Tasklet은 임의의 Step을 실행할 때 하나의 작업을 처리하는 방식

  • 읽기, 처리, 쓰기로 나뉜 방식이 청크 지향 프로세싱이라면 이를 단일 작업으로 만드는 개념Tasklet

  • 트랜잭션 내에서 로직이 실행될 수 있는 기능을 제공하는 전략 인터페이스

  • org.springframework.batch.core.step.tasklet.Tasklet

Adapter

CallableTaskletAdapter

  • org.springframework.batch.core.step.tasklet.CallableTaskletAdapter

  • Callable<V> 인터페이스의 구현체를 구성할 수 있게 해주는 Adapter

    • 리턴값이 존재하기 때문에 공유 객체를 사용하지 않는다.

    • 체크 예외를 외부로 던질 수 있다.

  • Step의 특정 로직을 해당 Step이 실행되는 스레드가 아닌 다른 스레드에서 실행하고 싶을 때 사용

MethodInvokingTaskletAdapter

  • org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter1

  • 다른 클래스 내의 메서드를 Tasklet처럼 실행 가능

TargetMethod는 ExitStatus.COMPLETED default이며, ExitStatus를 반환하면 메서드가 반환한 값이 Tasklet에서 반환된다.

SystemCommandTasklet

  • org.springframework.batch.core.step.tasklet.SystemCommandTasklet

  • 시스템 명령을 실행할 때 사용하며, 지정한 시스템 명령을 비동기로 실행한다.

  • SimpleSystemProcessExitCodeMapper

Chunk 기반

https://github.com/cheese10yun/TIL/raw/master/assets/chun-process.png

Chunk란 아이템이 트랜잭션에 commit되는 수를 말한다.

즉, 청크 지향 처리란 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것을 의미한다.

Chunk 지향 프로세싱은 1000개의 데이터에 대해 배치 로직을 실행한다고 가정하면, Chunk 단위로 나누지 않았을 경우에는 한개만 실패해도 성공한 999개의 데이터가 롤백된다. Chunk 단위를 10으로 한다면, 작업 중에 다른 Chunk는 영향을 받지 않는다.

이때, Chunk는 커밋 간격(commit interval)에 의해 정의되고 수행하므로, 커밋 간격에 따라 성능이 달라질 수 있다. 최상의 성능을 얻기 위해서는 커밋 간격 설정이 중요하다.

ItemReader, ItemProcessor, ItemWriter 3단계로 비지니스 로직을 분리해 역할을 명확하게 분리할 수 있다.

  • 비즈니스 로직 분리

  • 읽어온 배치 데이터와 쓰여질 데이터 타입이 다른 경우에 대한 대응

  • 각 Chunk는 자체 트랜잭션으로 실행되며, 처리에 실패하면 성공한 트랜잭션 이후부터 다시 시작 가능

그러므로 읽어온 배치의 데이터와 저장할 데이터 타입이 다른 경우에 대응할 수 있다.

ItemReader

  • Step의 대상이 되는 배치 데이터(File, Xml, DB 등)를 읽어오는 인터페이스

  • org.springframework.batch.item.ItemReader<T>

ItemProcessor

  • ItemReader로 읽어 온 배치 데이터를 변환하는 역할을 수행

  • ItemProcessor는 로직 처리만 수행하여 역할을 분리하고, 명확한 input/output을 ItemProcessor로 구현해놓으면 더 직관적인 코드가 될 것이다.

  • org.springframework.batch.item.ItemProcessor<T>

ItemWriter

  • 배치 데이터(DB, File 등)를 저장한다.

  • org.springframework.batch.item.ItemWriter<T>

리스트의 데이터 수는 설정한 *청크(Chunk) 단위로 불러온다.

청크 기반 Job 예시

일반적으로는 위와 같이 커밋간격을 하드 코딩해 크기를 정의하지만, 크기가 동일하지 않은 청크를 처리해야하는 경우도 있다. 스프링 배치는 org.springframework.batch.repeat.CompletionPolicy 인터페이스를 제공해 청크가 완료되는 시점을 정의할 수 있도록 제공해준다.

CompletionPolicy

청크 완료 여부를 결정할 수 있는 결정로직을 구현할 수 있는 인터페이스로, CompletionPolicy 인터페이스의 구현체에 대해서 알아 볼 것이다.

직접 구현하는 방법

CompletionPolicy를 구현하여 필수 메서드들을 각각 알맞게 로직을 구성하면된다.

다음과 같이 Random으로 지정된 수만큼 chunk가 수행되는 것을 확인 할 수 있다.

SimpleCompletionPolicy

가장 기본적인 구현체로, 미리 구성해둔 임곗값에 도달하면 청크 완료로 표시한다.

TimeoutTerminationPolicy

타임아웃 값을 구성해, 청크 내에서 처리 시간이 지정한 시간이 넘으면 청크가 완료된 것으로 간주하고, 모든 트랜잭션 처리를 정상적으로 한다는 것이다. TimeoutTerminationPolicy 만으로 청크 완료 시점을 결정하는 경우는 거의 존재하지 않으며, CompositeCompletionPolicy의 일부로 사용하는 경우가 많다.

TimeoutTerminationPolicy로 수행한 경우 각 chunk 단위를 확인해보면 다음과 같이 제각각인 것을 볼 수있다.

CompositeCompletionPolicy

CompositeCompletionPolicy는 청크 완료 여부를 결정하는 여러 정책을 함께 구성할 수 있다. 포함하고 있는 여러 정책 중 하나라도 청크 완료라고 판단되면 해당 청크가 완료된 것으로 표시한다.

다음과 같이 수행한 경우에는 chunk 단위가 1000개를 넘어선 경우가 없는 것을 확인할 수 있다.

Step Listener

스탭과 청크의 시작과 끝에서 특정 로직을 처리할 수 있게 해준다.

(StepListener는 모든 스탭 리스너가 상속하는 마커 인터페이스이다.)

모든 수준에 리스너를 적용해 Job을 중단할 수 있으며, 일반적으로 전처리를 수행하거나 이후 결과를 평가하거나, 일부 오류처리에도 사용된다.

StepExecutionListener

  • org.springframework.batch.core.StepExecutionListener

@BeforeStep, @AfterStep 애너테이션 제공

ChunkListener

@BeforeChunk, @AfterChunk 애너테이션 제공

Step Flow

조건 로직

스프링 배치의 Step은 StepBuilder.next() 메서드를 사용해 지정한 순서대로 실행된다. 전이(transition)를 구성해 결과에 따른 다른 순서로 실행하는 것도 가능하다.

스프링 배치는 기준에 따라 두개의 와일드 카드를 허용한다.

  • * : 0개 이상의 문자를 일치하는 것을 의미

    • C* : COMPLETE, CORRECT

  • ? : 1개의 문자를 일치 시키는 것을 의미

    • ?AT : CAT, KAT과 일치하지만, THAT과는 불일치

JobExecutionDecider

Job 실행 정보(jobExecution)와 스탭 실행정보( stepExecution)를 인자로 받아 모든 정보를 이용해 다음에 무엇을 수행할지에 대해 결정할 수 있다.

Job 종료하기

스프링 배치에서는 Job을 종료할 때 아래 3가지 상태로 종료할 수 있다.

상태
설명

Completed(완료)

스프링 배치 처리가 성공적으로 종료됐음을 의미 JobInstance가 Completed로 종료되면 동일한 파라미터를 사용해 다시 실행할 수 없다.

Failed(실패)

잡이 성공적으로 완료되지 않았음을 의미 Failed 상태로 종료된 잡은 스프링 배치를 사용해 동일한 파라미터로 다시 실행할 수 있다.

Stopped(중지)

Stopped 상태로 종료된 잡은 다시 수행 가능하다. Job에 오류가 발생하지 않았어도, 중단된 위치에서 잡을 다시 시작할 수 있다. 사람의 개입이 필요하거나 다른 검사/처리가 필요한 상황에 유용하다.

BatchStatus를 판별할 때, ExitStatus를 평가하면서 식별된다. ExitStatus는 스텝, 청크, 잡에서 반환될 수 있으며, BatchStatusStepExecution 이나 JobExecution 내에 보관되며, JobRepository에 저장된다.

Completed 상태로 종료하기

.end() 메서드 사용

BATCH_STEP_EXECUTION 테이블에 스텝이 반환한 ExitStatus가 저장되며, 스텝이 반환한 상태가 무엇이든 상관없이 BATCH_JOB_EXECUTIONCOMPLETED가 저장된다.

Failed 상태로 종료하기

fail() 메서드 사용

여기서 firstStep() 이 FAILED로 끝나면, JobRepository 에 해당 Job이 실패한 것으로 저장되며, 동일한 파라미터를 사용해 다시 실행할 수 있다.

Stopped 상태로 종료하기

.stopAndRestart() 메서드로 잡을 다시 수행한다면, 미리 구성해둔 스텝부터 시작된다. 아래 예제에서는 재수행시 successStep()부터 수행되는것을 볼 수 있다.

Last updated

Was this helpful?