ItemReader
Spring Batch의 Reader에서 읽어올 수 있는 데이터 유형은 다음과 같다.
입력 데이터에서 읽어오기
파일에서 읽어오기
DB에서 읽어오기
Java Message Service 등 다른 소스에서 읽어오기
커스텀한 Reader로 읽어오기
ItemReader의 read()를 호출하면, 해당 메서드는 스텝 내에서 처리할 Item한개를 반환하며, 스텝에서는 아이템 개수를 세어 청크 내 데이터가 몇개가 처리됐는지 관리한다. 해당 Item은 ItemProcessor로 전달되며, 그 뒤 ItemWriter로 전달된다.
가장 대표적인 구현체인 JdbcPagingItemReader의 클래스 계층 구조를 보면 다음과 같다.

여기서 ItemReader와 ItemStream 인터페이스도 같이 구현하고 있는 것을 볼 수 있다.
ItemStream은 주기적으로 상태를 저장하고, 오류가 발생하면 해당 상태에서 복원하기 위한 마커인터페이스이다. 즉, ItemReader 의 상태를 저장하고 실패한 곳에서 다시 실행할 수 있게 해주는 역할을 한다. ItemReader와 ItemStream 인터페이스를 직접 구현하여 원하는 형태의 ItemReader를 만들 수 있다.
파일 입력
FlatFileItemReader
org.springframework.batch.item.file.FlatFileItemReaderflat file
한 개 혹은 그 이상의 레코드가 포함된 특정 파일
파일의 내용을 봐도 데이터의 의미를 알 수 없다.
파일 내 데이터의 포맷이나 의미를 정의하는 메타 데이터가 없다.
comments
String[]
null
문자열 배열에 파일을 파싱할 떄 건너뛰어야할 주석 줄을 나타내는 접두어 지정
encoding
String
플랫폼의 기본 Charset
파일에 사용된 문자열 인코딩
lineMapper
LineMapper
null(필수)
파일 한줄을 String으로 읽은 뒤 처리 대상인 도메인 객체(Item)으로 변환
DefaultLineMapper
JsonLineMapper
PassThroughLineMapper
lineToSkip
int
0
파일을 읽어올 떄 몇 줄을 건너띄고 시작할지 지정
recordSeparatorPolicy
RecordSeparatorPolicy
DefaultRecordSeparatorPolicy
각 줄의 마지막을 정의하는데 사용 별도로 지정하지 않으면 개행 문자가 레코드의 끝 부분을 나타낸다.
resource
Resource
null(필수)
읽을 대상 리소스
skippedLinesCallback
LineCallbackHandler
null
줄을 건너뛸 떄 호출되는 콜백 인터페이스 건너띈 모든 줄은 이 콜백이 호출된다.
strict
boolean
false
true로 지정시, 리소스를 찾을 수 없는 경우 Exception을 던진다.
saveState
boolean
true
true : 재시작 가능하도록 각 청크 처리 후 ItemReader 상태 저장
false : 다중 스레드 환경에선 false 지정
고정된 너비 파일
구분자 파일
FieldSetMapper 커스텀
org.springframework.batch.item.file.mapping.FieldSetMapper를 구현하여 커스텀 매퍼를 만들 수 있다.
.fieldSetMapper()에 커스텀 매퍼를 지정하면 된다.
LineTokenizer 커스텀
org.springframework.batch.item.file.transform.LineTokenizer
LineMapper
org.springframework.batch.item.file.mapping.PatternMatchingCompositeLineMapper여러개의
LineTokenizer로 구성된 Map을 선언할 수 있음.PatternMatcher<LineTokenizer> tokenizers
각
LineTokenizer를 필요로 하는 여러개의FieldSetMapperMap 선언할 수 있음.PatternMatcher<FieldSetMapper<T>> patternMatcher
여러 형식으로 구성된 csv 파일이다.
TRANS로 시작하는 경우와 CUST로 시작하는 경우 각각 FieldSetMapper, LineTokenizer를 사용해 파싱 및 set을 할 수 있다.
ItemStreamReader 커스텀
두개의 다른 포맷의 데이터가 사실은 서로 연관이 있는 데이터일 수 있다. 그 경우에는 한개의 도메인이 다른 한개의 도메인의 내용을 포함하고 있을 수 있다.
거래내역(TRANS) 데이터는 그 위의 고객(CUST)의 계약 정보라고 가정해볼 것이다.
Custom 도메인 객체에 Transaction 거래내역 정보를 포함하게 변경해준다.
여기서 핵심은 read() 메서드이다. 한줄씩 읽어올 때 다음 고객정보가 나올때까지 거래내역 레코드를 읽어 해당 고객이 가지고 있게 한다.
Job에서 itemReader부분에 위에서 생성한 CustomerFileReader를 설정해주면 된다.
다음과 같이 고객이 가지고 있는 거래내역을 출력할 수 있다.
MultiResourceItemReader
동일한 포맷으로 작성된 여러개의 파일을 읽어들이는 ItemReader를 제공한다.
org.springframework.batch.item.file.MultiResourceItemReader
MultiResourceItemReader는 읽어야할 파일명의 패턴을 MultiResourceItemReader의 의존성으로 정의한다.
위에서 다룬 ItemStreamReader 와 다른 점은 Resource 주입부분이다. Resource를 주입하게 되면 필요한 각 파일을 스프링 배치가 생성해 ItemReader에 주입할 수 있다.
읽어야할 파일 목록(resources)을 설정해주고, delegate()에 실제로 작업을 수행할 위임 컴포넌트를 지정해주면된다.
여러 개의 파일을 다룰때는 재시작을 하게되는 상황에서 스프링배치가 추가적인 안정장치를 제공해주지 않는다. 예를들어 file1.csv, file2.csv, file3.csv가 있는데, file2.csv 처리하는 과정에서 오류가 발생하여 잡이 실패 된 이후 재시작을 할때 file4.csv를 추가한다면, 최초 실행시 file4.csv가 없었음에도 불구하고, 포함하여 실행한다.
이러한 문제점을 해결하기 위해서 배치 실행 시 사용할 디렉터리를 별도로 생성하는 것이 일반적이며, 새로 생성된 모든 파일은 새로운 디렉터리에 넣어주어 현재 수행중인 잡에 영향을 주지않게 할 수 있다.
XML
XML은 파일 내 데이터를 설명할 수 있는 태그를 사용해 파일에 포함된 데이터를 설명하므로, Flat file과는 다르다.
XML parser로 주로 DOM과 SAX를 많이 사용한다.
Dom vs SAX vs StAX
DOM(Document Object Model) 방식
XML문서 전체를 메모리에 로드하여 값을 읽는다.
XML문서를 읽으면 모든 Element, Text, Attribute 등에 대한 객체를 생성하고, 이를 Document 객체로 리턴한다.
Document 객체는 DOM API에 알맞는 트리 구조의 자바 객체로 표현되어 있다.
XML문서가 메모리에 모두 올라가 있어서 노드들의 검색, 수정, 구조변경이 빠르고 용이하다.
SAX 방식 보다 직관적이며 파싱이 단순하기 때문에 일반적으로 DOM 방식을 채택하여 개발하게 된다.
SAX(Simple API for XML) 방식
SAX 방식은 XML 문서를 하나의 긴 문자열로 간주한다.
XML문서를 앞에서 부터 순차적으로 읽어가면서 노드가 열리고 닫히는 과정에서 이벤트가 발생한다.
각각의 이벤트가 발생될 때마다 수행하고자 하는 기능을 이벤트 핸들러 기술을 이용하여 구현한다.
XML문서를 메모리에 전부 로딩하고 파싱하는 것이 아니기 때문에 메모리 사용량이 적고 단순히 읽기만 할때 속도가 빠르다.
발생한 이벤트를 핸들링하여 변수에 저장하고 활용하는 것이기 때문에 복잡하고 노드 수정이어렵다.
XML 오브젝트에 Random Access를 하지 못해, 지난 엘리먼트를 참조할 경우 다시 처음부터 읽어야한다.
StAX(Streaming API for XML)
StAX는 push 와 pull 방식을 동시에 제공하는 하이브리드한 형태
XML 문서를 파싱할때 하나의 Fragment로 구분
정해진 엘리먼트를 읽을때는 DOM 방식을 사용하며, Fragement로 처리하는 것은 SAX의 Push 방식을 사용
즉, 각 세션을 독립적으로 파싱하는 기능을 제공
스프링 배치에서는 StAX 파서를 사용한다.
StaxEventItemReader
org.springframework.batch.item.xml.StaxEventItemReader
위 예제 파일을 파싱하는 Reader를 구현해볼것이다.
.addFragmentRootElements():StaxEventItemReader를 사용하려면 XML 프래크먼트 루트 엘리먼트를 지정 XML내에서 Item으로 취급할 fragment의 root 엘리먼트를 식별하는데 사용.unmarshaller():org.springframework.oxm.Unmarshaller구현체를 전달 받으며, XML을 도메인 객체로 반환
이번 예제에서는 org.springframework.oxm.jaxb.Jaxb2Marshaller를 사용했으며, Jaxb2Marshaller를 사용하기 위해서는 의존성 추가가 필요하다.
build.gradle
JAXB 의존성과 Spring OXM 모듈로 JAXB를 사용하는 스프링 컴포넌트 의존성을 추가해준다.
XML을 파싱할 수 있게 하려면 도매인 객체에 JAXB 어노테이션을 추가해줄 것이다.
도메인 객체 설정이 끝났으면, 각 블록을 파싱하는데 사용할 Unmarshaller를 구현해주면 된다.
JSON
JsonItemReader
org.springframework.batch.item.json.JsonItemReader청크를 읽어 객체로 파싱한다.
실제 파싱 작업은
JsonObjectReader인터페이스 구현체에 위임한다.
JsonObjectReader
실제로 JSON 객체를 파싱하는 역할을 한다. 스프링 배치는 2개의 JsonObjectReader를 제공해준다.
Jackson
Gson
위 구조로되어있는 JSON을 파싱해 볼것이다.
ObjectMapper는 Jackson이 JSON을 읽고 쓰는데 사용하는 주요 클래스로 커스텀 데이터 포맷들을 설정하면된다.JacksonJsonObjectReader생성시 반환할 클래스를 설정하고, 커스텀한ObjectMapper를 설정해주면된다.JsonItemReaderBuilder는 파싱에 사용할JsonObjectReader를 설정해주면된다.
Database Reader
Cursor는 표준 java.sql.ResultSet으로 구현되며, ResultSet이 open되면 next() 메서드를 호출할 때마다 데이터베이스에서 배치 레코드를 가져와 반환한다.
즉, Cursor 방식은 DB와 커넥션을 맺은 후, Cursor를 한칸씩 옮기면서 지속적으로 데이터를 가져온다.
(CursorItemReader는 streaming으로 데이터를 처리)
Cursor는 하나의 Connection으로 Batch가 끝날때가지 사용되기 때문에 Batch가 끝나기전에 DB와 어플리케이션 Connection이 끊어질 수 있으므로, DB와 SocketTimeout을 충분히 큰 값으로 설정해야한다. (네트워크 오버헤드 추가) 추가로 ResultSet은 스레드 안전이 보장되지 않아 다중 스레드 환경에서는 사용할 수 없다.
JdbcCursorItemReader
HibernateCursorItemReader
StoredProcedureItemReader
Paging 방식은 한번에 지정한 PageSize만큼 데이터를 가져온다.
SpringBatch에서 offset과 limit을 PageSize에 맞게 자동으로 생성해준다. 다만 각 쿼리는 개별적으로 실행되므로, 페이징시 결과를 정렬(order by)하는 것이 중요하다.
Batch 수행시간이 오래 걸리는 경우에는 PagingItemReader를 사용하는 것이 좋다. Paging의 경우 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃과 부하 없이 수행될 수 있다.
JdbcPagingItemReader
HibernatePagingItemReader
JpaPagingItemReader
JDBC
JdbcCursorItemReader
<T, T> chunk(int chunkSize): 첫번째 T는 Reader에서 반환할 타입, 두번째 T는 Writer에 파라미터로 넘어올 타입이다.fetchSize : DB에서 한번에 가져올 데이터 양을 나타낸다. Paging은 실제 쿼리를 limit, offset으로 분할 처리하는 반면, Cursor는 분할 처리없이 실행되나 내부적으로 가져온는 데이터는 FetchSize만큼 가져와
read()를 통해서 하나씩 가져온다.dataSource : DB에 접근하기 위해 사용할 DataSource객체
rowMapper : 쿼리 결과를 인스턴스로 매핑하기 위한 매퍼
BeanPropertyRowMapper를 사용해 도메인 객체와 매핑해준다.
sql : Reader에서 사용할 쿼리문
preparedStatementSetter: SQL문의 파라미터 설정
name : Reader의 이름, ExecutionContext에 저장되어질 이름
JdbcPagingItemReader
PagingItemReader는 PagingQueryProvider를 통해 쿼리를 생성한다. 이렇게 생성하는 이유는 각 DB에는 Paging을 지원하는 자체적인 전략이 있으며, Spring은 각 DB의 Paging 전략에 맞춰 구현되어야만 한다.
Spring Batch에서는 SqlPagingQueryProviderFactoryBean을 통해 DataSource 설정 값을 보고, 위 Provider중 하나를 자동 선택하도록 한다.
SpringBatch에서 offset과 limit을 PageSize에 맞게 자동으로 생성해준다. 다만 각 쿼리는 개별적으로 실행되므로, 동일한 레코드 정렬 순서를 보장하려면 페이징시 결과를 정렬(order by)하는 것이 중요하다.
또한, 이 정렬키가 ResultSet 내에서 중복되지 않아야한다.
실행된 쿼리 로그를 보면 Paging Size인 LIMIT 10이 들어간 것을 볼 수 있다.
Hibernate
자바 ORM 기술로, 애플리케이션에서 사용하는 객체 지향 모델을 관계형 데이터베이스로 매핑하는 기능을 제공
XML or Annotation을 사용해 객체를 데이터베이스 테이블로 매핑
객체를 사용해 데이터베이스에 질의하는 프레임워크를 제공
Hibernate 세션 구현체에 따라서 다르게 작동한다.
별도 설정없이 Hibernate를 사용하면 일반적인 stateful 세션 구현체를 사용
예를 들어 백만건의 아이템을 읽고 처리한다면 Hibernate 세션이 데이터베이스에서 조회할 때 아이템을 캐시에 쌓으며 OutOfMemoryException이 발생
Persistence로 사용하면, 직접 JDBC를 사용할 때보다 더 큰 부하를 유발
레코드 백만 건을 처리할 때는 한건당 ms 단위의 차이도 거대한 차이가 된다.
스프링 배치는 이러한 문제를 해결하도록
HibernateCursorItemReader,HibernatePagingItemReader를 개발커밋시 세션을 flush하며 배치 처리에 관계가 있는 추가 기능을 제공해준다.
build.gradle 의존성 추가
Domain 객체 수정
@Entity
매핑할 객체가 Entity임을 나타냄
@Table
Entity가 매핑되는 테이블 지정
@Id
PK값 지정
TransactionManager 커스텀
하이버네이트 세션과 Datasource를 합친 TransactionManager가 필요하다.
BatchConfigurer의 커스텀 구현체를 사용해 HibernateTransactionManager를 구성했다.
HibernateCursorItemReader
하이버네이트 쿼리를 수행하는 방법은 4가지가 존재한다.
queryName
String
하이버네이트 구성에 포함된 네임드 하이버네이트 쿼리 참조
https://www.baeldung.com/hibernate-named-query
queryString
String
스프링 구성에 추가하는 HQL 쿼리
.queryString("from Customer where city = :city")
queryProvider
HibernateQueryProvider
하이버네이트 쿼리(HQL)를 프로그래밍으로 빌드
nativeQuery
String
네이티브 SQL 쿼리를 실행한 뒤 결과를 하이버네이트로 매핑하는데 사용
https://data-make.tistory.com/616
HibernatePagingItemReader
Cursor 방법과 유일하게 다른 점은 .pageSize()로 사용할 페이지 크기를 지정해야하는 것이다.
JPA
JPA(Java Persisstence API)는 ORM 영역에서 표준화된 접근법을 제공한다. Hibernate가 초기 JPA에 영감을 줬으며, 현재는 Hibernate가 JPA 명세를 구현하고 있다.
build.gradle 의존성 추가
spring-boot-starter-data-jpa는 JPA를 사용하는데 필요한 모든 필수 컴포넌트가 포함돼있다.
JpaCursorItemReader
Spring Batch 4.3이 릴리즈 되면서 JpaCursorItemReader 가 도입되었다. 이전버전까지는 제공하지 않았다.
JpaPagingItemReader
.entityManagerFactory를 설정하는 것 이외에 Jdbc와 크게 다른 점은 없으며, Cursor와 다른점은 pageSize()를 설정하는 것이다.
JPA에서는 .queryProvider()로 Query 객체를 사용해 쿼리를 수행할 수도 있다.
MyBatisPagingItemReader
Spring Data Repository
Spring Data는 스프링 데이터가 제공하는 특정 인터페이스 중 하나를 상속하는 인터페이스를 사용자가 정의하기만 하면 스프링 데이터가 해당 인터페이스의 구현을 처리하는 기능을 제공한다.
스프링 배치는 스프링 데이터의 PagingAndSotringRepository를 활용하기 때문에, 스프링 데이터와 호환성이 좋다.
RepositoryItemReader
RepositoryItemReader는 JdbcPagingItemReader 나 HibernatePagingItemReader를 사용할때와 동일하게 PagingAndSotringRepository를 사용해서 Paging 쿼리를 실행한다.
RepositoryItemReader는 어떤 저장소건 상관없이 해당 데이터 저장소에 질의를 수행할 수 있다는 점에서 ItemReader와 차이가 있다.
PagingAndSotringRepository를 상속하는 Repository를 생성해 city 조건으로 조회하는 메서드를 정의하였다.
주의 사항
JpaRepository를ListItemReader,QueueItemReader에 사용하면 안된다.이렇게 구현하는 경우 Spring Batch의 장점인 Paging & Cursor 구현이 없어 대규모 데이터 처리가 불가능하다.
JpaRepository를 사용해야하는 경우RepositoryItemReader를 사용하는 것을 권장한다.
Hibernate, JPA등 영속성 컨텍스트가 필요한 Reader 사용시 fetch size와 chunk size는 동일한 값을 유지해야 한다.
ItemReaderAdapter
Adapter는 다른 엘리멘트와 래핑하여 스프링 배치가 해당 엘리먼트와 통신할 수 있게 하는데 사용한다.
org.springframework.batch.item.adapter.ItemReaderAdapter
호출 대상 서비스의 참조와 호출할 메서드의 이름을 의존성으로 받는다.
ItemReaderAdapter가 매번 호출할 때마다 반환되는 객체는ItemReader가 반환하는 객체이다.입력 데이터를 모두 처리하면 서비스 메서드는 반드시
null을 반환해야한다. 스프링 배치에게 해당 step의 입력을 모두 완료했음을 알리는 것이다.
Customer 객체의 목록을 무작위로 생성하는 서비스이다.
ItemReaderAdapter에 기존 서비스 오브젝트와 메서드명을 전달하면된다.
오류 처리
입력에서 레코드를 읽는 중에 오류가 발생한 경우 처리할 수 있는 방법은 여러가지이다.
예외를 던쳐 처리를 멈추기
특정 예외가 발생한 경우 레코드 건너띄기(skip)
Skip
어떤 조건에서 레코드를 skip할지(어떤 예외를 무시할 것인지)
얼마나 많은 레코드를 skip 할 수 있게 할것인지
레코드를 skip할지 여부를 결정할 때 위 두가지 요소를 고려해야한다.
Skip을 사용하기 위해서는 우선 faultToLerant()라는 메서드를 호출해야한다.
skipLimit()
skip 허용 회수. 허용 회수를 넘어가면 job은 실패한다. skip()과 반드시 같이 써야 한다
skip()
해당 exception이 발생했을때 skip
noSkip()
해당 exception이 발생하면 skip을 하지 않고 오류를 내겠다는 것
skipPolicy()
용자 정의로 skip에 대한 policy를 만들어서 적용하고 싶을때 사용
SkipPolicy
SkipPolicy 구현체는 skip할 예외와 허용 횟수를 판별할 수 있으며, boolean 값으로 내부 로직을 수정할 수 있다.
오류 로그 남기기
ItemListener를 사용해 잘못된 레코드를 기록하는 방법을 다룰 것이다.
잘못된 레코드를 읽었을 때 로그를 남기기 위해서 onReadError 메서드를 오바라이드해 ItemListenerSupport를 사용하여 에러를 기록한다.
단순히 파일을 읽어올 때는 위와 같이 처리하면 되지만, DB를 이용한 오류가 발생한 경우에는 실제 DB입출력을 스프링 자체나 하이버네이트와 같은 다른 프레임워크가 처리하므로, 스프링 배치에서 처리할 예외가 많지 않다.
onReadError 파일 오류 로그 남기기
Last updated
Was this helpful?