Job, JobParameters와 함께 배치를 실행하는 인터페이스로, 메서드는 run 한개이다.
Job을 실행하는 역할
Job.execute() 호출
Job재실행 가능 여부 검증
Job 실행 방법(현재 스레드에서 수행할지, 스레드 풀을 통해 실행할지 등)
파라미터 유효성 검증
스프링부트를 사용하면 스프링 부트가 즉시 Job를 시작하는 기능을 제공해, 일반적으로는 직접 다룰 필요 없음
publicinterfaceJobLauncher {// Job과 JobParameters를 매개변수로 받아 JobExecution 반환// 매개변수가 동일하며, 이전 JobExecution가 중단된 적이 있으면 동일한 JobExecution 반환JobExecutionrun(Job var1,JobParameters var2) throwsJobExecutionAlreadyRunningException,JobRestartException,JobInstanceAlreadyCompleteException,JobParametersInvalidException;}
JobRegistryBackgroundJobRunner : 스프링을 부트스트랩에서 기동한 자바 프로세스 내에서 Quartz나 Jmx 후크와 같은 스케줄러를 사용해 잡을 실행하면 JobReistry를 생성하게 된다. JobRegistryBackgroundJobRunner는 JobRegistry를 생성하는 데 사용
JobBuilderFactory의 get 메서드를 호출할 때마다 새로운 JobBuilder를 생성되며, 새로운 JobBuilder를 생성할 때마다 JobBuilderFactory가 생성될 때 주입받은 JobRepository를 설정하는 것을 볼 수 있다. 해당 JobBuilderFactory에서 생성되는 모든 JobBuilder가 동일한 JobRepository를 사용하는 것이다.
즉, JobBuilderFactory는 JobBuilder를 생성하는 역할만 수행한다.
JobBuilder
publicclassJobBuilderextendsJobBuilderHelper<JobBuilder> {publicJobBuilder(String name) { super(name); }// 1. step을 추가하여 가장 기본이 되는 SimpleJobBuilder 생성publicSimpleJobBuilderstart(Step step) {return (newSimpleJobBuilder(this)).start(step); }// 2. Flow를 실행할 JobFlowBuilder 생성publicJobFlowBuilderstart(Flow flow) {return (newFlowJobBuilder(this)).start(flow); }// 3. Step을 실행할 JobFlowBuilder 생성publicJobFlowBuilderflow(Step step) {return (newFlowJobBuilder(this)).start(step); }}
JobBuilder는 직접적으로 Job을 생성하는 것이 아니라 별도의 구체적인 빌더를 생성해 반환한다. 왜냐하면 경우에 따라 Job 생성 방법이 모두 다르기 떄문에 별도의 구체적인 빌더를 구현하고, 이를 통해 Job 생성이 이루어지게 한다.
중간에 빌더를 한번 더 반환하여 사용해야하지만, 메서드 체인 방식을 활용하면 손쉽게 처리할 수 있다. Job은 Step / Flow 인스턴스의 컨테이너 역할을 하기 때문에 생성 이전에 인스턴스를 전달 받는다.
SimpleJobBuilder로 Job 생성하기
@AutowiredprivateJobBuilderFactory jobBuilderFactory; @BeanpublicJobsimpleJob(){returnjobBuilderFactory.get("simpleJob") // "simpleJob" 이름을 가진 JobBuilder instance 반환.start(simpleStep()) // step을 생성하는 메서드로 생성되는 SimpleJobBuilder.build(); // build 메서드 호출로 Job 반환 }
JobInstance
배치 처리에서 Job이 실행될 때 하나의 Job 실행단위이다. 예를들어 하루에 한번 배치 Job이 실행된다면, 어제 오늘 각각 실행된 Job을 JobInstance라 부른다.
JobInstance는 이름과 논리적 실행을 위해 제공되는 고유한 식별 파라미터 모음으로 유일하게 존재한다.
ExampleGenerator 이름의 Job이 다른 파라미터로 실행될 때마다 새로운 JobInstace가 생성된다.
BATCH_JOB_INSTANCE 테이블로 관리
BATCH_JOB_EXECUTION_PARAMS 에서 실제 식별 파라미터 관리
Job을 처음 실행하면 새로운 JobInstance를 얻는다. 하지만 실행에 실패한 이후 다시 실행하면, 여전히 동일한 논리적 실행(파라미터 동일)이므로 새로운 JobInstance를 얻지 못하며, 실제 실행을 추적하기 위한 새로운 JobExecution을 얻을 것이다.
즉, JobInstance는 실패한 JobExecution과 새로 수행한 JobExecution과 같이 JobExecution을 여러 개 가질 수 있다.
JobExecution
JobInstance에 대한 한 번의 실행(실제 시도)을 나타내는 객체이다. JobExecution은 Job 실행에 대한 정보를 담고 있는 도메인 객체이며, JobInstance, 배치 실행 상태, 시작 시간, 끝난 시간, 오류 메세지 등의 정보를 담고 있다.
JobExecution은 Job이 구동될 때 마다 매번 새로운 JobExecution을 얻게된다.
BATCH_JOB_EXECUTION 테이블에 각 레코드로 저장
BATCH_JOB_EXECUTION_CONTEXT 테이블에 상태 값 저장
publicclassJobExecutionextendsEntity {privatefinalJobParameters jobParameters; //Job 실행에 필요한 매개 변수 데이터privateJobInstance jobInstance; // Job 실행의 단위가 되는 객체privatevolatileCollection<StepExecution> stepExecutions; // StepExecution을 여러개 가질 수 있는 Collection 타입privatevolatileBatchStatus status; // Job의 실행 상태(COMPLETED, STARTING, STARTED ...)privatevolatileDate startTime; // Job이 실행된 시간(null은 시작하지 않은 것)privatevolatileDate createTime; // JobExecution이 생성된 시간privatevolatileDate endTime; // JobExecution 종료 시간privatevolatileDate lastUpdated; // 마지막 수정시간privatevolatileExitStatus exitStatus; // Job 실행 결과에 대한 상태값(UNKOWN, EXECUTING, COMPLETE, ...)privatevolatileExecutionContext executionContext;// Job 실행 사이에 유지해야하는 사용자 데이터privatetransientvolatileList<Throwable> failureExceptions; // Job 실행 중 발생한 예외 privatefinalString jobConfigurationName; // Job 설정 이름...}
JobParameters
Job이 실행될 때 필요한 파라미터들을 Map 타입으로 지정하는 객체로 JobInstance(1:1 관계)를 구분하는 기준이 되기도 한다.
하나의 Job을 생성할 때 시작 시간 등의 정보를 파라미터로 해서 하나의 JobInstance를 생성한다. 즉, 1:1 관계이다.
파라미터는 key=value로 이루어져있다.
JobParameters는 Map<String,JobParameter>의 wrapper에 불과하다.
식별에 사용하지 않는 파라미터도 있을 수 있다. 식별에 사용하고 싶지 않는 파라미터는 앞에 -를 다음과 같이 붙여주면 된다.
executionDate(date)=2021/11/27-filename=test
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={executionDate=1637938800000, filename=test}. If you want to run this job again, change the parameters.
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:139) ~[spring-batch-core-4.3.3.jar:4.3.3]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.12.jar:5.3.12]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.12.jar:5.3.12]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.12.jar:5.3.12]
다음과 같이 -를 붙인 파라미터는 식별 파라미터로 사용하지 않아, 기존에 이미 수행된 Job으로 실패한 것을 알 수 있다.
파라미터에 접근하는 방법
ChunkContext 인스턴스
실행 시점의 Job 상태 제공
tasklet 내에서 처리중인 chunk와 관련된 정보(스탭 및 잡과 관련된 정보 포함) 제공
publicinterfaceJobParametersValidator { /** * Check the parameters meet whatever requirements are appropriate, and * throw an exception if not. * * @param parameters some {@link JobParameters} (can be {@code null}) * @throwsJobParametersInvalidException if the parameters are invalid */voidvalidate(@NullableJobParameters parameters) throwsJobParametersInvalidException;}
JobParametersValidator 인터페이스를 구현하고, 해당 구현체를 잡 내에 구성해 파라미터 유효성 검증을 할 수 있다.
스프링은 필수 파라미터가 누락없이 전달됐는지 확인하는 DefaultJobParametersValidator를 기본적으로 제공해준다.
DefaultJobParametersValidator 사용한 유효성 검증 : 파라미터 존재 여부를 제외한 다른 유효성 검증은 수행하지 않음.
@BeanpublicJobParametersValidatorvalidator() {DefaultJobParametersValidator validator =newDefaultJobParametersValidator();validator.setRequiredKeys(newString[] {"executionDate","fileName"}); // 필수 파라미터 확인validator.setOptionalKeys(newString[] {"name"}); // 선택 파라미터validator.afterPropertiesSet(); // 선택 파라미터에 필수 파라미터가 포함되지 않았는지 확인return validator; }
필수 파라미터 미포함시 오류
Caused by: org.springframework.batch.core.JobParametersInvalidException: The JobParameters do not contain required keys: [fileName]
at org.springframework.batch.core.job.DefaultJobParametersValidator.validate(DefaultJobParametersValidator.java:120) ~[spring-batch-core-4.3.3.jar:4.3.3]
선택 파라미터와 필수 파라미터 겹치는 경우 오류
Caused by: java.lang.IllegalStateException: Optional keys cannot be required: fileName
at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.12.jar:5.3.12]
at org.springframework.batch.core.job.DefaultJobParametersValidator.afterPropertiesSet(DefaultJobParametersValidator.java:73) ~[spring-batch-core-4.3.3.jar:4.3.3]
선택 파라미터와 필수 파라미터에 포함되지 않은 파라미터 전송시 오류
Caused by: org.springframework.batch.core.JobParametersInvalidException: The JobParameters contains keys that are not explicitly optional or required: [displyYn]
at org.springframework.batch.core.job.DefaultJobParametersValidator.validate(DefaultJobParametersValidator.java:107) ~[spring-batch-core-4.3.3.jar:4.3.3]
커스텀 유효성 검증 : 파라미터 존재 여부 외의 추가 유효성 검증이 필요한 경우 커스텀 JobParametersValidator 구현 필요
publicinterfaceJobParametersIncrementer { /** * Increment the provided parameters. If the input is empty, then this * should return a bootstrap or initial value to be used on the first * instance of a job. * * @param parameters the last value used * @return the next value to use (never {@code null}) */JobParametersgetNext(@NullableJobParameters parameters);}
JobParametersIncrementer는 사용할 파라미터를 고유하게 생성할 수 있게 스프링 배치가 제공하는 인터페이스로 매 실행 시 timestamp를 추가하거나 파라미터를 증가시켜야하는 경우에 사용하기 적합하다.
@BeanpublicJobjob() {// jobBuilderFactory.get("잡이름")returnthis.jobBuilderFactory.get("basicJob").start(step1()).validator(validator()).incrementer(newRunIdIncrementer()).next(step2()).build(); // 실제 job 생성 }
JobListener
모든 Job은 생명주기를 가지고 있으며, 스프링 배치는 생명주기의 특정 시점에서 로직을 추가할 수 있게 기능을 제공해준다.
JobExecutionListener : Job 실행과 관련된 리스너 인터페이스
publicinterfaceJobExecutionListener { /** * Callback before a job executes. * * @param jobExecution the current {@link JobExecution} */voidbeforeJob(JobExecution jobExecution); /** * Callback after completion of a job. Called after both both successful and * failed executions. To perform logic on a particular status, use * "if (jobExecution.getStatus() == BatchStatus.X)". * * @param jobExecution the current {@link JobExecution} */voidafterJob(JobExecution jobExecution);}
beforeJob : Job 수행 이전에 수행
afterJob : Job 수행완료 후 수행하며 Job의 완료 상태와 상관 없이 호출된다.
Job Listener를 작성하는데 두가지 방법이 있다.
JobExecutionListener 인터페이스 구현
publicclassJobLoggerListenerimplementsJobExecutionListener {privatestaticString START_MESSAGE ="%s is beginning execution";privatestaticString END_MESSAGE ="%s has completed with the status %s"; @OverridepublicvoidbeforeJob(JobExecution jobExecution) {System.out.println(String.format(START_MESSAGE,jobExecution.getJobInstance().getJobName())); } @OverridepublicvoidafterJob(JobExecution jobExecution) {System.out.println(String.format(END_MESSAGE,jobExecution.getJobInstance().getJobName(),jobExecution.getStatus())); }}
JobBuilder 의 listener 메서드를 호출하면 다음과 같이 Job수행 전후로 수행된 것을 볼 수 있다.
returnthis.jobBuilderFactory.get("basicJob").start(step1()).validator(validator()).incrementer(newDailyJobTimestamper()).listener(newJobLoggerListener()).next(step2()).build(); // 실제 job 생성
2021-11-16 23:35:57.266 INFO 80890 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=basicJob]] launched with the following parameters: [{name=faker, executionDate=1637073357082, fileName=test4.csv, run.id=3}]
basicJob is beginning execution
...
2021-11-16 23:35:57.656 INFO 80890 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step2] executed in 129ms
basicJob has completed with the status COMPLETED
어노테이션 사용(@BeforeJob, @AfterJob) : 인터페이스를 구현할 필요 없이 어노테이션만으로 구현하면 된다.
publicclassJobLoggerListener {privatestaticString START_MESSAGE ="%s is beginning execution";privatestaticString END_MESSAGE ="%s has completed with the status %s"; @BeforeJobpublicvoidbeforeJob(JobExecution jobExecution) {System.out.println(String.format(START_MESSAGE,jobExecution.getJobInstance().getJobName())); } @AfterJobpublicvoidafterJob(JobExecution jobExecution) {System.out.println(String.format(END_MESSAGE,jobExecution.getJobInstance().getJobName(),jobExecution.getStatus())); }}
어노테이션으로 구현하는 경우 JobListenerFactoryBean 으로 리스너를 주입할 수 있다.
returnthis.jobBuilderFactory.get("basicJob").start(step1()).validator(validator()).incrementer(newDailyJobTimestamper()).listener(JobListenerFactoryBean.getListener(newJobLoggerListener())).next(step2()).build(); // 실제 job 생성
ExecutionContext
ExecutionContext는 배치 잡의 세션으로, 간단한 키-값을 보관한다. 이때 Job의 상태를 안전하게 보관할 수 있게 제공해준다. Job을 수행하는 과정에서 여러개의 ExecutionContext가 존재할 수 있다.
Job에 대한 상태 : JobExecution의 ExecutionContext에 저장
Step에 대한 상태 : StepExecution의 ExecutionContext
이렇게 각 Step용 데이터와 Job 전체용 데이터와 같이 데이터 사용 범위를 지정할 수 있다.
ExecutionContext가 담고있는 모든 데이터는 JopRepository에 저장된다.