public class JpaPagingItemReader<T>
extends AbstractPagingItemReader<T>
ItemReader for reading database records built on top of JPA.
It executes the JPQL setQueryString(String) to retrieve requested data. The query is executed using paged requests of a size specified in AbstractPagingItemReader.setPageSize(int). Additional pages are requested when needed as AbstractItemCountingItemStreamItemReader.read() method is called, returning an object corresponding to current position.
The performance of the paging depends on the JPA implementation and its use of database specific features to limit the number of returned rows.
Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
In order to reduce the memory usage for large results the persistence context is flushed and cleared after each page is read. This causes any entities read to be detached. If you make changes to the entities and want the changes persisted then you must explicitly merge the entities.
The reader must be configured with an EntityManagerFactory. All entity access is performed within a new transaction, independent of any existing Spring managed transactions.
The implementation is thread-safe in between calls to AbstractItemCountingItemStreamItemReader.open(ExecutionContext), but remember to use saveState=false if used in a multi-threaded client (no restart available).
public class JpaCursorItemReader<T>
extends AbstractItemCountingItemStreamItemReader<T>
implements org.springframework.beans.factory.InitializingBean
ItemStreamReader implementation based on JPA Query.getResultStream(). It executes the JPQL query when initialized and iterates over the result set as AbstractItemCountingItemStreamItemReader.read() method is called, returning an object corresponding to the current row. The query can be set directly using setQueryString(String), or using a query provider via setQueryProvider(JpaQueryProvider). The implementation is not thread-safe.
여기서 또 한가지 알아둬야할 점이 있다. 멀티쓰레드로 각 chunk들이 개별로 진행되는 경우 실패 지점에서 재시작하는 것이 불가능하다. 왜냐하면, 멀티쓰레드의 경우 1n개의 chunk가 동시에 실행되며, 5번째 chunk가 실패했다고 해서 14 chunk가 모두 성공했다는 보장이 없다.
그래서 멀티쓰레드 적용시 일반적으로 ItemReader의 saveState 옵션을 false로 설정한다.
위에서 봤듯이 PagingItemReader는 thread-safe한 것을 알 수 있다. 멀티쓰레드로 수행하는 배치가 있다면, DB접근시 PagingItemReader를 사용하는 것을 권장한다.
application.yml
spring:datasource:hikari:driver-class-name:com.mysql.cj.jdbc.Driverjdbc-url:jdbc:mysql://localhost:3306/spring_batchusername:springpassword:Springtest2021!maximum-pool-size:10# pool에 유지할 최대 connection 수auto-commit:false# 자동 commit 여부
main() 메서드 설정
@SpringBootApplication(exclude =DataSourceAutoConfiguration.class)publicclassSpringBatchRealApplication {publicstaticvoidmain(String[] args) {// main thread가 종료되면 jvm 강제 종료// main thread가 종료됐다는 것은 자식 thread도 모두 종료됐다는 것을 보장System.exit(SpringApplication.exit(SpringApplication.run(SpringBatchRealApplication.class, args))); }}
Job 구현
@Bean(JOB_NAME)publicJobjob() {returnthis.jobBuilderFactory.get(JOB_NAME).incrementer(newRunIdIncrementer()).start(step()).build(); } @Bean(JOB_NAME +"Step")publicStepstep() {returnthis.stepBuilderFactory.get(JOB_NAME +"Step").<Ncustomer, Ncustomer>chunk(chunkSize).reader(reader(null)).writer(writer()).taskExecutor(executor()).throttleLimit(poolSize) // default : 4, 생성된 쓰레드 중 몇개를 실제 작업에 사용할지 결정.build(); } @Bean(JOB_NAME +"TaskPool")publicTaskExecutorexecutor() {// 쓰레드 풀을 이용한 쓰레드 관리 방식ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();executor.setCorePoolSize(poolSize); // 풀의 기본 사이즈executor.setMaxPoolSize(poolSize); // 풀의 최대 사이즈executor.setThreadGroupName("multi-thread-");executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);// allowCoreThreadTimeOut을 true로 설정해// core thread 가 일정시간 태스크를 받지 않을 경우 pool 에서 정리하고,// 모든 자식 스레드가 정리되면 jvm 도 종료 되게 설정한다.executor.setKeepAliveSeconds(30);executor.setAllowCoreThreadTimeOut(true);executor.initialize();return executor; } @Bean(JOB_NAME +"Reader")publicJdbcPagingItemReader<Ncustomer>reader(PagingQueryProvider pagingQueryProvider) {returnnewJdbcPagingItemReaderBuilder<Ncustomer>().name("customerJdbcPagingItemReader") // Reader의 이름, ExecutionContext에 저장되어질 이름.dataSource(dataSource) // DB에 접근하기 위해 사용할 DataSource객체.queryProvider(pagingQueryProvider) // PagingQueryProvider.pageSize(10) // 각 페이지 크기.rowMapper(newBeanPropertyRowMapper<>(Ncustomer.class)) // 쿼리 결과를 인스턴스로 매핑하기 위한 매퍼.saveState(false) // Reader가 실패한 지점을 저장하지 않도록 설정.build(); }
여기서 핵심은 TaskExecutor을 구현하는 부분과 Step실행시 .saveState(false)로 설정하여, Reader가 실패한 지점을 저장하지 않고, 실패시 다시 처음부터 실행하도록 하는 것이다. (다른 thread들의 성공을 보장하지 않으므로!)
TaskExecutor 구현시에는 allowCoreThreadTimeOut을 설정해 특정시간(KeepAliveSeconds)이후에 사용하지 않으면 종료되도록 설정한다.
다음 Job을 수행하면 각 thread별로 병렬로 수행되는 것을 확인할 수 있다.
Not Thread Safety
CursorItemReader
cursorItemReader의 경우에는 thread safety를 보장하지 않는다. Reader 영역을 SynchronizedItemStreamReader로 wrapping하여 thread safety하게 구현할 수 있다.
@Bean(JOB_NAME +"Reader")publicSynchronizedItemStreamReader<Ncustomer>reader() {String sql ="SELECT N.CUSTOMER_ID"+", CONCAT(N.LAST_NAME, \" \", N.FIRST_NAME) AS FULL_NAME\n"+" , N.ADDRESS1 AS ADDRESS\n"+", N.POSTAL_CODE\n"+"FROM NCUSTOMER N\n"+"LIMIT 55";JdbcCursorItemReader itemReader =newJdbcCursorItemReaderBuilder<Ncustomer>().name(JOB_NAME +"Reader") // Reader의 이름, ExecutionContext에 저장되어질 이름.dataSource(dataSource) // DB에 접근하기 위해 사용할 DataSource객체.rowMapper(newBeanPropertyRowMapper<>(Ncustomer.class)) // 쿼리 결과를 인스턴스로 매핑하기 위한 매퍼.sql(sql).saveState(false) // Reader가 실패한 지점을 저장하지 않도록 설정.build();returnnewSynchronizedItemStreamReaderBuilder<Ncustomer>().delegate(itemReader).build(); }
SynchronizedItemStreamReader의 delegate에 수행하고 싶은 CursorItemReader를 등록해주면 된다.
publicclassSynchronizedItemStreamReader<T> implementsItemStreamReader<T>,InitializingBean {privateItemStreamReader<T> delegate;publicvoidsetDelegate(ItemStreamReader<T> delegate) {this.delegate= delegate; } /** * This delegates to the read method of the <code>delegate</code> */ @NullablepublicsynchronizedTread() throwsException,UnexpectedInputException,ParseException,NonTransientResourceException {returnthis.delegate.read(); }
SynchronizedItemStreamReader의 read() 메서드를 보면 synchronized 메서드로 감싸 동기화된 읽기가 가능하다.
Flow는 자체 스레드에서 진행되며, 여러 Flow를 병렬로 실행할 수 있다. FlowBuilder의 .split() 메서드는 TaskExecutor를 받아 각 Flow는 자체 스레드에서 실행된다. 위와 같이 구현하면 각 step이나 step의 flow를 병렬로 수행할 수 있다.
각 Step들이 병렬로 수행되는 것을 볼 수 있으며, 이때 모든 Flow가 종료되면 Job이 종료된다.
AsyncItemProcessor와 AsyncItemWriter
특정 배치 처리의 병목구간이 ItemProcessor에 존재하는 경우가 있는데, 그런 경우 Step의 ItemProcessor 부분만 별도 스레드에서 실행하여 성능을 향상시킬 수 있다. AsyncItemProcessor는 ItemProcessor 구현체를 래핑하는 데코레이터로, 새로운 스레드에서 ItemProcessor 로직을 수행하며, 처리를 완료하고 난 후 결과값으로 반환된 Future를 AsyncItemWriter로 전달한다. AsyncItemWriter 또한, ItemWrite 구현체를 래핑한 데코레이터이다. AsyncItemWriter는 Future를 처리한 후 그 결과를 ItemWriter에 전달한다. 그러므로 반드시 AsyncItemProcessor와 AsyncItemWriter는 함께 사용해야한다.
위 의존성을 추가해주어야 AsyncItemProcessor와 AsyncItemWriter를 사용할 수 있다.
example
@Bean(JOB_NAME +"AccountProcessor")publicItemProcessor<Naccount, Naccount>processor() {return (account)->{Thread.sleep(5);; // 비동기 처리 이전 일부러 늦춤return account; }; }
프로세서 병목현상을 구현하기 위해 임의로 5밀리초씩 쉬도록 만들었다. 그 후 배치를 수행해보면,
2021-12-13 20:24:14.945 INFO 36579 --- [ main] d.e.s.SpringBatchRealApplication : Starting SpringBatchRealApplication using Java 17.0.1 on
...(생략)
2021-12-13 20:24:23.155 INFO 36579 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2021-12-13 20:24:23.159 INFO 36579 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
AysncItemProcessor의 setDelegate() 메서드에 래핑할 ItemProcessor를 지정해주고, 별도 스레드에서 실행하기 위한 TaskExecutor를 설정해준다. 그 후 AsyncItemWriter를 구현하여 Future 를 받아와 쓰기 작업을 할 수 있도록 구현한다. 수행할 ItemWriter를 setDelegate 메서드에 지정해준다.
chunk메서드 처리시 AsyncItemProcessor가 반환하는 타입인 Future<T>로 반환 타입을 변경해주면된다. 그 후 배치를 수행해보면,
2021-12-13 20:28:11.286 INFO 36626 --- [ main] d.e.s.SpringBatchRealApplication : Starting SpringBatchRealApplication using Java 17.0.1 on PID 36626
...(생략)
2021-12-13 20:28:15.158 INFO 36626 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2021-12-13 20:28:15.162 INFO 36626 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
약 4초만에 수행이 완료된 것을 확인할 수 있다.
Partitioning
배치 기반 워크로드의 대부분은 I/O에 있다. 스프링 배치는 여러 개의 워커가 완전한 스텝을 실행할 수 있도록 기능을 제공한다.
파티셔닝은 Master Step이 대량의 데이터를 처리하기 위해 지정된 수의 Worker Step으로 일을 분할하여 처리하는 방식을 말한다. 대량의 데이터를 더 작은 파티션으로 나누어 각 워커가 나눠진 파티션을 병렬로 처리하는 것이다. 각 워커는 자체적으로 읽기(ItemReader), 처리(ItemProcessor), 쓰기(ItemWriter) 등을 담당하는 온전한 배치 Step이며, 재시작과 같이 스프링 배치가 기본적으로 제공하는 모든 기능을 사용할 수 있다.
Multi-threaded Step vs Partitioning
멀티쓰레드 Step은 단일 Step을 Chunk 단위로 쓰레드를 생성해 분할 처리
어떤 쓰레드에서 어떤 데이터들을 처리할지에 대한 세밀한 조정이 불가능
해당 Step의 ItemReader/ItemWriter등이 멀티쓰레드 환경을 지원하는지 유무가 중요함
파티셔닝은 독립적인 Step(Worker Step)을 구성하고, 각각 별도의 StepExecution 파라미터 환경을 가지고 처리
멀티쓰레드로 동작하지만, ItemReader/ItemWriter이 멀티쓰레드 환경을 지원하는지 중요하지 않다.
주요 인터페이스
Partitioner
Partitioner는 파티셔닝할 데이터를 여러 파티션으로 나누는 역할을 한다. 즉, 파티셔닝된 Worker Step을 위한 StepExecution을 생성하는 인터페이스다.
2021-12-13 22:48:47.105 INFO 38038 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step.manager]
Hibernate: select min(naccount0_.account_id) as col_0_0_ from naccount naccount0_ where naccount0_.last_statement_date between ? and ?
Hibernate: select max(naccount0_.account_id) as col_0_0_ from naccount naccount0_ where naccount0_.last_statement_date between ? and ?
2021-12-13 22:48:47.178 INFO 38038 --- [tition-thread-1] d.e.s.j.m.PartitioningConfiguration : reader minId=601, maxId=800
2021-12-13 22:48:47.179 INFO 38038 --- [tition-thread-4] d.e.s.j.m.PartitioningConfiguration : reader minId=801, maxId=1000
2021-12-13 22:48:47.179 INFO 38038 --- [tition-thread-2] d.e.s.j.m.PartitioningConfiguration : reader minId=201, maxId=400
2021-12-13 22:48:47.180 INFO 38038 --- [tition-thread-3] d.e.s.j.m.PartitioningConfiguration : reader minId=401, maxId=600
2021-12-13 22:48:47.180 INFO 38038 --- [tition-thread-5] d.e.s.j.m.PartitioningConfiguration : reader minId=1, maxId=200
H
다음과 같이 각 파티션별로 수행할 StepExecution이 정상적으로 잘 구현된 것을 확인할 수 있다.
Step 구현
@Bean(JOB_NAME +"StepManager")publicStepstepManager() {// 파티셔닝 대상 step과 이름을 연관지어 지어준다. (여러개의 step이 있을 수도 있으므로)returnthis.stepBuilderFactory.get("step.manager").partitioner("step",partitioner(null,null)) // Partitioner 구현체 등록.step(step()) // 파티셔닝될 Step등록.partitionHandler(partitionHandler()) // PartitionHandler 등록.build(); } @Bean(JOB_NAME +"Step")publicStepstep() {returnthis.stepBuilderFactory.get(JOB_NAME +"Step").<Naccount, Naccount>chunk(chunkSize).reader(reader(null,null)).writer(writer(null,null)).build(); }
partitioner() 메서드를 이용해 step에 사용될 Partitioner 구현체를 등록해주고, 파티셔닝될 Step을 .step() 메서드에 등록해준다. 그리고 사용할 partitionHandler()를 등록해주면 master step구현이 완료된다.
ItemReader/ItemWriter
@Bean(JOB_NAME +"Reader") @StepScopepublicJpaPagingItemReader<Naccount>reader(@Value("#{stepExecutionContext[minId]}") Long minId, @Value("#{stepExecutionContext[maxId]}") Long maxId) {Map<String,Object> params =newHashMap<>();params.put("minId", minId);params.put("maxId", maxId);log.info("reader minId={}, maxId={}", minId, maxId);returnnewJpaPagingItemReaderBuilder<Naccount>().name(JOB_NAME +"Reader").entityManagerFactory(entityManagerFactory).pageSize(chunkSize).queryString("SELECT n FROM Naccount n WHERE n.accountId BETWEEN :minId AND :maxId").parameterValues(params).build(); } @Bean(JOB_NAME +"Writer") @StepScopepublicItemWriter<Naccount>writer(@Value("#{stepExecutionContext[minId]}") Long minId, @Value("#{stepExecutionContext[maxId]}") Long maxId) {log.info("minId : {}, maxId: {}", minId, maxId);return (items) ->items.forEach(System.out::println); }
다음과 같이 #{stepExecutionContext['변수명']}으로 위 Partitioner에서 각 파티션 별로 지정한 StepExecution 변수를 가져올 수 있다. 해당 변수를 전달받아 각 파티션 별로 처리하도록 구현하면 된다.