Spring Batch์ Reader์์ ์ฝ์ด์ฌ ์ ์๋ ๋ฐ์ดํฐ ์ ํ์ ๋ค์๊ณผ ๊ฐ๋ค.
์
๋ ฅ ๋ฐ์ดํฐ์์ ์ฝ์ด์ค๊ธฐ
ํ์ผ์์ ์ฝ์ด์ค๊ธฐ
Java Message Service ๋ฑ ๋ค๋ฅธ ์์ค์์ ์ฝ์ด์ค๊ธฐ
์ปค์คํ
ํ Reader๋ก ์ฝ์ด์ค๊ธฐ
Copy package org . springframework . batch . item ;
public interface ItemReader < T > {
@ Nullable
T read () throws Exception , UnexpectedInputException , ParseException , NonTransientResourceException ;
}
ItemReader
์ read()
๋ฅผ ํธ์ถํ๋ฉด, ํด๋น ๋ฉ์๋๋ ์คํ
๋ด์์ ์ฒ๋ฆฌํ Itemํ๊ฐ๋ฅผ ๋ฐํํ๋ฉฐ, ์คํ
์์๋ ์์ดํ
๊ฐ์๋ฅผ ์ธ์ด ์ฒญํฌ ๋ด ๋ฐ์ดํฐ๊ฐ ๋ช๊ฐ๊ฐ ์ฒ๋ฆฌ๋๋์ง ๊ด๋ฆฌํ๋ค. ํด๋น Item์ ItemProcessor
๋ก ์ ๋ฌ๋๋ฉฐ, ๊ทธ ๋ค ItemWriter
๋ก ์ ๋ฌ๋๋ค.
๊ฐ์ฅ ๋ํ์ ์ธ ๊ตฌํ์ฒด์ธ JdbcPagingItemReader
์ ํด๋์ค ๊ณ์ธต ๊ตฌ์กฐ๋ฅผ ๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
์ฌ๊ธฐ์ ItemReader
์ ItemStream
์ธํฐํ์ด์ค๋ ๊ฐ์ด ๊ตฌํํ๊ณ ์๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
Copy public interface ItemStream {
void open ( ExecutionContext var1) throws ItemStreamException ;
// Batch์ ์ฒ๋ฆฌ ์ํ ์
๋ฐ์ดํธ
void update ( ExecutionContext var1) throws ItemStreamException ;
void close () throws ItemStreamException ;
}
ItemStream
์ ์ฃผ๊ธฐ์ ์ผ๋ก ์ํ๋ฅผ ์ ์ฅํ๊ณ , ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ฉด ํด๋น ์ํ์์ ๋ณต์ํ๊ธฐ ์ํ ๋ง์ปค์ธํฐํ์ด์ค์ด๋ค. ์ฆ, ItemReader
์ ์ํ๋ฅผ ์ ์ฅํ๊ณ ์คํจํ ๊ณณ์์ ๋ค์ ์คํํ ์ ์๊ฒ ํด์ฃผ๋ ์ญํ ์ ํ๋ค. ItemReader
์ ItemStream
์ธํฐํ์ด์ค๋ฅผ ์ง์ ๊ตฌํํ์ฌ ์ํ๋ ํํ์ ItemReader๋ฅผ ๋ง๋ค ์ ์๋ค.
ํ์ผ ์
๋ ฅ
FlatFileItemReader
org.springframework.batch.item.file.FlatFileItemReader
flat file
ํ ๊ฐ ํน์ ๊ทธ ์ด์์ ๋ ์ฝ๋๊ฐ ํฌํจ๋ ํน์ ํ์ผ
ํ์ผ์ ๋ด์ฉ์ ๋ด๋ ๋ฐ์ดํฐ์ ์๋ฏธ๋ฅผ ์ ์ ์๋ค.
ํ์ผ ๋ด ๋ฐ์ดํฐ์ ํฌ๋งท์ด๋ ์๋ฏธ๋ฅผ ์ ์ํ๋ ๋ฉํ ๋ฐ์ดํฐ๊ฐ ์๋ค.
์ต์
ํ์
default
์ค๋ช
๊ตฌํ์ฒด
๋ฌธ์์ด ๋ฐฐ์ด์ ํ์ผ์ ํ์ฑํ ๋ ๊ฑด๋๋ฐ์ด์ผํ ์ฃผ์ ์ค์ ๋ํ๋ด๋ ์ ๋์ด ์ง์
ํ๋ซํผ์ ๊ธฐ๋ณธ Charset
ํ์ผ์ ์ฌ์ฉ๋ ๋ฌธ์์ด ์ธ์ฝ๋ฉ
ํ์ผ ํ์ค์ String์ผ๋ก ์ฝ์ ๋ค ์ฒ๋ฆฌ ๋์์ธ ๋๋ฉ์ธ ๊ฐ์ฒด(Item)์ผ๋ก ๋ณํ
DefaultLineMapper
JsonLineMapper
PassThroughLineMapper
ํ์ผ์ ์ฝ์ด์ฌ ๋ ๋ช ์ค์ ๊ฑด๋๋๊ณ ์์ํ ์ง ์ง์
DefaultRecordSeparatorPolicy
๊ฐ ์ค์ ๋ง์ง๋ง์ ์ ์ํ๋๋ฐ ์ฌ์ฉ
๋ณ๋๋ก ์ง์ ํ์ง ์์ผ๋ฉด ๊ฐํ ๋ฌธ์๊ฐ ๋ ์ฝ๋์ ๋ ๋ถ๋ถ์ ๋ํ๋ธ๋ค.
์ค์ ๊ฑด๋๋ธ ๋ ํธ์ถ๋๋ ์ฝ๋ฐฑ ์ธํฐํ์ด์ค
๊ฑด๋๋ ๋ชจ๋ ์ค์ ์ด ์ฝ๋ฐฑ์ด ํธ์ถ๋๋ค.
true๋ก ์ง์ ์, ๋ฆฌ์์ค๋ฅผ ์ฐพ์ ์ ์๋ ๊ฒฝ์ฐ Exception์ ๋์ง๋ค.
true : ์ฌ์์ ๊ฐ๋ฅํ๋๋ก ๊ฐ ์ฒญํฌ ์ฒ๋ฆฌ ํ ItemReader
์ํ ์ ์ฅ
false : ๋ค์ค ์ค๋ ๋ ํ๊ฒฝ์์ false ์ง์
๊ณ ์ ๋ ๋๋น ํ์ผ
Copy Aimee CHoover 7341Vel Avenue Mobile AL35928
Jonas UGilbert 8852In St. Saint Paul MN57321
Regan MBaxter 4851Nec Av. Gulfport MS33193
Octavius TJohnson 7418Cum Road Houston TX51507
Sydnee NRobinson 894 Ornare. Ave Olathe KS25606
Stuart KMckenzie 5529Orci Av. Nampa ID18562
Copy @ Bean
@ StepScope
public FlatFileItemReader< Customer > customerItemReader(@ Value ( "#{jobParameters['customerFile']}" ) PathResource inputFile) {
return new FlatFileItemReaderBuilder < Customer >()
. name ( "customerItemReader" ) // ๊ฐ ์คํ
์ ExecutionContext์ ์ถ๊ฐ๋๋ ํน์ ํค์ ์ ๋๋ฌธ์๋ก ์ฌ์ฉ๋ ์ด๋ฆ(saveState false์ธ ๊ฒฝ์ฐ ์ง์ ํ ํ์X)
. resource (inputFile)
. fixedLength () // FixedLengthBuilder
. columns ( new Range []{ new Range( 1 , 11 ) , new Range( 12 , 12 ) , new Range( 13 , 22 ) , new Range( 23 , 26 )
, new Range( 27 , 46 ) , new Range( 47 , 62 ) , new Range( 63 , 64 ) , new Range( 65 , 69 ) }) // ๊ณ ์ ๋๋น
. names ( new String []{ "firstName" , "middleInitial" , "lastName" , "addressNumber" , "street"
, "city" , "state" , "zipCode" }) // ๊ฐ ์ปฌ๋ผ๋ช
// .strict(false) // ์ ์๋ ํ์ฑ ์ ๋ณด ๋ณด๋ค ๋ง์ ํญ๋ชฉ์ด ๋ ์ฝ๋์ ์๋ ๊ฒฝ์ฐ(true ์์ธ)
. targetType ( Customer . class ) // BeanWrapperFieldSetMapper ์์ฑํด ๋๋ฉ์ธ ํด๋ ์ค์ ๊ฐ์ ์ฑ์
. build ();
}
๊ตฌ๋ถ์ ํ์ผ
Copy Aimee,C,Hoover,7341,Vel Avenue,Mobile,AL,35928
Jonas,U,Gilbert,8852,In St.,Saint Paul,MN,57321
Regan,M,Baxter,4851,Nec Av.,Gulfport,MS,33193
Octavius,T,Johnson,7418,Cum Road,Houston,TX,51507
Sydnee,N,Robinson,894,Ornare. Ave,Olathe,KS,25606
Stuart,K,Mckenzie,5529,Orci Av.,Nampa,ID,18562
Copy @ Bean
@ StepScope
public FlatFileItemReader< Customer > delimitedCustomerItemReader(@ Value ( "#{jobParameters['customerFile']}" ) PathResource inputFile) {
return new FlatFileItemReaderBuilder < Customer >()
. name ( "delimitedCustomerItemReader" ) // ๊ฐ ์คํ
์ ExecutionContext์ ์ถ๊ฐ๋๋ ํน์ ํค์ ์ ๋๋ฌธ์๋ก ์ฌ์ฉ๋ ์ด๋ฆ(saveState false์ธ ๊ฒฝ์ฐ ์ง์ ํ ํ์X)
. resource (inputFile)
. delimited () // default(,) DelimitedLineTokenizer๋ฅผ ์ฌ์ฉํด ๊ฐ ๋ ์ฝ๋๋ฅผ FieldSet์ผ๋ก ๋ณํ
. names ( new String []{ "firstName" , "middleInitial" , "lastName" , "addressNumber" , "street"
, "city" , "state" , "zipCode" }) // ๊ฐ ์ปฌ๋ผ๋ช
. targetType ( Customer . class ) // BeanWrapperFieldSetMapper ์์ฑํด ๋๋ฉ์ธ ํด๋ ์ค์ ๊ฐ์ ์ฑ์
. build ();
}
FieldSetMapper ์ปค์คํ
Copy public interface FieldSetMapper < T > {
T mapFieldSet ( FieldSet fieldSet) throws BindException ;
}
org.springframework.batch.item.file.mapping.FieldSetMapper
๋ฅผ ๊ตฌํํ์ฌ ์ปค์คํ
๋งคํผ๋ฅผ ๋ง๋ค ์ ์๋ค.
Copy public class CustomFieldSetMapper implements FieldSetMapper < Customer > {
@ Override
public Customer mapFieldSet ( FieldSet fieldSet) throws BindException {
Customer customer = new Customer() ;
customer . setAddress ( fieldSet . readString ( "addressNumber" ) + " " + fieldSet . readString ( "street" ));
customer . setCity ( fieldSet . readString ( "city" ));
customer . setFirstName ( fieldSet . readString ( "firstName" ));
customer . setLastName ( fieldSet . readString ( "lastName" ));
customer . setMiddleInitial ( fieldSet . readString ( "middleInitial" ));
customer . setState ( fieldSet . readString ( "state" ));
customer . setZipCode ( fieldSet . readString ( "zipCode" ));
return customer;
}
}
Copy @ Bean
@ StepScope
public FlatFileItemReader< Customer > delimitedCustomerItemReader(@ Value ( "#{jobParameters['customerFile']}" ) PathResource inputFile) {
return new FlatFileItemReaderBuilder < Customer >()
. name ( "delimitedCustomerItemReader" )
. resource (inputFile)
. delimited ()
. names ( new String []{ "firstName" , "middleInitial" , "lastName" , "addressNumber" , "street" , "city" , "state" , "zipCode" })
. fieldSetMapper ( new CustomFieldSetMapper() ) // customMapper ์ค์
. build ();
}
.fieldSetMapper()
์ ์ปค์คํ
๋งคํผ๋ฅผ ์ง์ ํ๋ฉด ๋๋ค.
LineTokenizer ์ปค์คํ
org.springframework.batch.item.file.transform.LineTokenizer
Copy public interface LineTokenizer {
FieldSet tokenize (@ Nullable String line);
}
Copy public class CustomFileLineTokenizer implements LineTokenizer {
@ Setter
private String delimiter = "," ;
private String [] names = new String []{
"firstName"
, "middleInitial"
, "lastName"
, "address"
, "city"
, "state"
, "zipCode"
};
private FieldSetFactory fieldSetFactory = new DefaultFieldSetFactory() ;
@ Override
public FieldSet tokenize ( String line) {
// ๊ตฌ๋ถ์๋ก ํ๋ ๊ตฌ๋ถ
String [] fields = line . split (delimiter);
List < String > parsedFields = new ArrayList <>();
for ( int i = 0 ; i < fields . length ; i ++ ) {
if (i == 4 ) {
// 3,4๋ฒ์จฐ ํ๋ ๋จ์ผ ํ๋๋ก ๊ตฌ์ฑ
parsedFields . set (i - 1 , parsedFields . get (i - 1 ) + " " + fields[i]);
} else {
parsedFields . add (fields[i]);
}
}
// ๊ฐ์ ๋ฐฐ์ด & ํ๋ ์ด๋ฆ ๋ฐฐ์ด์ ๋๊ฒจ ํ๋๋ฅผ ์์ฑ
return fieldSetFactory . create ( parsedFields . toArray ( new String [ 0 ]) , names);
}
}
Copy @ Bean
@ StepScope
public FlatFileItemReader< Customer > lineTokenizerCustomerItemReader(@ Value ( "#{jobParameters['customerFile']}" ) PathResource inputFile) {
return new FlatFileItemReaderBuilder < Customer >()
. name ( "lineTokenizerCustomerItemReader" )
. resource (inputFile)
. lineTokenizer ( new CustomFileLineTokenizer() ) // lineTokenzier Custom
. targetType ( Customer . class ) // BeanWrapperFieldSetMapper ์์ฑํด ๋๋ฉ์ธ ํด๋ ์ค์ ๊ฐ์ ์ฑ์
. build ();
}
LineMapper
org.springframework.batch.item.file.mapping.PatternMatchingCompositeLineMapper
์ฌ๋ฌ๊ฐ์ LineTokenizer
๋ก ๊ตฌ์ฑ๋ Map์ ์ ์ธํ ์ ์์.
PatternMatcher<LineTokenizer> tokenizers
๊ฐ LineTokenizer
๋ฅผ ํ์๋ก ํ๋ ์ฌ๋ฌ๊ฐ์ FieldSetMapper
Map ์ ์ธํ ์ ์์.
PatternMatcher<FieldSetMapper<T>> patternMatcher
Copy CUST,Warren,Q,Darrow,8272 4th Street,New York,IL,76091
TRANS,1165965,2011-01-22 00:13:29,51.43
CUST,Ann,V,Gates,9247 Infinite Loop Drive,Hollywood,NE,37612
CUST,Erica,I,Jobs,8875 Farnam Street,Aurora,IL,36314
TRANS,8116369,2011-01-21 20:40:52,-14.83
TRANS,8116369,2011-01-21 15:50:17,-45.45
TRANS,8116369,2011-01-21 16:52:46,-74.6
TRANS,8116369,2011-01-22 13:51:05,48.55
TRANS,8116369,2011-01-21 16:51:59,98.53
์ฌ๋ฌ ํ์์ผ๋ก ๊ตฌ์ฑ๋ csv ํ์ผ์ด๋ค.
Copy @ Bean
public PatternMatchingCompositeLineMapper lineTokenizer() {
Map < String , LineTokenizer > lineTokenizerMap = new HashMap <>( 2 );
lineTokenizerMap . put ( "TRANS*" , transactionLineTokenizer() ); // TRANS๋ก ์์ํ๋ฉด transactionLineTokenizer
lineTokenizerMap . put ( "CUST*" , customerLineTokenizer() ); // CUST๋ก ์์ํ๋ฉด, customerLineTokenizer
Map < String , FieldSetMapper > fieldSetMapperMap = new HashMap <>( 2 );
BeanWrapperFieldSetMapper < Customer > customerFieldSetMapper = new BeanWrapperFieldSetMapper <>();
customerFieldSetMapper . setTargetType ( Customer . class );
fieldSetMapperMap . put ( "TRANS*" , new TransactionFieldSetMapper() ); // ์ผ๋ฐ์ ์ด์ง ์์ ํ์
ํ๋ ๋ณํ์ FieldSetMapper ํ์(Date, Double)
fieldSetMapperMap . put ( "CUST*" , customerFieldSetMapper);
PatternMatchingCompositeLineMapper lineMappers = new PatternMatchingCompositeLineMapper() ;
lineMappers . setTokenizers (lineTokenizerMap);
lineMappers . setFieldSetMappers (fieldSetMapperMap);
return lineMappers;
}
TRANS๋ก ์์ํ๋ ๊ฒฝ์ฐ์ CUST๋ก ์์ํ๋ ๊ฒฝ์ฐ ๊ฐ๊ฐ FieldSetMapper
, LineTokenizer
๋ฅผ ์ฌ์ฉํด ํ์ฑ ๋ฐ set์ ํ ์ ์๋ค.
Copy @ Bean
public DelimitedLineTokenizer transactionLineTokenizer() {
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer() ;
delimitedLineTokenizer . setNames ( "prefix" , "accountNumber" , "transactionDate" , "amount" );
return delimitedLineTokenizer;
}
@ Bean
public DelimitedLineTokenizer customerLineTokenizer() {
DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer() ;
delimitedLineTokenizer . setNames ( "firstName" , "middleInitial" , "lastName" , "address" , "city" , "state" , "zipCode" );
delimitedLineTokenizer . setIncludedFields ( 1 , 2 , 3 , 4 , 5 , 6 , 7 ); // prefix์ ์ธํ ๋ชจ๋ ํ๋
return delimitedLineTokenizer;
}
ItemStreamReader ์ปค์คํ
๋๊ฐ์ ๋ค๋ฅธ ํฌ๋งท์ ๋ฐ์ดํฐ๊ฐ ์ฌ์ค์ ์๋ก ์ฐ๊ด์ด ์๋ ๋ฐ์ดํฐ์ผ ์ ์๋ค. ๊ทธ ๊ฒฝ์ฐ์๋ ํ๊ฐ์ ๋๋ฉ์ธ์ด ๋ค๋ฅธ ํ๊ฐ์ ๋๋ฉ์ธ์ ๋ด์ฉ์ ํฌํจํ๊ณ ์์ ์ ์๋ค.
Copy CUST,Warren,Q,Darrow,8272 4th Street,New York,IL,76091
TRANS,1165965,2011-01-22 00:13:29,51.43
CUST,Ann,V,Gates,9247 Infinite Loop Drive,Hollywood,NE,37612
CUST,Erica,I,Jobs,8875 Farnam Street,Aurora,IL,36314
TRANS,8116369,2011-01-21 20:40:52,-14.83
TRANS,8116369,2011-01-21 15:50:17,-45.45
TRANS,8116369,2011-01-21 16:52:46,-74.6
TRANS,8116369,2011-01-22 13:51:05,48.55
TRANS,8116369,2011-01-21 16:51:59,98.53
๊ฑฐ๋๋ด์ญ(TRANS) ๋ฐ์ดํฐ๋ ๊ทธ ์์ ๊ณ ๊ฐ(CUST)์ ๊ณ์ฝ ์ ๋ณด๋ผ๊ณ ๊ฐ์ ํด๋ณผ ๊ฒ์ด๋ค.
Copy @ AllArgsConstructor
@ NoArgsConstructor
@ Getter
@ Setter
@ ToString
public class Customer {
private Long id;
private String firstName;
private String middleInitial;
private String lastName;
private String addressNumber;
private String street;
private String city;
private String state;
private String zipCode;
private String address;
private List < Transaction > transactions; // ๊ณ ๊ฐ์ ๊ฐ์ฝ์ ๋ณด
}
Custom ๋๋ฉ์ธ ๊ฐ์ฒด์ Transaction ๊ฑฐ๋๋ด์ญ ์ ๋ณด๋ฅผ ํฌํจํ๊ฒ ๋ณ๊ฒฝํด์ค๋ค.
Copy public class CustomerFileReader implements ItemStreamReader < Customer > {
private Object curItem = null ;
private ItemStreamReader < Object > delegate;
public CustomerFileReader ( ItemStreamReader < Object > delegate) {
this . delegate = delegate;
}
@ Override
public Customer read () throws Exception , UnexpectedInputException , ParseException , NonTransientResourceException {
if (curItem == null ) {
curItem = delegate . read (); // ๊ณ ๊ฐ ์ ๋ณด๋ฅผ ์ฝ์.
}
Customer item = (Customer) curItem;
curItem = null ;
if (item != null ) {
item . setTransactions ( new ArrayList <>());
// ๋ค์ ๊ณ ๊ฐ ๋ ์ฝ๋๋ฅผ ๋ง๋๊ธฐ ์ ๊น์ง ๊ฑฐ๋๋ด์ญ ๋ ์ฝ๋๋ฅผ ์ฝ๋๋ค.
while ( peek() instanceof Transaction) {
item . getTransactions () . add ((Transaction) curItem);
curItem = null ;
}
}
return item;
}
private Object peek () throws Exception {
if (curItem == null ) {
curItem = delegate . read ();
}
return curItem;
}
@ Override
public void open ( ExecutionContext executionContext) throws ItemStreamException {
delegate . open (executionContext);
}
@ Override
public void update ( ExecutionContext executionContext) throws ItemStreamException {
delegate . update (executionContext);
}
@ Override
public void close () throws ItemStreamException {
delegate . close ();
}
}
์ฌ๊ธฐ์ ํต์ฌ์ read()
๋ฉ์๋์ด๋ค. ํ์ค์ฉ ์ฝ์ด์ฌ ๋ ๋ค์ ๊ณ ๊ฐ์ ๋ณด๊ฐ ๋์ฌ๋๊น์ง ๊ฑฐ๋๋ด์ญ ๋ ์ฝ๋๋ฅผ ์ฝ์ด ํด๋น ๊ณ ๊ฐ์ด ๊ฐ์ง๊ณ ์๊ฒ ํ๋ค.
Copy @ Bean
@ StepScope
public FlatFileItemReader multiLineItemReader(@ Value ( "#{jobParameters['customFile']}" ) PathResource resource) {
return new FlatFileItemReaderBuilder < Customer >()
. name ( "multiLineItemReader" )
. lineMapper ( multiLineTokenizer() )
. resource (resource)
. build ();
}
@ Bean
public CustomerFileReader customerFileReader() {
return new CustomerFileReader(multiLineItemReader( null )) ;
}
Job์์ itemReader๋ถ๋ถ์ ์์์ ์์ฑํ CustomerFileReader
๋ฅผ ์ค์ ํด์ฃผ๋ฉด ๋๋ค.
Copy Customer(id=null, firstName=Warren, middleInitial=Q, lastName=Darrow, addressNumber=null, street=null, city=New York, state=IL, zipCode=76091, address=8272 4th Street, transactions=[Transaction(accountNumber=1165965, transactionDate=Sat Jan 22 00:13:29 KST 2011, amount=51.43, dateFormat=java.text.SimpleDateFormat@7c669100)])
Customer(id=null, firstName=Ann, middleInitial=V, lastName=Gates, addressNumber=null, street=null, city=Hollywood, state=NE, zipCode=37612, address=9247 Infinite Loop Drive, transactions=[])
Customer(id=null, firstName=Erica, middleInitial=I, lastName=Jobs, addressNumber=null, street=null, city=Aurora, state=IL, zipCode=36314, address=8875 Farnam Street, transactions=[Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 20:40:52 KST 2011, amount=-14.83, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 15:50:17 KST 2011, amount=-45.45, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 16:52:46 KST 2011, amount=-74.6, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Sat Jan 22 13:51:05 KST 2011, amount=48.55, dateFormat=java.text.SimpleDateFormat@7c669100), Transaction(accountNumber=8116369, transactionDate=Fri Jan 21 16:51:59 KST 2011, amount=98.53, dateFormat=java.text.SimpleDateFormat@7c669100)])
๋ค์๊ณผ ๊ฐ์ด ๊ณ ๊ฐ์ด ๊ฐ์ง๊ณ ์๋ ๊ฑฐ๋๋ด์ญ์ ์ถ๋ ฅํ ์ ์๋ค.
MultiResourceItemReader
๋์ผํ ํฌ๋งท์ผ๋ก ์์ฑ๋ ์ฌ๋ฌ๊ฐ์ ํ์ผ์ ์ฝ์ด๋ค์ด๋ ItemReader
๋ฅผ ์ ๊ณตํ๋ค.
org.springframework.batch.item.file.MultiResourceItemReader
MultiResourceItemReader
๋ ์ฝ์ด์ผํ ํ์ผ๋ช
์ ํจํด์ MultiResourceItemReader
์ ์์กด์ฑ์ผ๋ก ์ ์ํ๋ค.
Copy public class MultiResourceCustomerFileReader implements ResourceAwareItemReaderItemStream < Customer > {
private Object curItem = null ;
private ResourceAwareItemReaderItemStream < Object > delegate;
public MultiResourceCustomerFileReader ( ResourceAwareItemReaderItemStream < Object > delegate) {
this . delegate = delegate;
}
/**
* Resource๋ฅผ ์ฃผ์
ํจ์ผ๋ก ItemReader๊ฐ ํ์ผ ๊ด๋ฆฌํ๋ ๋์
* ๊ฐ ํ์ผ์ ์คํ๋ง ๋ฐฐ์น๊ฐ ์์ฑํด ์ฃผ์
* @param resource
*/
@ Override
public void setResource ( Resource resource) {
System . out . println (resource);
this . delegate . setResource (resource);
}
@ Override
public Customer read () throws Exception , UnexpectedInputException , ParseException , NonTransientResourceException {
if (curItem == null ) {
curItem = delegate . read (); // ๊ณ ๊ฐ ์ ๋ณด๋ฅผ ์ฝ์.
}
Customer item = (Customer) curItem;
curItem = null ;
if (item != null ) {
item . setTransactions ( new ArrayList <>());
// ๋ค์ ๊ณ ๊ฐ ๋ ์ฝ๋๋ฅผ ๋ง๋๊ธฐ ์ ๊น์ง ๊ฑฐ๋๋ด์ญ ๋ ์ฝ๋๋ฅผ ์ฝ๋๋ค.
while ( peek() instanceof Transaction) {
item . getTransactions () . add ((Transaction) curItem);
curItem = null ;
}
}
return item;
}
private Object peek () throws Exception {
if (curItem == null ) {
curItem = delegate . read ();
}
return curItem;
}
@ Override
public void open ( ExecutionContext executionContext) throws ItemStreamException {
delegate . open (executionContext);
}
@ Override
public void update ( ExecutionContext executionContext) throws ItemStreamException {
delegate . update (executionContext);
}
@ Override
public void close () throws ItemStreamException {
delegate . close ();
}
}
์์์ ๋ค๋ฃฌ ItemStreamReader
์ ๋ค๋ฅธ ์ ์ Resource
์ฃผ์
๋ถ๋ถ์ด๋ค. Resource
๋ฅผ ์ฃผ์
ํ๊ฒ ๋๋ฉด ํ์ํ ๊ฐ ํ์ผ์ ์คํ๋ง ๋ฐฐ์น๊ฐ ์์ฑํด ItemReader
์ ์ฃผ์
ํ ์ ์๋ค.
Copy @ Bean
@ StepScope
public MultiResourceItemReader multiResourceItemReader(@ Value ( "#{jobParameters['customFile']}" ) Resource [] resources) {
return new MultiResourceItemReaderBuilder <>()
. name ( "multiResourceItemReader" )
. resources (resources) // resources ๋ฐฐ์ด
. delegate ( multiResourceCustomerFileReader() ) // ์ค์ ์์
์ ์ํํ ์์ ์ปดํฌ๋ํธ
. build ();
}
@ Bean
public MultiResourceCustomerFileReader multiResourceCustomerFileReader() {
return new MultiResourceCustomerFileReader(multiResourceCustomerItemReader()) ;
}
@ Bean
@ StepScope
public FlatFileItemReader multiResourceCustomerItemReader() {
return new FlatFileItemReaderBuilder < Customer >()
. name ( "multiResourceCustomerItemReader" )
. lineMapper ( multiResourceTokenizer() )
. build ();
}
์ฝ์ด์ผํ ํ์ผ ๋ชฉ๋ก(resources)์ ์ค์ ํด์ฃผ๊ณ , delegate()
์ ์ค์ ๋ก ์์
์ ์ํํ ์์ ์ปดํฌ๋ํธ๋ฅผ ์ง์ ํด์ฃผ๋ฉด๋๋ค.
์ฌ๋ฌ ๊ฐ์ ํ์ผ์ ๋ค๋ฃฐ๋๋ ์ฌ์์์ ํ๊ฒ๋๋ ์ํฉ์์ ์คํ๋ง๋ฐฐ์น๊ฐ ์ถ๊ฐ์ ์ธ ์์ ์ฅ์น๋ฅผ ์ ๊ณตํด์ฃผ์ง ์๋๋ค. ์๋ฅผ๋ค์ด file1.csv, file2.csv, file3.csv๊ฐ ์๋๋ฐ, file2.csv ์ฒ๋ฆฌํ๋ ๊ณผ์ ์์ ์ค๋ฅ๊ฐ ๋ฐ์ํ์ฌ ์ก์ด ์คํจ ๋ ์ดํ ์ฌ์์์ ํ ๋ file4.csv๋ฅผ ์ถ๊ฐํ๋ค๋ฉด, ์ต์ด ์คํ์ file4.csv๊ฐ ์์์์๋ ๋ถ๊ตฌํ๊ณ , ํฌํจํ์ฌ ์คํํ๋ค.
์ด๋ฌํ ๋ฌธ์ ์ ์ ํด๊ฒฐํ๊ธฐ ์ํด์ ๋ฐฐ์น ์คํ ์ ์ฌ์ฉํ ๋๋ ํฐ๋ฆฌ๋ฅผ ๋ณ๋๋ก ์์ฑํ๋ ๊ฒ์ด ์ผ๋ฐ์ ์ด๋ฉฐ, ์๋ก ์์ฑ๋ ๋ชจ๋ ํ์ผ์ ์๋ก์ด ๋๋ ํฐ๋ฆฌ์ ๋ฃ์ด์ฃผ์ด ํ์ฌ ์ํ์ค์ธ ์ก์ ์ํฅ์ ์ฃผ์ง์๊ฒ ํ ์ ์๋ค.
XML
XML์ ํ์ผ ๋ด ๋ฐ์ดํฐ๋ฅผ ์ค๋ช
ํ ์ ์๋ ํ๊ทธ๋ฅผ ์ฌ์ฉํด ํ์ผ์ ํฌํจ๋ ๋ฐ์ดํฐ๋ฅผ ์ค๋ช
ํ๋ฏ๋ก, Flat file๊ณผ๋ ๋ค๋ฅด๋ค.
XML parser๋ก ์ฃผ๋ก DOM๊ณผ SAX๋ฅผ ๋ง์ด ์ฌ์ฉํ๋ค.
Dom vs SAX vs StAX
DOM(Document Object Model) ๋ฐฉ์
XML๋ฌธ์ ์ ์ฒด๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ๋ก๋ํ์ฌ ๊ฐ์ ์ฝ๋๋ค .
XML๋ฌธ์๋ฅผ ์ฝ์ผ๋ฉด ๋ชจ๋ Element, Text, Attribute ๋ฑ์ ๋ํ ๊ฐ์ฒด๋ฅผ ์์ฑํ๊ณ , ์ด๋ฅผ Document ๊ฐ์ฒด๋ก ๋ฆฌํดํ๋ค.
Document ๊ฐ์ฒด๋ DOM API์ ์๋ง๋ ํธ๋ฆฌ ๊ตฌ์กฐ์ ์๋ฐ ๊ฐ์ฒด๋ก ํํ๋์ด ์๋ค.
XML๋ฌธ์๊ฐ ๋ฉ๋ชจ๋ฆฌ์ ๋ชจ๋ ์ฌ๋ผ๊ฐ ์์ด์ ๋
ธ๋๋ค์ ๊ฒ์, ์์ , ๊ตฌ์กฐ๋ณ๊ฒฝ์ด ๋น ๋ฅด๊ณ ์ฉ์ด ํ๋ค.
SAX ๋ฐฉ์ ๋ณด๋ค ์ง๊ด์ ์ด๋ฉฐ ํ์ฑ์ด ๋จ์ํ๊ธฐ ๋๋ฌธ์ ์ผ๋ฐ์ ์ผ๋ก DOM ๋ฐฉ์์ ์ฑํํ์ฌ ๊ฐ๋ฐํ๊ฒ ๋๋ค.
SAX(Simple API for XML) ๋ฐฉ์
SAX ๋ฐฉ์์ XML ๋ฌธ์๋ฅผ ํ๋์ ๊ธด ๋ฌธ์์ด๋ก ๊ฐ์ฃผํ๋ค.
XML๋ฌธ์๋ฅผ ์์์ ๋ถํฐ ์์ฐจ์ ์ผ๋ก ์ฝ์ด๊ฐ๋ฉด์ ๋
ธ๋๊ฐ ์ด๋ฆฌ๊ณ ๋ซํ๋ ๊ณผ์ ์์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ๋ค.
๊ฐ๊ฐ์ ์ด๋ฒคํธ๊ฐ ๋ฐ์๋ ๋๋ง๋ค ์ํํ๊ณ ์ ํ๋ ๊ธฐ๋ฅ์ ์ด๋ฒคํธ ํธ๋ค๋ฌ ๊ธฐ์ ์ ์ด์ฉํ์ฌ ๊ตฌํํ๋ค.
XML๋ฌธ์๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ์ ๋ถ ๋ก๋ฉํ๊ณ ํ์ฑํ๋ ๊ฒ์ด ์๋๊ธฐ ๋๋ฌธ์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ด ์ ๊ณ ๋จ์ํ ์ฝ๊ธฐ๋ง ํ ๋ ์๋๊ฐ ๋น ๋ฅด๋ค .
๋ฐ์ํ ์ด๋ฒคํธ๋ฅผ ํธ๋ค๋งํ์ฌ ๋ณ์์ ์ ์ฅํ๊ณ ํ์ฉํ๋ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ๋ณต์กํ๊ณ ๋
ธ๋ ์์ ์ด์ด๋ ต๋ค.
XML ์ค๋ธ์ ํธ์ Random Access๋ฅผ ํ์ง ๋ชปํด, ์ง๋ ์๋ฆฌ๋จผํธ๋ฅผ ์ฐธ์กฐํ ๊ฒฝ์ฐ ๋ค์ ์ฒ์๋ถํฐ ์ฝ์ด์ผํ๋ค.
StAX(Streaming API for XML)
StAX๋ push ์ pull ๋ฐฉ์์ ๋์์ ์ ๊ณตํ๋ ํ์ด๋ธ๋ฆฌ๋ํ ํํ
XML ๋ฌธ์๋ฅผ ํ์ฑํ ๋ ํ๋์ Fragment๋ก ๊ตฌ๋ถ
์ ํด์ง ์๋ฆฌ๋จผํธ๋ฅผ ์ฝ์๋๋ DOM ๋ฐฉ์์ ์ฌ์ฉํ๋ฉฐ, Fragement๋ก ์ฒ๋ฆฌํ๋ ๊ฒ์ SAX์ Push ๋ฐฉ์์ ์ฌ์ฉ
์ฆ, ๊ฐ ์ธ์
์ ๋
๋ฆฝ์ ์ผ๋ก ํ์ฑํ๋ ๊ธฐ๋ฅ์ ์ ๊ณต
์คํ๋ง ๋ฐฐ์น์์๋ StAX ํ์๋ฅผ ์ฌ์ฉํ๋ค.
StaxEventItemReader
org.springframework.batch.item.xml.StaxEventItemReader
Copy < customers >
< customer >
< firstName >Laura</ firstName >
< middleInitial >O</ middleInitial >
< lastName >Minella</ lastName >
< address >2039 Wall Street</ address >
< city >Omaha</ city >
< state >IL</ state >
< zipCode >35446</ zipCode >
< transactions >
< transaction >
< accountNumber >829433</ accountNumber >
< transactionDate >2010-10-14 05:49:58</ transactionDate >
< amount >26.08</ amount >
</ transaction >
</ transactions >
</ customer >
...
</ customers >
์ ์์ ํ์ผ์ ํ์ฑํ๋ Reader๋ฅผ ๊ตฌํํด๋ณผ๊ฒ์ด๋ค.
Copy @ Bean
@ StepScope
public StaxEventItemReader< Customer > staxCustomerFileReader(@ Value ( "#{jobParameters['customFile']}" ) Resource resource) {
return new StaxEventItemReaderBuilder < Customer >()
. name ( "staxCustomerFileReader" )
. resource (resource)
. addFragmentRootElements ( "customer" ) // ํ๋ ๊ทธ๋จผํธ ๋ฃจํธ ์๋ฆฌ๋จผํธ
. unmarshaller ( customerMarshaller() ) // XML์ ๋๋ฉ์ธ ๊ฐ์ฒด๋ก ๋ฐํ JAXB ์ฌ์ฉ
. build ();
}
.addFragmentRootElements()
: StaxEventItemReader
๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด XML ํ๋ํฌ๋จผํธ ๋ฃจํธ ์๋ฆฌ๋จผํธ๋ฅผ ์ง์ XML๋ด์์ Item์ผ๋ก ์ทจ๊ธํ fragment์ root ์๋ฆฌ๋จผํธ๋ฅผ ์๋ณํ๋๋ฐ ์ฌ์ฉ
.unmarshaller()
: org.springframework.oxm.Unmarshaller
๊ตฌํ์ฒด๋ฅผ ์ ๋ฌ ๋ฐ์ผ๋ฉฐ, XML์ ๋๋ฉ์ธ ๊ฐ์ฒด๋ก ๋ฐํ
์ด๋ฒ ์์ ์์๋ org.springframework.oxm.jaxb.Jaxb2Marshaller
๋ฅผ ์ฌ์ฉํ์ผ๋ฉฐ, Jaxb2Marshaller
๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ์์กด์ฑ ์ถ๊ฐ๊ฐ ํ์ํ๋ค.
build.gradle
Copy dependencies {
implementation 'org.springframework:spring-oxm'
implementation 'javax.xml.bind:jaxb-api:2.3.1'
implementation 'javax.activation:activation:1.1'
implementation 'com.sun.xml.bind:jaxb-core:2.3.0.1'
implementation 'com.sun.xml.bind:jaxb-impl:2.3.1'
}
JAXB ์์กด์ฑ๊ณผ Spring OXM ๋ชจ๋๋ก JAXB๋ฅผ ์ฌ์ฉํ๋ ์คํ๋ง ์ปดํฌ๋ํธ ์์กด์ฑ์ ์ถ๊ฐํด์ค๋ค.
XML์ ํ์ฑํ ์ ์๊ฒ ํ๋ ค๋ฉด ๋๋งค์ธ ๊ฐ์ฒด์ JAXB ์ด๋
ธํ
์ด์
์ ์ถ๊ฐํด์ค ๊ฒ์ด๋ค.
Copy @ NoArgsConstructor
@ Getter
@ Setter
@ ToString
@ XmlRootElement
public class Customer {
private Long id;
private String firstName;
private String middleInitial;
private String lastName;
private String addressNumber;
private String street;
private String city;
private String state;
private String zipCode;
private String address; // customAddressMapper
private List < Transaction > transactions;
@ XmlElementWrapper (name = "transactions" )
@ XmlElement (name = "transaction" )
public void setTransactions ( List < Transaction > transactions) {
this . transactions = transactions;
}
}
Copy @ Getter
@ Setter
@ ToString
@ XmlType (name = "transaction" )
public class Transaction {
private String accountNumber;
private Date transactionDate;
private Double amount;
@ Setter (value = AccessLevel . NONE )
private DateFormat dateFormat = new SimpleDateFormat( "MM/dd/yyyy" ) ;
}
๋๋ฉ์ธ ๊ฐ์ฒด ์ค์ ์ด ๋๋ฌ์ผ๋ฉด, ๊ฐ ๋ธ๋ก์ ํ์ฑํ๋๋ฐ ์ฌ์ฉํ Unmarshaller๋ฅผ ๊ตฌํํด์ฃผ๋ฉด ๋๋ค.
Copy @ Bean
public Jaxb2Marshaller customerMarshaller() {
Jaxb2Marshaller jaxb2Marshaller = new Jaxb2Marshaller() ;
jaxb2Marshaller . setClassesToBeBound ( Customer . class , Transaction . class ); // ๋๋ฉ์ธ ๊ฐ์ฒด
return jaxb2Marshaller;
}
JSON
JsonItemReader
org.springframework.batch.item.json.JsonItemReader
์ฒญํฌ๋ฅผ ์ฝ์ด ๊ฐ์ฒด๋ก ํ์ฑํ๋ค.
์ค์ ํ์ฑ ์์
์ JsonObjectReader
์ธํฐํ์ด์ค ๊ตฌํ์ฒด์ ์์ํ๋ค.
JsonObjectReader
์ค์ ๋ก JSON ๊ฐ์ฒด๋ฅผ ํ์ฑํ๋ ์ญํ ์ ํ๋ค. ์คํ๋ง ๋ฐฐ์น๋ 2๊ฐ์ JsonObjectReader
๋ฅผ ์ ๊ณตํด์ค๋ค.
Copy [
{
"firstName" : "Laura" ,
"middleInitial" : "O" ,
"lastName" : "Minella" ,
"address" : "2039 Wall Street" ,
"city" : "Omaha" ,
"state" : "IL" ,
"zipCode" : "35446" ,
"transactions" : [
{
"accountNumber" : 829433 ,
"transactionDate" : "2010-10-14 05:49:58" ,
"amount" : 26.08
}
]
} ,
...
]
์ ๊ตฌ์กฐ๋ก๋์ด์๋ JSON์ ํ์ฑํด ๋ณผ๊ฒ์ด๋ค.
Copy @ Bean
@ StepScope
public JsonItemReader< Customer > jsonFileReader(@ Value ( "#{jobParameters['customFile']}" ) Resource resource) {
// Jackson์ด JSON์ ์ฝ๊ณ ์ฐ๋๋ฐ ์ฌ์ฉํ๋ ์ฃผ์ ํด๋์ค
ObjectMapper objectMapper = new ObjectMapper() ;
objectMapper . setDateFormat ( new SimpleDateFormat( "yyyy-MM-dd hh:mm:ss" ) );
JacksonJsonObjectReader < Customer > jsonObjectReader = new JacksonJsonObjectReader <>( Customer . class ); // ๋ฐํํ ํด๋์ค ์ค์
jsonObjectReader . setMapper (objectMapper); // ObjectMapper
return new JsonItemReaderBuilder < Customer >()
. name ( "jsonFileReader" )
. jsonObjectReader (jsonObjectReader) // ํ์ฑ์ ์ฌ์ฉ
. resource (resource)
. build ();
}
ObjectMapper
๋ Jackson์ด JSON์ ์ฝ๊ณ ์ฐ๋๋ฐ ์ฌ์ฉํ๋ ์ฃผ์ ํด๋์ค๋ก ์ปค์คํ
๋ฐ์ดํฐ ํฌ๋งท๋ค์ ์ค์ ํ๋ฉด๋๋ค.
JacksonJsonObjectReader
์์ฑ์ ๋ฐํํ ํด๋์ค๋ฅผ ์ค์ ํ๊ณ , ์ปค์คํ
ํ ObjectMapper
๋ฅผ ์ค์ ํด์ฃผ๋ฉด๋๋ค.
JsonItemReaderBuilder
๋ ํ์ฑ์ ์ฌ์ฉํ JsonObjectReader
๋ฅผ ์ค์ ํด์ฃผ๋ฉด๋๋ค.
Database Reader
Cursor๋ ํ์ค java.sql.ResultSet
์ผ๋ก ๊ตฌํ๋๋ฉฐ, ResultSet
์ด open๋๋ฉด next()
๋ฉ์๋๋ฅผ ํธ์ถํ ๋๋ง๋ค ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๋ฐฐ์น ๋ ์ฝ๋๋ฅผ ๊ฐ์ ธ์ ๋ฐํํ๋ค.
์ฆ, Cursor ๋ฐฉ์์ DB์ ์ปค๋ฅ์
์ ๋งบ์ ํ, Cursor๋ฅผ ํ์นธ์ฉ ์ฎ๊ธฐ๋ฉด์ ์ง์์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์จ๋ค.
(CursorItemReader๋ streaming์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ)
Cursor๋ ํ๋์ Connection์ผ๋ก Batch๊ฐ ๋๋ ๋๊ฐ์ง ์ฌ์ฉ๋๊ธฐ ๋๋ฌธ ์ Batch๊ฐ ๋๋๊ธฐ์ ์ DB์ ์ดํ๋ฆฌ์ผ์ด์
Connection์ด ๋์ด์ง ์ ์์ผ๋ฏ๋ก, DB์ SocketTimeout์ ์ถฉ๋ถํ ํฐ ๊ฐ์ผ๋ก ์ค์ ํด์ผํ๋ค. (๋คํธ์ํฌ ์ค๋ฒํค๋ ์ถ๊ฐ) ์ถ๊ฐ๋ก ResultSet
์ ์ค๋ ๋ ์์ ์ด ๋ณด์ฅ๋์ง ์์ ๋ค์ค ์ค๋ ๋ ํ๊ฒฝ์์๋ ์ฌ์ฉํ ์ ์๋ค.
HibernateCursorItemReader
StoredProcedureItemReader
Paging ๋ฐฉ์์ ํ๋ฒ์ ์ง์ ํ PageSize๋งํผ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์จ๋ค.
SpringBatch์์ offset๊ณผ limit์ PageSize์ ๋ง๊ฒ ์๋์ผ๋ก ์์ฑํด์ค๋ค. ๋ค๋ง ๊ฐ ์ฟผ๋ฆฌ๋ ๊ฐ๋ณ์ ์ผ๋ก ์คํ๋๋ฏ๋ก, ํ์ด์ง์ ๊ฒฐ๊ณผ๋ฅผ ์ ๋ ฌ(order by)ํ๋ ๊ฒ์ด ์ค์ํ๋ค.
Batch ์ํ์๊ฐ์ด ์ค๋ ๊ฑธ๋ฆฌ๋ ๊ฒฝ์ฐ์๋ PagingItemReader
๋ฅผ ์ฌ์ฉ ํ๋ ๊ฒ์ด ์ข๋ค. Paging์ ๊ฒฝ์ฐ ํ ํ์ด์ง๋ฅผ ์ฝ์๋๋ง๋ค Connection์ ๋งบ๊ณ ๋๊ธฐ ๋๋ฌธ์ ์๋ฌด๋ฆฌ ๋ง์ ๋ฐ์ดํฐ๋ผ๋ ํ์์์๊ณผ ๋ถํ ์์ด ์ํ ๋ ์ ์๋ค.
HibernatePagingItemReader
JDBC
JdbcCursorItemReader
Copy /**
* --job.name=jdbcCursorItemReaderJob city=Chicago
*/
@ RequiredArgsConstructor
@ Configuration
public class JdbcCursorCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@ Bean
public Job jdbcCursorItemReaderJob (){
return jobBuilderFactory . get ( "jdbcCursorItemReaderJob" )
. start ( jdbcCursorItemReaderStep() )
. build ();
}
@ Bean
public Step jdbcCursorItemReaderStep (){
return stepBuilderFactory . get ( "jdbcCursorItemReaderStep" )
. < Customer , Customer > chunk( 10 )
. reader ( customerJdbcCursorItemReader() )
. writer ( customerJdbcCursorItemWriter() )
. build ();
}
@ Bean
public JdbcCursorItemReader < Customer > customerJdbcCursorItemReader () {
return new JdbcCursorItemReaderBuilder < Customer >()
. name ( "customerJdbcCursorItemReader" )
. dataSource (dataSource)
. sql ( "select * from customer where city = ?" )
. rowMapper ( new BeanPropertyRowMapper <>( Customer . class ))
. preparedStatementSetter ( citySetter( null ) ) // ํ๋ผ๋ฏธํฐ ์ค์
. build ();
}
/**
* ํ๋ผ๋ฏธํฐ๋ฅผ SQL๋ฌธ์ ๋งคํ
* ArgumentPreparedStatementSetter๋ ๊ฐ์ฒด ๋ฐฐ์ด์ ๋ด๊ธด ์์๋๋ก ?์ ์์น์ ๊ฐ์ผ๋ก ์ค์
* @param city
* @return
*/
@ Bean
@ StepScope
public ArgumentPreparedStatementSetter citySetter (@ Value ( "#{jobParameters['city']}" ) String city) {
return new ArgumentPreparedStatementSetter( new Object []{city}) ;
}
@ Bean
public ItemWriter customerJdbcCursorItemWriter () {
return (items) -> items . forEach ( System . out :: println);
}
}
<T, T> chunk(int chunkSize)
: ์ฒซ๋ฒ์งธ T๋ Reader์์ ๋ฐํํ ํ์
, ๋๋ฒ์งธ T๋ Writer์ ํ๋ผ๋ฏธํฐ๋ก ๋์ด์ฌ ํ์
์ด๋ค.
fetchSize : DB์์ ํ๋ฒ์ ๊ฐ์ ธ์ฌ ๋ฐ์ดํฐ ์์ ๋ํ๋ธ๋ค. Paging์ ์ค์ ์ฟผ๋ฆฌ๋ฅผ limit, offset์ผ๋ก ๋ถํ ์ฒ๋ฆฌํ๋ ๋ฐ๋ฉด, Cursor๋ ๋ถํ ์ฒ๋ฆฌ์์ด ์คํ๋๋ ๋ด๋ถ์ ์ผ๋ก ๊ฐ์ ธ์จ๋ ๋ฐ์ดํฐ๋ FetchSize๋งํผ ๊ฐ์ ธ์ read()
๋ฅผ ํตํด์ ํ๋์ฉ ๊ฐ์ ธ์จ๋ค.
dataSource : DB์ ์ ๊ทผํ๊ธฐ ์ํด ์ฌ์ฉํ DataSource๊ฐ์ฒด
rowMapper : ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์คํด์ค๋ก ๋งคํํ๊ธฐ ์ํ ๋งคํผ
BeanPropertyRowMapper
๋ฅผ ์ฌ์ฉํด ๋๋ฉ์ธ ๊ฐ์ฒด์ ๋งคํํด์ค๋ค.
sql : Reader์์ ์ฌ์ฉํ ์ฟผ๋ฆฌ๋ฌธ
preparedStatementSetter: SQL๋ฌธ์ ํ๋ผ๋ฏธํฐ ์ค์
name : Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
JdbcPagingItemReader
Copy /**
* --job.name=jdbcPagingItemReaderJob city=Chicago
*/
@ RequiredArgsConstructor
@ Configuration
public class JdbcPagingCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource;
@ Bean
public Job jdbcPagingItemReaderJob (){
return jobBuilderFactory . get ( "jdbcPagingItemReaderJob" )
. start ( jdbcPagingItemReaderStep() )
. build ();
}
@ Bean
public Step jdbcPagingItemReaderStep (){
return stepBuilderFactory . get ( "jdbcPagingItemReaderStep" )
. < Customer , Customer > chunk( 10 )
. reader ( customerJdbcPagingItemReader( null , null ) )
. writer ( customerJdbcPagingItemWriter() )
. build ();
}
@ Bean
@ StepScope
public JdbcPagingItemReader < Customer > customerJdbcPagingItemReader (
PagingQueryProvider pagingQueryProvider , @ Value ( "#{jobParameters['city']}" ) String city) {
Map < String , Object > params = new HashMap <>( 1 );
params . put ( "city" , city);
return new JdbcPagingItemReaderBuilder < Customer >()
. name ( "customerJdbcPagingItemReader" ) // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
. dataSource (dataSource) // DB์ ์ ๊ทผํ๊ธฐ ์ํด ์ฌ์ฉํ DataSource๊ฐ์ฒด
. queryProvider (pagingQueryProvider) // PagingQueryProvider
. parameterValues (params) // SQL ๋ฌธ์ ์ฃผ์
ํด์ผํ ํ๋ผ๋ฏธํฐ
. pageSize ( 10 ) // ๊ฐ ํ์ด์ง ํฌ
. rowMapper ( new BeanPropertyRowMapper <>( Customer . class )) // ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์คํด์ค๋ก ๋งคํํ๊ธฐ ์ํ ๋งคํผ
. build ();
}
@ Bean
public ItemWriter customerJdbcPagingItemWriter () {
return (items) -> items . forEach ( System . out :: println);
}
@ Bean
public SqlPagingQueryProviderFactoryBean pagingQueryProvider (){
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean() ;
queryProvider . setDataSource (dataSource); // ์ ๊ณต๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ํ์
์ ๊ฒฐ์ (setDatabaseType ์ผ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ์
์ค์ ๋ ๊ฐ๋ฅ)
queryProvider . setSelectClause ( "*" );
queryProvider . setFromClause ( "from customer" );
queryProvider . setWhereClause ( "where city = :city" );
Map < String , Order > sortKeys = new HashMap <>();
sortKeys . put ( "lastName" , Order . ASCENDING );
queryProvider . setSortKeys (sortKeys);
return queryProvider;
}
}
PagingItemReader
๋ PagingQueryProvider
๋ฅผ ํตํด ์ฟผ๋ฆฌ๋ฅผ ์์ฑํ๋ค. ์ด๋ ๊ฒ ์์ฑํ๋ ์ด์ ๋ ๊ฐ DB์๋ Paging์ ์ง์ํ๋ ์์ฒด์ ์ธ ์ ๋ต์ด ์์ผ๋ฉฐ, Spring์ ๊ฐ DB์ Paging ์ ๋ต์ ๋ง์ถฐ ๊ตฌํ๋์ด์ผ๋ง ํ๋ค.
Copy public SqlPagingQueryProviderFactoryBean() {
this . providers . put ( DatabaseType . DB2 , new Db2PagingQueryProvider() );
this . providers . put ( DatabaseType . DB2VSE , new Db2PagingQueryProvider() );
this . providers . put ( DatabaseType . DB2ZOS , new Db2PagingQueryProvider() );
this . providers . put ( DatabaseType . DB2AS400 , new Db2PagingQueryProvider() );
this . providers . put ( DatabaseType . DERBY , new DerbyPagingQueryProvider() );
this . providers . put ( DatabaseType . HSQL , new HsqlPagingQueryProvider() );
this . providers . put ( DatabaseType . H2 , new H2PagingQueryProvider() );
this . providers . put ( DatabaseType . MYSQL , new MySqlPagingQueryProvider() );
this . providers . put ( DatabaseType . ORACLE , new OraclePagingQueryProvider() );
this . providers . put ( DatabaseType . POSTGRES , new PostgresPagingQueryProvider() );
this . providers . put ( DatabaseType . SQLITE , new SqlitePagingQueryProvider() );
this . providers . put ( DatabaseType . SQLSERVER , new SqlServerPagingQueryProvider() );
this . providers . put ( DatabaseType . SYBASE , new SybasePagingQueryProvider() );
}
Spring Batch์์๋ SqlPagingQueryProviderFactoryBean
์ ํตํด DataSource ์ค์ ๊ฐ์ ๋ณด๊ณ , ์ Provider์ค ํ๋๋ฅผ ์๋ ์ ํํ๋๋ก ํ๋ค.
SpringBatch์์ offset๊ณผ limit์ PageSize์ ๋ง๊ฒ ์๋์ผ๋ก ์์ฑํด์ค๋ค. ๋ค๋ง ๊ฐ ์ฟผ๋ฆฌ๋ ๊ฐ๋ณ์ ์ผ๋ก ์คํ๋๋ฏ๋ก, ๋์ผํ ๋ ์ฝ๋ ์ ๋ ฌ ์์๋ฅผ ๋ณด์ฅํ๋ ค๋ฉด ํ์ด์ง์ ๊ฒฐ๊ณผ๋ฅผ ์ ๋ ฌ(order by)ํ๋ ๊ฒ์ด ์ค์ํ๋ค.
๋ํ, ์ด ์ ๋ ฌํค๊ฐ ResultSet
๋ด์์ ์ค๋ณต๋์ง ์์์ผํ๋ค.
Copy SELECT * FROM customer WHERE city = :city ORDER BY lastName ASC LIMIT 10
์คํ๋ ์ฟผ๋ฆฌ ๋ก๊ทธ๋ฅผ ๋ณด๋ฉด Paging Size์ธ LIMIT 10์ด ๋ค์ด๊ฐ ๊ฒ์ ๋ณผ ์ ์๋ค.
Hibernate
์๋ฐ ORM ๊ธฐ์ ๋ก, ์ ํ๋ฆฌ์ผ์ด์
์์ ์ฌ์ฉํ๋ ๊ฐ์ฒด ์งํฅ ๋ชจ๋ธ์ ๊ด๊ณํ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ก ๋งคํํ๋ ๊ธฐ๋ฅ์ ์ ๊ณต
XML or Annotation์ ์ฌ์ฉํด ๊ฐ์ฒด๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ
์ด๋ธ๋ก ๋งคํ
๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ง์ํ๋ ํ๋ ์์ํฌ๋ฅผ ์ ๊ณต
Hibernate ์ธ์
๊ตฌํ์ฒด์ ๋ฐ๋ผ์ ๋ค๋ฅด๊ฒ ์๋ํ๋ค.
๋ณ๋ ์ค์ ์์ด Hibernate๋ฅผ ์ฌ์ฉํ๋ฉด ์ผ๋ฐ์ ์ธ stateful ์ธ์
๊ตฌํ์ฒด๋ฅผ ์ฌ์ฉ
์๋ฅผ ๋ค์ด ๋ฐฑ๋ง๊ฑด์ ์์ดํ
์ ์ฝ๊ณ ์ฒ๋ฆฌํ๋ค๋ฉด Hibernate ์ธ์
์ด ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ์กฐํํ ๋ ์์ดํ
์ ์บ์์ ์์ผ๋ฉฐ OutOfMemoryException ์ด ๋ฐ์
Persistence๋ก ์ฌ์ฉํ๋ฉด, ์ง์ JDBC๋ฅผ ์ฌ์ฉํ ๋๋ณด๋ค ๋ ํฐ ๋ถํ๋ฅผ ์ ๋ฐ
๋ ์ฝ๋ ๋ฐฑ๋ง ๊ฑด์ ์ฒ๋ฆฌํ ๋๋ ํ๊ฑด๋น ms ๋จ์์ ์ฐจ์ด๋ ๊ฑฐ๋ํ ์ฐจ์ด๊ฐ ๋๋ค.
์คํ๋ง ๋ฐฐ์น๋ ์ด๋ฌํ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋๋ก HibernateCursorItemReader
, HibernatePagingItemReader
๋ฅผ ๊ฐ๋ฐ
์ปค๋ฐ์ ์ธ์
์ flushํ๋ฉฐ ๋ฐฐ์น ์ฒ๋ฆฌ์ ๊ด๊ณ๊ฐ ์๋ ์ถ๊ฐ ๊ธฐ๋ฅ์ ์ ๊ณตํด์ค๋ค.
build.gradle ์์กด์ฑ ์ถ๊ฐ
Copy compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
Domain ๊ฐ์ฒด ์์
Copy @ Entity
@ Table (name = "customer" )
public class Customer {
@ Id
private Long id; // pk
private String firstName;
private String middleInitial;
private String lastName;
private String address;
private String city;
private String state;
private String zipCode;
}
๋งคํํ ๊ฐ์ฒด๊ฐ Entity์์ ๋ํ๋
Entity๊ฐ ๋งคํ๋๋ ํ
์ด๋ธ ์ง์
TransactionManager ์ปค์คํ
ํ์ด๋ฒ๋ค์ดํธ ์ธ์
๊ณผ Datasource๋ฅผ ํฉ์น TransactionManager
๊ฐ ํ์ํ๋ค.
Copy @ Component
public class HibernateBatchConfigurer extends DefaultBatchConfigurer {
private DataSource dataSource;
private SessionFactory sessionFactory;
private PlatformTransactionManager transactionManager;
/**
* Datasource connection๊ณผ ํ์ด๋ฒ๋ค์ดํธ ์ธ์
์ค์
* @param dataSource
* @param entityManagerFactory
*/
public HibernateBatchConfigurer ( DataSource dataSource ,
EntityManagerFactory entityManagerFactory) {
super(dataSource);
this . dataSource = dataSource;
this . sessionFactory = entityManagerFactory . unwrap ( SessionFactory . class );
// ํ์ด๋ฒ๋ค์ดํธ ํธ๋์ญ์
์ค์
this . transactionManager = new HibernateTransactionManager( this . sessionFactory ) ;
}
@ Override
public PlatformTransactionManager getTransactionManager () {
return this . transactionManager ;
}
}
BatchConfigurer
์ ์ปค์คํ
๊ตฌํ์ฒด๋ฅผ ์ฌ์ฉํด HibernateTransactionManager
๋ฅผ ๊ตฌ์ฑํ๋ค.
HibernateCursorItemReader
Copy /**
* --job.name=hibernateCursorItemReaderJob city=Chicago
*/
@ RequiredArgsConstructor
@ Configuration
public class HibernateCursorCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@ Bean
public Job hibernateCursorItemReaderJob (){
return jobBuilderFactory . get ( "hibernateCursorItemReaderJob" )
. start ( hibernateCursorItemReaderStep() )
. build ();
}
@ Bean
public Step hibernateCursorItemReaderStep (){
return stepBuilderFactory . get ( "hibernateCursorItemReaderStep" )
. < Customer , Customer > chunk( 10 )
. reader ( customerHibernateCursorItemReader( null ) )
. writer ( customerHibernateCursorItemWriter() )
. build ();
}
@ Bean
@ StepScope
public HibernateCursorItemReader < Customer > customerHibernateCursorItemReader (@ Value ( "#{jobParameters['city']}" ) String city) {
return new HibernateCursorItemReaderBuilder < Customer >()
. name ( "customerHibernateCursorItemReader" ) // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
. sessionFactory ( entityManagerFactory . unwrap ( SessionFactory . class ))
. queryString ( "from Customer where city = :city" ) // HQL ์ฟผ๋ฆฌ
. parameterValues ( Collections . singletonMap ( "city" , city)) // SQL ๋ฌธ์ ์ฃผ์
ํด์ผํ ํ๋ผ๋ฏธํฐ
. build ();
}
@ Bean
public ItemWriter customerHibernateCursorItemWriter () {
return (items) -> items . forEach ( System . out :: println);
}
}
ํ์ด๋ฒ๋ค์ดํธ ์ฟผ๋ฆฌ๋ฅผ ์ํํ๋ ๋ฐฉ๋ฒ์ 4๊ฐ์ง๊ฐ ์กด์ฌํ๋ค.
ํ์ด๋ฒ๋ค์ดํธ ๊ตฌ์ฑ์ ํฌํจ๋ ๋ค์๋ ํ์ด๋ฒ๋ค์ดํธ ์ฟผ๋ฆฌ ์ฐธ์กฐ
https://www.baeldung.com/hibernate-named-query
์คํ๋ง ๊ตฌ์ฑ์ ์ถ๊ฐํ๋ HQL ์ฟผ๋ฆฌ
.queryString("from Customer where city = :city")
ํ์ด๋ฒ๋ค์ดํธ ์ฟผ๋ฆฌ(HQL)๋ฅผ ํ๋ก๊ทธ๋๋ฐ์ผ๋ก ๋น๋
๋ค์ดํฐ๋ธ SQL ์ฟผ๋ฆฌ๋ฅผ ์คํํ ๋ค ๊ฒฐ๊ณผ๋ฅผ ํ์ด๋ฒ๋ค์ดํธ๋ก ๋งคํํ๋๋ฐ ์ฌ์ฉ
https://data-make.tistory.com/616
HibernatePagingItemReader
Copy /**
* --job.name=hibernatePagingItemReaderJob city=Chicago
*/
@ RequiredArgsConstructor
@ Configuration
public class HibernatePagingCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@ Bean
public Job hibernatePagingItemReaderJob () {
return jobBuilderFactory . get ( "hibernatePagingItemReaderJob" )
. start ( hibernatePagingItemReaderStep() )
. build ();
}
@ Bean
public Step hibernatePagingItemReaderStep () {
return stepBuilderFactory . get ( "hibernatePagingItemReaderStep" )
. < Customer , Customer > chunk( 10 )
. reader ( customerHibernatePagingItemReader( null ) )
. writer ( customerHibernatePagingItemWriter() )
. build ();
}
@ Bean
@ StepScope
public HibernatePagingItemReader < Customer > customerHibernatePagingItemReader (@ Value ( "#{jobParameters['city']}" ) String city) {
return new HibernatePagingItemReaderBuilder < Customer >()
. name ( "customerHibernatePagingItemReader" ) // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
. sessionFactory ( entityManagerFactory . unwrap ( SessionFactory . class ))
. queryString ( "from Customer where city = :city" ) // HQL ์ฟผ๋ฆฌ
. parameterValues ( Collections . singletonMap ( "city" , city)) // SQL ๋ฌธ์ ์ฃผ์
ํด์ผํ ํ๋ผ๋ฏธํฐ
. pageSize ( 10 ) // Cursor์ ์ ์ผํ ์ฐจ์ด์ ! pageSize ์ค์
. build ();
}
@ Bean
public ItemWriter customerHibernatePagingItemWriter () {
return (items) -> items . forEach ( System . out :: println);
}
}
Cursor ๋ฐฉ๋ฒ๊ณผ ์ ์ผํ๊ฒ ๋ค๋ฅธ ์ ์ .pageSize()
๋ก ์ฌ์ฉํ ํ์ด์ง ํฌ๊ธฐ๋ฅผ ์ง์ ํด์ผํ๋ ๊ฒ์ด๋ค.
JPA
JPA(Java Persisstence API)๋ ORM ์์ญ์์ ํ์คํ๋ ์ ๊ทผ๋ฒ์ ์ ๊ณตํ๋ค. Hibernate๊ฐ ์ด๊ธฐ JPA์ ์๊ฐ์ ์คฌ์ผ๋ฉฐ, ํ์ฌ๋ Hibernate๊ฐ JPA ๋ช
์ธ๋ฅผ ๊ตฌํํ๊ณ ์๋ค.
build.gradle ์์กด์ฑ ์ถ๊ฐ
Copy compileOnly 'org.springframework.boot:spring-boot-starter-data-jpa'
spring-boot-starter-data-jpa
๋ JPA๋ฅผ ์ฌ์ฉํ๋๋ฐ ํ์ํ ๋ชจ๋ ํ์ ์ปดํฌ๋ํธ๊ฐ ํฌํจ๋ผ์๋ค.
JpaCursorItemReader
Spring Batch 4.3์ด ๋ฆด๋ฆฌ์ฆ ๋๋ฉด์ JpaCursorItemReader ๊ฐ ๋์
๋์๋ค. ์ด์ ๋ฒ์ ๊น์ง๋ ์ ๊ณตํ์ง ์์๋ค.
Copy /**
* --job.name=jpaCursorItemReaderJob city=Chicago
*/
@ RequiredArgsConstructor
@ Configuration
public class JpaCursorCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@ Bean
public Job jpaCursorItemReaderJob (){
return jobBuilderFactory . get ( "jpaCursorItemReaderJob" )
. start ( jpaCursorItemReaderStep() )
. build ();
}
@ Bean
public Step jpaCursorItemReaderStep (){
return stepBuilderFactory . get ( "jpaCursorItemReaderStep" )
. < Customer , Customer > chunk( 10 )
. reader ( customerJpaCursorItemReader( null ) )
. writer ( customerJpaCursorItemWriter() )
. build ();
}
@ Bean
@ StepScope
public JpaCursorItemReader < Customer > customerJpaCursorItemReader (@ Value ( "#{jobParameters['city']}" ) String city) {
return new JpaCursorItemReaderBuilder < Customer >()
. name ( "customerJpaCursorItemReader" )
. entityManagerFactory (entityManagerFactory)
. queryString ( "select c from Customer c where c.city = :city" )
. parameterValues ( Collections . singletonMap ( "city" , city))
. build ();
}
@ Bean
public ItemWriter customerJpaCursorItemWriter () {
return (items) -> items . forEach ( System . out :: println);
}
}
JpaPagingItemReader
Copy /**
* --job.name=jpaPagingItemReaderJob city=Chicago
*/
@ RequiredArgsConstructor
@ Configuration
public class JpaPagingCustomerJob {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@ Bean
public Job jpaPagingItemReaderJob (){
return jobBuilderFactory . get ( "jpaPagingItemReaderJob" )
. start ( jpaPagingItemReaderStep() )
. build ();
}
@ Bean
public Step jpaPagingItemReaderStep (){
return stepBuilderFactory . get ( "jpaPagingItemReaderStep" )
. < Customer , Customer > chunk( 10 )
. reader ( customerJpaPagingItemReader( null ) )
. writer ( customerJpaPagingItemWriter() )
. build ();
}
@ Bean
@ StepScope
public JpaPagingItemReader < Customer > customerJpaPagingItemReader (@ Value ( "#{jobParameters['city']}" ) String city) {
return new JpaPagingItemReaderBuilder < Customer >()
. name ( "customerJpaPagingItemReader" )
. entityManagerFactory (entityManagerFactory)
. queryString ( "select c from Customer c where c.city = :city" )
. parameterValues ( Collections . singletonMap ( "city" , city))
. pageSize ( 10 )
. build ();
}
@ Bean
public ItemWriter customerJpaPagingItemWriter () {
return (items) -> items . forEach ( System . out :: println);
}
}
.entityManagerFactory
๋ฅผ ์ค์ ํ๋ ๊ฒ ์ด์ธ์ Jdbc์ ํฌ๊ฒ ๋ค๋ฅธ ์ ์ ์์ผ๋ฉฐ, Cursor์ ๋ค๋ฅธ์ ์ pageSize()
๋ฅผ ์ค์ ํ๋ ๊ฒ์ด๋ค.
JPA์์๋ .queryProvider()
๋ก Query ๊ฐ์ฒด๋ฅผ ์ฌ์ฉํด ์ฟผ๋ฆฌ๋ฅผ ์ํํ ์๋ ์๋ค.
MyBatisPagingItemReader
Copy <!--mybatis-config.xml-->
<? xml version = "1.0" encoding = "UTF-8" ?>
<! DOCTYPE configuration
PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
< configuration >
< settings >
< setting name = "defaultStatementTimeout" value = "25" />
</ settings >
</ configuration >
Copy package spring . batch . practice . config ;
import lombok . extern . slf4j . Slf4j ;
import org . apache . ibatis . session . SqlSessionFactory ;
import org . mybatis . spring . SqlSessionFactoryBean ;
import org . mybatis . spring . SqlSessionTemplate ;
import org . mybatis . spring . annotation . MapperScan ;
import org . springframework . beans . factory . annotation . Qualifier ;
import org . springframework . beans . factory . annotation . Value ;
import org . springframework . boot . context . properties . ConfigurationProperties ;
import org . springframework . boot . context . properties . ConfigurationPropertiesScan ;
import org . springframework . boot . jdbc . DataSourceBuilder ;
import org . springframework . context . ApplicationContext ;
import org . springframework . context . annotation . Bean ;
import org . springframework . context . annotation . Configuration ;
import org . springframework . core . io . Resource ;
import org . springframework . core . io . support . PathMatchingResourcePatternResolver ;
import javax . sql . DataSource ;
@ Slf4j
@ Configuration
public class MysqlMybatisConfig {
@ Value ( "${mybatis.mapper-locations}" )
private String mapperLocations;
@ Bean (name = "mybatisDataSource" )
@ ConfigurationProperties (prefix = "spring.datasource.hikari" )
public DataSource dataSource (){
return DataSourceBuilder . create () . build ();
}
@ Bean (name = "mybatisSqlSessionFactory" )
public SqlSessionFactory sqlSessionFactory (@ Qualifier ( "mybatisDataSource" ) DataSource dataSource , ApplicationContext applicationContext) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean() ;
sqlSessionFactoryBean . setDataSource (dataSource);
sqlSessionFactoryBean . setConfigLocation ( applicationContext . getResource ( "classpath:mybatis/mybatis-config.xml" ));
Resource [] resources = new PathMatchingResourcePatternResolver() . getResources (mapperLocations);
System . out . println (resources[ 0 ] . getURL ());
sqlSessionFactoryBean . setMapperLocations ( new PathMatchingResourcePatternResolver() . getResources (mapperLocations));
return sqlSessionFactoryBean . getObject ();
}
@ Bean
public SqlSessionTemplate sqlSessionTemplate (@ Qualifier ( "mybatisSqlSessionFactory" ) SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory) ;
}
}
Copy <? xml version = "1.0" encoding = "UTF-8" ?>
<! DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
< mapper namespace = "spring.batch.practice.dao.PayMapper" >
< select id = "selectPayList" parameterType = "hashmap" resultType = "spring.batch.practice.domain.Pay" >
<![CDATA[
SELECT ID, AMOUNT, TX_NAME, TX_DATE_TIME
FROM PAY
WHERE AMOUNT <= #{amount}
]]>
</ select >
</ mapper >
Copy @ Slf4j
@ RequiredArgsConstructor
@ Configuration
public class MybatisPagingItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@ Autowired
@ Qualifier ( "mybatisSqlSessionFactory" )
private SqlSessionFactory sqlSessionFactory;
private static final int CHUNK_SIZE = 10 ;
@ Bean
public Job mybatisPagingItemReaderJob () throws Exception {
return jobBuilderFactory . get ( "mybatisPagingItemReaderJob" )
. start ( mybatisPagingItemReaderStep() )
. build ();
}
@ Bean
public Step mybatisPagingItemReaderStep () throws Exception {
return stepBuilderFactory . get ( "mybatisPagingItemReaderStep" )
. < Pay , Pay > chunk(CHUNK_SIZE)
. reader ( mybatisPagingItemReader() )
. writer ( mybatisPagingItemWriter() )
. build ();
}
@ Bean
public MyBatisPagingItemReader < Pay > mybatisPagingItemReader () throws Exception {
Map < String , Object > params = new HashMap <>();
params . put ( "amount" , 2000 );
return new MyBatisPagingItemReaderBuilder < Pay >()
. pageSize (CHUNK_SIZE)
. sqlSessionFactory (sqlSessionFactory)
. queryId ( "spring.batch.practice.dao.PayMapper.selectPayList" )
. parameterValues (params)
. build ();
}
@ Bean
public ItemWriter < Pay > mybatisPagingItemWriter () {
return list -> {
for ( Pay pay : list) {
log . info ( "Current Pay={}" , pay);
}
};
}
}
Spring Data Repository
Spring Data๋ ์คํ๋ง ๋ฐ์ดํฐ๊ฐ ์ ๊ณตํ๋ ํน์ ์ธํฐํ์ด์ค ์ค ํ๋๋ฅผ ์์ํ๋ ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉ์๊ฐ ์ ์ํ๊ธฐ๋ง ํ๋ฉด ์คํ๋ง ๋ฐ์ดํฐ๊ฐ ํด๋น ์ธํฐํ์ด์ค์ ๊ตฌํ์ ์ฒ๋ฆฌํ๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.
์คํ๋ง ๋ฐฐ์น๋ ์คํ๋ง ๋ฐ์ดํฐ์ PagingAndSotringRepository
๋ฅผ ํ์ฉํ๊ธฐ ๋๋ฌธ์, ์คํ๋ง ๋ฐ์ดํฐ์ ํธํ์ฑ์ด ์ข๋ค.
RepositoryItemReader
RepositoryItemReader
๋ JdbcPagingItemReader
๋ HibernatePagingItemReader
๋ฅผ ์ฌ์ฉํ ๋์ ๋์ผํ๊ฒ PagingAndSotringRepository
๋ฅผ ์ฌ์ฉํด์ Paging ์ฟผ๋ฆฌ๋ฅผ ์คํํ๋ค.
RepositoryItemReader
๋ ์ด๋ค ์ ์ฅ์๊ฑด ์๊ด์์ด ํด๋น ๋ฐ์ดํฐ ์ ์ฅ์์ ์ง์๋ฅผ ์ํํ ์ ์๋ค๋ ์ ์์ ItemReader
์ ์ฐจ์ด๊ฐ ์๋ค.
Copy public interface CustomerRepository extends JpaRepository < Customer , Long > {
Page < Customer > findByCity ( String city , Pageable pageRequest);
}
PagingAndSotringRepository
๋ฅผ ์์ํ๋ Repository๋ฅผ ์์ฑํด city ์กฐ๊ฑด์ผ๋ก ์กฐํํ๋ ๋ฉ์๋๋ฅผ ์ ์ํ์๋ค.
Copy @ Bean
@ StepScope
public RepositoryItemReader< Customer > customerRepositoryItemReader(@ Value ( "#{jobParameters['city']}" ) String city) {
return new RepositoryItemReaderBuilder < Customer >()
. name ( "customerRepositoryItemReader" )
. arguments ( Collections . singletonList (city)) // pageable ํ๋ผ๋ฏธํฐ๋ฅผ ์ ์ธํ arguments
. methodName ( "findByCity" ) // ํธ์ถํ ๋ฉ์๋๋ช
. repository (customerRepository) // Repository ๊ตฌํ์ฒด
. sorts ( Collections . singletonMap ( "lastName" , Sort . Direction . ASC ))
. build ();
}
์ฃผ์ ์ฌํญ
JpaRepository
๋ฅผ ListItemReader
, QueueItemReader
์ ์ฌ์ฉํ๋ฉด ์๋๋ค.
์ด๋ ๊ฒ ๊ตฌํํ๋ ๊ฒฝ์ฐ Spring Batch์ ์ฅ์ ์ธ Paging & Cursor ๊ตฌํ์ด ์์ด ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๊ฐ ๋ถ๊ฐ๋ฅํ๋ค.
JpaRepository
๋ฅผ ์ฌ์ฉํด์ผํ๋ ๊ฒฝ์ฐ RepositoryItemReader
๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๊ถ์ฅํ๋ค.
Hibernate, JPA๋ฑ ์์์ฑ ์ปจํ
์คํธ๊ฐ ํ์ํ Reader ์ฌ์ฉ์ fetch size์ chunk size๋ ๋์ผํ ๊ฐ์ ์ ์งํด์ผ ํ๋ค.
ItemReaderAdapter
Adapter๋ ๋ค๋ฅธ ์๋ฆฌ๋ฉํธ์ ๋ํํ์ฌ ์คํ๋ง ๋ฐฐ์น๊ฐ ํด๋น ์๋ฆฌ๋จผํธ์ ํต์ ํ ์ ์๊ฒ ํ๋๋ฐ ์ฌ์ฉํ๋ค.
org.springframework.batch.item.adapter.ItemReaderAdapter
Copy public class ItemReaderAdapter < T > extends AbstractMethodInvokingDelegator < T > implements ItemReader < T > {
/**
* @return return value of the target method.
*/
@ Nullable
@ Override
public T read () throws Exception {
return invokeDelegateMethod() ;
}
}
ํธ์ถ ๋์ ์๋น์ค์ ์ฐธ์กฐ์ ํธ์ถํ ๋ฉ์๋์ ์ด๋ฆ์ ์์กด์ฑ์ผ๋ก ๋ฐ๋๋ค.
ItemReaderAdapter
๊ฐ ๋งค๋ฒ ํธ์ถํ ๋๋ง๋ค ๋ฐํ๋๋ ๊ฐ์ฒด๋ ItemReader
๊ฐ ๋ฐํํ๋ ๊ฐ์ฒด์ด๋ค.
์
๋ ฅ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์ฒ๋ฆฌํ๋ฉด ์๋น์ค ๋ฉ์๋๋ ๋ฐ๋์ null
์ ๋ฐํํด์ผํ๋ค. ์คํ๋ง ๋ฐฐ์น์๊ฒ ํด๋น step์ ์
๋ ฅ์ ๋ชจ๋ ์๋ฃํ์์ ์๋ฆฌ๋ ๊ฒ์ด๋ค.
Copy @ Component
public class CustomerService {
private List < Customer > customers;
private int curIndex;
private String [] firstNames = { "Michael" , "Warren" , "Ann" , "Terrence" ,
"Erica" , "Laura" , "Steve" , "Larry" };
private String middleInitial = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" ;
private String [] lastNames = { "Gates" , "Darrow" , "Donnelly" , "Jobs" ,
"Buffett" , "Ellison" , "Obama" };
private String [] streets = { "4th Street" , "Wall Street" , "Fifth Avenue" ,
"Mt. Lee Drive" , "Jeopardy Lane" ,
"Infinite Loop Drive" , "Farnam Street" ,
"Isabella Ave" , "S. Greenwood Ave" };
private String [] cities = { "Chicago" , "New York" , "Hollywood" , "Aurora" ,
"Omaha" , "Atherton" };
private String [] states = { "IL" , "NY" , "CA" , "NE" };
private Random generator = new Random() ;
public CustomerService () {
curIndex = 0 ;
customers = new ArrayList <>();
for ( int i = 0 ; i < 100 ; i ++ ) {
customers . add ( buildCustomer() );
}
}
private Customer buildCustomer () {
Customer customer = new Customer() ;
customer . setId (( long ) generator . nextInt ( Integer . MAX_VALUE ));
customer . setFirstName (
firstNames[ generator . nextInt ( firstNames . length - 1 )]);
customer . setMiddleInitial (
String . valueOf ( middleInitial . charAt (
generator . nextInt ( middleInitial . length () - 1 ))));
customer . setLastName (
lastNames[ generator . nextInt ( lastNames . length - 1 )]);
customer . setAddress ( generator . nextInt ( 9999 ) + " " +
streets[ generator . nextInt ( streets . length - 1 )]);
customer . setCity (cities[ generator . nextInt ( cities . length - 1 )]);
customer . setState (states[ generator . nextInt ( states . length - 1 )]);
customer . setZipCode ( String . valueOf ( generator . nextInt ( 99999 )));
return customer;
}
public Customer getCustomer () {
Customer cust = null ;
if (curIndex < customers . size ()) {
cust = customers . get (curIndex);
curIndex ++ ;
}
return cust;
}
}
Customer ๊ฐ์ฒด์ ๋ชฉ๋ก์ ๋ฌด์์๋ก ์์ฑํ๋ ์๋น์ค์ด๋ค.
Copy @ Bean
public ItemReaderAdapter< Customer > customerServiceItemReader() {
ItemReaderAdapter < Customer > adapter = new ItemReaderAdapter <>();
adapter . setTargetObject (customerService);
adapter . setTargetMethod ( "getCustomer" );
return adapter;
}
ItemReaderAdapter
์ ๊ธฐ์กด ์๋น์ค ์ค๋ธ์ ํธ์ ๋ฉ์๋๋ช
์ ์ ๋ฌํ๋ฉด๋๋ค.
์ค๋ฅ ์ฒ๋ฆฌ
์
๋ ฅ์์ ๋ ์ฝ๋๋ฅผ ์ฝ๋ ์ค์ ์ค๋ฅ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ ์ฒ๋ฆฌํ ์ ์๋ ๋ฐฉ๋ฒ์ ์ฌ๋ฌ๊ฐ์ง์ด๋ค.
์์ธ๋ฅผ ๋์ณ ์ฒ๋ฆฌ๋ฅผ ๋ฉ์ถ๊ธฐ
ํน์ ์์ธ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ ๋ ์ฝ๋ ๊ฑด๋๋๊ธฐ(skip)
Skip
์ด๋ค ์กฐ๊ฑด์์ ๋ ์ฝ๋๋ฅผ skipํ ์ง(์ด๋ค ์์ธ๋ฅผ ๋ฌด์ํ ๊ฒ์ธ์ง)
์ผ๋ง๋ ๋ง์ ๋ ์ฝ๋๋ฅผ skip ํ ์ ์๊ฒ ํ ๊ฒ์ธ์ง
๋ ์ฝ๋๋ฅผ skipํ ์ง ์ฌ๋ถ๋ฅผ ๊ฒฐ์ ํ ๋ ์ ๋๊ฐ์ง ์์๋ฅผ ๊ณ ๋ คํด์ผํ๋ค.
Skip์ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ์ฐ์ faultToLerant()
๋ผ๋ ๋ฉ์๋๋ฅผ ํธ์ถํด์ผํ๋ค.
skip ํ์ฉ ํ์. ํ์ฉ ํ์๋ฅผ ๋์ด๊ฐ๋ฉด job์ ์คํจํ๋ค. skip()๊ณผ ๋ฐ๋์ ๊ฐ์ด ์จ์ผ ํ๋ค
ํด๋น exception์ด ๋ฐ์ํ์๋ skip
ํด๋น exception์ด ๋ฐ์ํ๋ฉด skip์ ํ์ง ์๊ณ ์ค๋ฅ๋ฅผ ๋ด๊ฒ ๋ค๋ ๊ฒ
์ฉ์ ์ ์๋ก skip์ ๋ํ policy๋ฅผ ๋ง๋ค์ด์ ์ ์ฉํ๊ณ ์ถ์๋ ์ฌ์ฉ
Copy @ Bean
public Step skipRecordCopyFileStep() {
return this . stepBuilderFactory . get ( "skipRecordCopyFileStep" )
. < Customer , Customer > chunk( 10 )
. reader ( null )
. writer ( null )
. faultTolerant ()
. skip ( Exception . class )
. noSkip ( ParseException . class )
. skipLimit ( 10 )
. build ();
}
SkipPolicy
Copy public interface SkipPolicy {
boolean shouldSkip ( Throwable t , int skipCount) throws SkipLimitExceededException ;
}
SkipPolicy
๊ตฌํ์ฒด๋ skipํ ์์ธ์ ํ์ฉ ํ์๋ฅผ ํ๋ณํ ์ ์์ผ๋ฉฐ, boolean
๊ฐ์ผ๋ก ๋ด๋ถ ๋ก์ง์ ์์ ํ ์ ์๋ค.
Copy public class FileVerificationSkipper implements SkipPolicy {
@ Override
public boolean shouldSkip ( Throwable t , int skipCount) throws SkipLimitExceededException {
if (t instanceof FileNotFoundException) {
return false ;
} else if (t instanceof ParseException && skipCount < 10 ) {
return true ;
} else {
return false ;
}
}
}
์ค๋ฅ ๋ก๊ทธ ๋จ๊ธฐ๊ธฐ
ItemListener
๋ฅผ ์ฌ์ฉํด ์๋ชป๋ ๋ ์ฝ๋๋ฅผ ๊ธฐ๋กํ๋ ๋ฐฉ๋ฒ์ ๋ค๋ฃฐ ๊ฒ์ด๋ค.
Copy public interface ItemReadListener < T > extends StepListener {
void beforeRead ();
void afterRead ( T item);
void onReadError ( Exception ex);
}
์๋ชป๋ ๋ ์ฝ๋๋ฅผ ์ฝ์์ ๋ ๋ก๊ทธ๋ฅผ ๋จ๊ธฐ๊ธฐ ์ํด์ onReadError
๋ฉ์๋๋ฅผ ์ค๋ฐ๋ผ์ด๋ํด ItemListenerSupport
๋ฅผ ์ฌ์ฉํ์ฌ ์๋ฌ๋ฅผ ๊ธฐ๋กํ๋ค.
๋จ์ํ ํ์ผ์ ์ฝ์ด์ฌ ๋๋ ์์ ๊ฐ์ด ์ฒ๋ฆฌํ๋ฉด ๋์ง๋ง, DB๋ฅผ ์ด์ฉํ ์ค๋ฅ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ์๋ ์ค์ DB์
์ถ๋ ฅ์ ์คํ๋ง ์์ฒด๋ ํ์ด๋ฒ๋ค์ดํธ์ ๊ฐ์ ๋ค๋ฅธ ํ๋ ์์ํฌ๊ฐ ์ฒ๋ฆฌํ๋ฏ๋ก, ์คํ๋ง ๋ฐฐ์น์์ ์ฒ๋ฆฌํ ์์ธ๊ฐ ๋ง์ง ์๋ค.
onReadError ํ์ผ ์ค๋ฅ ๋ก๊ทธ ๋จ๊ธฐ๊ธฐ
Copy @ Slf4j
public class CustomerItemListener {
@ OnReadError
public void onReadError ( Exception e) {
if (e instanceof FlatFileParseException) {
FlatFileParseException ffpe = (FlatFileParseException) e;
StringBuilder sb = new StringBuilder() ;
sb . append ( "์ค๋ฅ ๋ฐ์ ๋ผ์ธ : " );
sb . append ( ffpe . getLineNumber ());
sb . append ( "์
๋ ฅ๊ฐ : " );
sb . append ( ffpe . getInput ());
log . error ( sb . toString () , ffpe);
} else {
log . error ( "์ค๋ฅ ๋ฐ์" , e);
}
}
}
Copy @ Bean
public CustomerItemListener customerItemListener() {
return new CustomerItemListener() ;
}
@ Bean
public Step copyFileStep() {
return this . stepBuilderFactory . get ( "skipRecordCopyFileStep" )
. < Customer , Customer > chunk( 10 )
. reader ( null )
. writer ( null )
. faultTolerant ()
. skip ( Exception . class )
. skipLimit ( 10 )
. listener ( customerItemListener() )
. build ();
}