์ฒ๋ฆฌํด์ผํ ๋ฐ์ดํฐ๊ฐ ์ฆ๊ฐํ์ฌ ์ผ์ ๊ท๋ชจ ์ด์์ด ๋๋ ๊ฒฝ์ฐ ๋ฐฐ์น๋ Scaling์ด ํ์ํ๋ค. Spring Batch์์ ์ ๊ณตํ๋ Scaling๊ธฐ๋ฅ์ ๋ค์๊ณผ ๊ฐ๋ค.
๋จ์ผ Step ์ํ์, ํด๋น Step๋ด์ ๊ฐ Chunk๋ฅผ ๋ณ๋์ ์ฌ๋ฌ ์ฐ๋ ๋์์ ์คํํ๋ ๋ฐฉ๋ฒ
์ฌ๋ฌ๊ฐ์ Step์ ๋ณ๋ ฌ๋ก ์คํํ๋ ๋ฐฉ๋ฒ์ผ๋ก, ๋จ์ผ Step๋ด์ ์ฑ๋ฅ ํฅ์์ ์๋ค.
Step์ฒ๋ฆฌ๊ฐ ์ฌ๋ฌ ํ๋ก์ธ์ค๋ก ๋ถํ ๋์ด ์ธ๋ถ์ ๋ค๋ฅธ ์๋ฒ๋ก ์ ์กํ์ฌ ์ฒ๋ฆฌํ๋ ๋ฐฉ์
์ด๋ ์๋ฒ์์ ์ด๋ค ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ ์๋์ง ๊ด๋ฆฌํ์ง ์๊ธฐ ๋๋ฌธ์ ๋ฉ์ธ์ง ์ ์ค์ด ์๋๋ ๊ฒ์ด 100% ๋ณด์ฅ ๋์ด์ผํ๋ค.(AWS SQS, Kafka ๋ฑ MQ์ฌ์ฉ ๊ถ์ฅ)
๋งค๋์ (๋ง์คํฐ)๋ฅผ ์ด์ฉํด ๋ฐ์ดํฐ๋ฅผ ๋ ์์ ํํฐ์
์ผ๋ก ๋๋๊ณ , ํํฐ์
์์ ์ฌ๋ ์ด๋ธ๊ฐ ๋
๋ฆฝ์ ์ผ๋ก ์๋ํ๋ ๋ฐฉ์
AsyncItemProcessor
์ AsyncItemWriter
๋ณ๊ฐ์ ์ฐ๋ ๋๋ฅผ ํตํด ItemProcessor์ ItemWriter๋ฅผ ์ฒ๋ฆฌ
spring-batch-integration
์์กด์ฑ์์ ์ง์
์ด ์ค ์ผ๋ถ๋ฅผ ๊ตฌํํด๋ณผ ๊ฒ์ด๋ค.
Multi-threaded Step
์คํ๋ง ๋ฐฐ์น์ ๋ฉํฐ ์ฐ๋ ๋ Step์ TaskExecutor
๋ฅผ ์ด์ฉํด ๊ฐ ์ฐ๋ ๋๊ฐ Chunk๋จ์๋ก ์คํ๋๊ฒ ํ๋ ๋ฐฉ์ ์ด๋ค.
SimpleAsyncTaskExecutor
: chunk ๋จ์๋ณ๋ก ์ฐ๋ ๋ ์์ฑ
ThreadPoolTaskExecutor
: ์ฐ๋ ๋ํ ๋ด์์ ์ง์ ๋ ๊ฐฏ์์ ์ฐ๋ ๋๋ง์ ์ฌ์ฌ์ฉํ๋ฉด์ ์คํ(์ด์์์ ์ฌ์ฉํ ๋๋ ์ฐ๋ ๋ํ๋ก ์ฌ์ฉํ๋ ๊ฒ์ ๊ถ์ฅ)
๋ฉํฐ์ฐ๋ ๋ ํ๊ฒฝ ๊ตฌ์ฑ์ ์ฌ์ฉํ๊ณ ์ ํ๋ Reader์ Writer๊ฐ ๋ฉํฐ์ฐ๋ ๋๋ฅผ ์ง์ํ๋์ง ํ์
์ด ํ์ํ๋ค.
์๋ฅผ ๋ค์ด JpaPagingItemReader ์ ๊ฒฝ์ฐ๋ ์๋์ ๊ฐ์ด thread-safe๋ฅผ ์ ๊ณตํด์ฃผ๊ณ ์๋๊ฒ์ ๋ณผ ์ ์์ผ๋ฉฐ,
Copy public class JpaPagingItemReader<T>
extends AbstractPagingItemReader<T>
ItemReader for reading database records built on top of JPA.
It executes the JPQL setQueryString(String) to retrieve requested data. The query is executed using paged requests of a size specified in AbstractPagingItemReader.setPageSize(int). Additional pages are requested when needed as AbstractItemCountingItemStreamItemReader.read() method is called, returning an object corresponding to current position.
The performance of the paging depends on the JPA implementation and its use of database specific features to limit the number of returned rows.
Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.
In order to reduce the memory usage for large results the persistence context is flushed and cleared after each page is read. This causes any entities read to be detached. If you make changes to the entities and want the changes persisted then you must explicitly merge the entities.
The reader must be configured with an EntityManagerFactory. All entity access is performed within a new transaction, independent of any existing Spring managed transactions.
The implementation is thread-safe in between calls to AbstractItemCountingItemStreamItemReader.open(ExecutionContext), but remember to use saveState=false if used in a multi-threaded client (no restart available).
JpaCursorItemReader ๋ thread-safeํ์ง ์์ ๊ฒ์ ๋ณผ ์ ์๋ค.
Copy public class JpaCursorItemReader<T>
extends AbstractItemCountingItemStreamItemReader<T>
implements org.springframework.beans.factory.InitializingBean
ItemStreamReader implementation based on JPA Query.getResultStream(). It executes the JPQL query when initialized and iterates over the result set as AbstractItemCountingItemStreamItemReader.read() method is called, returning an object corresponding to the current row. The query can be set directly using setQueryString(String), or using a query provider via setQueryProvider(JpaQueryProvider). The implementation is not thread-safe.
์ฌ๊ธฐ์ ๋ ํ๊ฐ์ง ์์๋ฌ์ผํ ์ ์ด ์๋ค. ๋ฉํฐ์ฐ๋ ๋๋ก ๊ฐ chunk๋ค์ด ๊ฐ๋ณ๋ก ์งํ๋๋ ๊ฒฝ์ฐ ์คํจ ์ง์ ์์ ์ฌ์์ํ๋ ๊ฒ์ด ๋ถ๊ฐ๋ฅํ๋ค. ์๋ํ๋ฉด, ๋ฉํฐ์ฐ๋ ๋์ ๊ฒฝ์ฐ 1n๊ฐ์ chunk๊ฐ ๋์์ ์คํ๋๋ฉฐ, 5๋ฒ์งธ chunk๊ฐ ์คํจํ๋ค๊ณ ํด์ 1 4 chunk๊ฐ ๋ชจ๋ ์ฑ๊ณตํ๋ค๋ ๋ณด์ฅ์ด ์๋ค.
๊ทธ๋์ ๋ฉํฐ์ฐ๋ ๋ ์ ์ฉ์ ์ผ๋ฐ์ ์ผ๋ก ItemReader์ saveState
์ต์
์ false๋ก ์ค์ ํ๋ค.
saveState : ItemStream#update(ExecutionContext)
๋ฉ์๋๋ก ExectuionContext
์ reader์ ์ํ๊ฐ์ ์ ์ฅํ ์ง ๊ฒฐ์ ํ๋ค. (Defualt : true)
Thread-safe
PagingItemReader
์์์ ๋ดค๋ฏ์ด PagingItemReader๋ thread-safeํ ๊ฒ์ ์ ์ ์๋ค. ๋ฉํฐ์ฐ๋ ๋๋ก ์ํํ๋ ๋ฐฐ์น๊ฐ ์๋ค๋ฉด, DB์ ๊ทผ์ PagingItemReader๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ ๊ถ์ฅํ๋ค.
application.yml
Copy spring :
datasource :
hikari :
driver-class-name : com.mysql.cj.jdbc.Driver
jdbc-url : jdbc:mysql://localhost:3306/spring_batch
username : spring
password : Springtest2021!
maximum-pool-size : 10 # pool์ ์ ์งํ ์ต๋ connection ์
auto-commit : false # ์๋ commit ์ฌ๋ถ
main() ๋ฉ์๋ ์ค์
Copy @ SpringBootApplication (exclude = DataSourceAutoConfiguration . class )
public class SpringBatchRealApplication {
public static void main ( String [] args) {
// main thread๊ฐ ์ข
๋ฃ๋๋ฉด jvm ๊ฐ์ ์ข
๋ฃ
// main thread๊ฐ ์ข
๋ฃ๋๋ค๋ ๊ฒ์ ์์ thread๋ ๋ชจ๋ ์ข
๋ฃ๋๋ค๋ ๊ฒ์ ๋ณด์ฅ
System . exit ( SpringApplication . exit ( SpringApplication . run ( SpringBatchRealApplication . class , args)));
}
}
Job ๊ตฌํ
Copy @ Bean (JOB_NAME)
public Job job() {
return this . jobBuilderFactory . get (JOB_NAME)
. incrementer ( new RunIdIncrementer() )
. start ( step() )
. build ();
}
@ Bean (JOB_NAME + "Step" )
public Step step() {
return this . stepBuilderFactory . get (JOB_NAME + "Step" )
. < Ncustomer , Ncustomer > chunk(chunkSize)
. reader ( reader( null ) )
. writer ( writer() )
. taskExecutor ( executor() )
. throttleLimit (poolSize) // default : 4, ์์ฑ๋ ์ฐ๋ ๋ ์ค ๋ช๊ฐ๋ฅผ ์ค์ ์์
์ ์ฌ์ฉํ ์ง ๊ฒฐ์
. build ();
}
@ Bean (JOB_NAME + "TaskPool" )
public TaskExecutor executor() {
// ์ฐ๋ ๋ ํ์ ์ด์ฉํ ์ฐ๋ ๋ ๊ด๋ฆฌ ๋ฐฉ์
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() ;
executor . setCorePoolSize (poolSize); // ํ์ ๊ธฐ๋ณธ ์ฌ์ด์ฆ
executor . setMaxPoolSize (poolSize); // ํ์ ์ต๋ ์ฌ์ด์ฆ
executor . setThreadGroupName ( "multi-thread-" );
executor . setWaitForTasksToCompleteOnShutdown ( Boolean . TRUE );
// allowCoreThreadTimeOut์ true๋ก ์ค์ ํด
// core thread ๊ฐ ์ผ์ ์๊ฐ ํ์คํฌ๋ฅผ ๋ฐ์ง ์์ ๊ฒฝ์ฐ pool ์์ ์ ๋ฆฌํ๊ณ ,
// ๋ชจ๋ ์์ ์ค๋ ๋๊ฐ ์ ๋ฆฌ๋๋ฉด jvm ๋ ์ข
๋ฃ ๋๊ฒ ์ค์ ํ๋ค.
executor . setKeepAliveSeconds ( 30 );
executor . setAllowCoreThreadTimeOut ( true );
executor . initialize ();
return executor;
}
@ Bean (JOB_NAME + "Reader" )
public JdbcPagingItemReader< Ncustomer > reader( PagingQueryProvider pagingQueryProvider) {
return new JdbcPagingItemReaderBuilder < Ncustomer >()
. name ( "customerJdbcPagingItemReader" ) // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
. dataSource (dataSource) // DB์ ์ ๊ทผํ๊ธฐ ์ํด ์ฌ์ฉํ DataSource๊ฐ์ฒด
. queryProvider (pagingQueryProvider) // PagingQueryProvider
. pageSize ( 10 ) // ๊ฐ ํ์ด์ง ํฌ๊ธฐ
. rowMapper ( new BeanPropertyRowMapper <>( Ncustomer . class )) // ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์คํด์ค๋ก ๋งคํํ๊ธฐ ์ํ ๋งคํผ
. saveState ( false ) // Reader๊ฐ ์คํจํ ์ง์ ์ ์ ์ฅํ์ง ์๋๋ก ์ค์
. build ();
}
์ฌ๊ธฐ์ ํต์ฌ์ TaskExecutor
์ ๊ตฌํํ๋ ๋ถ๋ถ๊ณผ Step์คํ์ .saveState(false)
๋ก ์ค์ ํ์ฌ, Reader๊ฐ ์คํจํ ์ง์ ์ ์ ์ฅํ์ง ์๊ณ , ์คํจ์ ๋ค์ ์ฒ์๋ถํฐ ์คํํ๋๋ก ํ๋ ๊ฒ์ด๋ค. (๋ค๋ฅธ thread๋ค์ ์ฑ๊ณต์ ๋ณด์ฅํ์ง ์์ผ๋ฏ๋ก!)
TaskExecutor
๊ตฌํ์์๋ allowCoreThreadTimeOut์ ์ค์ ํด ํน์ ์๊ฐ(KeepAliveSeconds)์ดํ์ ์ฌ์ฉํ์ง ์์ผ๋ฉด ์ข
๋ฃ๋๋๋ก ์ค์ ํ๋ค.
๋ค์ Job์ ์ํํ๋ฉด ๊ฐ thread๋ณ๋ก ๋ณ๋ ฌ๋ก ์ํ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
Not Thread Safety
CursorItemReader
cursorItemReader์ ๊ฒฝ์ฐ์๋ thread safety๋ฅผ ๋ณด์ฅํ์ง ์๋๋ค. Reader ์์ญ์ SynchronizedItemStreamReader
๋ก wrappingํ์ฌ thread safetyํ๊ฒ ๊ตฌํํ ์ ์๋ค.
Copy @ Bean (JOB_NAME + "Reader" )
public SynchronizedItemStreamReader< Ncustomer > reader() {
String sql = "SELECT N.CUSTOMER_ID" +
", CONCAT(N.LAST_NAME, \" \", N.FIRST_NAME) AS FULL_NAME\n" +
" , N.ADDRESS1 AS ADDRESS\n" +
", N.POSTAL_CODE\n" +
"FROM NCUSTOMER N\n" +
"LIMIT 55" ;
JdbcCursorItemReader itemReader = new JdbcCursorItemReaderBuilder < Ncustomer >()
. name (JOB_NAME + "Reader" ) // Reader์ ์ด๋ฆ, ExecutionContext์ ์ ์ฅ๋์ด์ง ์ด๋ฆ
. dataSource (dataSource) // DB์ ์ ๊ทผํ๊ธฐ ์ํด ์ฌ์ฉํ DataSource๊ฐ์ฒด
. rowMapper ( new BeanPropertyRowMapper <>( Ncustomer . class )) // ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์คํด์ค๋ก ๋งคํํ๊ธฐ ์ํ ๋งคํผ
. sql (sql)
. saveState ( false ) // Reader๊ฐ ์คํจํ ์ง์ ์ ์ ์ฅํ์ง ์๋๋ก ์ค์
. build ();
return new SynchronizedItemStreamReaderBuilder < Ncustomer >()
. delegate (itemReader)
. build ();
}
SynchronizedItemStreamReader
์ delegate
์ ์ํํ๊ณ ์ถ์ CursorItemReader
๋ฅผ ๋ฑ๋กํด์ฃผ๋ฉด ๋๋ค.
Copy public class SynchronizedItemStreamReader < T > implements ItemStreamReader < T > , InitializingBean {
private ItemStreamReader < T > delegate;
public void setDelegate ( ItemStreamReader < T > delegate) {
this . delegate = delegate;
}
/**
* This delegates to the read method of the <code>delegate</code>
*/
@ Nullable
public synchronized T read () throws Exception , UnexpectedInputException , ParseException , NonTransientResourceException {
return this . delegate . read ();
}
SynchronizedItemStreamReader
์ read()
๋ฉ์๋๋ฅผ ๋ณด๋ฉด synchronized
๋ฉ์๋๋ก ๊ฐ์ธ ๋๊ธฐํ๋ ์ฝ๊ธฐ๊ฐ ๊ฐ๋ฅํ๋ค.
Copy 2021-12-12 23:02:25.062 INFO 31421 --- [ multi-thread-5] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=1
2021-12-12 23:02:25.062 INFO 31421 --- [ multi-thread-2] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=2
2021-12-12 23:02:25.062 INFO 31421 --- [ multi-thread-4] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=3
2021-12-12 23:02:25.062 INFO 31421 --- [ multi-thread-1] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=4
2021-12-12 23:02:25.062 INFO 31421 --- [ multi-thread-3] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=5
2021-12-12 23:02:25.063 INFO 31421 --- [ multi-thread-4] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=6
2021-12-12 23:02:25.063 INFO 31421 --- [ multi-thread-3] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=7
2021-12-12 23:02:25.063 INFO 31421 --- [ multi-thread-5] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=8
2021-12-12 23:02:25.063 INFO 31421 --- [ multi-thread-2] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=9
2021-12-12 23:02:25.063 INFO 31421 --- [ multi-thread-1] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=10
2021-12-12 23:02:25.064 INFO 31421 --- [ multi-thread-2] d.e.s.j.m.l.CursorItemReaderListener : Reading customer id=11
์ํ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, ๊ฐ thread๊ฐ ์์ฐจ์ ์ผ๋ก ์ฝ์ด์ ๊ฐ๋ณ๋ก ์ฒ๋ฆฌํ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ด๋ฏธ ๋คํธ์ํฌ/DISK IO/CPU/Memory ๋ฑ ์๋ฒ ์์์ด ์ด๋ฏธ ๋จ์ผ ์ฐ๋ ๋์์๋ ๋ฆฌ์์ค ์ฌ์ฉ๋์ด ํ๊ณ์น์ ๋ฌํ๋ค๋ฉด ๋ฉํฐ์ฐ๋ ๋๋ก ์งํํ๋ค๊ณ ํด์ ์ฑ๋ฅ ํฅ์์ ๊ธฐ๋ํ ์ ์์ผ๋ฉฐ, ์ค์ ์ด์ ํ๊ฒฝ ์ ์ฉ ์ด์ ์ ์ถฉ๋ถํ ํ
์คํธ๋ฅผ ์งํํด๋ณด๊ณ ํด์ผํ๋ค.
Parallel Steps
๊ฐ ์ญํ ์ ์ฌ๋ฌ step(Flow)์ผ๋ก ๋๋์ด ๋ณ๋ ฌํ ํ ์ ์๋ค.
Copy @ Bean (JOB_NAME)
public Job job() {
Flow flow1 = new FlowBuilder < Flow >( "flow1" )
. start ( step1() )
. build ();
Flow flow2 = new FlowBuilder < Flow >( "flow2" )
. start ( step2() )
. build ();
Flow flow3 = new FlowBuilder < Flow >( "flow3" )
. start ( step3() )
. build ();
Flow parelleFlow = new FlowBuilder < Flow >( "parelleFlow" )
. split ( executor() )
. add (flow1 , flow2 , flow3)
. build ();
return this . jobBuilderFactory . get (JOB_NAME)
. incrementer ( new UniqueRunIdIncrementer() )
. start (parelleFlow)
. end ()
. build ();
}
Flow
๋ ์์ฒด ์ค๋ ๋์์ ์งํ๋๋ฉฐ, ์ฌ๋ฌ Flow
๋ฅผ ๋ณ๋ ฌ๋ก ์คํํ ์ ์๋ค. FlowBuilder
์ .split()
๋ฉ์๋๋ TaskExecutor
๋ฅผ ๋ฐ์ ๊ฐ Flow๋ ์์ฒด ์ค๋ ๋์์ ์คํ๋๋ค. ์์ ๊ฐ์ด ๊ตฌํํ๋ฉด ๊ฐ step์ด๋ step์ flow๋ฅผ ๋ณ๋ ฌ๋ก ์ํํ ์ ์๋ค.
Copy 2021-12-13 18:30:08.267 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 7910
Naccount(accountId=10, balance=24319.5, lastStatementDate=2018-05-02 22:25:15.0)
2021-12-13 18:30:08.267 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 7911
2021-12-13 18:30:08.267 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 7912
...(์๋ต)
2021-12-13 18:30:08.275 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 9345
Ncustomer(customerId=141, fullName=Docket Jonah, address=92125 Crownhardt Junction, postalCode=08638)
Ncustomer(customerId=142, fullName=Fullerlove Lani, address=02750 Lindbergh Center, postalCode=84189)
Ncustomer(customerId=143, fullName=Boich Orin, address=928 Crownhardt Road, postalCode=31205)
Ncustomer(customerId=144, fullName=Yeoland Maximo, address=930 Prairie Rose Pass, postalCode=33129)
2021-12-13 18:30:08.275 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 9346
2021-12-13 18:30:08.275 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 9347
2021-12-13 18:30:08.275 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 9348
2021-12-13 18:30:08.275 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 9349
2021-12-13 18:30:08.275 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 9350
Ncustomer(customerId=145, fullName=Nystrom Christoforo, address=52 Anhalt Circle, postalCode=64136)
2021-12-13 18:30:08.275 INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration : [step] : 9351
Ncustomer(customerId=146, fullName=Enderwick Karin, address=62338 Fieldstone Hill, postalCode=32309)
๊ฐ Step๋ค์ด ๋ณ๋ ฌ๋ก ์ํ๋๋ ๊ฒ์ ๋ณผ ์ ์์ผ๋ฉฐ, ์ด๋ ๋ชจ๋ Flow๊ฐ ์ข
๋ฃ๋๋ฉด Job์ด ์ข
๋ฃ๋๋ค.
AsyncItemProcessor
์ AsyncItemWriter
ํน์ ๋ฐฐ์น ์ฒ๋ฆฌ์ ๋ณ๋ชฉ๊ตฌ๊ฐ์ด ItemProcessor์ ์กด์ฌํ๋ ๊ฒฝ์ฐ๊ฐ ์๋๋ฐ, ๊ทธ๋ฐ ๊ฒฝ์ฐ Step์ ItemProcessor ๋ถ๋ถ๋ง ๋ณ๋ ์ค๋ ๋์์ ์คํํ์ฌ ์ฑ๋ฅ์ ํฅ์์ํฌ ์ ์๋ค. AsyncItemProcessor
๋ ItemProcessor
๊ตฌํ์ฒด๋ฅผ ๋ํํ๋ ๋ฐ์ฝ๋ ์ดํฐ๋ก, ์๋ก์ด ์ค๋ ๋์์ ItemProcessor ๋ก์ง์ ์ํํ๋ฉฐ, ์ฒ๋ฆฌ๋ฅผ ์๋ฃํ๊ณ ๋ ํ ๊ฒฐ๊ณผ๊ฐ์ผ๋ก ๋ฐํ๋ Future
๋ฅผ AsyncItemWriter
๋ก ์ ๋ฌํ๋ค. AsyncItemWriter
๋ํ, ItemWrite
๊ตฌํ์ฒด๋ฅผ ๋ํํ ๋ฐ์ฝ๋ ์ดํฐ์ด๋ค. AsyncItemWriter
๋ Future
๋ฅผ ์ฒ๋ฆฌํ ํ ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ItemWriter
์ ์ ๋ฌํ๋ค. ๊ทธ๋ฌ๋ฏ๋ก ๋ฐ๋์ AsyncItemProcessor
์ AsyncItemWriter
๋ ํจ๊ป ์ฌ์ฉ ํด์ผํ๋ค.
dependency
Copy dependencies {
implementation 'org.springframework.batch:spring-batch-integration'
}
์ ์์กด์ฑ์ ์ถ๊ฐํด์ฃผ์ด์ผ AsyncItemProcessor
์ AsyncItemWriter
๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
example
Copy @ Bean (JOB_NAME + "AccountProcessor" )
public ItemProcessor< Naccount , Naccount > processor() {
return (account) -> {
Thread . sleep ( 5 );; // ๋น๋๊ธฐ ์ฒ๋ฆฌ ์ด์ ์ผ๋ถ๋ฌ ๋ฆ์ถค
return account;
};
}
ํ๋ก์ธ์ ๋ณ๋ชฉํ์์ ๊ตฌํํ๊ธฐ ์ํด ์์๋ก 5๋ฐ๋ฆฌ์ด์ฉ ์ฌ๋๋ก ๋ง๋ค์๋ค. ๊ทธ ํ ๋ฐฐ์น๋ฅผ ์ํํด๋ณด๋ฉด,
Copy 2021-12-13 20:24:14.945 INFO 36579 --- [ main] d.e.s.SpringBatchRealApplication : Starting SpringBatchRealApplication using Java 17.0.1 on
...(์๋ต)
2021-12-13 20:24:23.155 INFO 36579 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2021-12-13 20:24:23.159 INFO 36579 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
๋ฐฐ์น ์ํ์์๋ถํฐ ์ข
๋ฃ๊น์ง ์ฝ 9์ด๊ฐ ๊ฑธ๋ฆฐ๊ฒ์ ๋ณผ ์ ์๋ค.
์ด์ ๋น๋๊ธฐ์ฒ๋ฆฌ๋ฅผ ํตํด ์ฑ๋ฅ์ ๊ฐ์ ํด๋ณผ ๊ฒ์ด๋ค.
Copy @ Bean (JOB_NAME + "AccountProcessor" )
public ItemProcessor< Naccount , Naccount > processor() {
return (account) -> {
Thread . sleep ( 5 );; // ๋น๋๊ธฐ ์ฒ๋ฆฌ ์ด์ ์ผ๋ถ๋ฌ ๋ฆ์ถค
return account;
};
}
@ Bean
public AsyncItemProcessor< Naccount , Naccount > asyncItemProcessor() {
AsyncItemProcessor < Naccount , Naccount > processor = new AsyncItemProcessor <>();
processor . setDelegate ( processor() );
processor . setTaskExecutor ( executor() );
return processor;
}
@ Bean
public AsyncItemWriter< Naccount > asyncItemWriter() {
AsyncItemWriter < Naccount > writer = new AsyncItemWriter <>();
writer . setDelegate ( naccountItemWriter() );
return writer;
}
@ Bean (JOB_NAME + "AccountWriter" )
public ItemWriter< Naccount > naccountItemWriter() {
return (items) -> items . forEach ( System . out :: println);
}
AysncItemProcessor
์ setDelegate()
๋ฉ์๋์ ๋ํํ ItemProcessor๋ฅผ ์ง์ ํด์ฃผ๊ณ , ๋ณ๋ ์ค๋ ๋์์ ์คํํ๊ธฐ ์ํ TaskExecutor๋ฅผ ์ค์ ํด์ค๋ค. ๊ทธ ํ AsyncItemWriter
๋ฅผ ๊ตฌํํ์ฌ Future
๋ฅผ ๋ฐ์์ ์ฐ๊ธฐ ์์
์ ํ ์ ์๋๋ก ๊ตฌํํ๋ค. ์ํํ ItemWriter๋ฅผ setDelegate
๋ฉ์๋์ ์ง์ ํด์ค๋ค.
Copy @ Bean (JOB_NAME + "AccountStep" )
public Step step2() {
return this . stepBuilderFactory . get (JOB_NAME + "AccountStep" )
. < Naccount , Future< Naccount >> chunk(chunkSize)
. reader ( jpaCursorItemReader() )
. processor ( asyncItemProcessor() )
. writer ( asyncItemWriter() )
. build ();
}
chunk๋ฉ์๋ ์ฒ๋ฆฌ์ AsyncItemProcessor
๊ฐ ๋ฐํํ๋ ํ์
์ธ Future<T>
๋ก ๋ฐํ ํ์
์ ๋ณ๊ฒฝํด์ฃผ๋ฉด๋๋ค. ๊ทธ ํ ๋ฐฐ์น๋ฅผ ์ํํด๋ณด๋ฉด,
Copy 2021-12-13 20:28:11.286 INFO 36626 --- [ main] d.e.s.SpringBatchRealApplication : Starting SpringBatchRealApplication using Java 17.0.1 on PID 36626
...(์๋ต)
2021-12-13 20:28:15.158 INFO 36626 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2021-12-13 20:28:15.162 INFO 36626 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
์ฝ 4์ด๋ง์ ์ํ์ด ์๋ฃ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
Partitioning
๋ฐฐ์น ๊ธฐ๋ฐ ์ํฌ๋ก๋์ ๋๋ถ๋ถ์ I/O์ ์๋ค. ์คํ๋ง ๋ฐฐ์น๋ ์ฌ๋ฌ ๊ฐ์ ์์ปค๊ฐ ์์ ํ ์คํ
์ ์คํํ ์ ์๋๋ก ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.
ํํฐ์
๋์ Master Step์ด ๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด ์ง์ ๋ ์์ Worker Step์ผ๋ก ์ผ์ ๋ถํ ํ์ฌ ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ ๋งํ๋ค. ๋๋์ ๋ฐ์ดํฐ๋ฅผ ๋ ์์ ํํฐ์
์ผ๋ก ๋๋์ด ๊ฐ ์์ปค๊ฐ ๋๋ ์ง ํํฐ์
์ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ๋ ๊ฒ์ด๋ค. ๊ฐ ์์ปค๋ ์์ฒด์ ์ผ๋ก ์ฝ๊ธฐ(ItemReader), ์ฒ๋ฆฌ(ItemProcessor), ์ฐ๊ธฐ(ItemWriter) ๋ฑ์ ๋ด๋นํ๋ ์จ์ ํ ๋ฐฐ์น Step์ด๋ฉฐ, ์ฌ์์๊ณผ ๊ฐ์ด ์คํ๋ง ๋ฐฐ์น๊ฐ ๊ธฐ๋ณธ์ ์ผ๋ก ์ ๊ณตํ๋ ๋ชจ๋ ๊ธฐ๋ฅ์ ์ฌ์ฉํ ์ ์๋ค.
Multi-threaded Step vs Partitioning
๋ฉํฐ์ฐ๋ ๋ Step์ ๋จ์ผ Step์ Chunk ๋จ์๋ก ์ฐ๋ ๋๋ฅผ ์์ฑํด ๋ถํ ์ฒ๋ฆฌ
์ด๋ค ์ฐ๋ ๋์์ ์ด๋ค ๋ฐ์ดํฐ๋ค์ ์ฒ๋ฆฌํ ์ง์ ๋ํ ์ธ๋ฐํ ์กฐ์ ์ด ๋ถ๊ฐ๋ฅ
ํด๋น Step์ ItemReader/ItemWriter๋ฑ์ด ๋ฉํฐ์ฐ๋ ๋ ํ๊ฒฝ์ ์ง์ํ๋์ง ์ ๋ฌด๊ฐ ์ค์ํจ
ํํฐ์
๋์ ๋
๋ฆฝ์ ์ธ Step(Worker Step)์ ๊ตฌ์ฑํ๊ณ , ๊ฐ๊ฐ ๋ณ๋์ StepExecution ํ๋ผ๋ฏธํฐ ํ๊ฒฝ์ ๊ฐ์ง๊ณ ์ฒ๋ฆฌ
๋ฉํฐ์ฐ๋ ๋๋ก ๋์ํ์ง๋ง, ItemReader/ItemWriter์ด ๋ฉํฐ์ฐ๋ ๋ ํ๊ฒฝ์ ์ง์ํ๋์ง ์ค์ํ์ง ์๋ค.
์ฃผ์ ์ธํฐํ์ด์ค
Partitioner
Partitioner
๋ ํํฐ์
๋ํ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ํํฐ์
์ผ๋ก ๋๋๋ ์ญํ ์ ํ๋ค. ์ฆ, ํํฐ์
๋๋ Worker Step์ ์ํ StepExecution์ ์์ฑํ๋ ์ธํฐํ์ด์ค๋ค.
Copy public interface Partitioner {
Map < String , ExecutionContext > partition ( int gridSize);
}
partition(int gridSize)
๋จ์ผ ๋ฉ์๋๋ก ๊ตฌํ๋์ด์์ผ๋ฉฐ, ์ฌ๊ธฐ์ gridSize๋ ๋ถํ ํ ์์ปค ๊ฐ์๊ฐ ๋ช๊ฐ์ธ์ง ๊ฒฐ์ ํ๋ ์ค์ ๊ฐ์ด๋ค. gridSize๋ฅผ ๊ณ์ฐํ๊ฑฐ๋ ์ค์ ํ๋๊ฒ์ ์ค๋ก์ง ๊ฐ๋ฐ์๋ค ๋ชซ์ด๋ค.
org.springframework.batch.core.partition.support.SimplePartitioner
๊ธฐ๋ณธ ๊ตฌํ์ผ๋ก, ๋น StepExecution์ ์์ฑ
org.springframework.batch.core.partition.support.MultiResourcePartitioner
์ฌ๋ฌ ๋ฆฌ์์ค์ ๋ฐฐ์ด์ ํ์ธํ๊ณ , ๋ฆฌ์์ค๋น ํํฐ์
์ ์์ฑ
PartitionHandler
PartitionHandler
๋ Master Step์ด Worker Step์ ์ด๋ป๊ฒ ๋ค๋ฃฐ์ง ์ ์ํ๋ ์ธํฐํ์ด์ค์ด๋ค. ์ฆ, ์์ปค์ ์์ฌ์ํต์ ํ๋๋ฐ ์ฌ์ฉํ๋ ์ธํฐํ์ด์ค์ด๋ค.
Copy public interface PartitionHandler {
Collection < StepExecution > handle ( StepExecutionSplitter stepSplitter , StepExecution stepExecution) throws Exception ;
}
์ฌ๊ธฐ์ ๋ค๋ฃจ๋ ๋ด์ฉ์ ๊ฐ ์์ปค์๊ฒ ์์
๋์์ ์ด๋ป๊ฒ ์๋ ค์ค์ง, ๋ณ๋ ฌ๋ก ์ํํ๊ฒ๋๋ ๊ฒฝ์ฐ ์ฐ๋ ๋ํ์ ์ด๋ป๊ฒ ๊ด๋ฆฌํ ์ง, ๋ชจ๋ ์์
์ด ์๋ฃ๋์๋์ง ์๋ณํ๋์ง ์ด๋ค.
์คํ๋ง ๋ฐฐ์น ์ฌ์ฉ์ Partitioner
๊ตฌํ์ฒด๋ฅผ ์ง์ ๊ตฌํํ๋ ๊ฒฝ์ฐ๋ ์์ง๋ง, PartitionHandler
๋ฅผ ๊ตฌํํ๋ ๊ฒฝ์ฐ๋ ์์ ๊ฒ์ด๋ค.
org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler
๋จ์ผ JVM ๋ด์์ ํํฐ์
๋ ๊ฐ๋
์ ์ฌ์ฉํ ์ ์๋๋ก, ๋์ผํ JVM๋ด์ ์ฌ๋ฌ ์ค๋ ๋์์ ์์ปค ๋ถํ ์คํ
org.springframework.batch.core.partition.support.MessageChannelPartitionHandler
์๊ฒฉ JVM์ ๋ฉํ๋ฐ์ดํฐ ์ ์ก
org.springframework.cloud.task.batch.partition.DeployerPartitionHandler
Spring Cloud Task ํ๋ก์ ํธ๊ฐ ์ ๊ณต
์ง์๋๋ ํ๋ซํผ์์์ ์จ๋๋งจ๋ ๋ฐฉ์์ผ๋ก ์์ปค ์คํ
TaskExecutorPartitionHandler (๋จ์ผ JVM) ์์
๋จ์ผ JVM๋ด์์ ์ฌ๋ฌ ์ค๋ ๋๋ฅผ ์ฌ์ฉํด ์์ปค๋ฅผ ์คํํ ์ ์๊ฒ ํด์ฃผ๋ ์ปดํฌ๋ํธ๋ก, ๋จ์ผ ์ฅ๋น๋ด์์ ์ํํ ์ ์๋ ์์
์๋ ์ ์ฝ(๋คํธ์ํฌ/DISK IO/CPU/Memory)์ด ์กด์ฌํ๋ค.
Copy @ Bean (JOB_NAME + "TaskPool" )
public TaskExecutor executor() {
// ์ฐ๋ ๋ ํ์ ์ด์ฉํ ์ฐ๋ ๋ ๊ด๋ฆฌ ๋ฐฉ์
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() ;
executor . setCorePoolSize (poolSize); // ํ์ ๊ธฐ๋ณธ ์ฌ์ด์ฆ
executor . setMaxPoolSize (poolSize); // ํ์ ์ต๋ ์ฌ์ด์ฆ
executor . setThreadNamePrefix ( "partition-thread-" );
executor . setWaitForTasksToCompleteOnShutdown ( Boolean . TRUE );
executor . initialize ();
return executor;
}
@ Bean (name = JOB_NAME + "PartitionHandler" )
public TaskExecutorPartitionHandler partitionHandler() {
// ๋ก์ปฌ์์ ๋ฉํฐ์ฐ๋ ๋๋ก ํํฐ์
๋ ์ฌ์ฉ
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler() ;
// worker๋ก ์ํํ step
partitionHandler . setStep ( step() );
// ๋ฉํฐ์ฐ๋ ๋ ์ํ์ ์ํ TaskExecutor์ค์
partitionHandler . setTaskExecutor ( executor() );
// ์ฐ๋ ๋ ๊ฐ์์ gridSize๋ฅผ ๋ง์ถฐ์ค๋ค.
partitionHandler . setGridSize (poolSize);
return partitionHandler;
}
๋ค์๊ณผ ๊ฐ์ด TaskExecutorPartitionHandler
์ worker๋ก ์ํํ step, ๋ฉํฐ์ฐ๋ ๋ ์ฒ๋ฆฌ๋ฅผ ์ํ TaskExecutor
์ค์ , ํํฐ์
๋ ์(gridSize)๋ฅผ ์ง์ ํด์ค๋ค.
Partitioner ๊ตฌํ
Copy @ RequiredArgsConstructor
public class AccountIdRangePartitioner implements Partitioner {
private final NaccountRepository naccountRepository;
private final LocalDate startDate;
private final LocalDate endDate;
/**
* ํน์ ๊ธฐ๊ฐ ๋ด ์กด์ฌํ๋ ์ต์ accountId์ ์ต๋ accountId๋ฅผ ๊ฐ์ ธ์ gridSize๋ก ๋ถํ ์ฒ๋ฆฌ
* @param gridSize
* @return
*/
@ Override
public Map < String , ExecutionContext > partition ( int gridSize) {
long min = naccountRepository . findMinId (startDate , endDate);
long max = naccountRepository . findMaxId (startDate , endDate);
long targetSize = (max - min) / gridSize + 1 ;
Map < String , ExecutionContext > result = new HashMap <>();
long number = 0 ;
long start = min;
long end = start + targetSize - 1 ;
while (start <= max) {
ExecutionContext value = new ExecutionContext() ;
result . put ( "partition" + number , value);
if (end >= max) {
end = max;
}
value . putLong ( "minId" , start); // ๊ฐ ํํฐ์
๋ง๋ค ์ฌ์ฉ๋ minId
value . putLong ( "maxId" , end); // ๊ฐ ํํฐ์
๋ง๋ค ์ฌ์ฉ๋ maxId
start += targetSize;
end += targetSize;
number ++ ;
}
return result;
}
}
Copy @ Bean (name = JOB_NAME + "Partitioner" )
@ StepScope
public AccountIdRangePartitioner partitioner(
@ Value ( "#{jobParameters['startDate']}" ) String startDate ,
@ Value ( "#{jobParameters['endDate']}" ) String endDate) {
LocalDate startLocalDate = LocalDate . parse (startDate , DateTimeFormatter . ofPattern ( "yyyy-MM-dd" ));
LocalDate endLocalDate = LocalDate . parse (endDate , DateTimeFormatter . ofPattern ( "yyyy-MM-dd" ));
return new AccountIdRangePartitioner(naccountRepository , startLocalDate , endLocalDate) ;
}
์์ฑํ Worker Step ์ ๊ฒฐ์
long targetSize = (max - min) / gridSize + 1
๊ฐ Worker Step์ด ์ด๋ค ๋ณ์๋ฅผ ๊ฐ์ง๊ฒ ํ ์ง ๊ฒฐ์
value.putLong("minId", start)
value.putLong("maxId", end)
Copy 2021-12-13 22:48:47.105 INFO 38038 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step.manager]
Hibernate: select min(naccount0_.account_id) as col_0_0_ from naccount naccount0_ where naccount0_.last_statement_date between ? and ?
Hibernate: select max(naccount0_.account_id) as col_0_0_ from naccount naccount0_ where naccount0_.last_statement_date between ? and ?
2021-12-13 22:48:47.178 INFO 38038 --- [tition-thread-1] d.e.s.j.m.PartitioningConfiguration : reader minId=601, maxId=800
2021-12-13 22:48:47.179 INFO 38038 --- [tition-thread-4] d.e.s.j.m.PartitioningConfiguration : reader minId=801, maxId=1000
2021-12-13 22:48:47.179 INFO 38038 --- [tition-thread-2] d.e.s.j.m.PartitioningConfiguration : reader minId=201, maxId=400
2021-12-13 22:48:47.180 INFO 38038 --- [tition-thread-3] d.e.s.j.m.PartitioningConfiguration : reader minId=401, maxId=600
2021-12-13 22:48:47.180 INFO 38038 --- [tition-thread-5] d.e.s.j.m.PartitioningConfiguration : reader minId=1, maxId=200
H
๋ค์๊ณผ ๊ฐ์ด ๊ฐ ํํฐ์
๋ณ๋ก ์ํํ StepExecution์ด ์ ์์ ์ผ๋ก ์ ๊ตฌํ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
Step ๊ตฌํ
Copy @ Bean (JOB_NAME + "StepManager" )
public Step stepManager() {
// ํํฐ์
๋ ๋์ step๊ณผ ์ด๋ฆ์ ์ฐ๊ด์ง์ด ์ง์ด์ค๋ค. (์ฌ๋ฌ๊ฐ์ step์ด ์์ ์๋ ์์ผ๋ฏ๋ก)
return this . stepBuilderFactory . get ( "step.manager" )
. partitioner ( "step" , partitioner( null , null ) ) // Partitioner ๊ตฌํ์ฒด ๋ฑ๋ก
. step ( step() ) // ํํฐ์
๋๋ Step๋ฑ๋ก
. partitionHandler ( partitionHandler() ) // PartitionHandler ๋ฑ๋ก
. build ();
}
@ Bean (JOB_NAME + "Step" )
public Step step() {
return this . stepBuilderFactory . get (JOB_NAME + "Step" )
. < Naccount , Naccount > chunk(chunkSize)
. reader ( reader( null , null ) )
. writer ( writer( null , null ) )
. build ();
}
partitioner()
๋ฉ์๋๋ฅผ ์ด์ฉํด step์ ์ฌ์ฉ๋ Partitioner
๊ตฌํ์ฒด๋ฅผ ๋ฑ๋กํด์ฃผ๊ณ , ํํฐ์
๋๋ Step์ .step()
๋ฉ์๋์ ๋ฑ๋กํด์ค๋ค. ๊ทธ๋ฆฌ๊ณ ์ฌ์ฉํ partitionHandler()
๋ฅผ ๋ฑ๋กํด์ฃผ๋ฉด master step๊ตฌํ์ด ์๋ฃ๋๋ค.
ItemReader/ItemWriter
Copy @ Bean (JOB_NAME + "Reader" )
@ StepScope
public JpaPagingItemReader< Naccount > reader(@ Value ( "#{stepExecutionContext[minId]}" ) Long minId ,
@ Value ( "#{stepExecutionContext[maxId]}" ) Long maxId) {
Map < String , Object > params = new HashMap <>();
params . put ( "minId" , minId);
params . put ( "maxId" , maxId);
log . info ( "reader minId={}, maxId={}" , minId , maxId);
return new JpaPagingItemReaderBuilder < Naccount >()
. name (JOB_NAME + "Reader" )
. entityManagerFactory (entityManagerFactory)
. pageSize (chunkSize)
. queryString ( "SELECT n FROM Naccount n WHERE n.accountId BETWEEN :minId AND :maxId" )
. parameterValues (params)
. build ();
}
@ Bean (JOB_NAME + "Writer" )
@ StepScope
public ItemWriter< Naccount > writer(@ Value ( "#{stepExecutionContext[minId]}" ) Long minId ,
@ Value ( "#{stepExecutionContext[maxId]}" ) Long maxId) {
log . info ( "minId : {}, maxId: {}" , minId , maxId);
return (items) -> items . forEach ( System . out :: println);
}
๋ค์๊ณผ ๊ฐ์ด #{stepExecutionContext['๋ณ์๋ช
']}
์ผ๋ก ์ Partitioner
์์ ๊ฐ ํํฐ์
๋ณ๋ก ์ง์ ํ StepExecution ๋ณ์๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๋ค. ํด๋น ๋ณ์๋ฅผ ์ ๋ฌ๋ฐ์ ๊ฐ ํํฐ์
๋ณ๋ก ์ฒ๋ฆฌํ๋๋ก ๊ตฌํํ๋ฉด ๋๋ค.
Job
๊ทธ ํ ํด๋น Master Step์ Job์ ๋ฑ๋กํด์ฃผ๋ฉด ๊ตฌํ์ด ์๋ฃ๋๋ค.
Copy @ Bean (JOB_NAME)
public Job job() {
return this . jobBuilderFactory . get (JOB_NAME)
. incrementer ( new RunIdIncrementer() )
. start ( stepManager() )
. build ();
}
์ฐธ๊ณ