Spring Batch Scaling

์ฒ˜๋ฆฌํ•ด์•ผํ•  ๋ฐ์ดํ„ฐ๊ฐ€ ์ฆ๊ฐ€ํ•˜์—ฌ ์ผ์ • ๊ทœ๋ชจ ์ด์ƒ์ด ๋˜๋Š” ๊ฒฝ์šฐ ๋ฐฐ์น˜๋„ Scaling์ด ํ•„์š”ํ•˜๋‹ค. Spring Batch์—์„œ ์ œ๊ณตํ•˜๋Š” Scaling๊ธฐ๋Šฅ์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

๊ธฐ๋Šฅ
ํ”„๋กœ์„ธ์Šค
์„ค๋ช…

Multi-threaded Step

Single process

Local

๋‹จ์ผ Step ์ˆ˜ํ–‰์‹œ, ํ•ด๋‹น Step๋‚ด์˜ ๊ฐ Chunk๋ฅผ ๋ณ„๋„์˜ ์—ฌ๋Ÿฌ ์“ฐ๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•

Parallel Steps

Single process

Local

์—ฌ๋Ÿฌ๊ฐœ์˜ Step์„ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•์œผ๋กœ, ๋‹จ์ผ Step๋‚ด์˜ ์„ฑ๋Šฅ ํ–ฅ์ƒ์€ ์—†๋‹ค.

Remote Chunking

Multi process

Remote

Step์ฒ˜๋ฆฌ๊ฐ€ ์—ฌ๋Ÿฌ ํ”„๋กœ์„ธ์Šค๋กœ ๋ถ„ํ• ๋˜์–ด ์™ธ๋ถ€์˜ ๋‹ค๋ฅธ ์„œ๋ฒ„๋กœ ์ „์†กํ•˜์—ฌ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹ ์–ด๋А ์„œ๋ฒ„์—์„œ ์–ด๋–ค ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ณ  ์žˆ๋Š”์ง€ ๊ด€๋ฆฌํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”์„ธ์ง€ ์œ ์‹ค์ด ์•ˆ๋˜๋Š” ๊ฒƒ์ด 100% ๋ณด์žฅ ๋˜์–ด์•ผํ•œ๋‹ค.(AWS SQS, Kafka ๋“ฑ MQ์‚ฌ์šฉ ๊ถŒ์žฅ)

Partitioning

Single/Multi process

Local/Remote

๋งค๋‹ˆ์ €(๋งˆ์Šคํ„ฐ)๋ฅผ ์ด์šฉํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ๋” ์ž‘์€ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„๊ณ , ํŒŒํ‹ฐ์…˜์—์„œ ์Šฌ๋ ˆ์ด๋ธŒ๊ฐ€ ๋…๋ฆฝ์ ์œผ๋กœ ์ž‘๋™ํ•˜๋Š” ๋ฐฉ์‹

AsyncItemProcessor์™€ AsyncItemWriter

Local

๋ณ„๊ฐœ์˜ ์“ฐ๋ ˆ๋“œ๋ฅผ ํ†ตํ•ด ItemProcessor์™€ ItemWriter๋ฅผ ์ฒ˜๋ฆฌ spring-batch-integration์˜์กด์„ฑ์—์„œ ์ง€์›

์ด ์ค‘ ์ผ๋ถ€๋ฅผ ๊ตฌํ˜„ํ•ด๋ณผ ๊ฒƒ์ด๋‹ค.

Multi-threaded Step

The thread model | Spring Batch Essentials

์Šคํ”„๋ง ๋ฐฐ์น˜์˜ ๋ฉ€ํ‹ฐ ์“ฐ๋ ˆ๋“œ Step์€ TaskExecutor๋ฅผ ์ด์šฉํ•ด ๊ฐ ์“ฐ๋ ˆ๋“œ๊ฐ€ Chunk๋‹จ์œ„๋กœ ์‹คํ–‰๋˜๊ฒŒ ํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.

  • SimpleAsyncTaskExecutor : chunk ๋‹จ์œ„๋ณ„๋กœ ์“ฐ๋ ˆ๋“œ ์ƒ์„ฑ

  • ThreadPoolTaskExecutor : ์“ฐ๋ ˆ๋“œํ’€ ๋‚ด์—์„œ ์ง€์ •๋œ ๊ฐฏ์ˆ˜์˜ ์“ฐ๋ ˆ๋“œ๋งŒ์„ ์žฌ์‚ฌ์šฉํ•˜๋ฉด์„œ ์‹คํ–‰(์šด์˜์—์„œ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์“ฐ๋ ˆ๋“œํ’€๋กœ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅ)

๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ํ™˜๊ฒฝ ๊ตฌ์„ฑ์‹œ ์‚ฌ์šฉํ•˜๊ณ ์ž ํ•˜๋Š” Reader์™€ Writer๊ฐ€ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋ฅผ ์ง€์›ํ•˜๋Š”์ง€ ํŒŒ์•…์ด ํ•„์š”ํ•˜๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด JpaPagingItemReader์˜ ๊ฒฝ์šฐ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด thread-safe๋ฅผ ์ œ๊ณตํ•ด์ฃผ๊ณ  ์žˆ๋Š”๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์œผ๋ฉฐ,

public class JpaPagingItemReader<T>
extends AbstractPagingItemReader<T>
ItemReader for reading database records built on top of JPA.

It executes the JPQL setQueryString(String) to retrieve requested data. The query is executed using paged requests of a size specified in AbstractPagingItemReader.setPageSize(int). Additional pages are requested when needed as AbstractItemCountingItemStreamItemReader.read() method is called, returning an object corresponding to current position.

The performance of the paging depends on the JPA implementation and its use of database specific features to limit the number of returned rows.

Setting a fairly large page size and using a commit interval that matches the page size should provide better performance.

In order to reduce the memory usage for large results the persistence context is flushed and cleared after each page is read. This causes any entities read to be detached. If you make changes to the entities and want the changes persisted then you must explicitly merge the entities.

The reader must be configured with an EntityManagerFactory. All entity access is performed within a new transaction, independent of any existing Spring managed transactions.

The implementation is thread-safe in between calls to AbstractItemCountingItemStreamItemReader.open(ExecutionContext), but remember to use saveState=false if used in a multi-threaded client (no restart available).

JpaCursorItemReader๋Š” thread-safeํ•˜์ง€ ์•Š์€ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

public class JpaCursorItemReader<T>
extends AbstractItemCountingItemStreamItemReader<T>
implements org.springframework.beans.factory.InitializingBean
ItemStreamReader implementation based on JPA Query.getResultStream(). It executes the JPQL query when initialized and iterates over the result set as AbstractItemCountingItemStreamItemReader.read() method is called, returning an object corresponding to the current row. The query can be set directly using setQueryString(String), or using a query provider via setQueryProvider(JpaQueryProvider). The implementation is not thread-safe.

์—ฌ๊ธฐ์„œ ๋˜ ํ•œ๊ฐ€์ง€ ์•Œ์•„๋‘ฌ์•ผํ•  ์ ์ด ์žˆ๋‹ค. ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ๊ฐ chunk๋“ค์ด ๊ฐœ๋ณ„๋กœ ์ง„ํ–‰๋˜๋Š” ๊ฒฝ์šฐ ์‹คํŒจ ์ง€์ ์—์„œ ์žฌ์‹œ์ž‘ํ•˜๋Š” ๊ฒƒ์ด ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค. ์™œ๋ƒํ•˜๋ฉด, ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ์˜ ๊ฒฝ์šฐ 1n๊ฐœ์˜ chunk๊ฐ€ ๋™์‹œ์— ์‹คํ–‰๋˜๋ฉฐ, 5๋ฒˆ์งธ chunk๊ฐ€ ์‹คํŒจํ–ˆ๋‹ค๊ณ  ํ•ด์„œ 14 chunk๊ฐ€ ๋ชจ๋‘ ์„ฑ๊ณตํ–ˆ๋‹ค๋Š” ๋ณด์žฅ์ด ์—†๋‹ค.

๊ทธ๋ž˜์„œ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ์ ์šฉ์‹œ ์ผ๋ฐ˜์ ์œผ๋กœ ItemReader์˜ saveState ์˜ต์…˜์„ false๋กœ ์„ค์ •ํ•œ๋‹ค.

saveState : ItemStream#update(ExecutionContext) ๋ฉ”์†Œ๋“œ๋กœ ExectuionContext์— reader์˜ ์ƒํƒœ๊ฐ’์„ ์ €์žฅํ• ์ง€ ๊ฒฐ์ •ํ•œ๋‹ค. (Defualt : true)

Thread-safe

PagingItemReader

์œ„์—์„œ ๋ดค๋“ฏ์ด PagingItemReader๋Š” thread-safeํ•œ ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค. ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐฐ์น˜๊ฐ€ ์žˆ๋‹ค๋ฉด, DB์ ‘๊ทผ์‹œ PagingItemReader๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅํ•œ๋‹ค.

  • application.yml

    spring:
      datasource:
        hikari:
          driver-class-name: com.mysql.cj.jdbc.Driver
          jdbc-url: jdbc:mysql://localhost:3306/spring_batch
          username: spring
          password: Springtest2021!
          maximum-pool-size: 10 # pool์— ์œ ์ง€ํ•  ์ตœ๋Œ€ connection ์ˆ˜
          auto-commit: false # ์ž๋™ commit ์—ฌ๋ถ€
  • main() ๋ฉ”์„œ๋“œ ์„ค์ •

    @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
    public class SpringBatchRealApplication {
    
        public static void main(String[] args) {
            // main thread๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด jvm ๊ฐ•์ œ ์ข…๋ฃŒ
            // main thread๊ฐ€ ์ข…๋ฃŒ๋๋‹ค๋Š” ๊ฒƒ์€ ์ž์‹ thread๋„ ๋ชจ๋‘ ์ข…๋ฃŒ๋๋‹ค๋Š” ๊ฒƒ์„ ๋ณด์žฅ
            System.exit(SpringApplication.exit(SpringApplication.run(SpringBatchRealApplication.class, args)));
        }
    }
  • Job ๊ตฌํ˜„

        @Bean(JOB_NAME)
        public Job job() {
            return this.jobBuilderFactory.get(JOB_NAME)
                    .incrementer(new RunIdIncrementer())
                    .start(step())
                    .build();
    
        }
    
        @Bean(JOB_NAME + "Step")
        public Step step() {
            return this.stepBuilderFactory.get(JOB_NAME + "Step")
                    .<Ncustomer, Ncustomer> chunk(chunkSize)
                    .reader(reader(null))
                    .writer(writer())
                    .taskExecutor(executor())
                    .throttleLimit(poolSize) // default : 4, ์ƒ์„ฑ๋œ ์“ฐ๋ ˆ๋“œ ์ค‘ ๋ช‡๊ฐœ๋ฅผ ์‹ค์ œ ์ž‘์—…์— ์‚ฌ์šฉํ• ์ง€ ๊ฒฐ์ •
                    .build();
        }
    
        @Bean(JOB_NAME + "TaskPool")
        public TaskExecutor executor() {
            // ์“ฐ๋ ˆ๋“œ ํ’€์„ ์ด์šฉํ•œ ์“ฐ๋ ˆ๋“œ ๊ด€๋ฆฌ ๋ฐฉ์‹
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(poolSize); // ํ’€์˜ ๊ธฐ๋ณธ ์‚ฌ์ด์ฆˆ
            executor.setMaxPoolSize(poolSize); // ํ’€์˜ ์ตœ๋Œ€ ์‚ฌ์ด์ฆˆ
            executor.setThreadGroupName("multi-thread-");
            executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
    
            // allowCoreThreadTimeOut์„ true๋กœ ์„ค์ •ํ•ด
            // core thread ๊ฐ€ ์ผ์ •์‹œ๊ฐ„ ํƒœ์Šคํฌ๋ฅผ ๋ฐ›์ง€ ์•Š์„ ๊ฒฝ์šฐ pool ์—์„œ ์ •๋ฆฌํ•˜๊ณ ,
            // ๋ชจ๋“  ์ž์‹ ์Šค๋ ˆ๋“œ๊ฐ€ ์ •๋ฆฌ๋˜๋ฉด jvm ๋„ ์ข…๋ฃŒ ๋˜๊ฒŒ ์„ค์ •ํ•œ๋‹ค.
            executor.setKeepAliveSeconds(30);
            executor.setAllowCoreThreadTimeOut(true);
    
            executor.initialize();
            return executor;
        }
    
        @Bean(JOB_NAME + "Reader")
        public JdbcPagingItemReader<Ncustomer> reader(PagingQueryProvider pagingQueryProvider) {
    
            return new JdbcPagingItemReaderBuilder<Ncustomer>()
                    .name("customerJdbcPagingItemReader")   // Reader์˜ ์ด๋ฆ„, ExecutionContext์— ์ €์žฅ๋˜์–ด์งˆ ์ด๋ฆ„
                    .dataSource(dataSource)                 // DB์— ์ ‘๊ทผํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•  DataSource๊ฐ์ฒด
                    .queryProvider(pagingQueryProvider)     // PagingQueryProvider
                    .pageSize(10)                           // ๊ฐ ํŽ˜์ด์ง€ ํฌ๊ธฐ
                    .rowMapper(new BeanPropertyRowMapper<>(Ncustomer.class)) // ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์Šคํ„ด์Šค๋กœ ๋งคํ•‘ํ•˜๊ธฐ ์œ„ํ•œ ๋งคํผ
                    .saveState(false)                       // Reader๊ฐ€ ์‹คํŒจํ•œ ์ง€์ ์„ ์ €์žฅํ•˜์ง€ ์•Š๋„๋ก ์„ค์ •
                    .build();
        }

    ์—ฌ๊ธฐ์„œ ํ•ต์‹ฌ์€ TaskExecutor์„ ๊ตฌํ˜„ํ•˜๋Š” ๋ถ€๋ถ„๊ณผ Step์‹คํ–‰์‹œ .saveState(false)๋กœ ์„ค์ •ํ•˜์—ฌ, Reader๊ฐ€ ์‹คํŒจํ•œ ์ง€์ ์„ ์ €์žฅํ•˜์ง€ ์•Š๊ณ , ์‹คํŒจ์‹œ ๋‹ค์‹œ ์ฒ˜์Œ๋ถ€ํ„ฐ ์‹คํ–‰ํ•˜๋„๋ก ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. (๋‹ค๋ฅธ thread๋“ค์˜ ์„ฑ๊ณต์„ ๋ณด์žฅํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ!)

    TaskExecutor ๊ตฌํ˜„์‹œ์—๋Š” allowCoreThreadTimeOut์„ ์„ค์ •ํ•ด ํŠน์ •์‹œ๊ฐ„(KeepAliveSeconds)์ดํ›„์— ์‚ฌ์šฉํ•˜์ง€ ์•Š์œผ๋ฉด ์ข…๋ฃŒ๋˜๋„๋ก ์„ค์ •ํ•œ๋‹ค.

    ๋‹ค์Œ Job์„ ์ˆ˜ํ–‰ํ•˜๋ฉด ๊ฐ thread๋ณ„๋กœ ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Not Thread Safety

CursorItemReader

cursorItemReader์˜ ๊ฒฝ์šฐ์—๋Š” thread safety๋ฅผ ๋ณด์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค. Reader ์˜์—ญ์„ SynchronizedItemStreamReader๋กœ wrappingํ•˜์—ฌ thread safetyํ•˜๊ฒŒ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

@Bean(JOB_NAME + "Reader")
    public SynchronizedItemStreamReader<Ncustomer> reader() {

        String sql = "SELECT N.CUSTOMER_ID" +
                ", CONCAT(N.LAST_NAME, \" \", N.FIRST_NAME) AS FULL_NAME\n" +
                " , N.ADDRESS1 AS ADDRESS\n" +
                ", N.POSTAL_CODE\n" +
                "FROM NCUSTOMER N\n" +
                "LIMIT 55";

        JdbcCursorItemReader itemReader =  new JdbcCursorItemReaderBuilder<Ncustomer>()
                .name(JOB_NAME + "Reader")   // Reader์˜ ์ด๋ฆ„, ExecutionContext์— ์ €์žฅ๋˜์–ด์งˆ ์ด๋ฆ„
                .dataSource(dataSource)                 // DB์— ์ ‘๊ทผํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•  DataSource๊ฐ์ฒด
                .rowMapper(new BeanPropertyRowMapper<>(Ncustomer.class)) // ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ ์ธ์Šคํ„ด์Šค๋กœ ๋งคํ•‘ํ•˜๊ธฐ ์œ„ํ•œ ๋งคํผ
                .sql(sql)
                .saveState(false)                       // Reader๊ฐ€ ์‹คํŒจํ•œ ์ง€์ ์„ ์ €์žฅํ•˜์ง€ ์•Š๋„๋ก ์„ค์ •
                .build();

        return new SynchronizedItemStreamReaderBuilder<Ncustomer>()
                .delegate(itemReader)
                .build();
    }

SynchronizedItemStreamReader์˜ delegate์— ์ˆ˜ํ–‰ํ•˜๊ณ  ์‹ถ์€ CursorItemReader๋ฅผ ๋“ฑ๋กํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

public class SynchronizedItemStreamReader<T> implements ItemStreamReader<T>, InitializingBean {

	private ItemStreamReader<T> delegate;

	public void setDelegate(ItemStreamReader<T> delegate) {
		this.delegate = delegate;
	}

	/**
	 * This delegates to the read method of the <code>delegate</code>
	 */
	@Nullable
	public synchronized T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		return this.delegate.read();
	}

SynchronizedItemStreamReader์˜ read() ๋ฉ”์„œ๋“œ๋ฅผ ๋ณด๋ฉด synchronized ๋ฉ”์„œ๋“œ๋กœ ๊ฐ์‹ธ ๋™๊ธฐํ™”๋œ ์ฝ๊ธฐ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

2021-12-12 23:02:25.062  INFO 31421 --- [ multi-thread-5] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=1
2021-12-12 23:02:25.062  INFO 31421 --- [ multi-thread-2] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=2
2021-12-12 23:02:25.062  INFO 31421 --- [ multi-thread-4] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=3
2021-12-12 23:02:25.062  INFO 31421 --- [ multi-thread-1] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=4
2021-12-12 23:02:25.062  INFO 31421 --- [ multi-thread-3] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=5
2021-12-12 23:02:25.063  INFO 31421 --- [ multi-thread-4] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=6
2021-12-12 23:02:25.063  INFO 31421 --- [ multi-thread-3] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=7
2021-12-12 23:02:25.063  INFO 31421 --- [ multi-thread-5] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=8
2021-12-12 23:02:25.063  INFO 31421 --- [ multi-thread-2] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=9
2021-12-12 23:02:25.063  INFO 31421 --- [ multi-thread-1] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=10
2021-12-12 23:02:25.064  INFO 31421 --- [ multi-thread-2] d.e.s.j.m.l.CursorItemReaderListener     : Reading customer id=11

์ˆ˜ํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด, ๊ฐ thread๊ฐ€ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฝ์–ด์™€ ๊ฐœ๋ณ„๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด๋ฏธ ๋„คํŠธ์›Œํฌ/DISK IO/CPU/Memory ๋“ฑ ์„œ๋ฒ„ ์ž์›์ด ์ด๋ฏธ ๋‹จ์ผ ์“ฐ๋ ˆ๋“œ์—์„œ๋„ ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ๋Ÿ‰์ด ํ•œ๊ณ„์น˜์— ๋‹ฌํ–ˆ๋‹ค๋ฉด ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ์ง„ํ–‰ํ•œ๋‹ค๊ณ  ํ•ด์„œ ์„ฑ๋Šฅ ํ–ฅ์ƒ์„ ๊ธฐ๋Œ€ํ•  ์ˆ˜ ์—†์œผ๋ฉฐ, ์‹ค์ œ ์šด์˜ ํ™˜๊ฒฝ ์ ์šฉ ์ด์ „์— ์ถฉ๋ถ„ํžˆ ํ…Œ์ŠคํŠธ๋ฅผ ์ง„ํ–‰ํ•ด๋ณด๊ณ  ํ•ด์•ผํ•œ๋‹ค.

Parallel Steps

๊ฐ ์—ญํ• ์„ ์—ฌ๋Ÿฌ step(Flow)์œผ๋กœ ๋‚˜๋ˆ„์–ด ๋ณ‘๋ ฌํ™” ํ•  ์ˆ˜ ์žˆ๋‹ค.

    @Bean(JOB_NAME)
    public Job job() {
        Flow flow1 = new FlowBuilder<Flow>("flow1")
                .start(step1())
                .build();

        Flow flow2 = new FlowBuilder<Flow>("flow2")
                .start(step2())
                .build();


        Flow flow3 = new FlowBuilder<Flow>("flow3")
                .start(step3())
                .build();

        Flow parelleFlow = new FlowBuilder<Flow>("parelleFlow")
                .split(executor())
                .add(flow1, flow2, flow3)
                .build();

        return this.jobBuilderFactory.get(JOB_NAME)
                .incrementer(new UniqueRunIdIncrementer())
                .start(parelleFlow)
                .end()
                .build();
    }

Flow๋Š” ์ž์ฒด ์Šค๋ ˆ๋“œ์—์„œ ์ง„ํ–‰๋˜๋ฉฐ, ์—ฌ๋Ÿฌ Flow๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค. FlowBuilder์˜ .split() ๋ฉ”์„œ๋“œ๋Š” TaskExecutor๋ฅผ ๋ฐ›์•„ ๊ฐ Flow๋Š” ์ž์ฒด ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋œ๋‹ค. ์œ„์™€ ๊ฐ™์ด ๊ตฌํ˜„ํ•˜๋ฉด ๊ฐ step์ด๋‚˜ step์˜ flow๋ฅผ ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

2021-12-13 18:30:08.267  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 7910
Naccount(accountId=10, balance=24319.5, lastStatementDate=2018-05-02 22:25:15.0)
2021-12-13 18:30:08.267  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 7911
2021-12-13 18:30:08.267  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 7912
...(์ƒ๋žต)
2021-12-13 18:30:08.275  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 9345
Ncustomer(customerId=141, fullName=Docket Jonah, address=92125 Crownhardt Junction, postalCode=08638)
Ncustomer(customerId=142, fullName=Fullerlove Lani, address=02750 Lindbergh Center, postalCode=84189)
Ncustomer(customerId=143, fullName=Boich Orin, address=928 Crownhardt Road, postalCode=31205)
Ncustomer(customerId=144, fullName=Yeoland Maximo, address=930 Prairie Rose Pass, postalCode=33129)
2021-12-13 18:30:08.275  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 9346
2021-12-13 18:30:08.275  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 9347
2021-12-13 18:30:08.275  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 9348
2021-12-13 18:30:08.275  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 9349
2021-12-13 18:30:08.275  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 9350
Ncustomer(customerId=145, fullName=Nystrom Christoforo, address=52 Anhalt Circle, postalCode=64136)
2021-12-13 18:30:08.275  INFO 35926 --- [ multi-thread-3] d.e.s.j.m.ParallelStepsConfiguration     : [step] : 9351
Ncustomer(customerId=146, fullName=Enderwick Karin, address=62338 Fieldstone Hill, postalCode=32309)

๊ฐ Step๋“ค์ด ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰๋˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์œผ๋ฉฐ, ์ด๋•Œ ๋ชจ๋“  Flow๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด Job์ด ์ข…๋ฃŒ๋œ๋‹ค.

AsyncItemProcessor์™€ AsyncItemWriter

ํŠน์ • ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ์˜ ๋ณ‘๋ชฉ๊ตฌ๊ฐ„์ด ItemProcessor์— ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ๋Š”๋ฐ, ๊ทธ๋Ÿฐ ๊ฒฝ์šฐ Step์˜ ItemProcessor ๋ถ€๋ถ„๋งŒ ๋ณ„๋„ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜์—ฌ ์„ฑ๋Šฅ์„ ํ–ฅ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค. AsyncItemProcessor๋Š” ItemProcessor ๊ตฌํ˜„์ฒด๋ฅผ ๋ž˜ํ•‘ํ•˜๋Š” ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋กœ, ์ƒˆ๋กœ์šด ์Šค๋ ˆ๋“œ์—์„œ ItemProcessor ๋กœ์ง์„ ์ˆ˜ํ–‰ํ•˜๋ฉฐ, ์ฒ˜๋ฆฌ๋ฅผ ์™„๋ฃŒํ•˜๊ณ  ๋‚œ ํ›„ ๊ฒฐ๊ณผ๊ฐ’์œผ๋กœ ๋ฐ˜ํ™˜๋œ Future๋ฅผ AsyncItemWriter๋กœ ์ „๋‹ฌํ•œ๋‹ค. AsyncItemWriter ๋˜ํ•œ, ItemWrite ๊ตฌํ˜„์ฒด๋ฅผ ๋ž˜ํ•‘ํ•œ ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ์ด๋‹ค. AsyncItemWriter๋Š” Future๋ฅผ ์ฒ˜๋ฆฌํ•œ ํ›„ ๊ทธ ๊ฒฐ๊ณผ๋ฅผ ItemWriter์— ์ „๋‹ฌํ•œ๋‹ค. ๊ทธ๋Ÿฌ๋ฏ€๋กœ ๋ฐ˜๋“œ์‹œ AsyncItemProcessor์™€ AsyncItemWriter๋Š” ํ•จ๊ป˜ ์‚ฌ์šฉํ•ด์•ผํ•œ๋‹ค.

dependency

dependencies {
    implementation 'org.springframework.batch:spring-batch-integration'
}

์œ„ ์˜์กด์„ฑ์„ ์ถ”๊ฐ€ํ•ด์ฃผ์–ด์•ผ AsyncItemProcessor์™€ AsyncItemWriter๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

example

    @Bean(JOB_NAME + "AccountProcessor")
    public ItemProcessor<Naccount, Naccount> processor() {
        return (account)->{
            Thread.sleep(5);; // ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ์ด์ „ ์ผ๋ถ€๋Ÿฌ ๋Šฆ์ถค
            return account;
        };
    }

ํ”„๋กœ์„ธ์„œ ๋ณ‘๋ชฉํ˜„์ƒ์„ ๊ตฌํ˜„ํ•˜๊ธฐ ์œ„ํ•ด ์ž„์˜๋กœ 5๋ฐ€๋ฆฌ์ดˆ์”ฉ ์‰ฌ๋„๋ก ๋งŒ๋“ค์—ˆ๋‹ค. ๊ทธ ํ›„ ๋ฐฐ์น˜๋ฅผ ์ˆ˜ํ–‰ํ•ด๋ณด๋ฉด,

2021-12-13 20:24:14.945  INFO 36579 --- [           main] d.e.s.SpringBatchRealApplication         : Starting SpringBatchRealApplication using Java 17.0.1 on

...(์ƒ๋žต)

2021-12-13 20:24:23.155  INFO 36579 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-12-13 20:24:23.159  INFO 36579 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

๋ฐฐ์น˜ ์ˆ˜ํ–‰์‹œ์ž‘๋ถ€ํ„ฐ ์ข…๋ฃŒ๊นŒ์ง€ ์•ฝ 9์ดˆ๊ฐ€ ๊ฑธ๋ฆฐ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

์ด์ œ ๋น„๋™๊ธฐ์ฒ˜๋ฆฌ๋ฅผ ํ†ตํ•ด ์„ฑ๋Šฅ์„ ๊ฐœ์„ ํ•ด๋ณผ ๊ฒƒ์ด๋‹ค.

    @Bean(JOB_NAME + "AccountProcessor")
    public ItemProcessor<Naccount, Naccount> processor() {
        return (account)->{
            Thread.sleep(5);; // ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ ์ด์ „ ์ผ๋ถ€๋Ÿฌ ๋Šฆ์ถค
            return account;
        };
    }

    @Bean
    public AsyncItemProcessor<Naccount, Naccount> asyncItemProcessor() {
        AsyncItemProcessor<Naccount, Naccount> processor = new AsyncItemProcessor<>();
        processor.setDelegate(processor());
        processor.setTaskExecutor(executor());
        return processor;
    }

    @Bean
    public AsyncItemWriter<Naccount> asyncItemWriter() {
        AsyncItemWriter<Naccount> writer = new AsyncItemWriter<>();
        writer.setDelegate(naccountItemWriter());
        return writer;
    }

    @Bean(JOB_NAME + "AccountWriter")
    public ItemWriter<Naccount> naccountItemWriter() {
        return (items) -> items.forEach(System.out::println);
    }

AysncItemProcessor์˜ setDelegate() ๋ฉ”์„œ๋“œ์— ๋ž˜ํ•‘ํ•  ItemProcessor๋ฅผ ์ง€์ •ํ•ด์ฃผ๊ณ , ๋ณ„๋„ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•œ TaskExecutor๋ฅผ ์„ค์ •ํ•ด์ค€๋‹ค. ๊ทธ ํ›„ AsyncItemWriter๋ฅผ ๊ตฌํ˜„ํ•˜์—ฌ Future ๋ฅผ ๋ฐ›์•„์™€ ์“ฐ๊ธฐ ์ž‘์—…์„ ํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ตฌํ˜„ํ•œ๋‹ค. ์ˆ˜ํ–‰ํ•  ItemWriter๋ฅผ setDelegate ๋ฉ”์„œ๋“œ์— ์ง€์ •ํ•ด์ค€๋‹ค.

    @Bean(JOB_NAME + "AccountStep")
    public Step step2() {
        return this.stepBuilderFactory.get(JOB_NAME + "AccountStep")
                .<Naccount, Future<Naccount>>chunk(chunkSize)
                .reader(jpaCursorItemReader())
                .processor(asyncItemProcessor())
                .writer(asyncItemWriter())
                .build();
    }

chunk๋ฉ”์„œ๋“œ ์ฒ˜๋ฆฌ์‹œ AsyncItemProcessor๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” ํƒ€์ž…์ธ Future<T>๋กœ ๋ฐ˜ํ™˜ ํƒ€์ž…์„ ๋ณ€๊ฒฝํ•ด์ฃผ๋ฉด๋œ๋‹ค. ๊ทธ ํ›„ ๋ฐฐ์น˜๋ฅผ ์ˆ˜ํ–‰ํ•ด๋ณด๋ฉด,

2021-12-13 20:28:11.286  INFO 36626 --- [           main] d.e.s.SpringBatchRealApplication         : Starting SpringBatchRealApplication using Java 17.0.1 on PID 36626 

...(์ƒ๋žต)

2021-12-13 20:28:15.158  INFO 36626 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2021-12-13 20:28:15.162  INFO 36626 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

์•ฝ 4์ดˆ๋งŒ์— ์ˆ˜ํ–‰์ด ์™„๋ฃŒ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Partitioning

๋ฐฐ์น˜ ๊ธฐ๋ฐ˜ ์›Œํฌ๋กœ๋“œ์˜ ๋Œ€๋ถ€๋ถ„์€ I/O์— ์žˆ๋‹ค. ์Šคํ”„๋ง ๋ฐฐ์น˜๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์›Œ์ปค๊ฐ€ ์™„์ „ํ•œ ์Šคํ…์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค.

https://docs.spring.io/spring-batch/docs/current/reference/html/images/partitioning-overview.png

ํŒŒํ‹ฐ์…”๋‹์€ Master Step์ด ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ์ง€์ •๋œ ์ˆ˜์˜ Worker Step์œผ๋กœ ์ผ์„ ๋ถ„ํ• ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์„ ๋งํ•œ๋‹ค. ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋” ์ž‘์€ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„์–ด ๊ฐ ์›Œ์ปค๊ฐ€ ๋‚˜๋ˆ ์ง„ ํŒŒํ‹ฐ์…˜์„ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ๊ฐ ์›Œ์ปค๋Š” ์ž์ฒด์ ์œผ๋กœ ์ฝ๊ธฐ(ItemReader), ์ฒ˜๋ฆฌ(ItemProcessor), ์“ฐ๊ธฐ(ItemWriter) ๋“ฑ์„ ๋‹ด๋‹นํ•˜๋Š” ์˜จ์ „ํ•œ ๋ฐฐ์น˜ Step์ด๋ฉฐ, ์žฌ์‹œ์ž‘๊ณผ ๊ฐ™์ด ์Šคํ”„๋ง ๋ฐฐ์น˜๊ฐ€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ œ๊ณตํ•˜๋Š” ๋ชจ๋“  ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

Multi-threaded Step vs Partitioning

  • ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ Step์€ ๋‹จ์ผ Step์„ Chunk ๋‹จ์œ„๋กœ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ƒ์„ฑํ•ด ๋ถ„ํ•  ์ฒ˜๋ฆฌ

    • ์–ด๋–ค ์“ฐ๋ ˆ๋“œ์—์„œ ์–ด๋–ค ๋ฐ์ดํ„ฐ๋“ค์„ ์ฒ˜๋ฆฌํ• ์ง€์— ๋Œ€ํ•œ ์„ธ๋ฐ€ํ•œ ์กฐ์ •์ด ๋ถˆ๊ฐ€๋Šฅ

    • ํ•ด๋‹น Step์˜ ItemReader/ItemWriter๋“ฑ์ด ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ํ™˜๊ฒฝ์„ ์ง€์›ํ•˜๋Š”์ง€ ์œ ๋ฌด๊ฐ€ ์ค‘์š”ํ•จ

  • ํŒŒํ‹ฐ์…”๋‹์€ ๋…๋ฆฝ์ ์ธ Step(Worker Step)์„ ๊ตฌ์„ฑํ•˜๊ณ , ๊ฐ๊ฐ ๋ณ„๋„์˜ StepExecution ํŒŒ๋ผ๋ฏธํ„ฐ ํ™˜๊ฒฝ์„ ๊ฐ€์ง€๊ณ  ์ฒ˜๋ฆฌ

    • ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ๋™์ž‘ํ•˜์ง€๋งŒ, ItemReader/ItemWriter์ด ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ํ™˜๊ฒฝ์„ ์ง€์›ํ•˜๋Š”์ง€ ์ค‘์š”ํ•˜์ง€ ์•Š๋‹ค.

์ฃผ์š” ์ธํ„ฐํŽ˜์ด์Šค

Partitioner

Partitioner๋Š” ํŒŒํ‹ฐ์…”๋‹ํ•  ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„๋Š” ์—ญํ• ์„ ํ•œ๋‹ค. ์ฆ‰, ํŒŒํ‹ฐ์…”๋‹๋œ Worker Step์„ ์œ„ํ•œ StepExecution์„ ์ƒ์„ฑํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋‹ค.

public interface Partitioner {
	Map<String, ExecutionContext> partition(int gridSize);
}

partition(int gridSize) ๋‹จ์ผ ๋ฉ”์„œ๋“œ๋กœ ๊ตฌํ˜„๋˜์–ด์žˆ์œผ๋ฉฐ, ์—ฌ๊ธฐ์„œ gridSize๋Š” ๋ถ„ํ• ํ•  ์›Œ์ปค ๊ฐœ์ˆ˜๊ฐ€ ๋ช‡๊ฐœ์ธ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ์„ค์ •๊ฐ’์ด๋‹ค. gridSize๋ฅผ ๊ณ„์‚ฐํ•˜๊ฑฐ๋‚˜ ์„ค์ •ํ•˜๋Š”๊ฒƒ์€ ์˜ค๋กœ์ง€ ๊ฐœ๋ฐœ์ž๋“ค ๋ชซ์ด๋‹ค.

  • org.springframework.batch.core.partition.support.SimplePartitioner

    • ๊ธฐ๋ณธ ๊ตฌํ˜„์œผ๋กœ, ๋นˆ StepExecution์„ ์ƒ์„ฑ

  • org.springframework.batch.core.partition.support.MultiResourcePartitioner

    • ์—ฌ๋Ÿฌ ๋ฆฌ์†Œ์Šค์˜ ๋ฐฐ์—ด์„ ํ™•์ธํ•˜๊ณ , ๋ฆฌ์†Œ์Šค๋‹น ํŒŒํ‹ฐ์…˜์„ ์ƒ์„ฑ

PartitionHandler

PartitionHandler๋Š” Master Step์ด Worker Step์„ ์–ด๋–ป๊ฒŒ ๋‹ค๋ฃฐ์ง€ ์ •์˜ํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค์ด๋‹ค. ์ฆ‰, ์›Œ์ปค์™€ ์˜์‚ฌ์†Œํ†ต์„ ํ•˜๋Š”๋ฐ ์‚ฌ์šฉํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค์ด๋‹ค.

public interface PartitionHandler {
	Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception;
}

์—ฌ๊ธฐ์„œ ๋‹ค๋ฃจ๋Š” ๋‚ด์šฉ์€ ๊ฐ ์›Œ์ปค์—๊ฒŒ ์ž‘์—… ๋Œ€์ƒ์„ ์–ด๋–ป๊ฒŒ ์•Œ๋ ค์ค„์ง€, ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰ํ•˜๊ฒŒ๋˜๋Š” ๊ฒฝ์šฐ ์“ฐ๋ ˆ๋“œํ’€์€ ์–ด๋–ป๊ฒŒ ๊ด€๋ฆฌํ• ์ง€, ๋ชจ๋“  ์ž‘์—…์ด ์™„๋ฃŒ๋˜์—ˆ๋Š”์ง€ ์‹๋ณ„ํ•˜๋Š”์ง€ ์ด๋‹ค.

์Šคํ”„๋ง ๋ฐฐ์น˜ ์‚ฌ์šฉ์‹œ Partitioner ๊ตฌํ˜„์ฒด๋ฅผ ์ง์ ‘ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒฝ์šฐ๋Š” ์žˆ์ง€๋งŒ, PartitionHandler๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒฝ์šฐ๋Š” ์—†์„ ๊ฒƒ์ด๋‹ค.

  • org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler

    • ๋‹จ์ผ JVM ๋‚ด์—์„œ ํŒŒํ‹ฐ์…”๋‹ ๊ฐœ๋…์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก, ๋™์ผํ•œ JVM๋‚ด์˜ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ์—์„œ ์›Œ์ปค ๋ถ„ํ•  ์‹คํ–‰

  • org.springframework.batch.core.partition.support.MessageChannelPartitionHandler

    • ์›๊ฒฉ JVM์— ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ „์†ก

  • org.springframework.cloud.task.batch.partition.DeployerPartitionHandler

    • Spring Cloud Task ํ”„๋กœ์ ํŠธ๊ฐ€ ์ œ๊ณต

    • ์ง€์›๋˜๋Š” ํ”Œ๋žซํผ์ƒ์—์„œ ์˜จ๋””๋งจ๋“œ ๋ฐฉ์‹์œผ๋กœ ์›Œ์ปค ์‹คํ–‰

TaskExecutorPartitionHandler (๋‹จ์ผ JVM) ์˜ˆ์ œ

๋‹จ์ผ JVM๋‚ด์—์„œ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•ด ์›Œ์ปค๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ฃผ๋Š” ์ปดํฌ๋„ŒํŠธ๋กœ, ๋‹จ์ผ ์žฅ๋น„๋‚ด์—์„œ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋Š” ์ž‘์—…์—๋Š” ์ œ์•ฝ(๋„คํŠธ์›Œํฌ/DISK IO/CPU/Memory)์ด ์กด์žฌํ•œ๋‹ค.

    @Bean(JOB_NAME + "TaskPool")
    public TaskExecutor executor() {
        // ์“ฐ๋ ˆ๋“œ ํ’€์„ ์ด์šฉํ•œ ์“ฐ๋ ˆ๋“œ ๊ด€๋ฆฌ ๋ฐฉ์‹
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(poolSize); // ํ’€์˜ ๊ธฐ๋ณธ ์‚ฌ์ด์ฆˆ
        executor.setMaxPoolSize(poolSize); // ํ’€์˜ ์ตœ๋Œ€ ์‚ฌ์ด์ฆˆ
        executor.setThreadNamePrefix("partition-thread-");
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE);
        executor.initialize();
        return executor;
    }


    @Bean(name = JOB_NAME + "PartitionHandler")
    public TaskExecutorPartitionHandler partitionHandler() {
        // ๋กœ์ปฌ์—์„œ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ๋กœ ํŒŒํ‹ฐ์…”๋‹ ์‚ฌ์šฉ
        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();

        // worker๋กœ ์ˆ˜ํ–‰ํ•  step
        partitionHandler.setStep(step());

        // ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ์ˆ˜ํ–‰์„ ์œ„ํ•œ TaskExecutor์„ค์ •
        partitionHandler.setTaskExecutor(executor());

        // ์“ฐ๋ ˆ๋“œ ๊ฐœ์ˆ˜์™€ gridSize๋ฅผ ๋งž์ถฐ์ค€๋‹ค.
        partitionHandler.setGridSize(poolSize);

        return partitionHandler;
    }

๋‹ค์Œ๊ณผ ๊ฐ™์ด TaskExecutorPartitionHandler์— worker๋กœ ์ˆ˜ํ–‰ํ•  step, ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋“œ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ TaskExecutor ์„ค์ •, ํŒŒํ‹ฐ์…”๋‹ ์ˆ˜(gridSize)๋ฅผ ์ง€์ •ํ•ด์ค€๋‹ค.

Partitioner ๊ตฌํ˜„

@RequiredArgsConstructor
public class AccountIdRangePartitioner implements Partitioner {

    private final NaccountRepository naccountRepository;
    private final LocalDate startDate;
    private final LocalDate endDate;

    /**
     * ํŠน์ •๊ธฐ๊ฐ„ ๋‚ด ์กด์žฌํ•˜๋Š” ์ตœ์†Œ accountId์™€ ์ตœ๋Œ€ accountId๋ฅผ ๊ฐ€์ ธ์™€ gridSize๋กœ ๋ถ„ํ• ์ฒ˜๋ฆฌ
     * @param gridSize
     * @return
     */
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

        long min = naccountRepository.findMinId(startDate, endDate);
        long max = naccountRepository.findMaxId(startDate, endDate);
        long targetSize = (max - min) / gridSize + 1;

        Map<String, ExecutionContext> result = new HashMap<>();
        long number = 0;
        long start = min;
        long end = start + targetSize - 1;

        while (start <= max) {
            ExecutionContext value = new ExecutionContext();
            result.put("partition" + number, value);

            if (end >= max) {
                end = max;
            }

            value.putLong("minId", start); // ๊ฐ ํŒŒํ‹ฐ์…˜๋งˆ๋‹ค ์‚ฌ์šฉ๋  minId
            value.putLong("maxId", end); // ๊ฐ ํŒŒํ‹ฐ์…˜๋งˆ๋‹ค ์‚ฌ์šฉ๋  maxId

            start += targetSize;
            end += targetSize;
            number++;
        }


        return result;
    }
}
    @Bean(name = JOB_NAME + "Partitioner")
    @StepScope
    public AccountIdRangePartitioner partitioner(
            @Value("#{jobParameters['startDate']}") String startDate,
            @Value("#{jobParameters['endDate']}") String endDate) {
        LocalDate startLocalDate = LocalDate.parse(startDate, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        LocalDate endLocalDate = LocalDate.parse(endDate, DateTimeFormatter.ofPattern("yyyy-MM-dd"));

        return new AccountIdRangePartitioner(naccountRepository, startLocalDate, endLocalDate);
    }
  • ์ƒ์„ฑํ•  Worker Step ์ˆ˜ ๊ฒฐ์ •

    • long targetSize = (max - min) / gridSize + 1

  • ๊ฐ Worker Step์ด ์–ด๋–ค ๋ณ€์ˆ˜๋ฅผ ๊ฐ€์ง€๊ฒŒ ํ• ์ง€ ๊ฒฐ์ •

    • value.putLong("minId", start)

    • value.putLong("maxId", end)

2021-12-13 22:48:47.105  INFO 38038 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step.manager]
Hibernate: select min(naccount0_.account_id) as col_0_0_ from naccount naccount0_ where naccount0_.last_statement_date between ? and ?
Hibernate: select max(naccount0_.account_id) as col_0_0_ from naccount naccount0_ where naccount0_.last_statement_date between ? and ?
2021-12-13 22:48:47.178  INFO 38038 --- [tition-thread-1] d.e.s.j.m.PartitioningConfiguration      : reader minId=601, maxId=800
2021-12-13 22:48:47.179  INFO 38038 --- [tition-thread-4] d.e.s.j.m.PartitioningConfiguration      : reader minId=801, maxId=1000
2021-12-13 22:48:47.179  INFO 38038 --- [tition-thread-2] d.e.s.j.m.PartitioningConfiguration      : reader minId=201, maxId=400
2021-12-13 22:48:47.180  INFO 38038 --- [tition-thread-3] d.e.s.j.m.PartitioningConfiguration      : reader minId=401, maxId=600
2021-12-13 22:48:47.180  INFO 38038 --- [tition-thread-5] d.e.s.j.m.PartitioningConfiguration      : reader minId=1, maxId=200
H

๋‹ค์Œ๊ณผ ๊ฐ™์ด ๊ฐ ํŒŒํ‹ฐ์…˜๋ณ„๋กœ ์ˆ˜ํ–‰ํ•  StepExecution์ด ์ •์ƒ์ ์œผ๋กœ ์ž˜ ๊ตฌํ˜„๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

Step ๊ตฌํ˜„

    @Bean(JOB_NAME + "StepManager")
    public Step stepManager() {
        // ํŒŒํ‹ฐ์…”๋‹ ๋Œ€์ƒ step๊ณผ ์ด๋ฆ„์„ ์—ฐ๊ด€์ง€์–ด ์ง€์–ด์ค€๋‹ค. (์—ฌ๋Ÿฌ๊ฐœ์˜ step์ด ์žˆ์„ ์ˆ˜๋„ ์žˆ์œผ๋ฏ€๋กœ)
        return this.stepBuilderFactory.get("step.manager")
                .partitioner("step", partitioner(null, null)) // Partitioner ๊ตฌํ˜„์ฒด ๋“ฑ๋ก
                .step(step()) // ํŒŒํ‹ฐ์…”๋‹๋  Step๋“ฑ๋ก
                .partitionHandler(partitionHandler()) // PartitionHandler ๋“ฑ๋ก
                .build();
    }

	@Bean(JOB_NAME + "Step")
    public Step step() {
        return this.stepBuilderFactory.get(JOB_NAME + "Step")
                .<Naccount, Naccount>chunk(chunkSize)
                .reader(reader(null, null))
                .writer(writer(null, null))
                .build();
    }

partitioner() ๋ฉ”์„œ๋“œ๋ฅผ ์ด์šฉํ•ด step์— ์‚ฌ์šฉ๋  Partitioner ๊ตฌํ˜„์ฒด๋ฅผ ๋“ฑ๋กํ•ด์ฃผ๊ณ , ํŒŒํ‹ฐ์…”๋‹๋  Step์„ .step() ๋ฉ”์„œ๋“œ์— ๋“ฑ๋กํ•ด์ค€๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์‚ฌ์šฉํ•  partitionHandler()๋ฅผ ๋“ฑ๋กํ•ด์ฃผ๋ฉด master step๊ตฌํ˜„์ด ์™„๋ฃŒ๋œ๋‹ค.

ItemReader/ItemWriter

    @Bean(JOB_NAME + "Reader")
    @StepScope
    public JpaPagingItemReader<Naccount> reader(@Value("#{stepExecutionContext[minId]}") Long minId,
                                                @Value("#{stepExecutionContext[maxId]}") Long maxId) {

        Map<String, Object> params = new HashMap<>();
        params.put("minId", minId);
        params.put("maxId", maxId);

        log.info("reader minId={}, maxId={}", minId, maxId);

        return new JpaPagingItemReaderBuilder<Naccount>()
                .name(JOB_NAME + "Reader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(chunkSize)
                .queryString("SELECT n FROM Naccount n WHERE n.accountId BETWEEN :minId AND :maxId")
                .parameterValues(params)
                .build();
    }

    @Bean(JOB_NAME + "Writer")
    @StepScope
    public ItemWriter<Naccount> writer(@Value("#{stepExecutionContext[minId]}") Long minId,
                                       @Value("#{stepExecutionContext[maxId]}") Long maxId) {
        log.info("minId : {}, maxId: {}", minId, maxId);
        return (items) -> items.forEach(System.out::println);
    }

๋‹ค์Œ๊ณผ ๊ฐ™์ด #{stepExecutionContext['๋ณ€์ˆ˜๋ช…']}์œผ๋กœ ์œ„ Partitioner์—์„œ ๊ฐ ํŒŒํ‹ฐ์…˜ ๋ณ„๋กœ ์ง€์ •ํ•œ StepExecution ๋ณ€์ˆ˜๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค. ํ•ด๋‹น ๋ณ€์ˆ˜๋ฅผ ์ „๋‹ฌ๋ฐ›์•„ ๊ฐ ํŒŒํ‹ฐ์…˜ ๋ณ„๋กœ ์ฒ˜๋ฆฌํ•˜๋„๋ก ๊ตฌํ˜„ํ•˜๋ฉด ๋œ๋‹ค.

Job

๊ทธ ํ›„ ํ•ด๋‹น Master Step์„ Job์— ๋“ฑ๋กํ•ด์ฃผ๋ฉด ๊ตฌํ˜„์ด ์™„๋ฃŒ๋œ๋‹ค.

    @Bean(JOB_NAME)
    public Job job() {
        return this.jobBuilderFactory.get(JOB_NAME)
                .incrementer(new RunIdIncrementer())
                .start(stepManager())
                .build();
    }

์ฐธ๊ณ 

Last updated

Was this helpful?