package org.springframework.batch.item;
public interface ItemReader<T> {
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
ItemReader의 read()를 호출하면, 해당 메서드는 스텝 내에서 처리할 Item한개를 반환하며, 스텝에서는 아이템 개수를 세어 청크 내 데이터가 몇개가 처리됐는지 관리한다. 해당 Item은 ItemProcessor로 전달되며, 그 뒤 ItemWriter로 전달된다.
가장 대표적인 구현체인 JdbcPagingItemReader의 클래스 계층 구조를 보면 다음과 같다.
여기서 ItemReader와 ItemStream 인터페이스도 같이 구현하고 있는 것을 볼 수 있다.
public interface ItemStream {
void open(ExecutionContext var1) throws ItemStreamException;
// Batch의 처리 상태 업데이트
void update(ExecutionContext var1) throws ItemStreamException;
void close() throws ItemStreamException;
}
ItemStream은 주기적으로 상태를 저장하고, 오류가 발생하면 해당 상태에서 복원하기 위한 마커인터페이스이다. 즉, ItemReader 의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할을 한다. ItemReader와 ItemStream 인터페이스를 직접 구현하여 원하는 형태의 ItemReader를 만들 수 있다.
각 줄의 마지막을 정의하는데 사용
별도로 지정하지 않으면 개행 문자가 레코드의 끝 부분을 나타낸다.
resource
Resource
null(필수)
읽을 대상 리소스
skippedLinesCallback
LineCallbackHandler
null
줄을 건너뛸 떄 호출되는 콜백 인터페이스
건너띈 모든 줄은 이 콜백이 호출된다.
strict
boolean
false
true로 지정시, 리소스를 찾을 수 없는 경우 Exception을 던진다.
saveState
boolean
true
true : 재시작 가능하도록 각 청크 처리 후 ItemReader 상태 저장
false : 다중 스레드 환경에선 false 지정
고정된 너비 파일
Aimee CHoover 7341Vel Avenue Mobile AL35928
Jonas UGilbert 8852In St. Saint Paul MN57321
Regan MBaxter 4851Nec Av. Gulfport MS33193
Octavius TJohnson 7418Cum Road Houston TX51507
Sydnee NRobinson 894 Ornare. Ave Olathe KS25606
Stuart KMckenzie 5529Orci Av. Nampa ID18562
@Bean
@StepScope
public FlatFileItemReader<Customer> customerItemReader(@Value("#{jobParameters['customerFile']}") PathResource inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader") // 각 스텝의 ExecutionContext에 추가되는 특정키의 접두문자로 사용될 이름(saveState false인 경우 지정할 필요X)
.resource(inputFile)
.fixedLength() // FixedLengthBuilder
.columns(new Range[]{new Range(1,11), new Range(12,12), new Range(13,22), new Range(23,26)
, new Range(27,46), new Range(47,62), new Range(63,64), new Range(65,69)}) // 고정너비
.names(new String[]{"firstName", "middleInitial", "lastName", "addressNumber", "street"
, "city", "state", "zipCode"}) // 각 컬럼명
// .strict(false) // 정의된 파싱 정보 보다 많은 항목이 레코드에 있는 경우(true 예외)
.targetType(Customer.class) // BeanWrapperFieldSetMapper 생성해 도메인 클레스에 값을 채움
.build();
}
MultiResourceItemReader는 읽어야할 파일명의 패턴을 MultiResourceItemReader의 의존성으로 정의한다.
public class MultiResourceCustomerFileReader implements ResourceAwareItemReaderItemStream<Customer> {
private Object curItem = null;
private ResourceAwareItemReaderItemStream<Object> delegate;
public MultiResourceCustomerFileReader(ResourceAwareItemReaderItemStream<Object> delegate) {
this.delegate = delegate;
}
/**
* Resource를 주입함으로 ItemReader가 파일 관리하는 대신
* 각 파일을 스프링 배치가 생성해 주입
* @param resource
*/
@Override
public void setResource(Resource resource) {
System.out.println(resource);
this.delegate.setResource(resource);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (curItem == null) {
curItem = delegate.read(); // 고객 정보를 읽음.
}
Customer item = (Customer) curItem;
curItem = null;
if (item != null) {
item.setTransactions(new ArrayList<>());
// 다음 고객 레코드를 만나기 전까지 거래내역 레코드를 읽는다.
while (peek() instanceof Transaction) {
item.getTransactions().add((Transaction) curItem);
curItem = null;
}
}
return item;
}
private Object peek() throws Exception {
if (curItem == null) {
curItem = delegate.read();
}
return curItem;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
delegate.open(executionContext);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
delegate.update(executionContext);
}
@Override
public void close() throws ItemStreamException {
delegate.close();
}
}
위에서 다룬 ItemStreamReader 와 다른 점은 Resource 주입부분이다. Resource를 주입하게 되면 필요한 각 파일을 스프링 배치가 생성해 ItemReader에 주입할 수 있다.
@Bean
@StepScope
public MultiResourceItemReader multiResourceItemReader(@Value("#{jobParameters['customFile']}") Resource[] resources) {
return new MultiResourceItemReaderBuilder<>()
.name("multiResourceItemReader")
.resources(resources) // resources 배열
.delegate(multiResourceCustomerFileReader()) // 실제 작업을 수행할 위임 컴포넌트
.build();
}
@Bean
public MultiResourceCustomerFileReader multiResourceCustomerFileReader() {
return new MultiResourceCustomerFileReader(multiResourceCustomerItemReader());
}
@Bean
@StepScope
public FlatFileItemReader multiResourceCustomerItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("multiResourceCustomerItemReader")
.lineMapper(multiResourceTokenizer())
.build();
}
읽어야할 파일 목록(resources)을 설정해주고, delegate()에 실제로 작업을 수행할 위임 컴포넌트를 지정해주면된다.
여러 개의 파일을 다룰때는 재시작을 하게되는 상황에서 스프링배치가 추가적인 안정장치를 제공해주지 않는다. 예를들어 file1.csv, file2.csv, file3.csv가 있는데, file2.csv 처리하는 과정에서 오류가 발생하여 잡이 실패 된 이후 재시작을 할때 file4.csv를 추가한다면, 최초 실행시 file4.csv가 없었음에도 불구하고, 포함하여 실행한다.
이러한 문제점을 해결하기 위해서 배치 실행 시 사용할 디렉터리를 별도로 생성하는 것이 일반적이며, 새로 생성된 모든 파일은 새로운 디렉터리에 넣어주어 현재 수행중인 잡에 영향을 주지않게 할 수 있다.
XML
XML은 파일 내 데이터를 설명할 수 있는 태그를 사용해 파일에 포함된 데이터를 설명하므로, Flat file과는 다르다.
@Bean
@StepScope
public JsonItemReader<Customer> jsonFileReader(@Value("#{jobParameters['customFile']}") Resource resource) {
// Jackson이 JSON을 읽고 쓰는데 사용하는 주요 클래스
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"));
JacksonJsonObjectReader<Customer> jsonObjectReader = new JacksonJsonObjectReader<>(Customer.class); // 반환할 클래스 설정
jsonObjectReader.setMapper(objectMapper); // ObjectMapper
return new JsonItemReaderBuilder<Customer>()
.name("jsonFileReader")
.jsonObjectReader(jsonObjectReader) // 파싱에 사용
.resource(resource)
.build();
}
ObjectMapper는 Jackson이 JSON을 읽고 쓰는데 사용하는 주요 클래스로 커스텀 데이터 포맷들을 설정하면된다.
JsonItemReaderBuilder는 파싱에 사용할 JsonObjectReader를 설정해주면된다.
Database Reader
Cursor는 표준 java.sql.ResultSet으로 구현되며, ResultSet이 open되면 next() 메서드를 호출할 때마다 데이터베이스에서 배치 레코드를 가져와 반환한다.
즉, Cursor 방식은 DB와 커넥션을 맺은 후, Cursor를 한칸씩 옮기면서 지속적으로 데이터를 가져온다.
(CursorItemReader는 streaming으로 데이터를 처리)
Cursor는 하나의 Connection으로 Batch가 끝날때가지 사용되기 때문에 Batch가 끝나기전에 DB와 어플리케이션 Connection이 끊어질 수 있으므로, DB와 SocketTimeout을 충분히 큰 값으로 설정해야한다. (네트워크 오버헤드 추가) 추가로 ResultSet은 스레드 안전이 보장되지 않아 다중 스레드 환경에서는 사용할 수 없다.
JdbcCursorItemReader
HibernateCursorItemReader
StoredProcedureItemReader
Paging 방식은 한번에 지정한 PageSize만큼 데이터를 가져온다.
SpringBatch에서 offset과 limit을 PageSize에 맞게 자동으로 생성해준다. 다만 각 쿼리는 개별적으로 실행되므로, 페이징시 결과를 정렬(order by)하는 것이 중요하다.
Batch 수행시간이 오래 걸리는 경우에는 PagingItemReader를 사용하는 것이 좋다. Paging의 경우 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있다.
/**
* --job.name=jdbcCursorItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class JdbcCursorCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job jdbcCursorItemReaderJob(){
return jobBuilderFactory.get("jdbcCursorItemReaderJob")
.start(jdbcCursorItemReaderStep())
.build();
}
@Bean
public Step jdbcCursorItemReaderStep(){
return stepBuilderFactory.get("jdbcCursorItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerJdbcCursorItemReader())
.writer(customerJdbcCursorItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Customer> customerJdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder<Customer>()
.name("customerJdbcCursorItemReader")
.dataSource(dataSource)
.sql("select * from customer where city = ?")
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.preparedStatementSetter(citySetter(null)) // 파라미터 설정
.build();
}
/**
* 파라미터를 SQL문에 매핑
* ArgumentPreparedStatementSetter는 객체 배열에 담긴 순서대로 ?의 위치에 값으로 설정
* @param city
* @return
*/
@Bean
@StepScope
public ArgumentPreparedStatementSetter citySetter(@Value("#{jobParameters['city']}") String city) {
return new ArgumentPreparedStatementSetter(new Object[]{city});
}
@Bean
public ItemWriter customerJdbcCursorItemWriter() {
return (items) -> items.forEach(System.out::println);
}
}
<T, T> chunk(int chunkSize) : 첫번째 T는 Reader에서 반환할 타입, 두번째 T는 Writer에 파라미터로 넘어올 타입이다.
fetchSize : DB에서 한번에 가져올 데이터 양을 나타낸다. Paging은 실제 쿼리를 limit, offset으로 분할 처리하는 반면, Cursor는 분할 처리없이 실행되나 내부적으로 가져온는 데이터는 FetchSize만큼 가져와 read()를 통해서 하나씩 가져온다.
dataSource : DB에 접근하기 위해 사용할 DataSource객체
rowMapper : 쿼리 결과를 인스턴스로 매핑하기 위한 매퍼
BeanPropertyRowMapper 를 사용해 도메인 객체와 매핑해준다.
sql : Reader에서 사용할 쿼리문
preparedStatementSetter: SQL문의 파라미터 설정
name : Reader의 이름, ExecutionContext에 저장되어질 이름
JdbcPagingItemReader
/**
* --job.name=jdbcPagingItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class JdbcPagingCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job jdbcPagingItemReaderJob(){
return jobBuilderFactory.get("jdbcPagingItemReaderJob")
.start(jdbcPagingItemReaderStep())
.build();
}
@Bean
public Step jdbcPagingItemReaderStep(){
return stepBuilderFactory.get("jdbcPagingItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerJdbcPagingItemReader(null, null))
.writer(customerJdbcPagingItemWriter())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> customerJdbcPagingItemReader(
PagingQueryProvider pagingQueryProvider, @Value("#{jobParameters['city']}") String city) {
Map<String, Object> params = new HashMap<>(1);
params.put("city", city);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("customerJdbcPagingItemReader") // Reader의 이름, ExecutionContext에 저장되어질 이름
.dataSource(dataSource) // DB에 접근하기 위해 사용할 DataSource객체
.queryProvider(pagingQueryProvider) // PagingQueryProvider
.parameterValues(params) // SQL 문에 주입해야할 파라미터
.pageSize(10) // 각 페이지 크
.rowMapper(new BeanPropertyRowMapper<>(Customer.class)) // 쿼리 결과를 인스턴스로 매핑하기 위한 매퍼
.build();
}
@Bean
public ItemWriter customerJdbcPagingItemWriter() {
return (items) -> items.forEach(System.out::println);
}
@Bean
public SqlPagingQueryProviderFactoryBean pagingQueryProvider(){
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // 제공된 데이터베이스의 타입을 결정(setDatabaseType 으로 데이터베이스 타입 설정도 가능)
queryProvider.setSelectClause("*");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where city = :city");
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("lastName", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider;
}
}
PagingItemReader는 PagingQueryProvider를 통해 쿼리를 생성한다. 이렇게 생성하는 이유는 각 DB에는 Paging을 지원하는 자체적인 전략이 있으며, Spring은 각 DB의 Paging 전략에 맞춰 구현되어야만 한다.
public SqlPagingQueryProviderFactoryBean() {
this.providers.put(DatabaseType.DB2, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DB2VSE, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DB2ZOS, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DB2AS400, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DERBY, new DerbyPagingQueryProvider());
this.providers.put(DatabaseType.HSQL, new HsqlPagingQueryProvider());
this.providers.put(DatabaseType.H2, new H2PagingQueryProvider());
this.providers.put(DatabaseType.MYSQL, new MySqlPagingQueryProvider());
this.providers.put(DatabaseType.ORACLE, new OraclePagingQueryProvider());
this.providers.put(DatabaseType.POSTGRES, new PostgresPagingQueryProvider());
this.providers.put(DatabaseType.SQLITE, new SqlitePagingQueryProvider());
this.providers.put(DatabaseType.SQLSERVER, new SqlServerPagingQueryProvider());
this.providers.put(DatabaseType.SYBASE, new SybasePagingQueryProvider());
}
Spring Batch에서는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 보고, 위 Provider중 하나를 자동 선택하도록 한다.
SpringBatch에서 offset과 limit을 PageSize에 맞게 자동으로 생성해준다. 다만 각 쿼리는 개별적으로 실행되므로, 동일한 레코드 정렬 순서를 보장하려면 페이징시 결과를 정렬(order by)하는 것이 중요하다.
또한, 이 정렬키가 ResultSet 내에서 중복되지 않아야한다.
SELECT * FROM customer WHERE city = :city ORDER BY lastName ASC LIMIT 10
실행된 쿼리 로그를 보면 Paging Size인 LIMIT 10이 들어간 것을 볼 수 있다.
Hibernate
자바 ORM 기술로, 애플리케이션에서 사용하는 객체 지향 모델을 관계형 데이터베이스로 매핑하는 기능을 제공
XML or Annotation을 사용해 객체를 데이터베이스 테이블로 매핑
객체를 사용해 데이터베이스에 질의하는 프레임워크를 제공
Hibernate 세션 구현체에 따라서 다르게 작동한다.
별도 설정없이 Hibernate를 사용하면 일반적인 stateful 세션 구현체를 사용
예를 들어 백만건의 아이템을 읽고 처리한다면 Hibernate 세션이 데이터베이스에서 조회할 때 아이템을 캐시에 쌓으며 OutOfMemoryException이 발생
Persistence로 사용하면, 직접 JDBC를 사용할 때보다 더 큰 부하를 유발
레코드 백만 건을 처리할 때는 한건당 ms 단위의 차이도 거대한 차이가 된다.
스프링 배치는 이러한 문제를 해결하도록 HibernateCursorItemReader, HibernatePagingItemReader를 개발
public class ItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemReader<T> {
/**
* @return return value of the target method.
*/
@Nullable
@Override
public T read() throws Exception {
return invokeDelegateMethod();
}
}
호출 대상 서비스의 참조와 호출할 메서드의 이름을 의존성으로 받는다.
ItemReaderAdapter가 매번 호출할 때마다 반환되는 객체는 ItemReader가 반환하는 객체이다.
입력 데이터를 모두 처리하면 서비스 메서드는 반드시 null을 반환해야한다. 스프링 배치에게 해당 step의 입력을 모두 완료했음을 알리는 것이다.