ItemReader
Spring Batch์ Reader์์ ์ฝ์ด์ฌ ์ ์๋ ๋ฐ์ดํฐ ์ ํ์ ๋ค์๊ณผ ๊ฐ๋ค.
์ ๋ ฅ ๋ฐ์ดํฐ์์ ์ฝ์ด์ค๊ธฐ
ํ์ผ์์ ์ฝ์ด์ค๊ธฐ
DB์์ ์ฝ์ด์ค๊ธฐ
Java Message Service ๋ฑ ๋ค๋ฅธ ์์ค์์ ์ฝ์ด์ค๊ธฐ
์ปค์คํ ํ Reader๋ก ์ฝ์ด์ค๊ธฐ
package org.springframework.batch.item;
public interface ItemReader<T> {
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
ItemReader
์ read()
๋ฅผ ํธ์ถํ๋ฉด, ํด๋น ๋ฉ์๋๋ ์คํ
๋ด์์ ์ฒ๋ฆฌํ Itemํ๊ฐ๋ฅผ ๋ฐํํ๋ฉฐ, ์คํ
์์๋ ์์ดํ
๊ฐ์๋ฅผ ์ธ์ด ์ฒญํฌ ๋ด ๋ฐ์ดํฐ๊ฐ ๋ช๊ฐ๊ฐ ์ฒ๋ฆฌ๋๋์ง ๊ด๋ฆฌํ๋ค. ํด๋น Item์ ItemProcessor
๋ก ์ ๋ฌ๋๋ฉฐ, ๊ทธ ๋ค ItemWriter
๋ก ์ ๋ฌ๋๋ค.
๊ฐ์ฅ ๋ํ์ ์ธ ๊ตฌํ์ฒด์ธ JdbcPagingItemReader
์ ํด๋์ค ๊ณ์ธต ๊ตฌ์กฐ๋ฅผ ๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.

์ฌ๊ธฐ์ ItemReader
์ ItemStream
์ธํฐํ์ด์ค๋ ๊ฐ์ด ๊ตฌํํ๊ณ ์๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
public interface ItemStream {
void open(ExecutionContext var1) throws ItemStreamException;
// Batch์ ์ฒ๋ฆฌ ์ํ ์
๋ฐ์ดํธ
void update(ExecutionContext var1) throws ItemStreamException;
void close() throws ItemStreamException;
}
ItemStream
์ ์ฃผ๊ธฐ์ ์ผ๋ก ์ํ๋ฅผ ์ ์ฅํ๊ณ , ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ฉด ํด๋น ์ํ์์ ๋ณต์ํ๊ธฐ ์ํ ๋ง์ปค์ธํฐํ์ด์ค์ด๋ค. ์ฆ, ItemReader
์ ์ํ๋ฅผ ์ ์ฅํ๊ณ ์คํจํ ๊ณณ์์ ๋ค์ ์คํํ ์ ์๊ฒ ํด์ฃผ๋ ์ญํ ์ ํ๋ค. ItemReader
์ ItemStream
์ธํฐํ์ด์ค๋ฅผ ์ง์ ๊ตฌํํ์ฌ ์ํ๋ ํํ์ ItemReader๋ฅผ ๋ง๋ค ์ ์๋ค.
ํ์ผ ์
๋ ฅ
FlatFileItemReader
org.springframework.batch.item.file.FlatFileItemReader
flat file
ํ ๊ฐ ํน์ ๊ทธ ์ด์์ ๋ ์ฝ๋๊ฐ ํฌํจ๋ ํน์ ํ์ผ
ํ์ผ์ ๋ด์ฉ์ ๋ด๋ ๋ฐ์ดํฐ์ ์๋ฏธ๋ฅผ ์ ์ ์๋ค.
ํ์ผ ๋ด ๋ฐ์ดํฐ์ ํฌ๋งท์ด๋ ์๋ฏธ๋ฅผ ์ ์ํ๋ ๋ฉํ ๋ฐ์ดํฐ๊ฐ ์๋ค.
comments
String[]
null
๋ฌธ์์ด ๋ฐฐ์ด์ ํ์ผ์ ํ์ฑํ ๋ ๊ฑด๋๋ฐ์ด์ผํ ์ฃผ์ ์ค์ ๋ํ๋ด๋ ์ ๋์ด ์ง์
encoding
String
ํ๋ซํผ์ ๊ธฐ๋ณธ Charset
ํ์ผ์ ์ฌ์ฉ๋ ๋ฌธ์์ด ์ธ์ฝ๋ฉ
lineMapper
LineMapper
null(ํ์)
ํ์ผ ํ์ค์ String์ผ๋ก ์ฝ์ ๋ค ์ฒ๋ฆฌ ๋์์ธ ๋๋ฉ์ธ ๊ฐ์ฒด(Item)์ผ๋ก ๋ณํ
DefaultLineMapper
JsonLineMapper
PassThroughLineMapper
lineToSkip
int
0
ํ์ผ์ ์ฝ์ด์ฌ ๋ ๋ช ์ค์ ๊ฑด๋๋๊ณ ์์ํ ์ง ์ง์
recordSeparatorPolicy
RecordSeparatorPolicy
DefaultRecordSeparatorPolicy
๊ฐ ์ค์ ๋ง์ง๋ง์ ์ ์ํ๋๋ฐ ์ฌ์ฉ ๋ณ๋๋ก ์ง์ ํ์ง ์์ผ๋ฉด ๊ฐํ ๋ฌธ์๊ฐ ๋ ์ฝ๋์ ๋ ๋ถ๋ถ์ ๋ํ๋ธ๋ค.
resource
Resource
null(ํ์)
์ฝ์ ๋์ ๋ฆฌ์์ค
skippedLinesCallback
LineCallbackHandler
null
์ค์ ๊ฑด๋๋ธ ๋ ํธ์ถ๋๋ ์ฝ๋ฐฑ ์ธํฐํ์ด์ค ๊ฑด๋๋ ๋ชจ๋ ์ค์ ์ด ์ฝ๋ฐฑ์ด ํธ์ถ๋๋ค.
strict
boolean
false
true๋ก ์ง์ ์, ๋ฆฌ์์ค๋ฅผ ์ฐพ์ ์ ์๋ ๊ฒฝ์ฐ Exception์ ๋์ง๋ค.
saveState
boolean
true
true : ์ฌ์์ ๊ฐ๋ฅํ๋๋ก ๊ฐ ์ฒญํฌ ์ฒ๋ฆฌ ํ ItemReader
์ํ ์ ์ฅ
false : ๋ค์ค ์ค๋ ๋ ํ๊ฒฝ์์ false ์ง์
๊ณ ์ ๋ ๋๋น ํ์ผ
Aimee CHoover 7341Vel Avenue Mobile AL35928
Jonas UGilbert 8852In St. Saint Paul MN57321
Regan MBaxter 4851Nec Av. Gulfport MS33193
Octavius TJohnson 7418Cum Road Houston TX51507
Sydnee NRobinson 894 Ornare. Ave Olathe KS25606
Stuart KMckenzie 5529Orci Av. Nampa ID18562
@Bean
@StepScope
public FlatFileItemReader<Customer> customerItemReader(@Value("#{jobParameters['customerFile']}") PathResource inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("customerItemReader") // ๊ฐ ์คํ
์ ExecutionContext์ ์ถ๊ฐ๋๋ ํน์ ํค์ ์ ๋๋ฌธ์๋ก ์ฌ์ฉ๋ ์ด๋ฆ(saveState false์ธ ๊ฒฝ์ฐ ์ง์ ํ ํ์X)
.resource(inputFile)
.fixedLength() // FixedLengthBuilder
.columns(new Range[]{new Range(1,11), new Range(12,12), new Range(13,22), new Range(23,26)
, new Range(27,46), new Range(47,62), new Range(63,64), new Range(65,69)}) // ๊ณ ์ ๋๋น
.names(new String[]{"firstName", "middleInitial", "lastName", "addressNumber", "street"
, "city", "state", "zipCode"}) // ๊ฐ ์ปฌ๋ผ๋ช
// .strict(false) // ์ ์๋ ํ์ฑ ์ ๋ณด ๋ณด๋ค ๋ง์ ํญ๋ชฉ์ด ๋ ์ฝ๋์ ์๋ ๊ฒฝ์ฐ(true ์์ธ)
.targetType(Customer.class) // BeanWrapperFieldSetMapper ์์ฑํด ๋๋ฉ์ธ ํด๋ ์ค์ ๊ฐ์ ์ฑ์
.build();
}
๊ตฌ๋ถ์ ํ์ผ
Aimee,C,Hoover,7341,Vel Avenue,Mobile,AL,35928
Jonas,U,Gilbert,8852,In St.,Saint Paul,MN,57321
Regan,M,Baxter,4851,Nec Av.,Gulfport,MS,33193
Octavius,T,Johnson,7418,Cum Road,Houston,TX,51507
Sydnee,N,Robinson,894,Ornare. Ave,Olathe,KS,25606
Stuart,K,Mckenzie,5529,Orci Av.,Nampa,ID,18562
@Bean
@StepScope
public FlatFileItemReader<Customer> delimitedCustomerItemReader(@Value("#{jobParameters['customerFile']}") PathResource inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("delimitedCustomerItemReader") // ๊ฐ ์คํ
์ ExecutionContext์ ์ถ๊ฐ๋๋ ํน์ ํค์ ์ ๋๋ฌธ์๋ก ์ฌ์ฉ๋ ์ด๋ฆ(saveState false์ธ ๊ฒฝ์ฐ ์ง์ ํ ํ์X)
.resource(inputFile)
.delimited() // default(,) DelimitedLineTokenizer๋ฅผ ์ฌ์ฉํด ๊ฐ ๋ ์ฝ๋๋ฅผ FieldSet์ผ๋ก ๋ณํ
.names(new String[]{"firstName", "middleInitial", "lastName", "addressNumber", "street"
, "city", "state", "zipCode"}) // ๊ฐ ์ปฌ๋ผ๋ช
.targetType(Customer.class) // BeanWrapperFieldSetMapper ์์ฑํด ๋๋ฉ์ธ ํด๋ ์ค์ ๊ฐ์ ์ฑ์
.build();
}
FieldSetMapper ์ปค์คํ
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
org.springframework.batch.item.file.mapping.FieldSetMapper
๋ฅผ ๊ตฌํํ์ฌ ์ปค์คํ
๋งคํผ๋ฅผ ๋ง๋ค ์ ์๋ค.
public class CustomFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
Customer customer = new Customer();
customer.setAddress(fieldSet.readString("addressNumber") + " " + fieldSet.readString("street"));
customer.setCity(fieldSet.readString("city"));
customer.setFirstName(fieldSet.readString("firstName"));
customer.setLastName(fieldSet.readString("lastName"));
customer.setMiddleInitial(fieldSet.readString("middleInitial"));
customer.setState(fieldSet.readString("state"));
customer.setZipCode(fieldSet.readString("zipCode"));
return customer;
}
}
@Bean
@StepScope
public FlatFileItemReader<Customer> delimitedCustomerItemReader(@Value("#{jobParameters['customerFile']}") PathResource inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("delimitedCustomerItemReader")
.resource(inputFile)
.delimited()
.names(new String[]{"firstName", "middleInitial", "lastName", "addressNumber", "street", "city", "state", "zipCode"})
.fieldSetMapper(new CustomFieldSetMapper()) // customMapper ์ค์
.build();
}
.fieldSetMapper()
์ ์ปค์คํ
๋งคํผ๋ฅผ ์ง์ ํ๋ฉด ๋๋ค.
LineTokenizer ์ปค์คํ
org.springframework.batch.item.file.transform.LineTokenizer
public interface LineTokenizer {
FieldSet tokenize(@Nullable String line);
}
public class CustomFileLineTokenizer implements LineTokenizer {
@Setter
private String delimiter = ",";
private String[] names = new String[]{
"firstName"
, "middleInitial"
, "lastName"
, "address"
, "city"
, "state"
, "zipCode"
};
private FieldSetFactory fieldSetFactory = new DefaultFieldSetFactory();
@Override
public FieldSet tokenize(String line) {
// ๊ตฌ๋ถ์๋ก ํ๋ ๊ตฌ๋ถ
String[] fields = line.split(delimiter);
List<String> parsedFields = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
if (i == 4) {
// 3,4๋ฒ์จฐ ํ๋ ๋จ์ผ ํ๋๋ก ๊ตฌ์ฑ
parsedFields.set(i - 1, parsedFields.get(i - 1) + " " + fields[i]);
} else {
parsedFields.add(fields[i]);
}
}
// ๊ฐ์ ๋ฐฐ์ด & ํ๋ ์ด๋ฆ ๋ฐฐ์ด์ ๋๊ฒจ ํ๋๋ฅผ ์์ฑ
return fieldSetFactory.create(parsedFields.toArray(new String[0]), names);
}
}
@Bean
@StepScope
public FlatFileItemReader<Customer> lineTokenizerCustomerItemReader(@Value("#{jobParameters['customerFile']}") PathResource inputFile) {
return new FlatFileItemReaderBuilder<Customer>()
.name("lineTokenizerCustomerItemReader")
.resource(inputFile)
.lineTokenizer(new CustomFileLineTokenizer()) // lineTokenzier Custom
.targetType(Customer.class) // BeanWrapperFieldSetMapper ์์ฑํด ๋๋ฉ์ธ ํด๋ ์ค์ ๊ฐ์ ์ฑ์
.build();
}
LineMapper
org.springframework.batch.item.file.mapping.PatternMatchingCompositeLineMapper
์ฌ๋ฌ๊ฐ์
LineTokenizer
๋ก ๊ตฌ์ฑ๋ Map์ ์ ์ธํ ์ ์์.PatternMatcher<LineTokenizer> tokenizers
๊ฐ
LineTokenizer
๋ฅผ ํ์๋ก ํ๋ ์ฌ๋ฌ๊ฐ์FieldSetMapper
Map ์ ์ธํ ์ ์์.PatternMatcher<FieldSetMapper<T>> patternMatcher
CUST,Warren,Q,Darrow,8272 4th Street,New York,IL,76091
TRANS,1165965,2011-01-22 00:13:29,51.43
CUST,Ann,V,Gates,9247 Infinite Loop Drive,Hollywood,NE,37612
CUST,Erica,I,Jobs,8875 Farnam Street,Aurora,IL,36314
TRANS,8116369,2011-01-21 20:40:52,-14.83
TRANS,8116369,2011-01-21 15:50:17,-45.45
TRANS,8116369,2011-01-21 16:52:46,-74.6
TRANS,8116369,2011-01-22 13:51:05,48.55
TRANS,8116369,2011-01-21 16:51:59,98.53
์ฌ๋ฌ ํ์์ผ๋ก ๊ตฌ์ฑ๋ csv ํ์ผ์ด๋ค.
@Bean
public PatternMatchingCompositeLineMapper lineTokenizer() {
Map<String, LineTokenizer> lineTokenizerMap = new HashMap<>(2);
lineTokenizerMap.put("TRANS*", transactionLineTokenizer()); // TRANS๋ก ์์ํ๋ฉด transactionLineTokenizer
lineTokenizerMap.put("CUST*", customerLineTokenizer()); // CUST๋ก ์์ํ๋ฉด, customerLineTokenizer
Map<String, FieldSetMapper> fieldSetMapperMap = new HashMap<>(2);
BeanWrapperFieldSetMapper<Customer> customerFieldSetMapper = new BeanWrapperFieldSetMapper<>();
customerFieldSetMapper.setTargetType(Customer.class);
fieldSetMapperMap.put("TRANS*", new TransactionFieldSetMapper()); // ์ผ๋ฐ์ ์ด์ง ์์ ํ์
ํ๋ ๋ณํ์ FieldSetMapper ํ์(Date, Double)
fieldSetMapperMap.put("CUST*", customerFieldSetMapper);
PatternMatchingCompositeLineMapper lineMappers = new PatternMatchingCompositeLineMapper();
lineMappers.setTokenizers(lineTokenizerMap);
lineMappers.setFieldSetMappers(fieldSetMapperMap);
return lineMappers;
}
TRANS๋ก ์์ํ๋ ๊ฒฝ์ฐ์ CUST๋ก ์์ํ๋ ๊ฒฝ์ฐ ๊ฐ๊ฐ FieldSetMapper
, LineTokenizer
๋ฅผ ์ฌ์ฉํด ํ์ฑ ๋ฐ set์ ํ ์ ์๋ค.
@Bean
public DelimitedLineTokenizer transactionLineTokenizer() {
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames("prefix", "accountNumber", "transactionDate", "amount");
return delimitedLineTokenizer;
}
@Bean
public DelimitedLineTokenizer customerLineTokenizer() {
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
delimitedLineTokenizer.setNames("firstName", "middleInitial", "lastName", "address", "city", "state", "zipCode");
delimitedLineTokenizer.setIncludedFields(1,2,3,4,5,6,7); // prefix์ ์ธํ ๋ชจ๋ ํ๋
return delimitedLineTokenizer;
}
ItemStreamReader ์ปค์คํ
๋๊ฐ์ ๋ค๋ฅธ ํฌ๋งท์ ๋ฐ์ดํฐ๊ฐ ์ฌ์ค์ ์๋ก ์ฐ๊ด์ด ์๋ ๋ฐ์ดํฐ์ผ ์ ์๋ค. ๊ทธ ๊ฒฝ์ฐ์๋ ํ๊ฐ์ ๋๋ฉ์ธ์ด ๋ค๋ฅธ ํ๊ฐ์ ๋๋ฉ์ธ์ ๋ด์ฉ์ ํฌํจํ๊ณ ์์ ์ ์๋ค.
CUST,Warren,Q,Darrow,8272 4th Street,New York,IL,76091
TRANS,1165965,2011-01-22 00:13:29,51.43
CUST,Ann,V,Gates,9247 Infinite Loop Drive,Hollywood,NE,37612
CUST,Erica,I,Jobs,8875 Farnam Street,Aurora,IL,36314
TRANS,8116369,2011-01-21 20:40:52,-14.83
TRANS,8116369,2011-01-21 15:50:17,-45.45
TRANS,8116369,2011-01-21 16:52:46,-74.6
TRANS,8116369,2011-01-22 13:51:05,48.55
TRANS,8116369,2011-01-21 16:51:59,98.53
๊ฑฐ๋๋ด์ญ(TRANS) ๋ฐ์ดํฐ๋ ๊ทธ ์์ ๊ณ ๊ฐ(CUST)์ ๊ณ์ฝ ์ ๋ณด๋ผ๊ณ ๊ฐ์ ํด๋ณผ ๊ฒ์ด๋ค.
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@ToString
public class Customer {
private Long id;
private String firstName;
private String middleInitial;
private String lastName;
private String addressNumber;
private String street;
private String city;
private String state;
private String zipCode;
private String address;
private List<Transaction> transactions; // ๊ณ ๊ฐ์ ๊ฐ์ฝ์ ๋ณด
}
Custom ๋๋ฉ์ธ ๊ฐ์ฒด์ Transaction ๊ฑฐ๋๋ด์ญ ์ ๋ณด๋ฅผ ํฌํจํ๊ฒ ๋ณ๊ฒฝํด์ค๋ค.
public class CustomerFileReader implements ItemStreamReader<Customer> {
private Object curItem = null;
private ItemStreamReader<Object> delegate;
public CustomerFileReader(ItemStreamReader<Object> delegate) {
this.delegate = delegate;
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (curItem == null) {
curItem = delegate.read(); // ๊ณ ๊ฐ ์ ๋ณด๋ฅผ ์ฝ์.
}
Customer item = (Customer) curItem;
curItem = null;
if (item != null) {
item.setTransactions(new ArrayList<>());
// ๋ค์ ๊ณ ๊ฐ ๋ ์ฝ๋๋ฅผ ๋ง๋๊ธฐ ์ ๊น์ง ๊ฑฐ๋๋ด์ญ ๋ ์ฝ๋๋ฅผ ์ฝ๋๋ค.
while (peek() instanceof Transaction) {
item.getTransactions().add((Transaction) curItem);
curItem = null;
}
}
return item;
}
private Object peek() throws Exception {
if (curItem == null) {
curItem = delegate.read();
}
return curItem;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
delegate.open(executionContext);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
delegate.update(executionContext);
}
@Override
public void close() throws ItemStreamException {
delegate.close();
}
}
์ฌ๊ธฐ์ ํต์ฌ์ read()
๋ฉ์๋์ด๋ค. ํ์ค์ฉ ์ฝ์ด์ฌ ๋ ๋ค์ ๊ณ ๊ฐ์ ๋ณด๊ฐ ๋์ฌ๋๊น์ง ๊ฑฐ๋๋ด์ญ ๋ ์ฝ๋๋ฅผ ์ฝ์ด ํด๋น ๊ณ ๊ฐ์ด ๊ฐ์ง๊ณ ์๊ฒ ํ๋ค.
@Bean
@StepScope
public FlatFileItemReader multiLineItemReader(@Value("#{jobParameters['customFile']}") PathResource resource) {
return new FlatFileItemReaderBuilder<Customer>()
.name("multiLineItemReader")
.lineMapper(multiLineTokenizer())
.resource(resource)
.build();
}
@Bean
public CustomerFileReader customerFileReader() {
return new CustomerFileReader(multiLineItemReader(null));
}
Job์์ itemReader๋ถ๋ถ์ ์์์ ์์ฑํ CustomerFileReader
๋ฅผ ์ค์ ํด์ฃผ๋ฉด ๋๋ค.
Customer(id=null, firstName=Warren, middleInitial=Q, lastName=Darrow, addressNumber=null, street=null, city=New York, state=IL, zipCode=76091, address=8272 4th Street, transactions=[Transaction(accountNumber=1165965, transactionDate=Sat Jan 22 00:13:29 KST 2011, amount=51.43, dateFormat=java.text.SimpleDateFormat@7c669100)])
Customer(id=null, firstName=Ann, middleInitial=V, lastName=Gates, addressNumber=null, street=null, city=Hollywood, state=NE, zipCode=37612, address=9247 Infinite Loop Drive, transactions=[])
Customer(id=null, firstName=Erica, middleInitial=I, lastName=Jobs, addressNumber=null, street=null, city=Aurora, state=IL, zipCode=36314, address=8875 Farnam Street, transactions=[Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 20:40:52 KST 2011, amount=-14.83, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 15:50:17 KST 2011, amount=-45.45, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 16:52:46 KST 2011, amount=-74.6, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Sat Jan 22 13:51:05 KST 2011, amount=48.55, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 16:51:59 KST 2011, amount=98.53, dateFormat=java.text.SimpleDateFormat@7c669100)])
๋ค์๊ณผ ๊ฐ์ด ๊ณ ๊ฐ์ด ๊ฐ์ง๊ณ ์๋ ๊ฑฐ๋๋ด์ญ์ ์ถ๋ ฅํ ์ ์๋ค.
MultiResourceItemReader
๋์ผํ ํฌ๋งท์ผ๋ก ์์ฑ๋ ์ฌ๋ฌ๊ฐ์ ํ์ผ์ ์ฝ์ด๋ค์ด๋ ItemReader
๋ฅผ ์ ๊ณตํ๋ค.
org.springframework.batch.item.file.MultiResourceItemReader
MultiResourceItemReader
๋ ์ฝ์ด์ผํ ํ์ผ๋ช
์ ํจํด์ MultiResourceItemReader
์ ์์กด์ฑ์ผ๋ก ์ ์ํ๋ค.
public class MultiResourceCustomerFileReader implements ResourceAwareItemReaderItemStream<Customer> {
private Object curItem = null;
private ResourceAwareItemReaderItemStream<Object> delegate;
public MultiResourceCustomerFileReader(ResourceAwareItemReaderItemStream<Object> delegate) {
this.delegate = delegate;
}
/**
* Resource๋ฅผ ์ฃผ์
ํจ์ผ๋ก ItemReader๊ฐ ํ์ผ ๊ด๋ฆฌํ๋ ๋์
* ๊ฐ ํ์ผ์ ์คํ๋ง ๋ฐฐ์น๊ฐ ์์ฑํด ์ฃผ์
* @param resource
*/
@Override
public void setResource(Resource resource) {
System.out.println(resource);
this.delegate.setResource(resource);
}
@Override
public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (curItem == null) {
curItem = delegate.read(); // ๊ณ ๊ฐ ์ ๋ณด๋ฅผ ์ฝ์.
}
Customer item = (Customer) curItem;
curItem = null;
if (item != null) {
item.setTransactions(new ArrayList<>());
// ๋ค์ ๊ณ ๊ฐ ๋ ์ฝ๋๋ฅผ ๋ง๋๊ธฐ ์ ๊น์ง ๊ฑฐ๋๋ด์ญ ๋ ์ฝ๋๋ฅผ ์ฝ๋๋ค.
while (peek() instanceof Transaction) {
item.getTransactions().add((Transaction) curItem);
curItem = null;
}
}
return item;
}
private Object peek() throws Exception {
if (curItem == null) {
curItem = delegate.read();
}
return curItem;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
delegate.open(executionContext);
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
delegate.update(executionContext);
}
@Override
public void close() throws ItemStreamException {
delegate.close();
}
}
์์์ ๋ค๋ฃฌ ItemStreamReader
์ ๋ค๋ฅธ ์ ์ Resource
์ฃผ์
๋ถ๋ถ์ด๋ค. Resource
๋ฅผ ์ฃผ์
ํ๊ฒ ๋๋ฉด ํ์ํ ๊ฐ ํ์ผ์ ์คํ๋ง ๋ฐฐ์น๊ฐ ์์ฑํด ItemReader
์ ์ฃผ์
ํ ์ ์๋ค.
@Bean
@StepScope
public MultiResourceItemReader multiResourceItemReader(@Value("#{jobParameters['customFile']}") Resource[] resources) {
return new MultiResourceItemReaderBuilder<>()
.name("multiResourceItemReader")
.resources(resources) // resources ๋ฐฐ์ด
.delegate(multiResourceCustomerFileReader()) // ์ค์ ์์
์ ์ํํ ์์ ์ปดํฌ๋ํธ
.build();
}
@Bean
public MultiResourceCustomerFileReader multiResourceCustomerFileReader() {
return new MultiResourceCustomerFileReader(multiResourceCustomerItemReader());
}
@Bean
@StepScope
public FlatFileItemReader multiResourceCustomerItemReader() {
return new FlatFileItemReaderBuilder<Customer>()
.name("multiResourceCustomerItemReader")
.lineMapper(multiResourceTokenizer())
.build();
}
์ฝ์ด์ผํ ํ์ผ ๋ชฉ๋ก(resources)์ ์ค์ ํด์ฃผ๊ณ , delegate()
์ ์ค์ ๋ก ์์
์ ์ํํ ์์ ์ปดํฌ๋ํธ๋ฅผ ์ง์ ํด์ฃผ๋ฉด๋๋ค.
์ฌ๋ฌ ๊ฐ์ ํ์ผ์ ๋ค๋ฃฐ๋๋ ์ฌ์์์ ํ๊ฒ๋๋ ์ํฉ์์ ์คํ๋ง๋ฐฐ์น๊ฐ ์ถ๊ฐ์ ์ธ ์์ ์ฅ์น๋ฅผ ์ ๊ณตํด์ฃผ์ง ์๋๋ค. ์๋ฅผ๋ค์ด file1.csv, file2.csv, file3.csv๊ฐ ์๋๋ฐ, file2.csv ์ฒ๋ฆฌํ๋ ๊ณผ์ ์์ ์ค๋ฅ๊ฐ ๋ฐ์ํ์ฌ ์ก์ด ์คํจ ๋ ์ดํ ์ฌ์์์ ํ ๋ file4.csv๋ฅผ ์ถ๊ฐํ๋ค๋ฉด, ์ต์ด ์คํ์ file4.csv๊ฐ ์์์์๋ ๋ถ๊ตฌํ๊ณ , ํฌํจํ์ฌ ์คํํ๋ค.
์ด๋ฌํ ๋ฌธ์ ์ ์ ํด๊ฒฐํ๊ธฐ ์ํด์ ๋ฐฐ์น ์คํ ์ ์ฌ์ฉํ ๋๋ ํฐ๋ฆฌ๋ฅผ ๋ณ๋๋ก ์์ฑํ๋ ๊ฒ์ด ์ผ๋ฐ์ ์ด๋ฉฐ, ์๋ก ์์ฑ๋ ๋ชจ๋ ํ์ผ์ ์๋ก์ด ๋๋ ํฐ๋ฆฌ์ ๋ฃ์ด์ฃผ์ด ํ์ฌ ์ํ์ค์ธ ์ก์ ์ํฅ์ ์ฃผ์ง์๊ฒ ํ ์ ์๋ค.
XML
XML์ ํ์ผ ๋ด ๋ฐ์ดํฐ๋ฅผ ์ค๋ช ํ ์ ์๋ ํ๊ทธ๋ฅผ ์ฌ์ฉํด ํ์ผ์ ํฌํจ๋ ๋ฐ์ดํฐ๋ฅผ ์ค๋ช ํ๋ฏ๋ก, Flat file๊ณผ๋ ๋ค๋ฅด๋ค.
XML parser๋ก ์ฃผ๋ก DOM๊ณผ SAX๋ฅผ ๋ง์ด ์ฌ์ฉํ๋ค.
Dom vs SAX vs StAX
์ฐธ๊ณ : https://gohlab2017.tistory.com/3
DOM(Document Object Model) ๋ฐฉ์
XML๋ฌธ์ ์ ์ฒด๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋ํ์ฌ ๊ฐ์ ์ฝ๋๋ค.
XML๋ฌธ์๋ฅผ ์ฝ์ผ๋ฉด ๋ชจ๋ Element, Text, Attribute ๋ฑ์ ๋ํ ๊ฐ์ฒด๋ฅผ ์์ฑํ๊ณ , ์ด๋ฅผ Document ๊ฐ์ฒด๋ก ๋ฆฌํดํ๋ค.
Document ๊ฐ์ฒด๋ DOM API์ ์๋ง๋ ํธ๋ฆฌ ๊ตฌ์กฐ์ ์๋ฐ ๊ฐ์ฒด๋ก ํํ๋์ด ์๋ค.
XML๋ฌธ์๊ฐ ๋ฉ๋ชจ๋ฆฌ์ ๋ชจ๋ ์ฌ๋ผ๊ฐ ์์ด์ ๋ ธ๋๋ค์ ๊ฒ์, ์์ , ๊ตฌ์กฐ๋ณ๊ฒฝ์ด ๋น ๋ฅด๊ณ ์ฉ์ดํ๋ค.
SAX ๋ฐฉ์ ๋ณด๋ค ์ง๊ด์ ์ด๋ฉฐ ํ์ฑ์ด ๋จ์ํ๊ธฐ ๋๋ฌธ์ ์ผ๋ฐ์ ์ผ๋ก DOM ๋ฐฉ์์ ์ฑํํ์ฌ ๊ฐ๋ฐํ๊ฒ ๋๋ค.
SAX(Simple API for XML) ๋ฐฉ์
SAX ๋ฐฉ์์ XML ๋ฌธ์๋ฅผ ํ๋์ ๊ธด ๋ฌธ์์ด๋ก ๊ฐ์ฃผํ๋ค.
XML๋ฌธ์๋ฅผ ์์์ ๋ถํฐ ์์ฐจ์ ์ผ๋ก ์ฝ์ด๊ฐ๋ฉด์ ๋ ธ๋๊ฐ ์ด๋ฆฌ๊ณ ๋ซํ๋ ๊ณผ์ ์์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ๋ค.
๊ฐ๊ฐ์ ์ด๋ฒคํธ๊ฐ ๋ฐ์๋ ๋๋ง๋ค ์ํํ๊ณ ์ ํ๋ ๊ธฐ๋ฅ์ ์ด๋ฒคํธ ํธ๋ค๋ฌ ๊ธฐ์ ์ ์ด์ฉํ์ฌ ๊ตฌํํ๋ค.
XML๋ฌธ์๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์ ๋ถ ๋ก๋ฉํ๊ณ ํ์ฑํ๋ ๊ฒ์ด ์๋๊ธฐ ๋๋ฌธ์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ ๊ณ ๋จ์ํ ์ฝ๊ธฐ๋ง ํ ๋ ์๋๊ฐ ๋น ๋ฅด๋ค.
๋ฐ์ํ ์ด๋ฒคํธ๋ฅผ ํธ๋ค๋งํ์ฌ ๋ณ์์ ์ ์ฅํ๊ณ ํ์ฉํ๋ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ๋ณต์กํ๊ณ ๋ ธ๋ ์์ ์ด์ด๋ ต๋ค.
XML ์ค๋ธ์ ํธ์ Random Access๋ฅผ ํ์ง ๋ชปํด, ์ง๋ ์๋ฆฌ๋จผํธ๋ฅผ ์ฐธ์กฐํ ๊ฒฝ์ฐ ๋ค์ ์ฒ์๋ถํฐ ์ฝ์ด์ผํ๋ค.
StAX(Streaming API for XML)
StAX๋ push ์ pull ๋ฐฉ์์ ๋์์ ์ ๊ณตํ๋ ํ์ด๋ธ๋ฆฌ๋ํ ํํ
XML ๋ฌธ์๋ฅผ ํ์ฑํ ๋ ํ๋์ Fragment๋ก ๊ตฌ๋ถ
์ ํด์ง ์๋ฆฌ๋จผํธ๋ฅผ ์ฝ์๋๋ DOM ๋ฐฉ์์ ์ฌ์ฉํ๋ฉฐ, Fragement๋ก ์ฒ๋ฆฌํ๋ ๊ฒ์ SAX์ Push ๋ฐฉ์์ ์ฌ์ฉ
์ฆ, ๊ฐ ์ธ์ ์ ๋ ๋ฆฝ์ ์ผ๋ก ํ์ฑํ๋ ๊ธฐ๋ฅ์ ์ ๊ณต
์คํ๋ง ๋ฐฐ์น์์๋ StAX ํ์๋ฅผ ์ฌ์ฉํ๋ค.
StaxEventItemReader
org.springframework.batch.item.xml.StaxEventItemReader
<customers>
<customer>
<firstName>Laura</firstName>
<middleInitial>O</middleInitial>
<lastName>Minella</lastName>
<address>2039 Wall Street</address>
<city>Omaha</city>
<state>IL</state>
<zipCode>35446</zipCode>
<transactions>
<transaction>
<accountNumber>829433</accountNumber>
<transactionDate>2010-10-14 05:49:58</transactionDate>
<amount>26.08</amount>
</transaction>
</transactions>
</customer>
...
</customers>
์ ์์ ํ์ผ์ ํ์ฑํ๋ Reader๋ฅผ ๊ตฌํํด๋ณผ๊ฒ์ด๋ค.
@Bean
@StepScope
public StaxEventItemReader<Customer> staxCustomerFileReader(@Value("#{jobParameters['customFile']}")Resource resource) {
return new StaxEventItemReaderBuilder<Customer>()
.name("staxCustomerFileReader")
.resource(resource)
.addFragmentRootElements("customer") // ํ๋ ๊ทธ๋จผํธ ๋ฃจํธ ์๋ฆฌ๋จผํธ
.unmarshaller(customerMarshaller()) // XML์ ๋๋ฉ์ธ ๊ฐ์ฒด๋ก ๋ฐํ JAXB ์ฌ์ฉ
.build();
}
.addFragmentRootElements()
:StaxEventItemReader
๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด XML ํ๋ํฌ๋จผํธ ๋ฃจํธ ์๋ฆฌ๋จผํธ๋ฅผ ์ง์ XML๋ด์์ Item์ผ๋ก ์ทจ๊ธํ fragment์ root ์๋ฆฌ๋จผํธ๋ฅผ ์๋ณํ๋๋ฐ ์ฌ์ฉ.unmarshaller()
:org.springframework.oxm.Unmarshaller
๊ตฌํ์ฒด๋ฅผ ์ ๋ฌ ๋ฐ์ผ๋ฉฐ, XML์ ๋๋ฉ์ธ ๊ฐ์ฒด๋ก ๋ฐํ
์ด๋ฒ ์์ ์์๋ org.springframework.oxm.jaxb.Jaxb2Marshaller
๋ฅผ ์ฌ์ฉํ์ผ๋ฉฐ, Jaxb2Marshaller
๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ์์กด์ฑ ์ถ๊ฐ๊ฐ ํ์ํ๋ค.
build.gradle
dependencies {
implementation 'org.springframework:spring-oxm'
implementation 'javax.xml.bind:jaxb-api:2.3.1'
implementation 'javax.activation:activation:1.1'
implementation 'com.sun.xml.bind:jaxb-core:2.3.0.1'
implementation 'com.sun.xml.bind:jaxb-impl:2.3.1'
}
JAXB ์์กด์ฑ๊ณผ Spring OXM ๋ชจ๋๋ก JAXB๋ฅผ ์ฌ์ฉํ๋ ์คํ๋ง ์ปดํฌ๋ํธ ์์กด์ฑ์ ์ถ๊ฐํด์ค๋ค.
XML์ ํ์ฑํ ์ ์๊ฒ ํ๋ ค๋ฉด ๋๋งค์ธ ๊ฐ์ฒด์ JAXB ์ด๋ ธํ ์ด์ ์ ์ถ๊ฐํด์ค ๊ฒ์ด๋ค.
@NoArgsConstructor
@Getter
@Setter
@ToString
@XmlRootElement
public class Customer {
private Long id;
private String firstName;
private String middleInitial;
private String lastName;
private String addressNumber;
private String street;
private String city;
private String state;
private String zipCode;
private String address; // customAddressMapper
private List<Transaction> transactions;
@XmlElementWrapper(name = "transactions")
@XmlElement(name = "transaction")
public void setTransactions(List<Transaction> transactions) {
this.transactions = transactions;
}
}
@Getter
@Setter
@ToString
@XmlType(name = "transaction")
public class Transaction {
private String accountNumber;
private Date transactionDate;
private Double amount;
@Setter(value = AccessLevel.NONE)
private DateFormat dateFormat = new SimpleDateFormat("MM/dd/yyyy");
}
๋๋ฉ์ธ ๊ฐ์ฒด ์ค์ ์ด ๋๋ฌ์ผ๋ฉด, ๊ฐ ๋ธ๋ก์ ํ์ฑํ๋๋ฐ ์ฌ์ฉํ Unmarshaller๋ฅผ ๊ตฌํํด์ฃผ๋ฉด ๋๋ค.
@Bean
public Jaxb2Marshaller customerMarshaller() {
Jaxb2Marshaller jaxb2Marshaller = new Jaxb2Marshaller();
jaxb2Marshaller.setClassesToBeBound(Customer.class, Transaction.class); // ๋๋ฉ์ธ ๊ฐ์ฒด
return jaxb2Marshaller;
}
JSON
JsonItemReader
org.springframework.batch.item.json.JsonItemReader
์ฒญํฌ๋ฅผ ์ฝ์ด ๊ฐ์ฒด๋ก ํ์ฑํ๋ค.
์ค์ ํ์ฑ ์์ ์
JsonObjectReader
์ธํฐํ์ด์ค ๊ตฌํ์ฒด์ ์์ํ๋ค.
JsonObjectReader
์ค์ ๋ก JSON ๊ฐ์ฒด๋ฅผ ํ์ฑํ๋ ์ญํ ์ ํ๋ค. ์คํ๋ง ๋ฐฐ์น๋ 2๊ฐ์ JsonObjectReader
๋ฅผ ์ ๊ณตํด์ค๋ค.
Jackson
Gson
[
{
"firstName": "Laura",
"middleInitial": "O",
"lastName": "Minella",
"address": "2039 Wall Street",
"city": "Omaha",
"state": "IL",
"zipCode": "35446",
"transactions": [
{
"accountNumber": 829433,
"transactionDate": "2010-10-14 05:49:58",
"amount": 26.08
}
]
},
...
]
์ ๊ตฌ์กฐ๋ก๋์ด์๋ JSON์ ํ์ฑํด ๋ณผ๊ฒ์ด๋ค.
@Bean
@StepScope
public JsonItemReader<Customer> jsonFileReader(@Value("#{jobParameters['customFile']}") Resource resource) {
// Jackson์ด JSON์ ์ฝ๊ณ ์ฐ๋๋ฐ ์ฌ์ฉํ๋ ์ฃผ์ ํด๋์ค
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"));
JacksonJsonObjectReader<Customer> jsonObjectReader = new JacksonJsonObjectReader<>(Customer.class); // ๋ฐํํ ํด๋์ค ์ค์
jsonObjectReader.setMapper(objectMapper); // ObjectMapper
return new JsonItemReaderBuilder<Customer>()
.name("jsonFileReader")
.jsonObjectReader(jsonObjectReader) // ํ์ฑ์ ์ฌ์ฉ
.resource(resource)
.build();
}
ObjectMapper
๋ Jackson์ด JSON์ ์ฝ๊ณ ์ฐ๋๋ฐ ์ฌ์ฉํ๋ ์ฃผ์ ํด๋์ค๋ก ์ปค์คํ ๋ฐ์ดํฐ ํฌ๋งท๋ค์ ์ค์ ํ๋ฉด๋๋ค.JacksonJsonObjectReader
์์ฑ์ ๋ฐํํ ํด๋์ค๋ฅผ ์ค์ ํ๊ณ , ์ปค์คํ ํObjectMapper
๋ฅผ ์ค์ ํด์ฃผ๋ฉด๋๋ค.JsonItemReaderBuilder
๋ ํ์ฑ์ ์ฌ์ฉํJsonObjectReader
๋ฅผ ์ค์ ํด์ฃผ๋ฉด๋๋ค.
Database Reader
Cursor๋ ํ์ค java.sql.ResultSet
์ผ๋ก ๊ตฌํ๋๋ฉฐ, ResultSet
์ด open๋๋ฉด next()
๋ฉ์๋๋ฅผ ํธ์ถํ ๋๋ง๋ค ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๋ฐฐ์น ๋ ์ฝ๋๋ฅผ ๊ฐ์ ธ์ ๋ฐํํ๋ค.
์ฆ, Cursor ๋ฐฉ์์ DB์ ์ปค๋ฅ์ ์ ๋งบ์ ํ, Cursor๋ฅผ ํ์นธ์ฉ ์ฎ๊ธฐ๋ฉด์ ์ง์์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์จ๋ค.
(CursorItemReader๋ streaming์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ)
Cursor๋ ํ๋์ Connection์ผ๋ก Batch๊ฐ ๋๋ ๋๊ฐ์ง ์ฌ์ฉ๋๊ธฐ ๋๋ฌธ์ Batch๊ฐ ๋๋๊ธฐ์ ์ DB์ ์ดํ๋ฆฌ์ผ์ด์
Connection์ด ๋์ด์ง ์ ์์ผ๋ฏ๋ก, DB์ SocketTimeout์ ์ถฉ๋ถํ ํฐ ๊ฐ์ผ๋ก ์ค์ ํด์ผํ๋ค. (๋คํธ์ํฌ ์ค๋ฒํค๋ ์ถ๊ฐ) ์ถ๊ฐ๋ก ResultSet
์ ์ค๋ ๋ ์์ ์ด ๋ณด์ฅ๋์ง ์์ ๋ค์ค ์ค๋ ๋ ํ๊ฒฝ์์๋ ์ฌ์ฉํ ์ ์๋ค.
JdbcCursorItemReader
HibernateCursorItemReader
StoredProcedureItemReader
Paging ๋ฐฉ์์ ํ๋ฒ์ ์ง์ ํ PageSize๋งํผ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์จ๋ค.
SpringBatch์์ offset๊ณผ limit์ PageSize์ ๋ง๊ฒ ์๋์ผ๋ก ์์ฑํด์ค๋ค. ๋ค๋ง ๊ฐ ์ฟผ๋ฆฌ๋ ๊ฐ๋ณ์ ์ผ๋ก ์คํ๋๋ฏ๋ก, ํ์ด์ง์ ๊ฒฐ๊ณผ๋ฅผ ์ ๋ ฌ(order by)ํ๋ ๊ฒ์ด ์ค์ํ๋ค.
Batch ์ํ์๊ฐ์ด ์ค๋ ๊ฑธ๋ฆฌ๋ ๊ฒฝ์ฐ์๋ PagingItemReader
๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข๋ค. Paging์ ๊ฒฝ์ฐ ํ ํ์ด์ง๋ฅผ ์ฝ์๋๋ง๋ค Connection์ ๋งบ๊ณ ๋๊ธฐ ๋๋ฌธ์ ์๋ฌด๋ฆฌ ๋ง์ ๋ฐ์ดํฐ๋ผ๋ ํ์์์๊ณผ ๋ถํ ์์ด ์ํ๋ ์ ์๋ค.
JdbcPagingItemReader
HibernatePagingItemReader
JpaPagingItemReader
JDBC
JdbcCursorItemReader
/**
* --job.name=jdbcCursorItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class JdbcCursorCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job jdbcCursorItemReaderJob(){
return jobBuilderFactory.get("jdbcCursorItemReaderJob")
.start(jdbcCursorItemReaderStep())
.build();
}
@Bean
public Step jdbcCursorItemReaderStep(){
return stepBuilderFactory.get("jdbcCursorItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerJdbcCursorItemReader())
.writer(customerJdbcCursorItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Customer> customerJdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder<Customer>()
.name("customerJdbcCursorItemReader")
.dataSource(dataSource)
.sql("select * from customer where city = ?")
.rowMapper(new BeanPropertyRowMapper<>(Customer.class))
.preparedStatementSetter(citySetter(null)) // ํ๋ผ๋ฏธํฐ ์ค์
.build();
}
/**
* ํ๋ผ๋ฏธํฐ๋ฅผ SQL๋ฌธ์ ๋งคํ
* ArgumentPreparedStatementSetter๋ ๊ฐ์ฒด ๋ฐฐ์ด์ ๋ด๊ธด ์์๋๋ก ?์ ์์น์ ๊ฐ์ผ๋ก ์ค์
* @param city
* @return
*/
@Bean
@StepScope
public ArgumentPreparedStatementSetter citySetter(@Value("#{jobParameters['city']}") String city) {
return new ArgumentPreparedStatementSetter(new Object[]{city});
}
@Bean
public ItemWriter customerJdbcCursorItemWriter() {
return (items) -> items.forEach(System.out::println);
}
}
<T, T> chunk(int chunkSize)
: ์ฒซ๋ฒ์งธ T๋ Reader์์ ๋ฐํํ ํ์ , ๋๋ฒ์งธ T๋ Writer์ ํ๋ผ๋ฏธํฐ๋ก ๋์ด์ฌ ํ์ ์ด๋ค.fetchSize : DB์์ ํ๋ฒ์ ๊ฐ์ ธ์ฌ ๋ฐ์ดํฐ ์์ ๋ํ๋ธ๋ค. Paging์ ์ค์ ์ฟผ๋ฆฌ๋ฅผ limit, offset์ผ๋ก ๋ถํ ์ฒ๋ฆฌํ๋ ๋ฐ๋ฉด, Cursor๋ ๋ถํ ์ฒ๋ฆฌ์์ด ์คํ๋๋ ๋ด๋ถ์ ์ผ๋ก ๊ฐ์ ธ์จ๋ ๋ฐ์ดํฐ๋ FetchSize๋งํผ ๊ฐ์ ธ์
read()
๋ฅผ ํตํด์ ํ๋์ฉ ๊ฐ์ ธ์จ๋ค.dataSource : DB์ ์ ๊ทผํ๊ธฐ ์ํด ์ฌ์ฉํ DataSource๊ฐ์ฒด
rowMapper : ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์คํด์ค๋ก ๋งคํํ๊ธฐ ์ํ ๋งคํผ
BeanPropertyRowMapper
๋ฅผ ์ฌ์ฉํด ๋๋ฉ์ธ ๊ฐ์ฒด์ ๋งคํํด์ค๋ค.
sql : Reader์์ ์ฌ์ฉํ ์ฟผ๋ฆฌ๋ฌธ
preparedStatementSetter: SQL๋ฌธ์ ํ๋ผ๋ฏธํฐ ์ค์
name : Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
JdbcPagingItemReader
/**
* --job.name=jdbcPagingItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class JdbcPagingCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@Bean
public Job jdbcPagingItemReaderJob(){
return jobBuilderFactory.get("jdbcPagingItemReaderJob")
.start(jdbcPagingItemReaderStep())
.build();
}
@Bean
public Step jdbcPagingItemReaderStep(){
return stepBuilderFactory.get("jdbcPagingItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerJdbcPagingItemReader(null, null))
.writer(customerJdbcPagingItemWriter())
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> customerJdbcPagingItemReader(
PagingQueryProvider pagingQueryProvider, @Value("#{jobParameters['city']}") String city) {
Map<String, Object> params = new HashMap<>(1);
params.put("city", city);
return new JdbcPagingItemReaderBuilder<Customer>()
.name("customerJdbcPagingItemReader") // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
.dataSource(dataSource) // DB์ ์ ๊ทผํ๊ธฐ ์ํด ์ฌ์ฉํ DataSource๊ฐ์ฒด
.queryProvider(pagingQueryProvider) // PagingQueryProvider
.parameterValues(params) // SQL ๋ฌธ์ ์ฃผ์
ํด์ผํ ํ๋ผ๋ฏธํฐ
.pageSize(10) // ๊ฐ ํ์ด์ง ํฌ
.rowMapper(new BeanPropertyRowMapper<>(Customer.class)) // ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์คํด์ค๋ก ๋งคํํ๊ธฐ ์ํ ๋งคํผ
.build();
}
@Bean
public ItemWriter customerJdbcPagingItemWriter() {
return (items) -> items.forEach(System.out::println);
}
@Bean
public SqlPagingQueryProviderFactoryBean pagingQueryProvider(){
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // ์ ๊ณต๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ํ์
์ ๊ฒฐ์ (setDatabaseType ์ผ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ์
์ค์ ๋ ๊ฐ๋ฅ)
queryProvider.setSelectClause("*");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where city = :city");
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("lastName", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider;
}
}
PagingItemReader
๋ PagingQueryProvider
๋ฅผ ํตํด ์ฟผ๋ฆฌ๋ฅผ ์์ฑํ๋ค. ์ด๋ ๊ฒ ์์ฑํ๋ ์ด์ ๋ ๊ฐ DB์๋ Paging์ ์ง์ํ๋ ์์ฒด์ ์ธ ์ ๋ต์ด ์์ผ๋ฉฐ, Spring์ ๊ฐ DB์ Paging ์ ๋ต์ ๋ง์ถฐ ๊ตฌํ๋์ด์ผ๋ง ํ๋ค.
public SqlPagingQueryProviderFactoryBean() {
this.providers.put(DatabaseType.DB2, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DB2VSE, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DB2ZOS, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DB2AS400, new Db2PagingQueryProvider());
this.providers.put(DatabaseType.DERBY, new DerbyPagingQueryProvider());
this.providers.put(DatabaseType.HSQL, new HsqlPagingQueryProvider());
this.providers.put(DatabaseType.H2, new H2PagingQueryProvider());
this.providers.put(DatabaseType.MYSQL, new MySqlPagingQueryProvider());
this.providers.put(DatabaseType.ORACLE, new OraclePagingQueryProvider());
this.providers.put(DatabaseType.POSTGRES, new PostgresPagingQueryProvider());
this.providers.put(DatabaseType.SQLITE, new SqlitePagingQueryProvider());
this.providers.put(DatabaseType.SQLSERVER, new SqlServerPagingQueryProvider());
this.providers.put(DatabaseType.SYBASE, new SybasePagingQueryProvider());
}
Spring Batch์์๋ SqlPagingQueryProviderFactoryBean
์ ํตํด DataSource ์ค์ ๊ฐ์ ๋ณด๊ณ , ์ Provider์ค ํ๋๋ฅผ ์๋ ์ ํํ๋๋ก ํ๋ค.
SpringBatch์์ offset๊ณผ limit์ PageSize์ ๋ง๊ฒ ์๋์ผ๋ก ์์ฑํด์ค๋ค. ๋ค๋ง ๊ฐ ์ฟผ๋ฆฌ๋ ๊ฐ๋ณ์ ์ผ๋ก ์คํ๋๋ฏ๋ก, ๋์ผํ ๋ ์ฝ๋ ์ ๋ ฌ ์์๋ฅผ ๋ณด์ฅํ๋ ค๋ฉด ํ์ด์ง์ ๊ฒฐ๊ณผ๋ฅผ ์ ๋ ฌ(order by)ํ๋ ๊ฒ์ด ์ค์ํ๋ค.
๋ํ, ์ด ์ ๋ ฌํค๊ฐ ResultSet
๋ด์์ ์ค๋ณต๋์ง ์์์ผํ๋ค.
SELECT * FROM customer WHERE city = :city ORDER BY lastName ASC LIMIT 10
์คํ๋ ์ฟผ๋ฆฌ ๋ก๊ทธ๋ฅผ ๋ณด๋ฉด Paging Size์ธ LIMIT 10์ด ๋ค์ด๊ฐ ๊ฒ์ ๋ณผ ์ ์๋ค.
Hibernate
์๋ฐ ORM ๊ธฐ์ ๋ก, ์ ํ๋ฆฌ์ผ์ด์ ์์ ์ฌ์ฉํ๋ ๊ฐ์ฒด ์งํฅ ๋ชจ๋ธ์ ๊ด๊ณํ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ก ๋งคํํ๋ ๊ธฐ๋ฅ์ ์ ๊ณต
XML or Annotation์ ์ฌ์ฉํด ๊ฐ์ฒด๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ ์ด๋ธ๋ก ๋งคํ
๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ง์ํ๋ ํ๋ ์์ํฌ๋ฅผ ์ ๊ณต
Hibernate ์ธ์ ๊ตฌํ์ฒด์ ๋ฐ๋ผ์ ๋ค๋ฅด๊ฒ ์๋ํ๋ค.
๋ณ๋ ์ค์ ์์ด Hibernate๋ฅผ ์ฌ์ฉํ๋ฉด ์ผ๋ฐ์ ์ธ stateful ์ธ์ ๊ตฌํ์ฒด๋ฅผ ์ฌ์ฉ
์๋ฅผ ๋ค์ด ๋ฐฑ๋ง๊ฑด์ ์์ดํ ์ ์ฝ๊ณ ์ฒ๋ฆฌํ๋ค๋ฉด Hibernate ์ธ์ ์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ์กฐํํ ๋ ์์ดํ ์ ์บ์์ ์์ผ๋ฉฐ OutOfMemoryException์ด ๋ฐ์
Persistence๋ก ์ฌ์ฉํ๋ฉด, ์ง์ JDBC๋ฅผ ์ฌ์ฉํ ๋๋ณด๋ค ๋ ํฐ ๋ถํ๋ฅผ ์ ๋ฐ
๋ ์ฝ๋ ๋ฐฑ๋ง ๊ฑด์ ์ฒ๋ฆฌํ ๋๋ ํ๊ฑด๋น ms ๋จ์์ ์ฐจ์ด๋ ๊ฑฐ๋ํ ์ฐจ์ด๊ฐ ๋๋ค.
์คํ๋ง ๋ฐฐ์น๋ ์ด๋ฌํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋๋ก
HibernateCursorItemReader
,HibernatePagingItemReader
๋ฅผ ๊ฐ๋ฐ์ปค๋ฐ์ ์ธ์ ์ flushํ๋ฉฐ ๋ฐฐ์น ์ฒ๋ฆฌ์ ๊ด๊ณ๊ฐ ์๋ ์ถ๊ฐ ๊ธฐ๋ฅ์ ์ ๊ณตํด์ค๋ค.
build.gradle ์์กด์ฑ ์ถ๊ฐ
compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
Domain ๊ฐ์ฒด ์์
@Entity
@Table(name = "customer")
public class Customer {
@Id
private Long id; // pk
private String firstName;
private String middleInitial;
private String lastName;
private String address;
private String city;
private String state;
private String zipCode;
}
@Entity
๋งคํํ ๊ฐ์ฒด๊ฐ Entity์์ ๋ํ๋
@Table
Entity๊ฐ ๋งคํ๋๋ ํ ์ด๋ธ ์ง์
@Id
PK๊ฐ ์ง์
TransactionManager ์ปค์คํ
ํ์ด๋ฒ๋ค์ดํธ ์ธ์
๊ณผ Datasource๋ฅผ ํฉ์น TransactionManager
๊ฐ ํ์ํ๋ค.
@Component
public class HibernateBatchConfigurer extends DefaultBatchConfigurer {
private DataSource dataSource;
private SessionFactory sessionFactory;
private PlatformTransactionManager transactionManager;
/**
* Datasource connection๊ณผ ํ์ด๋ฒ๋ค์ดํธ ์ธ์
์ค์
* @param dataSource
* @param entityManagerFactory
*/
public HibernateBatchConfigurer(DataSource dataSource,
EntityManagerFactory entityManagerFactory) {
super(dataSource);
this.dataSource = dataSource;
this.sessionFactory = entityManagerFactory.unwrap(SessionFactory.class);
// ํ์ด๋ฒ๋ค์ดํธ ํธ๋์ญ์
์ค์
this.transactionManager = new HibernateTransactionManager(this.sessionFactory);
}
@Override
public PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
}
}
BatchConfigurer
์ ์ปค์คํ
๊ตฌํ์ฒด๋ฅผ ์ฌ์ฉํด HibernateTransactionManager
๋ฅผ ๊ตฌ์ฑํ๋ค.
HibernateCursorItemReader
/**
* --job.name=hibernateCursorItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class HibernateCursorCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job hibernateCursorItemReaderJob(){
return jobBuilderFactory.get("hibernateCursorItemReaderJob")
.start(hibernateCursorItemReaderStep())
.build();
}
@Bean
public Step hibernateCursorItemReaderStep(){
return stepBuilderFactory.get("hibernateCursorItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerHibernateCursorItemReader(null))
.writer(customerHibernateCursorItemWriter())
.build();
}
@Bean
@StepScope
public HibernateCursorItemReader<Customer> customerHibernateCursorItemReader(@Value("#{jobParameters['city']}") String city) {
return new HibernateCursorItemReaderBuilder<Customer>()
.name("customerHibernateCursorItemReader") // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
.sessionFactory(entityManagerFactory.unwrap(SessionFactory.class))
.queryString("from Customer where city = :city") // HQL ์ฟผ๋ฆฌ
.parameterValues(Collections.singletonMap("city", city)) // SQL ๋ฌธ์ ์ฃผ์
ํด์ผํ ํ๋ผ๋ฏธํฐ
.build();
}
@Bean
public ItemWriter customerHibernateCursorItemWriter() {
return (items) -> items.forEach(System.out::println);
}
}
ํ์ด๋ฒ๋ค์ดํธ ์ฟผ๋ฆฌ๋ฅผ ์ํํ๋ ๋ฐฉ๋ฒ์ 4๊ฐ์ง๊ฐ ์กด์ฌํ๋ค.
queryName
String
ํ์ด๋ฒ๋ค์ดํธ ๊ตฌ์ฑ์ ํฌํจ๋ ๋ค์๋ ํ์ด๋ฒ๋ค์ดํธ ์ฟผ๋ฆฌ ์ฐธ์กฐ
https://www.baeldung.com/hibernate-named-query
queryString
String
์คํ๋ง ๊ตฌ์ฑ์ ์ถ๊ฐํ๋ HQL ์ฟผ๋ฆฌ
.queryString("from Customer where city = :city")
queryProvider
HibernateQueryProvider
ํ์ด๋ฒ๋ค์ดํธ ์ฟผ๋ฆฌ(HQL)๋ฅผ ํ๋ก๊ทธ๋๋ฐ์ผ๋ก ๋น๋
nativeQuery
String
๋ค์ดํฐ๋ธ SQL ์ฟผ๋ฆฌ๋ฅผ ์คํํ ๋ค ๊ฒฐ๊ณผ๋ฅผ ํ์ด๋ฒ๋ค์ดํธ๋ก ๋งคํํ๋๋ฐ ์ฌ์ฉ
https://data-make.tistory.com/616
HibernatePagingItemReader
/**
* --job.name=hibernatePagingItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class HibernatePagingCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job hibernatePagingItemReaderJob() {
return jobBuilderFactory.get("hibernatePagingItemReaderJob")
.start(hibernatePagingItemReaderStep())
.build();
}
@Bean
public Step hibernatePagingItemReaderStep() {
return stepBuilderFactory.get("hibernatePagingItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerHibernatePagingItemReader(null))
.writer(customerHibernatePagingItemWriter())
.build();
}
@Bean
@StepScope
public HibernatePagingItemReader<Customer> customerHibernatePagingItemReader(@Value("#{jobParameters['city']}") String city) {
return new HibernatePagingItemReaderBuilder<Customer>()
.name("customerHibernatePagingItemReader") // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
.sessionFactory(entityManagerFactory.unwrap(SessionFactory.class))
.queryString("from Customer where city = :city") // HQL ์ฟผ๋ฆฌ
.parameterValues(Collections.singletonMap("city", city)) // SQL ๋ฌธ์ ์ฃผ์
ํด์ผํ ํ๋ผ๋ฏธํฐ
.pageSize(10) // Cursor์ ์ ์ผํ ์ฐจ์ด์ ! pageSize ์ค์
.build();
}
@Bean
public ItemWriter customerHibernatePagingItemWriter() {
return (items) -> items.forEach(System.out::println);
}
}
Cursor ๋ฐฉ๋ฒ๊ณผ ์ ์ผํ๊ฒ ๋ค๋ฅธ ์ ์ .pageSize()
๋ก ์ฌ์ฉํ ํ์ด์ง ํฌ๊ธฐ๋ฅผ ์ง์ ํด์ผํ๋ ๊ฒ์ด๋ค.
JPA
JPA(Java Persisstence API)๋ ORM ์์ญ์์ ํ์คํ๋ ์ ๊ทผ๋ฒ์ ์ ๊ณตํ๋ค. Hibernate๊ฐ ์ด๊ธฐ JPA์ ์๊ฐ์ ์คฌ์ผ๋ฉฐ, ํ์ฌ๋ Hibernate๊ฐ JPA ๋ช ์ธ๋ฅผ ๊ตฌํํ๊ณ ์๋ค.
build.gradle ์์กด์ฑ ์ถ๊ฐ
compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
spring-boot-starter-data-jpa
๋ JPA๋ฅผ ์ฌ์ฉํ๋๋ฐ ํ์ํ ๋ชจ๋ ํ์ ์ปดํฌ๋ํธ๊ฐ ํฌํจ๋ผ์๋ค.
JpaCursorItemReader
Spring Batch 4.3์ด ๋ฆด๋ฆฌ์ฆ ๋๋ฉด์ JpaCursorItemReader ๊ฐ ๋์ ๋์๋ค. ์ด์ ๋ฒ์ ๊น์ง๋ ์ ๊ณตํ์ง ์์๋ค.
/**
* --job.name=jpaCursorItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class JpaCursorCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job jpaCursorItemReaderJob(){
return jobBuilderFactory.get("jpaCursorItemReaderJob")
.start(jpaCursorItemReaderStep())
.build();
}
@Bean
public Step jpaCursorItemReaderStep(){
return stepBuilderFactory.get("jpaCursorItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerJpaCursorItemReader(null))
.writer(customerJpaCursorItemWriter())
.build();
}
@Bean
@StepScope
public JpaCursorItemReader<Customer> customerJpaCursorItemReader(@Value("#{jobParameters['city']}") String city) {
return new JpaCursorItemReaderBuilder<Customer>()
.name("customerJpaCursorItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("select c from Customer c where c.city = :city")
.parameterValues(Collections.singletonMap("city", city))
.build();
}
@Bean
public ItemWriter customerJpaCursorItemWriter() {
return (items) -> items.forEach(System.out::println);
}
}
JpaPagingItemReader
/**
* --job.name=jpaPagingItemReaderJob city=Chicago
*/
@RequiredArgsConstructor
@Configuration
public class JpaPagingCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job jpaPagingItemReaderJob(){
return jobBuilderFactory.get("jpaPagingItemReaderJob")
.start(jpaPagingItemReaderStep())
.build();
}
@Bean
public Step jpaPagingItemReaderStep(){
return stepBuilderFactory.get("jpaPagingItemReaderStep")
.<Customer, Customer>chunk(10)
.reader(customerJpaPagingItemReader(null))
.writer(customerJpaPagingItemWriter())
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<Customer> customerJpaPagingItemReader(@Value("#{jobParameters['city']}") String city) {
return new JpaPagingItemReaderBuilder<Customer>()
.name("customerJpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("select c from Customer c where c.city = :city")
.parameterValues(Collections.singletonMap("city", city))
.pageSize(10)
.build();
}
@Bean
public ItemWriter customerJpaPagingItemWriter() {
return (items) -> items.forEach(System.out::println);
}
}
.entityManagerFactory
๋ฅผ ์ค์ ํ๋ ๊ฒ ์ด์ธ์ Jdbc์ ํฌ๊ฒ ๋ค๋ฅธ ์ ์ ์์ผ๋ฉฐ, Cursor์ ๋ค๋ฅธ์ ์ pageSize()
๋ฅผ ์ค์ ํ๋ ๊ฒ์ด๋ค.
JPA์์๋ .queryProvider()
๋ก Query ๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด ์ฟผ๋ฆฌ๋ฅผ ์ํํ ์๋ ์๋ค.
MyBatisPagingItemReader
<!--mybatis-config.xml-->
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<settings>
<setting name="defaultStatementTimeout" value="25"/>
</settings>
</configuration>
package spring.batch.practice.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
@Slf4j
@Configuration
public class MysqlMybatisConfig {
@Value("${mybatis.mapper-locations}")
private String mapperLocations;
@Bean(name = "mybatisDataSource")
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public DataSource dataSource(){
return DataSourceBuilder.create().build();
}
@Bean(name = "mybatisSqlSessionFactory")
public SqlSessionFactory sqlSessionFactory(@Qualifier("mybatisDataSource") DataSource dataSource, ApplicationContext applicationContext) throws Exception{
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setConfigLocation(applicationContext.getResource("classpath:mybatis/mybatis-config.xml"));
Resource[] resources = new PathMatchingResourcePatternResolver().getResources(mapperLocations);
System.out.println(resources[0].getURL());
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));
return sqlSessionFactoryBean.getObject();
}
@Bean
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("mybatisSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="spring.batch.practice.dao.PayMapper">
<select id="selectPayList" parameterType="hashmap" resultType="spring.batch.practice.domain.Pay">
<![CDATA[
SELECT ID, AMOUNT, TX_NAME, TX_DATE_TIME
FROM PAY
WHERE AMOUNT <= #{amount}
]]>
</select>
</mapper>
@Slf4j
@RequiredArgsConstructor
@Configuration
public class MybatisPagingItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("mybatisSqlSessionFactory")
private SqlSessionFactory sqlSessionFactory;
private static final int CHUNK_SIZE = 10;
@Bean
public Job mybatisPagingItemReaderJob() throws Exception {
return jobBuilderFactory.get("mybatisPagingItemReaderJob")
.start(mybatisPagingItemReaderStep())
.build();
}
@Bean
public Step mybatisPagingItemReaderStep() throws Exception {
return stepBuilderFactory.get("mybatisPagingItemReaderStep")
.<Pay, Pay>chunk(CHUNK_SIZE)
.reader(mybatisPagingItemReader())
.writer(mybatisPagingItemWriter())
.build();
}
@Bean
public MyBatisPagingItemReader<Pay> mybatisPagingItemReader() throws Exception {
Map<String, Object> params = new HashMap<>();
params.put("amount", 2000);
return new MyBatisPagingItemReaderBuilder<Pay>()
.pageSize(CHUNK_SIZE)
.sqlSessionFactory(sqlSessionFactory)
.queryId("spring.batch.practice.dao.PayMapper.selectPayList")
.parameterValues(params)
.build();
}
@Bean
public ItemWriter<Pay> mybatisPagingItemWriter() {
return list -> {
for (Pay pay : list) {
log.info("Current Pay={}", pay);
}
};
}
}
Spring Data Repository
Spring Data๋ ์คํ๋ง ๋ฐ์ดํฐ๊ฐ ์ ๊ณตํ๋ ํน์ ์ธํฐํ์ด์ค ์ค ํ๋๋ฅผ ์์ํ๋ ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉ์๊ฐ ์ ์ํ๊ธฐ๋ง ํ๋ฉด ์คํ๋ง ๋ฐ์ดํฐ๊ฐ ํด๋น ์ธํฐํ์ด์ค์ ๊ตฌํ์ ์ฒ๋ฆฌํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.
์คํ๋ง ๋ฐฐ์น๋ ์คํ๋ง ๋ฐ์ดํฐ์ PagingAndSotringRepository
๋ฅผ ํ์ฉํ๊ธฐ ๋๋ฌธ์, ์คํ๋ง ๋ฐ์ดํฐ์ ํธํ์ฑ์ด ์ข๋ค.
RepositoryItemReader
RepositoryItemReader
๋ JdbcPagingItemReader
๋ HibernatePagingItemReader
๋ฅผ ์ฌ์ฉํ ๋์ ๋์ผํ๊ฒ PagingAndSotringRepository
๋ฅผ ์ฌ์ฉํด์ Paging ์ฟผ๋ฆฌ๋ฅผ ์คํํ๋ค.
RepositoryItemReader
๋ ์ด๋ค ์ ์ฅ์๊ฑด ์๊ด์์ด ํด๋น ๋ฐ์ดํฐ ์ ์ฅ์์ ์ง์๋ฅผ ์ํํ ์ ์๋ค๋ ์ ์์ ItemReader
์ ์ฐจ์ด๊ฐ ์๋ค.
public interface CustomerRepository extends JpaRepository<Customer, Long> {
Page<Customer> findByCity(String city, Pageable pageRequest);
}
PagingAndSotringRepository
๋ฅผ ์์ํ๋ Repository๋ฅผ ์์ฑํด city ์กฐ๊ฑด์ผ๋ก ์กฐํํ๋ ๋ฉ์๋๋ฅผ ์ ์ํ์๋ค.
@Bean
@StepScope
public RepositoryItemReader<Customer> customerRepositoryItemReader(@Value("#{jobParameters['city']}") String city) {
return new RepositoryItemReaderBuilder<Customer>()
.name("customerRepositoryItemReader")
.arguments(Collections.singletonList(city)) // pageable ํ๋ผ๋ฏธํฐ๋ฅผ ์ ์ธํ arguments
.methodName("findByCity") // ํธ์ถํ ๋ฉ์๋๋ช
.repository(customerRepository) // Repository ๊ตฌํ์ฒด
.sorts(Collections.singletonMap("lastName", Sort.Direction.ASC))
.build();
}
์ฃผ์ ์ฌํญ
JpaRepository
๋ฅผListItemReader
,QueueItemReader
์ ์ฌ์ฉํ๋ฉด ์๋๋ค.์ด๋ ๊ฒ ๊ตฌํํ๋ ๊ฒฝ์ฐ Spring Batch์ ์ฅ์ ์ธ Paging & Cursor ๊ตฌํ์ด ์์ด ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๊ฐ ๋ถ๊ฐ๋ฅํ๋ค.
JpaRepository
๋ฅผ ์ฌ์ฉํด์ผํ๋ ๊ฒฝ์ฐRepositoryItemReader
๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๊ถ์ฅํ๋ค.
Hibernate, JPA๋ฑ ์์์ฑ ์ปจํ ์คํธ๊ฐ ํ์ํ Reader ์ฌ์ฉ์ fetch size์ chunk size๋ ๋์ผํ ๊ฐ์ ์ ์งํด์ผ ํ๋ค.
ItemReaderAdapter
Adapter๋ ๋ค๋ฅธ ์๋ฆฌ๋ฉํธ์ ๋ํํ์ฌ ์คํ๋ง ๋ฐฐ์น๊ฐ ํด๋น ์๋ฆฌ๋จผํธ์ ํต์ ํ ์ ์๊ฒ ํ๋๋ฐ ์ฌ์ฉํ๋ค.
org.springframework.batch.item.adapter.ItemReaderAdapter
public class ItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T> implements ItemReader<T> {
/**
* @return return value of the target method.
*/
@Nullable
@Override
public T read() throws Exception {
return invokeDelegateMethod();
}
}
ํธ์ถ ๋์ ์๋น์ค์ ์ฐธ์กฐ์ ํธ์ถํ ๋ฉ์๋์ ์ด๋ฆ์ ์์กด์ฑ์ผ๋ก ๋ฐ๋๋ค.
ItemReaderAdapter
๊ฐ ๋งค๋ฒ ํธ์ถํ ๋๋ง๋ค ๋ฐํ๋๋ ๊ฐ์ฒด๋ItemReader
๊ฐ ๋ฐํํ๋ ๊ฐ์ฒด์ด๋ค.์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์ฒ๋ฆฌํ๋ฉด ์๋น์ค ๋ฉ์๋๋ ๋ฐ๋์
null
์ ๋ฐํํด์ผํ๋ค. ์คํ๋ง ๋ฐฐ์น์๊ฒ ํด๋น step์ ์ ๋ ฅ์ ๋ชจ๋ ์๋ฃํ์์ ์๋ฆฌ๋ ๊ฒ์ด๋ค.
@Component
public class CustomerService {
private List<Customer> customers;
private int curIndex;
private String [] firstNames = {"Michael", "Warren", "Ann", "Terrence",
"Erica", "Laura", "Steve", "Larry"};
private String middleInitial = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
private String [] lastNames = {"Gates", "Darrow", "Donnelly", "Jobs",
"Buffett", "Ellison", "Obama"};
private String [] streets = {"4th Street", "Wall Street", "Fifth Avenue",
"Mt. Lee Drive", "Jeopardy Lane",
"Infinite Loop Drive", "Farnam Street",
"Isabella Ave", "S. Greenwood Ave"};
private String [] cities = {"Chicago", "New York", "Hollywood", "Aurora",
"Omaha", "Atherton"};
private String [] states = {"IL", "NY", "CA", "NE"};
private Random generator = new Random();
public CustomerService() {
curIndex = 0;
customers = new ArrayList<>();
for(int i = 0; i < 100; i++) {
customers.add(buildCustomer());
}
}
private Customer buildCustomer() {
Customer customer = new Customer();
customer.setId((long) generator.nextInt(Integer.MAX_VALUE));
customer.setFirstName(
firstNames[generator.nextInt(firstNames.length - 1)]);
customer.setMiddleInitial(
String.valueOf(middleInitial.charAt(
generator.nextInt(middleInitial.length() - 1))));
customer.setLastName(
lastNames[generator.nextInt(lastNames.length - 1)]);
customer.setAddress(generator.nextInt(9999) + " " +
streets[generator.nextInt(streets.length - 1)]);
customer.setCity(cities[generator.nextInt(cities.length - 1)]);
customer.setState(states[generator.nextInt(states.length - 1)]);
customer.setZipCode(String.valueOf(generator.nextInt(99999)));
return customer;
}
public Customer getCustomer() {
Customer cust = null;
if(curIndex < customers.size()) {
cust = customers.get(curIndex);
curIndex++;
}
return cust;
}
}
Customer ๊ฐ์ฒด์ ๋ชฉ๋ก์ ๋ฌด์์๋ก ์์ฑํ๋ ์๋น์ค์ด๋ค.
@Bean
public ItemReaderAdapter<Customer> customerServiceItemReader() {
ItemReaderAdapter<Customer> adapter = new ItemReaderAdapter<>();
adapter.setTargetObject(customerService);
adapter.setTargetMethod("getCustomer");
return adapter;
}
ItemReaderAdapter
์ ๊ธฐ์กด ์๋น์ค ์ค๋ธ์ ํธ์ ๋ฉ์๋๋ช
์ ์ ๋ฌํ๋ฉด๋๋ค.
์ค๋ฅ ์ฒ๋ฆฌ
์ ๋ ฅ์์ ๋ ์ฝ๋๋ฅผ ์ฝ๋ ์ค์ ์ค๋ฅ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ ์ฒ๋ฆฌํ ์ ์๋ ๋ฐฉ๋ฒ์ ์ฌ๋ฌ๊ฐ์ง์ด๋ค.
์์ธ๋ฅผ ๋์ณ ์ฒ๋ฆฌ๋ฅผ ๋ฉ์ถ๊ธฐ
ํน์ ์์ธ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ ๋ ์ฝ๋ ๊ฑด๋๋๊ธฐ(skip)
Skip
์ด๋ค ์กฐ๊ฑด์์ ๋ ์ฝ๋๋ฅผ skipํ ์ง(์ด๋ค ์์ธ๋ฅผ ๋ฌด์ํ ๊ฒ์ธ์ง)
์ผ๋ง๋ ๋ง์ ๋ ์ฝ๋๋ฅผ skip ํ ์ ์๊ฒ ํ ๊ฒ์ธ์ง
๋ ์ฝ๋๋ฅผ skipํ ์ง ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ ๋ ์ ๋๊ฐ์ง ์์๋ฅผ ๊ณ ๋ คํด์ผํ๋ค.
Skip์ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ์ฐ์ faultToLerant()
๋ผ๋ ๋ฉ์๋๋ฅผ ํธ์ถํด์ผํ๋ค.
skipLimit()
skip ํ์ฉ ํ์. ํ์ฉ ํ์๋ฅผ ๋์ด๊ฐ๋ฉด job์ ์คํจํ๋ค. skip()๊ณผ ๋ฐ๋์ ๊ฐ์ด ์จ์ผ ํ๋ค
skip()
ํด๋น exception์ด ๋ฐ์ํ์๋ skip
noSkip()
ํด๋น exception์ด ๋ฐ์ํ๋ฉด skip์ ํ์ง ์๊ณ ์ค๋ฅ๋ฅผ ๋ด๊ฒ ๋ค๋ ๊ฒ
skipPolicy()
์ฉ์ ์ ์๋ก skip์ ๋ํ policy๋ฅผ ๋ง๋ค์ด์ ์ ์ฉํ๊ณ ์ถ์๋ ์ฌ์ฉ
@Bean
public Step skipRecordCopyFileStep() {
return this.stepBuilderFactory.get("skipRecordCopyFileStep")
.<Customer, Customer>chunk(10)
.reader(null)
.writer(null)
.faultTolerant()
.skip(Exception.class)
.noSkip(ParseException.class)
.skipLimit(10)
.build();
}
SkipPolicy
public interface SkipPolicy {
boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException;
}
SkipPolicy
๊ตฌํ์ฒด๋ skipํ ์์ธ์ ํ์ฉ ํ์๋ฅผ ํ๋ณํ ์ ์์ผ๋ฉฐ, boolean
๊ฐ์ผ๋ก ๋ด๋ถ ๋ก์ง์ ์์ ํ ์ ์๋ค.
public class FileVerificationSkipper implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
if (t instanceof FileNotFoundException) {
return false;
} else if (t instanceof ParseException && skipCount < 10) {
return true;
} else {
return false;
}
}
}
์ค๋ฅ ๋ก๊ทธ ๋จ๊ธฐ๊ธฐ
ItemListener
๋ฅผ ์ฌ์ฉํด ์๋ชป๋ ๋ ์ฝ๋๋ฅผ ๊ธฐ๋กํ๋ ๋ฐฉ๋ฒ์ ๋ค๋ฃฐ ๊ฒ์ด๋ค.
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
์๋ชป๋ ๋ ์ฝ๋๋ฅผ ์ฝ์์ ๋ ๋ก๊ทธ๋ฅผ ๋จ๊ธฐ๊ธฐ ์ํด์ onReadError
๋ฉ์๋๋ฅผ ์ค๋ฐ๋ผ์ด๋ํด ItemListenerSupport
๋ฅผ ์ฌ์ฉํ์ฌ ์๋ฌ๋ฅผ ๊ธฐ๋กํ๋ค.
๋จ์ํ ํ์ผ์ ์ฝ์ด์ฌ ๋๋ ์์ ๊ฐ์ด ์ฒ๋ฆฌํ๋ฉด ๋์ง๋ง, DB๋ฅผ ์ด์ฉํ ์ค๋ฅ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ์๋ ์ค์ DB์ ์ถ๋ ฅ์ ์คํ๋ง ์์ฒด๋ ํ์ด๋ฒ๋ค์ดํธ์ ๊ฐ์ ๋ค๋ฅธ ํ๋ ์์ํฌ๊ฐ ์ฒ๋ฆฌํ๋ฏ๋ก, ์คํ๋ง ๋ฐฐ์น์์ ์ฒ๋ฆฌํ ์์ธ๊ฐ ๋ง์ง ์๋ค.
onReadError ํ์ผ ์ค๋ฅ ๋ก๊ทธ ๋จ๊ธฐ๊ธฐ
@Slf4j
public class CustomerItemListener {
@OnReadError
public void onReadError(Exception e) {
if (e instanceof FlatFileParseException) {
FlatFileParseException ffpe = (FlatFileParseException) e;
StringBuilder sb = new StringBuilder();
sb.append("์ค๋ฅ ๋ฐ์ ๋ผ์ธ : ");
sb.append(ffpe.getLineNumber());
sb.append("์
๋ ฅ๊ฐ : ");
sb.append(ffpe.getInput());
log.error(sb.toString(), ffpe);
} else {
log.error("์ค๋ฅ ๋ฐ์", e);
}
}
}
@Bean
public CustomerItemListener customerItemListener(){
return new CustomerItemListener();
}
@Bean
public Step copyFileStep() {
return this.stepBuilderFactory.get("skipRecordCopyFileStep")
.<Customer, Customer>chunk(10)
.reader(null)
.writer(null)
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.listener(customerItemListener())
.build();
}
Last updated
Was this helpful?