Step

Step은 μ‹€μ§ˆμ μΈ 배치 처리λ₯Ό μ •μ˜ν•˜κ³  μ œμ–΄ν•˜λŠ” 데 ν•„μš”ν•œ λͺ¨λ“  정보가 λ“€μ–΄μžˆλŠ” 도메인 객체둜, Job을 μ²˜λ¦¬ν•˜λŠ” μ‹€μ§ˆμ μΈ λ‹¨μœ„λ‘œ 쓰인닀.(Job:Step = 1:M)

  • Step은 Job을 κ΅¬μ„±ν•˜λŠ” λ…λ¦½λœ μž‘μ—… λ‹¨μœ„

  • 순차적으둜 배치 처리 μˆ˜ν–‰

  • Step은 λͺ¨λ“  λ‹¨μœ„ μž‘μ—…μ˜ 쑰각으둜 자체적으둜 μž…λ ₯, 처리기, 좜λ ₯을 닀룬닀.

  • νŠΈλžœμž­μ…˜μ€ Step λ‚΄λΆ€μ—μ„œ 이루어짐

  • org.springframework.batch.core.Step

StepExecution

Step의 μ‹€ν–‰ 정보λ₯Ό λ‹΄λŠ” 객체둜, 각각의 Step이 싀행될 λ•Œλ§ˆλ‹€ StepExecution이 μƒμ„±λœλ‹€.

public class StepExecution extends Entity {
    private final JobExecution jobExecution; // ν˜„μž¬ JobExecution 정보
    private final String stepName; // Step 이름
    private volatile BatchStatus status; // Step의 μ‹€ν–‰ μƒνƒœ(COMPLETED, STARTING, STARTED ...)
    private volatile int readCount; // μ„±κ³΅μ μœΌλ‘œ 읽은 λ ˆμ½”λ“œ 수
    private volatile int writeCount; // μ„±κ³΅μ μœΌλ‘œ μ“΄ λ ˆμ½”λ“œ 수
    private volatile int commitCount; // Step의 싀행에 λŒ€ν•΄ μ»€λ°‹λœ νŠΈλžœμž­μ…˜ 수
    private volatile int rollbackCount; // Step의 싀행에 λŒ€ν•΄ 둀백된 νŠΈλžœμž­μ…˜ 수
    private volatile int readSkipCount; // 읽기에 μ‹€νŒ¨ν•΄ κ±΄λ„ˆ 띈 λ ‰μ½”λ“œ 수
    private volatile int processSkipCount; // ν”„λ‘œμ„ΈμŠ€κ°€ μ‹€νŒ¨ν•΄ κ±΄λ„ˆ 띈 λ ‰μ½”λ“œ 수
    private volatile int writeSkipCount;// 쓰기에 μ‹€νŒ¨ν•΄ κ±΄λ„ˆ 띈 λ ‰μ½”λ“œ 수
    private volatile Date startTime; // Step이 μ‹€ν–‰λœ μ‹œκ°„(null == μ‹œμž‘λ˜μ§€ μ•ŠμŒ)
    private volatile Date endTime; // Step의 μ‹€ν–‰ 성곡 여뢀와 관계 없이 λλ‚œ μ‹œκ°„
    private volatile Date lastUpdated; // λ§ˆμ§€λ§‰μœΌλ‘œ μˆ˜μ •λœ μ‹œκ°„
    private volatile ExecutionContext executionContext; // Step μ‹€ν–‰ 사이에 μœ μ§€ν•΄μ•Όν•˜λŠ” μ‚¬μš©μž 데이터
    private volatile ExitStatus exitStatus; // Step μ‹€ν–‰ 결과에 λŒ€ν•œ μƒνƒœ κ°’(UNKOWN, EXECUTING, COMPLETE, ...)
    private volatile boolean terminateOnly; // Job μ‹€ν–‰ 쀑지 μ—¬λΆ€
    private volatile int filterCount; // μ‹€ν–‰μ—μ„œ ν•„ν„°λ§λœ λ ˆμ½”λ“œ 수
    private transient volatile List<Throwable> failureExceptions; // Step 싀행쀑 λ°œμƒν•œ μ˜ˆμ™Έ 리슀트
    ...
}

Tasklet 기반

  • Tasklet은 μž„μ˜μ˜ Step을 μ‹€ν–‰ν•  λ•Œ ν•˜λ‚˜μ˜ μž‘μ—…μ„ μ²˜λ¦¬ν•˜λŠ” 방식

  • 읽기, 처리, μ“°κΈ°λ‘œ λ‚˜λ‰œ 방식이 청크 μ§€ν–₯ ν”„λ‘œμ„Έμ‹±μ΄λΌλ©΄ 이λ₯Ό 단일 μž‘μ—…μœΌλ‘œ λ§Œλ“œλŠ” κ°œλ…μ΄ Tasklet

  • νŠΈλžœμž­μ…˜ λ‚΄μ—μ„œ 둜직이 싀행될 수 μžˆλŠ” κΈ°λŠ₯을 μ œκ³΅ν•˜λŠ” μ „λž΅ μΈν„°νŽ˜μ΄μŠ€

  • org.springframework.batch.core.step.tasklet.Tasklet

Adapter

CallableTaskletAdapter

  • org.springframework.batch.core.step.tasklet.CallableTaskletAdapter

  • Callable<V> μΈν„°νŽ˜μ΄μŠ€μ˜ κ΅¬ν˜„μ²΄λ₯Ό ꡬ성할 수 있게 ν•΄μ£ΌλŠ” Adapter

    • 리턴값이 μ‘΄μž¬ν•˜κΈ° λ•Œλ¬Έμ— 곡유 객체λ₯Ό μ‚¬μš©ν•˜μ§€ μ•ŠλŠ”λ‹€.

    • 체크 μ˜ˆμ™Έλ₯Ό μ™ΈλΆ€λ‘œ 던질 수 μžˆλ‹€.

  • Step의 νŠΉμ • λ‘œμ§μ„ ν•΄λ‹Ή Step이 μ‹€ν–‰λ˜λŠ” μŠ€λ ˆλ“œκ°€ μ•„λ‹Œ λ‹€λ₯Έ μŠ€λ ˆλ“œμ—μ„œ μ‹€ν–‰ν•˜κ³  싢을 λ•Œ μ‚¬μš©

MethodInvokingTaskletAdapter

  • org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter1

  • λ‹€λ₯Έ 클래슀 λ‚΄μ˜ λ©”μ„œλ“œλ₯Ό Tasklet처럼 μ‹€ν–‰ κ°€λŠ₯

TargetMethodλŠ” ExitStatus.COMPLETED default이며, ExitStatusλ₯Ό λ°˜ν™˜ν•˜λ©΄ λ©”μ„œλ“œκ°€ λ°˜ν™˜ν•œ 값이 Taskletμ—μ„œ λ°˜ν™˜λœλ‹€.

SystemCommandTasklet

  • org.springframework.batch.core.step.tasklet.SystemCommandTasklet

  • μ‹œμŠ€ν…œ λͺ…령을 μ‹€ν–‰ν•  λ•Œ μ‚¬μš©ν•˜λ©°, μ§€μ •ν•œ μ‹œμŠ€ν…œ λͺ…령을 λΉ„λ™κΈ°λ‘œ μ‹€ν–‰ν•œλ‹€.

  • SimpleSystemProcessExitCodeMapper

Chunk 기반

https://github.com/cheese10yun/TIL/raw/master/assets/chun-process.png

Chunkλž€ μ•„μ΄ν…œμ΄ νŠΈλžœμž­μ…˜μ— commitλ˜λŠ” 수λ₯Ό λ§ν•œλ‹€.

즉, 청크 μ§€ν–₯ μ²˜λ¦¬λž€ ν•œ λ²ˆμ— ν•˜λ‚˜μ”© 데이터λ₯Ό 읽어 ChunkλΌλŠ” 덩어리λ₯Ό λ§Œλ“  λ’€, Chunk λ‹¨μœ„λ‘œ νŠΈλžœμž­μ…˜μ„ λ‹€λ£¨λŠ” 것을 μ˜λ―Έν•œλ‹€.

Chunk μ§€ν–₯ ν”„λ‘œμ„Έμ‹±μ€ 1000개의 데이터에 λŒ€ν•΄ 배치 λ‘œμ§μ„ μ‹€ν–‰ν•œλ‹€κ³  κ°€μ •ν•˜λ©΄, Chunk λ‹¨μœ„λ‘œ λ‚˜λˆ„μ§€ μ•Šμ•˜μ„ κ²½μš°μ—λŠ” ν•œκ°œλ§Œ μ‹€νŒ¨ν•΄λ„ μ„±κ³΅ν•œ 999개의 데이터가 λ‘€λ°±λœλ‹€. Chunk λ‹¨μœ„λ₯Ό 10으둜 ν•œλ‹€λ©΄, μž‘μ—… 쀑에 λ‹€λ₯Έ ChunkλŠ” 영ν–₯을 λ°›μ§€ μ•ŠλŠ”λ‹€.

μ΄λ•Œ, ChunkλŠ” 컀밋 간격(commit interval)에 μ˜ν•΄ μ •μ˜λ˜κ³  μˆ˜ν–‰ν•˜λ―€λ‘œ, 컀밋 간격에 따라 μ„±λŠ₯이 λ‹¬λΌμ§ˆ 수 μžˆλ‹€. μ΅œμƒμ˜ μ„±λŠ₯을 μ–»κΈ° μœ„ν•΄μ„œλŠ” 컀밋 간격 섀정이 μ€‘μš”ν•˜λ‹€.

ItemReader, ItemProcessor, ItemWriter 3λ‹¨κ³„λ‘œ λΉ„μ§€λ‹ˆμŠ€ λ‘œμ§μ„ 뢄리해 역할을 λͺ…ν™•ν•˜κ²Œ 뢄리할 수 μžˆλ‹€.

  • λΉ„μ¦ˆλ‹ˆμŠ€ 둜직 뢄리

  • μ½μ–΄μ˜¨ 배치 데이터와 μ“°μ—¬μ§ˆ 데이터 νƒ€μž…μ΄ λ‹€λ₯Έ κ²½μš°μ— λŒ€ν•œ λŒ€μ‘

  • 각 ChunkλŠ” 자체 νŠΈλžœμž­μ…˜μœΌλ‘œ μ‹€ν–‰λ˜λ©°, μ²˜λ¦¬μ— μ‹€νŒ¨ν•˜λ©΄ μ„±κ³΅ν•œ νŠΈλžœμž­μ…˜ 이후뢀터 λ‹€μ‹œ μ‹œμž‘ κ°€λŠ₯

κ·ΈλŸ¬λ―€λ‘œ μ½μ–΄μ˜¨ 배치의 데이터와 μ €μž₯ν•  데이터 νƒ€μž…μ΄ λ‹€λ₯Έ κ²½μš°μ— λŒ€μ‘ν•  수 μžˆλ‹€.

ItemReader

  • Step의 λŒ€μƒμ΄ λ˜λŠ” 배치 데이터(File, Xml, DB λ“±)λ₯Ό μ½μ–΄μ˜€λŠ” μΈν„°νŽ˜μ΄μŠ€

  • org.springframework.batch.item.ItemReader<T>

ItemProcessor

  • ItemReader둜 읽어 온 배치 데이터λ₯Ό λ³€ν™˜ν•˜λŠ” 역할을 μˆ˜ν–‰

  • ItemProcessorλŠ” 둜직 처리만 μˆ˜ν–‰ν•˜μ—¬ 역할을 λΆ„λ¦¬ν•˜κ³ , λͺ…ν™•ν•œ input/output을 ItemProcessor둜 κ΅¬ν˜„ν•΄λ†“μœΌλ©΄ 더 직관적인 μ½”λ“œκ°€ 될 것이닀.

  • org.springframework.batch.item.ItemProcessor<T>

ItemWriter

  • 배치 데이터(DB, File λ“±)λ₯Ό μ €μž₯ν•œλ‹€.

  • org.springframework.batch.item.ItemWriter<T>

리슀트의 데이터 μˆ˜λŠ” μ„€μ •ν•œ *청크(Chunk) λ‹¨μœ„λ‘œ λΆˆλŸ¬μ˜¨λ‹€.

청크 기반 Job μ˜ˆμ‹œ

μΌλ°˜μ μœΌλ‘œλŠ” μœ„μ™€ 같이 컀밋간격을 ν•˜λ“œ μ½”λ”©ν•΄ 크기λ₯Ό μ •μ˜ν•˜μ§€λ§Œ, 크기가 λ™μΌν•˜μ§€ μ•Šμ€ 청크λ₯Ό μ²˜λ¦¬ν•΄μ•Όν•˜λŠ” κ²½μš°λ„ μžˆλ‹€. μŠ€ν”„λ§ λ°°μΉ˜λŠ” org.springframework.batch.repeat.CompletionPolicy μΈν„°νŽ˜μ΄μŠ€λ₯Ό μ œκ³΅ν•΄ 청크가 μ™„λ£Œλ˜λŠ” μ‹œμ μ„ μ •μ˜ν•  수 μžˆλ„λ‘ μ œκ³΅ν•΄μ€€λ‹€.

CompletionPolicy

청크 μ™„λ£Œ μ—¬λΆ€λ₯Ό κ²°μ •ν•  수 μžˆλŠ” κ²°μ •λ‘œμ§μ„ κ΅¬ν˜„ν•  수 μžˆλŠ” μΈν„°νŽ˜μ΄μŠ€λ‘œ, CompletionPolicy μΈν„°νŽ˜μ΄μŠ€μ˜ κ΅¬ν˜„μ²΄μ— λŒ€ν•΄μ„œ μ•Œμ•„ λ³Ό 것이닀.

직접 κ΅¬ν˜„ν•˜λŠ” 방법

CompletionPolicyλ₯Ό κ΅¬ν˜„ν•˜μ—¬ ν•„μˆ˜ λ©”μ„œλ“œλ“€μ„ 각각 μ•Œλ§žκ²Œ λ‘œμ§μ„ κ΅¬μ„±ν•˜λ©΄λœλ‹€.

λ‹€μŒκ³Ό 같이 Random으둜 μ§€μ •λœ 수만큼 chunkκ°€ μˆ˜ν–‰λ˜λŠ” 것을 확인 ν•  수 μžˆλ‹€.

SimpleCompletionPolicy

κ°€μž₯ 기본적인 κ΅¬ν˜„μ²΄λ‘œ, 미리 ꡬ성해둔 μž„κ³—κ°’μ— λ„λ‹¬ν•˜λ©΄ 청크 μ™„λ£Œλ‘œ ν‘œμ‹œν•œλ‹€.

TimeoutTerminationPolicy

νƒ€μž„μ•„μ›ƒ 값을 ꡬ성해, 청크 λ‚΄μ—μ„œ 처리 μ‹œκ°„μ΄ μ§€μ •ν•œ μ‹œκ°„μ΄ λ„˜μœΌλ©΄ 청크가 μ™„λ£Œλœ κ²ƒμœΌλ‘œ κ°„μ£Όν•˜κ³ , λͺ¨λ“  νŠΈλžœμž­μ…˜ 처리λ₯Ό μ •μƒμ μœΌλ‘œ ν•œλ‹€λŠ” 것이닀. TimeoutTerminationPolicy 만으둜 청크 μ™„λ£Œ μ‹œμ μ„ κ²°μ •ν•˜λŠ” κ²½μš°λŠ” 거의 μ‘΄μž¬ν•˜μ§€ μ•ŠμœΌλ©°, CompositeCompletionPolicy의 μΌλΆ€λ‘œ μ‚¬μš©ν•˜λŠ” κ²½μš°κ°€ λ§Žλ‹€.

TimeoutTerminationPolicy둜 μˆ˜ν–‰ν•œ 경우 각 chunk λ‹¨μœ„λ₯Ό 확인해보면 λ‹€μŒκ³Ό 같이 제각각인 것을 λ³Ό μˆ˜μžˆλ‹€.

CompositeCompletionPolicy

CompositeCompletionPolicyλŠ” 청크 μ™„λ£Œ μ—¬λΆ€λ₯Ό κ²°μ •ν•˜λŠ” μ—¬λŸ¬ 정책을 ν•¨κ»˜ ꡬ성할 수 μžˆλ‹€. ν¬ν•¨ν•˜κ³  μžˆλŠ” μ—¬λŸ¬ μ •μ±… 쀑 ν•˜λ‚˜λΌλ„ 청크 μ™„λ£ŒλΌκ³  νŒλ‹¨λ˜λ©΄ ν•΄λ‹Ή 청크가 μ™„λ£Œλœ κ²ƒμœΌλ‘œ ν‘œμ‹œν•œλ‹€.

λ‹€μŒκ³Ό 같이 μˆ˜ν–‰ν•œ κ²½μš°μ—λŠ” chunk λ‹¨μœ„κ°€ 1000개λ₯Ό λ„˜μ–΄μ„  κ²½μš°κ°€ μ—†λŠ” 것을 확인할 수 μžˆλ‹€.

Step Listener

μŠ€νƒ­κ³Ό 청크의 μ‹œμž‘κ³Ό λμ—μ„œ νŠΉμ • λ‘œμ§μ„ μ²˜λ¦¬ν•  수 있게 ν•΄μ€€λ‹€.

(StepListenerλŠ” λͺ¨λ“  μŠ€νƒ­ λ¦¬μŠ€λ„ˆκ°€ μƒμ†ν•˜λŠ” 마컀 μΈν„°νŽ˜μ΄μŠ€μ΄λ‹€.)

λͺ¨λ“  μˆ˜μ€€μ— λ¦¬μŠ€λ„ˆλ₯Ό μ μš©ν•΄ Job을 쀑단할 수 있으며, 일반적으둜 μ „μ²˜λ¦¬λ₯Ό μˆ˜ν–‰ν•˜κ±°λ‚˜ 이후 κ²°κ³Όλ₯Ό ν‰κ°€ν•˜κ±°λ‚˜, 일뢀 였λ₯˜μ²˜λ¦¬μ—λ„ μ‚¬μš©λœλ‹€.

StepExecutionListener

  • org.springframework.batch.core.StepExecutionListener

@BeforeStep, @AfterStep μ• λ„ˆν…Œμ΄μ…˜ 제곡

ChunkListener

@BeforeChunk, @AfterChunk μ• λ„ˆν…Œμ΄μ…˜ 제곡

Step Flow

쑰건 둜직

μŠ€ν”„λ§ 배치의 Step은 StepBuilder의 .next() λ©”μ„œλ“œλ₯Ό μ‚¬μš©ν•΄ μ§€μ •ν•œ μˆœμ„œλŒ€λ‘œ μ‹€ν–‰λœλ‹€. 전이(transition)λ₯Ό ꡬ성해 결과에 λ”°λ₯Έ λ‹€λ₯Έ μˆœμ„œλ‘œ μ‹€ν–‰ν•˜λŠ” 것도 κ°€λŠ₯ν•˜λ‹€.

μŠ€ν”„λ§ λ°°μΉ˜λŠ” 기쀀에 따라 λ‘κ°œμ˜ μ™€μΌλ“œ μΉ΄λ“œλ₯Ό ν—ˆμš©ν•œλ‹€.

  • * : 0개 μ΄μƒμ˜ 문자λ₯Ό μΌμΉ˜ν•˜λŠ” 것을 의미

    • C* : COMPLETE, CORRECT

  • ? : 1개의 문자λ₯Ό 일치 μ‹œν‚€λŠ” 것을 의미

    • ?AT : CAT, KATκ³Ό μΌμΉ˜ν•˜μ§€λ§Œ, THATκ³ΌλŠ” 뢈일치

JobExecutionDecider

Job μ‹€ν–‰ 정보(jobExecution)와 μŠ€νƒ­ 싀행정보( stepExecution)λ₯Ό 인자둜 λ°›μ•„ λͺ¨λ“  정보λ₯Ό μ΄μš©ν•΄ λ‹€μŒμ— 무엇을 μˆ˜ν–‰ν• μ§€μ— λŒ€ν•΄ κ²°μ •ν•  수 μžˆλ‹€.

Job μ’…λ£Œν•˜κΈ°

μŠ€ν”„λ§ λ°°μΉ˜μ—μ„œλŠ” Job을 μ’…λ£Œν•  λ•Œ μ•„λž˜ 3κ°€μ§€ μƒνƒœλ‘œ μ’…λ£Œν•  수 μžˆλ‹€.

μƒνƒœ
μ„€λͺ…

Completed(μ™„λ£Œ)

μŠ€ν”„λ§ 배치 μ²˜λ¦¬κ°€ μ„±κ³΅μ μœΌλ‘œ μ’…λ£ŒλμŒμ„ 의미 JobInstanceκ°€ Completed둜 μ’…λ£Œλ˜λ©΄ λ™μΌν•œ νŒŒλΌλ―Έν„°λ₯Ό μ‚¬μš©ν•΄ λ‹€μ‹œ μ‹€ν–‰ν•  수 μ—†λ‹€.

Failed(μ‹€νŒ¨)

작이 μ„±κ³΅μ μœΌλ‘œ μ™„λ£Œλ˜μ§€ μ•Šμ•˜μŒμ„ 의미 Failed μƒνƒœλ‘œ μ’…λ£Œλœ μž‘μ€ μŠ€ν”„λ§ 배치λ₯Ό μ‚¬μš©ν•΄ λ™μΌν•œ νŒŒλΌλ―Έν„°λ‘œ λ‹€μ‹œ μ‹€ν–‰ν•  수 μžˆλ‹€.

Stopped(쀑지)

Stopped μƒνƒœλ‘œ μ’…λ£Œλœ μž‘μ€ λ‹€μ‹œ μˆ˜ν–‰ κ°€λŠ₯ν•˜λ‹€. Job에 였λ₯˜κ°€ λ°œμƒν•˜μ§€ μ•Šμ•˜μ–΄λ„, μ€‘λ‹¨λœ μœ„μΉ˜μ—μ„œ μž‘μ„ λ‹€μ‹œ μ‹œμž‘ν•  수 μžˆλ‹€. μ‚¬λžŒμ˜ κ°œμž…μ΄ ν•„μš”ν•˜κ±°λ‚˜ λ‹€λ₯Έ 검사/μ²˜λ¦¬κ°€ ν•„μš”ν•œ 상황에 μœ μš©ν•˜λ‹€.

BatchStatusλ₯Ό νŒλ³„ν•  λ•Œ, ExitStatusλ₯Ό ν‰κ°€ν•˜λ©΄μ„œ μ‹λ³„λœλ‹€. ExitStatusλŠ” μŠ€ν…, 청크, μž‘μ—μ„œ λ°˜ν™˜λ  수 있으며, BatchStatusλŠ” StepExecution μ΄λ‚˜ JobExecution 내에 λ³΄κ΄€λ˜λ©°, JobRepository에 μ €μž₯λœλ‹€.

Completed μƒνƒœλ‘œ μ’…λ£Œν•˜κΈ°

.end() λ©”μ„œλ“œ μ‚¬μš©

BATCH_STEP_EXECUTION ν…Œμ΄λΈ”μ— μŠ€ν…μ΄ λ°˜ν™˜ν•œ ExitStatusκ°€ μ €μž₯되며, μŠ€ν…μ΄ λ°˜ν™˜ν•œ μƒνƒœκ°€ 무엇이든 상관없이 BATCH_JOB_EXECUTION에 COMPLETEDκ°€ μ €μž₯λœλ‹€.

Failed μƒνƒœλ‘œ μ’…λ£Œν•˜κΈ°

fail() λ©”μ„œλ“œ μ‚¬μš©

μ—¬κΈ°μ„œ firstStep() 이 FAILED둜 λλ‚˜λ©΄, JobRepository 에 ν•΄λ‹Ή Job이 μ‹€νŒ¨ν•œ κ²ƒμœΌλ‘œ μ €μž₯되며, λ™μΌν•œ νŒŒλΌλ―Έν„°λ₯Ό μ‚¬μš©ν•΄ λ‹€μ‹œ μ‹€ν–‰ν•  수 μžˆλ‹€.

Stopped μƒνƒœλ‘œ μ’…λ£Œν•˜κΈ°

.stopAndRestart() λ©”μ„œλ“œλ‘œ μž‘μ„ λ‹€μ‹œ μˆ˜ν–‰ν•œλ‹€λ©΄, 미리 ꡬ성해둔 μŠ€ν…λΆ€ν„° μ‹œμž‘λœλ‹€. μ•„λž˜ μ˜ˆμ œμ—μ„œλŠ” μž¬μˆ˜ν–‰μ‹œ successStep()λΆ€ν„° μˆ˜ν–‰λ˜λŠ”κ²ƒμ„ λ³Ό 수 μžˆλ‹€.

Last updated

Was this helpful?